1 module QueueTest; 2 3 import std.stdio; 4 import hunt.amqp.client.AmqpClientOptions; 5 import hunt.amqp.client.AmqpClient; 6 import hunt.amqp.client.AmqpSender; 7 import hunt.amqp.client.AmqpMessage; 8 import hunt.amqp.Handler; 9 import hunt.amqp.client.AmqpReceiver; 10 import hunt.amqp.client.AmqpConnection; 11 import hunt.logging; 12 import core.thread; 13 import std.parallelism; 14 15 void senderTask(AmqpSender sender) 16 { 17 // while(true) 18 { 19 sender.send(AmqpMessage.create().withBody("hello world").build()); 20 trace("send completed"); 21 // Thread.sleep(500.msecs); 22 } 23 } 24 25 26 27 void run() 28 { 29 AmqpClientOptions options = new AmqpClientOptions() 30 .setHost("10.1.223.62") 31 .setPort(5672) 32 .setUsername("test") 33 .setPassword("123"); 34 35 AmqpClient client = AmqpClient.create(options); 36 37 client.connect(new class Handler!AmqpConnection { 38 void handle(AmqpConnection conn) 39 { 40 if (conn is null) 41 { 42 logWarning("Unable to connect to the broker"); 43 return; 44 } 45 46 logInfo("Connection succeeded"); 47 conn.createSender("my-queue", new class Handler!AmqpSender{ 48 void handle(AmqpSender sender) 49 { 50 if(sender is null) 51 { 52 logWarning("Unable to create a sender"); 53 return; 54 } 55 56 auto t = task!(senderTask , AmqpSender)(sender); 57 taskPool.put(t); 58 //for (int i = 0 ; i < 100; ++i) 59 //{ 60 // sender.send(AmqpMessage.create().withBody("hello world").build()); 61 // logInfo("send complite"); 62 //} 63 } 64 }); 65 66 //conn.createReceiver("my-queue", new class Handler!AmqpReceiver { 67 // void handle(AmqpReceiver recv) 68 // { 69 // if(recv is null) 70 // { 71 // logWarning("Unable to create a receiver"); 72 // return; 73 // } 74 // recv.handler(new class Handler!AmqpMessage { 75 // void handle(AmqpMessage msg){ 76 // logInfo("Received %s" , msg.bodyAsString()); 77 // } 78 // }); 79 // } 80 //}); 81 } 82 }); 83 } 84