1 /* 2 * hunt-amqp-client: AMQP Client Library for D Programming Language. Support for RabbitMQ and other AMQP Server. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module hunt.amqp.client.impl.AmqpConnectionImpl; 12 13 import hunt.amqp.client.AmqpClientOptions; 14 import hunt.amqp.client.AmqpSender; 15 import hunt.amqp.client.AmqpReceiver; 16 import hunt.amqp.client.impl.AmqpClientImpl; 17 import hunt.amqp.client.AmqpReceiverOptions; 18 import hunt.amqp.client.impl.AmqpReceiverImpl; 19 import hunt.amqp.client.impl.AmqpSenderImpl; 20 import hunt.amqp.client.AmqpSenderOptions; 21 import hunt.amqp.client.AmqpConnection; 22 23 //import hunt.core.*; 24 import hunt.amqp.ProtonConnection; 25 import hunt.amqp.impl.ProtonConnectionImpl; 26 import hunt.amqp.Handler; 27 import hunt.amqp.ProtonQoS; 28 import hunt.amqp.ProtonClient; 29 import hunt.amqp.ProtonLinkOptions; 30 import hunt.amqp.ProtonReceiver; 31 import hunt.amqp.ProtonSender; 32 import hunt.amqp.impl.ProtonReceiverImpl; 33 import hunt.amqp.impl.ProtonLinkImpl; 34 import hunt.proton.amqp.Symbol; 35 import hunt.proton.amqp.messaging.TerminusDurability; 36 import hunt.proton.amqp.messaging.TerminusExpiryPolicy; 37 import hunt.proton.engine.EndpointState; 38 import hunt.proton.amqp.messaging.Source; 39 40 import hunt.collection.HashMap; 41 import hunt.collection.List; 42 import hunt.collection.Map; 43 import hunt.collection.ArrayList; 44 45 import hunt.concurrency.Future; 46 import hunt.concurrency.FuturePromise; 47 import hunt.Exceptions; 48 import hunt.logging; 49 import hunt.net.AsyncResult; 50 import hunt.Object; 51 52 import hunt.String; 53 import std.concurrency : initOnce; 54 import core.atomic; 55 import std.uni; 56 import std.range; 57 58 class AmqpConnectionImpl : AmqpConnection { 59 60 // static Symbol PRODUCT_KEY = Symbol.valueOf("product"); 61 62 static String PRODUCT() { 63 __gshared String m; 64 return initOnce!(m)(new String("hunt-amqp-client")); 65 } 66 67 static Symbol PRODUCT_KEY() { 68 __gshared Symbol m; 69 return initOnce!(m)(Symbol.valueOf("product")); 70 } 71 72 private AmqpClientOptions _options; 73 //private AtomicBoolean _isClosed = new AtomicBoolean(); 74 private shared bool _isClosing; 75 private shared bool _isClosed; 76 //private AtomicReference<ProtonConnection> connection = new AtomicReference<>(); 77 private ProtonConnection connection; 78 // private Context context; 79 80 private List!AmqpSender senders; //= new CopyOnWriteArrayList<>(); 81 private List!AmqpReceiver receivers; // = new CopyOnWriteArrayList<>(); 82 /** 83 * The exception handler, protected by the monitor lock. 84 */ 85 private Handler!Throwable _exceptionHandler; 86 87 this(AmqpClientImpl client, AmqpClientOptions options, ProtonClient proton, 88 AsyncResultHandler!AmqpConnection connectionHandler) { 89 90 assert(proton !is null, "proton cannot be `null`"); 91 assert(connectionHandler !is null, "connection handler cannot be `null`"); 92 93 _isClosing = false; 94 _options = options; 95 senders = new ArrayList!AmqpSender; 96 receivers = new ArrayList!AmqpReceiver; 97 connect(client, proton, connectionHandler); 98 } 99 100 bool isClosed() { 101 return _isClosed; 102 } 103 104 // dfmt off 105 private void connect(AmqpClientImpl client, ProtonClient proton, 106 AsyncResultHandler!AmqpConnection connectionHandler) { 107 108 proton.connect(_options, _options.getHost(), _options.getPort(), 109 _options.getUsername(), _options.getPassword(), 110 new class Handler!ProtonConnection { 111 // Called on the connection context. 112 void handle(ProtonConnection ar) 113 { 114 if (ar !is null) 115 { 116 if (connection !is null) { 117 string msg = "Unable to connect - already holding a connection"; 118 error(msg); 119 if(connectionHandler !is null) { 120 connectionHandler(failedResult!AmqpConnection(new Exception(msg))); 121 } 122 return; 123 }else 124 { 125 connection = ar; 126 } 127 128 Map!(Symbol, Object) map = new HashMap!(Symbol,Object)(); 129 map.put(AmqpConnectionImpl.PRODUCT_KEY, AmqpConnectionImpl.PRODUCT); 130 if (_options.getContainerId() !is null) { 131 connection.setContainer(_options.getContainerId()); 132 } 133 134 if (_options.getVirtualHost() !is null) { 135 connection.setHostname(_options.getVirtualHost()); 136 } 137 138 connection 139 .setProperties(map) 140 .disconnectHandler((ProtonConnection var1) { 141 try { 142 onDisconnect(); 143 } finally { 144 _isClosed = true; 145 } 146 }) 147 .closeHandler( (x) { 148 // Not expected closing, consider it failed 149 try { 150 onDisconnect(); 151 } finally { 152 _isClosed = true; 153 } 154 }) 155 .openHandler(new class Handler!ProtonConnection { 156 void handle(ProtonConnection conn) 157 { 158 if (conn !is null) { 159 if(client !is null) 160 client.register(this.outer.outer); 161 _isClosed = false; 162 connectionHandler(succeededResult!(AmqpConnection)(this.outer.outer)); 163 } else { 164 _isClosed = true; 165 connectionHandler(failedResult!(AmqpConnection)(null)); 166 } 167 } 168 }); 169 170 connection.open(); 171 // } 172 } else { 173 connectionHandler(failedResult!(AmqpConnection)(null)); 174 } 175 } 176 } 177 ); 178 } 179 180 // dfmt on 181 182 /** 183 * Must be called on context. 184 */ 185 private void onDisconnect() { 186 Handler!Throwable h = null; 187 ProtonConnection conn = connection; 188 connection = null; 189 synchronized (this) { 190 if (_exceptionHandler !is null) { 191 h = _exceptionHandler; 192 } 193 } 194 195 version(HUNT_DEBUG) trace("handling..."); 196 197 if (h !is null) { 198 string message = getErrorMessage(conn); 199 h.handle(new Exception(message)); 200 } 201 } 202 203 private string getErrorMessage(ProtonConnection conn) { 204 string message = "Connection disconnected"; 205 if (conn !is null) { 206 if (conn.getCondition() !is null && conn.getCondition().getDescription() !is null) { 207 message ~= " - " ~ (conn.getCondition().getDescription().value); 208 } else if (conn.getRemoteCondition() !is null 209 && conn.getRemoteCondition().getDescription() !is null) { 210 message ~= " - " ~ conn.getRemoteCondition().getDescription().value; 211 } 212 } 213 return message; 214 } 215 216 void runOnContext(Handler!Void action) { 217 implementationMissing(false); 218 // context.runOnContext(action); 219 } 220 221 void runWithTrampoline(Handler!Void action) { 222 implementationMissing(false); 223 //if (Vertx.currentContext() == context) { 224 // action.handle(null); 225 //} else { 226 // runOnContext(action); 227 //} 228 } 229 230 /** 231 * Must be called on context. 232 */ 233 private bool isLocalOpen() { 234 ProtonConnection conn = this.connection; 235 return conn !is null && (cast(ProtonConnectionImpl) conn) 236 .getLocalState() == EndpointState.ACTIVE; 237 } 238 239 /** 240 * Must be called on context. 241 */ 242 private bool isRemoteOpen() { 243 ProtonConnection conn = this.connection; 244 return conn !is null && (cast(ProtonConnectionImpl) conn) 245 .getRemoteState() == EndpointState.ACTIVE; 246 } 247 248 override AmqpConnection exceptionHandler(Handler!Throwable handler) { 249 this._exceptionHandler = handler; 250 return this; 251 } 252 253 // dfmt off 254 override AmqpConnection close(AsyncResultHandler!Void done) { 255 ProtonConnection actualConnection = connection; 256 if (actualConnection is null || _isClosed || _isClosing || (!isLocalOpen() && !isRemoteOpen())) { 257 if (done !is null) { 258 done(succeededResult!(Void)(null)); 259 } 260 return null; 261 } else { 262 _isClosing = true; 263 } 264 265 void onClosed(Exception ex) { 266 _isClosing = false; 267 268 if(done !is null) { 269 if(ex is null) { 270 done(succeededResult!(Void)(null)); 271 } else { 272 done(failedResult!(Void)(ex)); 273 } 274 } 275 } 276 277 if (actualConnection.isDisconnected()) { 278 onClosed(null); 279 } else { 280 try { 281 actualConnection 282 .disconnectHandler( (ProtonConnection conn) { 283 if(cas(&_isClosed, false, true)) { 284 string msg = getErrorMessage(conn); 285 version(HUNT_DEBUG) warning(msg); 286 onClosed(new Exception(msg)); 287 } 288 }) 289 .closeHandler((res) { 290 version(HUNT_DEBUG) infof("Close handling"); 291 if(cas(&_isClosed, false, true)) { 292 if (res.succeeded()) { 293 onClosed(null); 294 } else { 295 onClosed(cast(Exception)res.cause()); 296 } 297 } 298 }) 299 .close(); 300 } catch (Exception e) { 301 warning(e.msg); 302 version(HUNT_DEBUG) warning(e); 303 onClosed(cast(Exception)e); 304 } 305 } 306 307 return this; 308 } 309 310 // dfmt on 311 312 //Future<Void> close() { 313 // Promise<Void> promise = Promise.promise(); 314 // close(promise); 315 // return promise.future(); 316 //} 317 318 void unregister(AmqpSender sender) { 319 senders.remove(sender); 320 } 321 322 void unregister(AmqpReceiver receiver) { 323 receivers.remove(receiver); 324 } 325 326 override AmqpConnection createDynamicReceiver(Handler!AmqpReceiver completionHandler) { 327 return createReceiver(null, new AmqpReceiverOptions().setDynamic(true), completionHandler); 328 } 329 330 //Future<AmqpReceiver> createDynamicReceiver() { 331 // Promise<AmqpReceiver> promise = Promise.promise(); 332 // createDynamicReceiver(promise); 333 // return promise.future(); 334 //} 335 336 override AmqpConnection createReceiver(string address, Handler!AmqpReceiver completionHandler) { 337 assert(!address.empty(), "The address must not be `null`"); 338 assert(completionHandler !is null, "The completion handler must not be `null`"); 339 340 ProtonLinkOptions opts = new ProtonLinkOptions(); 341 //runWithTrampoline(x -> { 342 ProtonReceiver receiver = connection.createReceiver(address, opts); 343 new AmqpReceiverImpl(address, this, new AmqpReceiverOptions(), receiver, completionHandler); 344 //}); 345 return this; 346 } 347 348 //Future<AmqpReceiver> createReceiver(String address) { 349 // Promise<AmqpReceiver> promise = Promise.promise(); 350 // createReceiver(address, promise); 351 // return promise.future(); 352 //} 353 354 override AmqpConnection createReceiver(string address, 355 AmqpReceiverOptions receiverOptions, Handler!AmqpReceiver completionHandler) { 356 ProtonLinkOptions opts = new ProtonLinkOptions(); 357 AmqpReceiverOptions recOpts = receiverOptions is null ? new AmqpReceiverOptions() 358 : receiverOptions; 359 opts.setDynamic(recOpts.isDynamic()).setLinkName(recOpts.getLinkName()); 360 361 // runWithTrampoline(v -> { 362 ProtonReceiver receiver = connection.createReceiver(address, opts); 363 364 if (receiverOptions !is null) { 365 if (receiverOptions.getQos() !is null) { 366 //receiver.setQoS(ProtonQoS.valueOf(receiverOptions.getQos().toUpper)); 367 if (receiverOptions.getQos().toUpper == "AT_MOST_ONCE") { 368 receiver.setQoS(ProtonQoS.AT_MOST_ONCE); 369 } else if (receiverOptions.getQos().toUpper == "AT_LEAST_ONCE") { 370 receiver.setQoS(ProtonQoS.AT_LEAST_ONCE); 371 } 372 } 373 374 configureTheSource(recOpts, receiver); 375 } 376 377 new AmqpReceiverImpl(address, this, recOpts, receiver, completionHandler); 378 // }); 379 return this; 380 } 381 382 //Future<AmqpReceiver> createReceiver(String address, AmqpReceiverOptions receiverOptions) { 383 // Promise<AmqpReceiver> promise = Promise.promise(); 384 // createReceiver(address, receiverOptions, promise); 385 // return promise.future(); 386 //} 387 388 private void configureTheSource(AmqpReceiverOptions receiverOptions, ProtonReceiver receiver) { 389 hunt.proton.amqp.messaging.Source.Source source = cast( 390 hunt.proton.amqp.messaging.Source.Source) receiver.getSource(); 391 392 List!string capabilities = receiverOptions.getCapabilities(); 393 if (!capabilities.isEmpty()) { 394 //source.setCapabilities(capabilities.stream().map(Symbol::valueOf).toArray(Symbol[]::new)); 395 List!Symbol tmpLst = new ArrayList!Symbol; 396 foreach (string s; capabilities) { 397 tmpLst.add(Symbol.valueOf(s)); 398 } 399 source.setCapabilities(tmpLst); 400 } 401 402 if (receiverOptions.isDurable()) { 403 source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); 404 source.setDurable(TerminusDurability.UNSETTLED_STATE); 405 } 406 } 407 408 override AmqpConnection createSender(string address, Handler!AmqpSender completionHandler) { 409 // Objects.requireNonNull(address, "The address must be set"); 410 if (address is null || address.length == 0) { 411 logError("The address must be set"); 412 return null; 413 } 414 return createSender(address, new AmqpSenderOptions(), completionHandler); 415 } 416 417 //Future<AmqpSender> createSender(String address) { 418 // Promise<AmqpSender> promise = Promise.promise(); 419 // createSender(address, promise); 420 // return promise.future(); 421 //} 422 423 override AmqpConnection createSender(string address, AmqpSenderOptions options, 424 Handler!AmqpSender completionHandler) { 425 if (address is null && !options.isDynamic()) { 426 throw new IllegalArgumentException("Address must be set if the link is not dynamic"); 427 } 428 429 // Objects.requireNonNull(completionHandler, "The completion handler must be set"); 430 431 if (completionHandler is null) { 432 logError("The completion handler must be set"); 433 return null; 434 } 435 436 // runWithTrampoline(x -> { 437 438 ProtonSender sender; 439 if (options !is null) { 440 ProtonLinkOptions opts = new ProtonLinkOptions(); 441 opts.setLinkName(options.getLinkName()); 442 opts.setDynamic(options.isDynamic()); 443 444 sender = connection.createSender(address, opts); 445 sender.setAutoDrained(options.isAutoDrained()); 446 } else { 447 sender = connection.createSender(address); 448 } 449 450 // TODO durable? 451 452 AmqpSenderImpl.create(sender, this, completionHandler); 453 // }); 454 return this; 455 } 456 457 //Future<AmqpSender> createSender(String address, AmqpSenderOptions options) { 458 // Promise<AmqpSender> promise = Promise.promise(); 459 // createSender(address, options, promise); 460 // return promise.future(); 461 //} 462 463 override AmqpConnection createAnonymousSender(Handler!AmqpSender completionHandler) { 464 // Objects.requireNonNull(completionHandler, "The completion handler must be set"); 465 if (completionHandler is null) { 466 logError("The completion handler must be set"); 467 return null; 468 } 469 // runWithTrampoline(x -> { 470 ProtonSender sender = connection.createSender(null); 471 AmqpSenderImpl.create(sender, this, completionHandler); 472 // }); 473 return this; 474 } 475 476 //Future<AmqpSender> createAnonymousSender() { 477 // Promise<AmqpSender> promise = Promise.promise(); 478 // createAnonymousSender(promise); 479 // return promise.future(); 480 //} 481 482 ProtonConnection unwrap() { 483 return this.connection; 484 } 485 486 AmqpClientOptions options() { 487 return _options; 488 } 489 490 void register(AmqpSenderImpl sender) { 491 senders.add(sender); 492 } 493 494 void register(AmqpReceiverImpl receiver) { 495 receivers.add(receiver); 496 } 497 }