1 module sender; 2 3 import hunt.amqp.client.AmqpClientOptions; 4 import hunt.amqp.client.AmqpClient; 5 import hunt.amqp.client.AmqpSender; 6 import hunt.amqp.client.AmqpMessage; 7 import hunt.amqp.client.AmqpPool; 8 import hunt.amqp.Handler; 9 import hunt.amqp.client.AmqpReceiver; 10 import hunt.amqp.client.AmqpConnection; 11 12 import hunt.logging.ConsoleLogger; 13 import hunt.Object; 14 15 import core.thread; 16 import core.time; 17 18 import std.conv; 19 import std.datetime; 20 import std.format; 21 import std.parallelism; 22 import std.stdio; 23 24 import hunt.amqp.impl.ProtonTransport; 25 import hunt.proton.engine.Event; 26 27 enum Total = 1; 28 29 30 void main(string[] agrs) { 31 32 int number = Total; 33 34 if(agrs.length >=2) { 35 number = to!int(agrs[1]); 36 if(number <=0 ) number = Total; 37 } 38 39 AmqpClientOptions options = new AmqpClientOptions().setHost("10.1.222.110") 40 .setPort(5672).setUsername("admin").setPassword("admin"); 41 42 // AmqpClientOptions options = new AmqpClientOptions().setHost("10.1.223.62") 43 // .setPort(5672).setUsername("test").setPassword("123"); 44 45 // AmqpClientOptions options = new AmqpClientOptions().setHost("121.40.16.40") 46 // .setPort(5672).setUsername("admin").setPassword("RzNKT565Twof"); 47 48 // AmqpPool pool = new AmqpPool(options); 49 // AmqpConnection conn = pool.borrowObject(); 50 51 AmqpClient client = AmqpClient.create(options); 52 AmqpConnection conn = client.connect(); 53 54 if (conn is null) { 55 logWarning("Unable to connect to the broker"); 56 return; 57 } 58 59 logInfo("Connection succeeded"); 60 // dfmt off 61 conn.createSender("my-queue", new class Handler!AmqpSender { 62 void handle(AmqpSender sender) { 63 if(sender is null) { 64 logWarning("Unable to create a sender"); 65 return; 66 } 67 68 foreach(index; 0..number) { 69 DateTime dt = cast(DateTime)Clock.currTime(); 70 string message = format("[%d] Say hello at %s", index, dt.toSimpleString()); 71 message = "xxx123"; 72 AmqpMessage amqpMessage = AmqpMessage.create().withBody(message).build(); 73 sender.send(amqpMessage); 74 tracef("Message %d sent. The content is: '%s'", index, message); 75 // Thread.sleep(1.seconds); 76 } 77 78 trace("All message sent."); 79 // pool.returnObject(conn); 80 81 // sender.end( (VoidAsyncResult ar) { 82 // if(ar.succeeded()) { 83 // warning("Sender ended."); 84 // } else { 85 // Throwable th = ar.cause(); 86 // errorf("Error occured: %s", th.msg); 87 // warning(th); 88 // } 89 // }); 90 91 // sender.close( (ar) { 92 // warning("Sender closed."); 93 // }); 94 95 client.close( (VoidAsyncResult ar) { 96 if(ar.succeeded()) { 97 warning("Connection closed."); 98 } else { 99 Throwable th = ar.cause(); 100 warning(th); 101 } 102 }); 103 } 104 }); 105 // dfmt on 106 107 // pool.returnObject(conn); 108 109 warning("Done."); 110 }