1 module receiver; 2 3 import std.stdio; 4 import hunt.amqp.client.AmqpClientOptions; 5 import hunt.amqp.client.AmqpClient; 6 import hunt.amqp.client.AmqpSender; 7 import hunt.amqp.client.AmqpMessage; 8 import hunt.amqp.client.AmqpPool; 9 import hunt.amqp.Handler; 10 import hunt.amqp.client.AmqpReceiver; 11 import hunt.amqp.client.AmqpConnection; 12 import hunt.amqp.client.AmqpReceiverOptions; 13 14 import hunt.logging.ConsoleLogger; 15 import core.thread; 16 import std.algorithm; 17 import std.parallelism; 18 import std.string; 19 20 void senderTask(AmqpSender sender) { 21 // while(true) 22 { 23 sender.send(AmqpMessage.create().withBody("hello world").build()); 24 trace("send completed"); 25 // Thread.sleep(500.msecs); 26 } 27 } 28 29 void main(string[] agrs) { 30 31 // dfmt off 32 // AmqpClientOptions options = new AmqpClientOptions() 33 // .setHost("10.1.223.62") 34 // .setPort(5672) 35 // .setUsername("test") 36 // .setPassword("123"); 37 38 AmqpClientOptions options = new AmqpClientOptions() 39 .setHost("10.1.222.110") 40 .setPort(5672) 41 .setUsername("admin") 42 .setPassword("admin"); 43 44 AmqpPool pool = new AmqpPool(options); 45 AmqpConnection conn = pool.borrowObject(); 46 47 if (conn is null) { 48 logWarning("Unable to connect to the broker"); 49 return; 50 } 51 52 logInfo("Connection succeeded"); 53 54 AmqpReceiverOptions receiverOptions = new AmqpReceiverOptions(); 55 receiverOptions.setAutoAcknowledgement(false); 56 57 conn.createReceiver("my-queue", new class Handler!AmqpReceiver { 58 void handle(AmqpReceiver recv) { 59 if (recv is null) { 60 logWarning("Unable to create a receiver"); 61 return; 62 } 63 64 int counter = 0; 65 recv.handler(new class Handler!AmqpMessage { 66 void handle(AmqpMessage msg) { 67 counter++; 68 69 string content = msg.bodyAsString(); 70 71 if(content.canFind("[2]")) { 72 if(counter < 10) { 73 74 warningf("%d => modified: %s", counter, content); 75 msg.modified(true, false); 76 // msg.rejected(); 77 78 // warningf("%d => released: %s", counter, content); 79 // msg.released(); 80 } else { 81 infof("%d => accepted forcedly: %s", counter, content); 82 msg.accepted(); 83 } 84 } else { 85 tracef("%d => accepted: %s", counter, content); 86 msg.accepted(); 87 } 88 // msg.rejected(); 89 } 90 }); 91 } 92 }); 93 // dfmt on 94 // pool.returnObject(conn); 95 96 warning("Done."); 97 }