1 module receiver; 2 3 import std.stdio; 4 import hunt.amqp.client.AmqpClientOptions; 5 import hunt.amqp.client.AmqpClient; 6 import hunt.amqp.client.AmqpSender; 7 import hunt.amqp.client.AmqpMessage; 8 import hunt.amqp.client.AmqpPool; 9 import hunt.amqp.Handler; 10 import hunt.amqp.client.AmqpReceiver; 11 import hunt.amqp.client.AmqpConnection; 12 13 import hunt.logging.ConsoleLogger; 14 import core.thread; 15 import std.parallelism; 16 17 void senderTask(AmqpSender sender) { 18 // while(true) 19 { 20 sender.send(AmqpMessage.create().withBody("hello world").build()); 21 trace("send completed"); 22 // Thread.sleep(500.msecs); 23 } 24 } 25 26 void main() { 27 28 // dfmt off 29 AmqpClientOptions options = new AmqpClientOptions() 30 .setHost("10.1.223.62") 31 .setPort(5672) 32 .setUsername("test") 33 .setPassword("123"); 34 35 AmqpPool pool = new AmqpPool(options); 36 AmqpConnection conn = pool.borrowObject(); 37 38 if (conn is null) { 39 logWarning("Unable to connect to the broker"); 40 return; 41 } 42 43 logInfo("Connection succeeded"); 44 45 conn.createReceiver("my-queue", new class Handler!AmqpReceiver { 46 void handle(AmqpReceiver recv) { 47 if (recv is null) { 48 logWarning("Unable to create a receiver"); 49 return; 50 } 51 52 int counter = 0; 53 recv.handler(new class Handler!AmqpMessage { 54 void handle(AmqpMessage msg) { 55 counter++; 56 tracef("%d => Received: %s", counter, msg.bodyAsString()); 57 } 58 }); 59 } 60 }); 61 // dfmt on 62 // pool.returnObject(conn); 63 64 warning("Done."); 65 }