1 module QueueTest;
2 
3 import std.stdio;
4 import hunt.amqp.client.AmqpClientOptions;
5 import hunt.amqp.client.AmqpClient;
6 import hunt.amqp.client.AmqpSender;
7 import hunt.amqp.client.AmqpMessage;
8 import hunt.amqp.Handler;
9 import hunt.amqp.client.AmqpReceiver;
10 import hunt.amqp.client.AmqpConnection;
11 import hunt.logging;
12 import core.thread;
13 import std.parallelism;
14 
15 void senderTask(AmqpSender sender)
16 {
17     // while(true)
18     {
19         sender.send(AmqpMessage.create().withBody("hello world").build());
20         trace("send completed");
21         // Thread.sleep(500.msecs);
22     }
23 }
24 
25 
26 
27 void run()
28 {
29     AmqpClientOptions options = new AmqpClientOptions()
30     .setHost("10.1.223.62")
31     .setPort(5672)
32     .setUsername("test")
33     .setPassword("123");
34 
35      AmqpClient client = AmqpClient.create(options);
36 
37      client.connect(new class Handler!AmqpConnection {
38          void handle(AmqpConnection conn)
39          {
40             if (conn is null)
41             {
42                 logWarning("Unable to connect to the broker");
43                 return;
44             }
45 
46             logInfo("Connection succeeded");
47             conn.createSender("my-queue", new class Handler!AmqpSender{
48                 void handle(AmqpSender sender)
49                 {
50                     if(sender is null)
51                     {
52                         logWarning("Unable to create a sender");
53                         return;
54                     }
55 
56                     auto t = task!(senderTask , AmqpSender)(sender);
57                     taskPool.put(t);
58                     //for (int i = 0 ; i < 100; ++i)
59                     //{
60                     //  sender.send(AmqpMessage.create().withBody("hello world").build());
61                     //  logInfo("send complite");
62                     //}
63                 }
64             });
65 
66              //conn.createReceiver("my-queue", new class Handler!AmqpReceiver {
67              //   void handle(AmqpReceiver recv)
68              //   {
69              //       if(recv is null)
70              //       {
71              //         logWarning("Unable to create a receiver");
72              //         return;
73              //       }
74              //       recv.handler(new class Handler!AmqpMessage {
75              //         void handle(AmqpMessage msg){
76              //           logInfo("Received %s" , msg.bodyAsString());
77              //         }
78              //       });
79              //   }
80              //});
81          }
82      });
83 }
84