1 module AmqpPoolTest; 2 3 import std.stdio; 4 import hunt.amqp.client.AmqpClientOptions; 5 import hunt.amqp.client.AmqpClient; 6 import hunt.amqp.client.AmqpSender; 7 import hunt.amqp.client.AmqpMessage; 8 import hunt.amqp.client.AmqpPool; 9 import hunt.amqp.Handler; 10 import hunt.amqp.client.AmqpReceiver; 11 import hunt.amqp.client.AmqpConnection; 12 13 import hunt.logging.ConsoleLogger; 14 import core.thread; 15 import std.parallelism; 16 17 void senderTask(AmqpSender sender) { 18 // while(true) 19 { 20 sender.send(AmqpMessage.create().withBody("hello world").build()); 21 trace("send completed"); 22 // Thread.sleep(500.msecs); 23 } 24 } 25 26 void run() { 27 28 AmqpClientOptions options = new AmqpClientOptions().setHost("10.1.223.62") 29 .setPort(5672).setUsername("test").setPassword("123"); 30 31 AmqpPool pool = new AmqpPool(options); 32 AmqpConnection conn = pool.borrowObject(); 33 34 if (conn is null) { 35 logWarning("Unable to connect to the broker"); 36 return; 37 } 38 39 logInfo("Connection succeeded"); 40 // dfmt off 41 conn.createSender("my-queue", new class Handler!AmqpSender{ 42 void handle(AmqpSender sender) { 43 if(sender is null) { 44 logWarning("Unable to create a sender"); 45 return; 46 } 47 48 sender.send(AmqpMessage.create().withBody("hello world").build()); 49 trace("send completed"); 50 51 warningf("active: %d, idle: %d, waiters: %d", 52 pool.getNumActive(), pool.getNumIdle(), pool.getNumWaiters()); 53 54 pool.returnObject(conn); 55 56 // FIXME: Needing refactor or cleanup -@zhangxueping at 2020-03-27T17:24:05+08:00 57 // 58 conn.close(null); // bug 59 warningf("active: %d, idle: %d, waiters: %d", 60 pool.getNumActive(), pool.getNumIdle(), pool.getNumWaiters()); 61 } 62 }); 63 // dfmt on 64 65 //conn.createReceiver("my-queue", new class Handler!AmqpReceiver { 66 // void handle(AmqpReceiver recv) 67 // { 68 // if(recv is null) 69 // { 70 // logWarning("Unable to create a receiver"); 71 // return; 72 // } 73 // recv.handler(new class Handler!AmqpMessage { 74 // void handle(AmqpMessage msg){ 75 // logInfo("Received %s" , msg.bodyAsString()); 76 // } 77 // }); 78 // } 79 //}); 80 81 // pool.returnObject(conn); 82 83 warning("ok"); 84 85 }