1 /* 2 * hunt-amqp-client: AMQP Client Library for D Programming Language. Support for RabbitMQ and other AMQP Server. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module hunt.amqp.client.impl.AmqpClientImpl; 12 13 import hunt.amqp.ProtonClient; 14 15 import hunt.collection.ArrayList; 16 import hunt.collection.List; 17 import hunt.amqp.client.AmqpClient; 18 import hunt.amqp.client.AmqpClientOptions; 19 import hunt.amqp.client.AmqpConnection; 20 import hunt.amqp.Handler; 21 import hunt.amqp.client.impl.AmqpConnectionImpl; 22 import hunt.amqp.client.AmqpReceiver; 23 import hunt.amqp.client.AmqpSender; 24 import hunt.amqp.client.AmqpReceiverOptions; 25 import hunt.amqp.client.AmqpSenderOptions; 26 27 import hunt.Assert; 28 import hunt.concurrency.CompletableFuture; 29 import hunt.concurrency.Future; 30 import hunt.concurrency.FuturePromise; 31 import hunt.concurrency.Promise; 32 import hunt.logging; 33 import hunt.net.AsyncResult; 34 import hunt.Object; 35 import hunt.Exceptions; 36 import hunt.String; 37 38 /** 39 * 40 */ 41 class AmqpClientImpl : AmqpClient { 42 43 private ProtonClient proton; 44 private AmqpClientOptions options; 45 46 private List!AmqpConnection connections; //= new CopyOnWriteArrayList<>(); 47 private bool mustCloseVertxOnClose; 48 49 this(AmqpClientOptions options, bool mustCloseVertxOnClose) { 50 if (options is null) { 51 this.options = new AmqpClientOptions(); 52 } else { 53 this.options = options; 54 } 55 this.proton = ProtonClient.create(); 56 this.mustCloseVertxOnClose = mustCloseVertxOnClose; 57 this.connections = new ArrayList!AmqpConnection; 58 } 59 60 AmqpClient connect(AsyncResultHandler!AmqpConnection connectionHandler) { 61 if (options.getHost() is null) { 62 logError("Host must be set"); 63 } 64 if (connectionHandler is null) { 65 logError("Handler must not be null"); 66 } 67 // options.getHost(), "Host must be set"); 68 // Objects.requireNonNull(connectionHandler, "Handler must not be null"); 69 new AmqpConnectionImpl(this, options, proton, connectionHandler); 70 return this; 71 } 72 73 Future!AmqpConnection connectAsync() { 74 // dfmt off 75 auto promise = new FuturePromise!AmqpConnection(); 76 77 connect((ar) { 78 if(ar.succeeded()) { 79 promise.succeeded(ar.result()); 80 } else { 81 Throwable th = ar.cause(); 82 warning(th.msg); 83 version(HUNT_DEWBUG) warning(th); 84 promise.failed(new Exception("Unable to connect to the broker.")); 85 } 86 // void handle(AmqpConnection conn) { 87 // if (conn is null) 88 // promise.failed(new Exception("Unable to connect to the broker.")); 89 // else 90 // promise.succeeded(conn); 91 // } 92 }); 93 94 // dfmt on 95 return promise; 96 } 97 98 AmqpConnection connect() { 99 Future!AmqpConnection promise = connectAsync(); 100 version (HUNT_AMQP_DEBUG) 101 warning("try to get a result"); 102 return promise.get(options.getIdleTimeout()); 103 104 } 105 106 // dfmt off 107 void close(AsyncResultHandler!Void handler) { 108 if(handler is null) return; 109 110 Future!(void)[] actions; 111 foreach (AmqpConnection connection ; connections) { 112 FuturePromise!(void) future = new FuturePromise!void(); 113 connection.close((ar) { 114 if (ar.succeeded()) { 115 future.succeeded(); 116 } else { 117 future.failed(cast(Exception) ar.cause()); 118 } 119 }); 120 121 actions ~= future; 122 } 123 124 CompletableFuture!bool cf = supplyAsync!(bool)(() { 125 foreach(Future!void f; actions) { 126 try { 127 f.get(); 128 } catch(Throwable th) { 129 warning(th.msg); 130 version(HUNT_DEBUG) warning(th); 131 return false; 132 } 133 } 134 135 return true; 136 }); 137 138 cf.thenAccept((done) { 139 version(HUNT_DEBUG) info("All connections closed."); 140 connections = null; 141 if(done) { 142 handler(succeededResult!(Void)(null)); 143 } else { 144 Exception ex = new Exception("Failed to close the connections. See the log for more details."); 145 handler(failedResult!(Void)(ex)); 146 } 147 }); 148 } 149 150 //Future<Void> close() { 151 // Promise<Void> promise = Promise.promise(); 152 // close(promise); 153 // return promise.future(); 154 //} 155 156 AmqpClient createReceiver(string address, Handler!AmqpReceiver completionHandler) { 157 return connect((res) { 158 if (res.failed()) { 159 completionHandler.handle(null); 160 } else { 161 res.result().createReceiver(address, completionHandler); 162 } 163 }); 164 165 // return connect(new class Handler!AmqpConnection { 166 // void handle(AmqpConnection conn) { 167 // if (conn !is null) { 168 // conn.createReceiver(address, completionHandler); 169 // } else { 170 // completionHandler.handle(null); 171 // } 172 // } 173 // }); 174 } 175 176 //Future<AmqpReceiver> createReceiver(String address) { 177 // Promise<AmqpReceiver> promise = Promise.promise(); 178 // createReceiver(address, promise); 179 // return promise.future(); 180 //} 181 182 AmqpClient createReceiver(string address, AmqpReceiverOptions receiverOptions, 183 Handler!AmqpReceiver completionHandler) { 184 return connect((res) { 185 if(res.succeeded()) { 186 res.result().createReceiver(address, receiverOptions, completionHandler); 187 } else { 188 completionHandler.handle(null); 189 } 190 191 // void handle(AmqpConnection conn) { 192 // if (conn !is null) { 193 // conn.createReceiver(address, receiverOptions, completionHandler); 194 // } else { 195 // completionHandler.handle(null); 196 // } 197 // } 198 }); 199 } 200 201 //Future<AmqpReceiver> createReceiver(String address, AmqpReceiverOptions receiverOptions) { 202 // Promise<AmqpReceiver> promise = Promise.promise(); 203 // createReceiver(address, receiverOptions, promise); 204 // return promise.future(); 205 //} 206 207 AmqpClient createSender(string address, Handler!AmqpSender completionHandler) { 208 return connect((ar) { 209 if(ar.succeeded()) { 210 auto conn = ar.result(); 211 conn.createSender(address, completionHandler); 212 } else { 213 Throwable th = ar.cause(); 214 warning(th.msg); 215 version(HUNT_DEWBUG) warning(th); 216 completionHandler.handle(null); 217 } 218 }); 219 } 220 221 //Future<AmqpSender> createSender(String address) { 222 // Promise<AmqpSender> promise = Promise.promise(); 223 // createSender(address, promise); 224 // return promise.future(); 225 //} 226 227 AmqpClient createSender(string address, AmqpSenderOptions options, 228 Handler!AmqpSender completionHandler) { 229 230 return connect((ar) { 231 if(ar.succeeded()) { 232 auto conn = ar.result(); 233 conn.createSender(address, options, completionHandler); 234 } else { 235 Throwable th = ar.cause(); 236 warning(th.msg); 237 version(HUNT_DEWBUG) warning(th); 238 completionHandler.handle(null); 239 } 240 }); 241 } 242 243 // dfmt on 244 245 //Future<AmqpSender> createSender(String address, AmqpSenderOptions options) { 246 // Promise<AmqpSender> promise = Promise.promise(); 247 // createSender(address, options, promise); 248 // return promise.future(); 249 //} 250 // 251 void register(AmqpConnectionImpl connection) { 252 synchronized(this) { 253 connections.add(connection); 254 } 255 } 256 }