1 module receiver;
2 
3 import std.stdio;
4 import hunt.amqp.client.AmqpClientOptions;
5 import hunt.amqp.client.AmqpClient;
6 import hunt.amqp.client.AmqpSender;
7 import hunt.amqp.client.AmqpMessage;
8 import hunt.amqp.client.AmqpPool;
9 import hunt.amqp.Handler;
10 import hunt.amqp.client.AmqpReceiver;
11 import hunt.amqp.client.AmqpConnection;
12 import hunt.amqp.client.AmqpReceiverOptions;
13 
14 import hunt.logging.ConsoleLogger;
15 import core.thread;
16 import std.algorithm;
17 import std.parallelism;
18 import std.string;
19 
20 void senderTask(AmqpSender sender) {
21     // while(true)
22     {
23         sender.send(AmqpMessage.create().withBody("hello world").build());
24         trace("send completed");
25         // Thread.sleep(500.msecs);
26     }
27 }
28 
29 void main(string[] agrs) {
30 
31     // dfmt off
32     // AmqpClientOptions options = new AmqpClientOptions()
33     //     .setHost("10.1.223.62")
34     //     .setPort(5672)
35     //     .setUsername("test")
36     //     .setPassword("123");
37 
38     AmqpClientOptions options = new AmqpClientOptions()
39         .setHost("10.1.222.110")
40         .setPort(5672)
41         .setUsername("admin")
42         .setPassword("admin");    
43 
44     AmqpPool pool = new AmqpPool(options);
45     AmqpConnection conn = pool.borrowObject();
46 
47     if (conn is null) {
48         logWarning("Unable to connect to the broker");
49         return;
50     }
51 
52     logInfo("Connection succeeded");
53 
54     AmqpReceiverOptions receiverOptions = new AmqpReceiverOptions();
55     receiverOptions.setAutoAcknowledgement(false);
56     
57     conn.createReceiver("my-queue",  new class Handler!AmqpReceiver {
58         void handle(AmqpReceiver recv) {    
59             if (recv is null) {
60                 logWarning("Unable to create a receiver");
61                 return;
62             }
63 
64             int counter = 0;
65             recv.handler(new class Handler!AmqpMessage {
66                 void handle(AmqpMessage msg) {
67                     counter++;
68 
69                     string content = msg.bodyAsString();
70 
71                     if(content.canFind("[2]")) {
72                         if(counter < 10) {
73 
74                             warningf("%d => modified: %s", counter, content);
75                             msg.modified(true, false);
76                             // msg.rejected();
77 
78                             // warningf("%d => released: %s", counter, content);
79                             // msg.released();
80                         } else {
81                             infof("%d => accepted forcedly: %s", counter, content);
82                             msg.accepted();
83                         }
84                     } else {
85                         tracef("%d => accepted: %s", counter, content);
86                         msg.accepted();
87                     }
88                     // msg.rejected();
89                 }
90             });          
91         }
92     });
93     // dfmt on
94     // pool.returnObject(conn);
95 
96     warning("Done.");
97 }