1 module sender;
2 
3 import hunt.amqp.client.AmqpClientOptions;
4 import hunt.amqp.client.AmqpClient;
5 import hunt.amqp.client.AmqpSender;
6 import hunt.amqp.client.AmqpMessage;
7 import hunt.amqp.client.AmqpPool;
8 import hunt.amqp.Handler;
9 import hunt.amqp.client.AmqpReceiver;
10 import hunt.amqp.client.AmqpConnection;
11 
12 import hunt.logging.ConsoleLogger;
13 import hunt.Object;
14 
15 import core.thread;
16 import core.time;
17 
18 import std.conv;
19 import std.datetime;
20 import std.format;
21 import std.parallelism;
22 import std.stdio;
23 
24 import hunt.amqp.impl.ProtonTransport;
25 import hunt.proton.engine.Event;
26 
27 enum Total = 1;
28 
29 
30 void main(string[] agrs) {
31 
32     int number = Total;
33 
34     if(agrs.length >=2) {
35         number = to!int(agrs[1]);
36         if(number <=0 ) number = Total;
37     }
38 
39     AmqpClientOptions options = new AmqpClientOptions().setHost("10.1.222.110")
40         .setPort(5672).setUsername("admin").setPassword("admin");
41 
42     // AmqpClientOptions options = new AmqpClientOptions().setHost("10.1.223.62")
43     //     .setPort(5672).setUsername("test").setPassword("123");
44 
45     // AmqpClientOptions options = new AmqpClientOptions().setHost("121.40.16.40")
46     //     .setPort(5672).setUsername("admin").setPassword("RzNKT565Twof");    
47 
48     // AmqpPool pool = new AmqpPool(options);
49     // AmqpConnection conn = pool.borrowObject();
50 
51     AmqpClient client = AmqpClient.create(options);
52     AmqpConnection conn = client.connect();
53 
54     if (conn is null) {
55         logWarning("Unable to connect to the broker");
56         return;
57     }
58 
59     logInfo("Connection succeeded");
60     // dfmt off
61     conn.createSender("my-queue", new class Handler!AmqpSender {
62         void handle(AmqpSender sender) {
63             if(sender is null) {
64                 logWarning("Unable to create a sender");
65                 return;
66             }
67 
68 			foreach(index; 0..number) {
69 				DateTime dt = cast(DateTime)Clock.currTime();
70 				string message = format("[%d] Say hello at %s", index, dt.toSimpleString());
71                 message = "xxx123";
72                 AmqpMessage amqpMessage = AmqpMessage.create().withBody(message).build();
73 				sender.send(amqpMessage);
74 				tracef("Message %d sent. The content is: '%s'", index, message);
75 				// Thread.sleep(1.seconds);
76 			}
77 
78 			trace("All message sent.");
79             // pool.returnObject(conn);
80 
81             // sender.end( (VoidAsyncResult ar) {
82             //     if(ar.succeeded()) {
83             //         warning("Sender ended.");
84             //     } else {
85             //         Throwable th = ar.cause();
86             //         errorf("Error occured: %s", th.msg);
87             //         warning(th);
88             //     }
89             // });
90 
91             // sender.close( (ar) {
92             //     warning("Sender closed.");
93             // });
94 
95             client.close( (VoidAsyncResult ar) {
96                 if(ar.succeeded()) {
97                     warning("Connection closed.");
98                 } else {
99                     Throwable th = ar.cause();
100                     warning(th);
101                 }
102             });
103         }
104     });
105     // dfmt on
106 
107     // pool.returnObject(conn);
108 
109     warning("Done.");
110 }