1 module AmqpPoolTest;
2 
3 import std.stdio;
4 import hunt.amqp.client.AmqpClientOptions;
5 import hunt.amqp.client.AmqpClient;
6 import hunt.amqp.client.AmqpSender;
7 import hunt.amqp.client.AmqpMessage;
8 import hunt.amqp.client.AmqpPool;
9 import hunt.amqp.Handler;
10 import hunt.amqp.client.AmqpReceiver;
11 import hunt.amqp.client.AmqpConnection;
12 
13 import hunt.logging.ConsoleLogger;
14 import core.thread;
15 import std.parallelism;
16 
17 void senderTask(AmqpSender sender) {
18     // while(true)
19     {
20         sender.send(AmqpMessage.create().withBody("hello world").build());
21         trace("send completed");
22         // Thread.sleep(500.msecs);
23     }
24 }
25 
26 void run() {
27 
28     AmqpClientOptions options = new AmqpClientOptions().setHost("10.1.223.62")
29         .setPort(5672).setUsername("test").setPassword("123");
30 
31     AmqpPool pool = new AmqpPool(options);
32     AmqpConnection conn = pool.borrowObject();
33 
34     if (conn is null) {
35         logWarning("Unable to connect to the broker");
36         return;
37     }
38 
39     logInfo("Connection succeeded");
40     // dfmt off
41     conn.createSender("my-queue", new class Handler!AmqpSender{
42         void handle(AmqpSender sender) {
43             if(sender is null) {
44                 logWarning("Unable to create a sender");
45                 return;
46             }
47 
48             sender.send(AmqpMessage.create().withBody("hello world").build());
49             trace("send completed");
50 
51             warningf("active: %d, idle: %d, waiters: %d", 
52                 pool.getNumActive(), pool.getNumIdle(), pool.getNumWaiters());
53 
54             pool.returnObject(conn);
55 
56             // FIXME: Needing refactor or cleanup -@zhangxueping at 2020-03-27T17:24:05+08:00
57             // 
58             conn.close(null); // bug
59             warningf("active: %d, idle: %d, waiters: %d", 
60                 pool.getNumActive(), pool.getNumIdle(), pool.getNumWaiters());
61         }
62     });
63     // dfmt on
64 
65     //conn.createReceiver("my-queue", new class Handler!AmqpReceiver {
66     //   void handle(AmqpReceiver recv)
67     //   {
68     //       if(recv is null)
69     //       {
70     //         logWarning("Unable to create a receiver");
71     //         return;
72     //       }
73     //       recv.handler(new class Handler!AmqpMessage {
74     //         void handle(AmqpMessage msg){
75     //           logInfo("Received %s" , msg.bodyAsString());
76     //         }
77     //       });
78     //   }
79     //});
80 
81     // pool.returnObject(conn);
82 
83     warning("ok");
84 
85 }