1 module sender; 2 3 import hunt.amqp.client.AmqpClientOptions; 4 import hunt.amqp.client.AmqpClient; 5 import hunt.amqp.client.AmqpSender; 6 import hunt.amqp.client.AmqpMessage; 7 import hunt.amqp.client.AmqpPool; 8 import hunt.amqp.Handler; 9 import hunt.amqp.client.AmqpReceiver; 10 import hunt.amqp.client.AmqpConnection; 11 12 import hunt.logging.ConsoleLogger; 13 14 import core.thread; 15 import core.time; 16 17 import std.datetime; 18 import std.format; 19 import std.parallelism; 20 import std.stdio; 21 22 23 24 void main() { 25 26 AmqpClientOptions options = new AmqpClientOptions().setHost("10.1.223.62") 27 .setPort(5672).setUsername("test").setPassword("123"); 28 29 AmqpPool pool = new AmqpPool(options); 30 AmqpConnection conn = pool.borrowObject(); 31 32 if (conn is null) { 33 logWarning("Unable to connect to the broker"); 34 return; 35 } 36 37 logInfo("Connection succeeded"); 38 // dfmt off 39 conn.createSender("my-queue", new class Handler!AmqpSender { 40 void handle(AmqpSender sender) { 41 if(sender is null) { 42 logWarning("Unable to create a sender"); 43 return; 44 } 45 46 warningf("active: %d, idle: %d, waiters: %d", 47 pool.getNumActive(), pool.getNumIdle(), pool.getNumWaiters()); 48 49 foreach(index; 0..1) { 50 DateTime dt = cast(DateTime)Clock.currTime(); 51 string message = format("[%d] Say hello at %s", index, dt.toSimpleString()); 52 sender.send(AmqpMessage.create().withBody(message).build()); 53 tracef("Message %d sent.", index); 54 // Thread.sleep(1.seconds); 55 } 56 57 trace("All message sent."); 58 warningf("active: %d, idle: %d, waiters: %d", 59 pool.getNumActive(), pool.getNumIdle(), pool.getNumWaiters()); 60 pool.returnObject(conn); 61 62 // FIXME: Needing refactor or cleanup -@zhangxueping at 2020-03-27T17:24:05+08:00 63 // more tests 64 sender.close(null); 65 warningf("active: %d, idle: %d, waiters: %d", 66 pool.getNumActive(), pool.getNumIdle(), pool.getNumWaiters()); 67 } 68 }); 69 // dfmt on 70 71 // pool.returnObject(conn); 72 73 warning("Done."); 74 }