1 /* 2 * hunt-amqp-client: AMQP Client Library for D Programming Language. Support for RabbitMQ and other AMQP Server. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module hunt.amqp.client.impl.AmqpConnectionImpl; 12 13 import hunt.amqp.client.AmqpClientOptions; 14 import hunt.amqp.client.AmqpSender; 15 import hunt.amqp.client.AmqpReceiver; 16 import hunt.amqp.client.impl.AmqpClientImpl; 17 import hunt.amqp.client.AmqpReceiverOptions; 18 import hunt.amqp.client.impl.AmqpReceiverImpl; 19 import hunt.amqp.client.impl.AmqpSenderImpl; 20 import hunt.amqp.client.AmqpSenderOptions; 21 import hunt.amqp.client.AmqpConnection; 22 23 //import hunt.core.*; 24 import hunt.amqp.ProtonConnection; 25 import hunt.amqp.impl.ProtonConnectionImpl; 26 import hunt.amqp.Handler; 27 import hunt.amqp.ProtonQoS; 28 import hunt.amqp.ProtonClient; 29 import hunt.amqp.ProtonLinkOptions; 30 import hunt.amqp.ProtonReceiver; 31 import hunt.amqp.ProtonSender; 32 import hunt.amqp.impl.ProtonReceiverImpl; 33 import hunt.amqp.impl.ProtonLinkImpl; 34 import hunt.proton.amqp.Symbol; 35 import hunt.proton.amqp.messaging.TerminusDurability; 36 import hunt.proton.amqp.messaging.TerminusExpiryPolicy; 37 import hunt.proton.engine.EndpointState; 38 import hunt.proton.amqp.messaging.Source; 39 40 import hunt.collection.HashMap; 41 import hunt.collection.List; 42 import hunt.collection.Map; 43 import hunt.collection.ArrayList; 44 45 import hunt.logging; 46 import hunt.Exceptions; 47 import hunt.Object; 48 49 import hunt.String; 50 import std.concurrency : initOnce; 51 import std.uni; 52 import std.range; 53 54 class AmqpConnectionImpl : AmqpConnection { 55 56 // static Symbol PRODUCT_KEY = Symbol.valueOf("product"); 57 58 static String PRODUCT() { 59 __gshared String m; 60 return initOnce!(m)(new String("hunt-amqp-client")); 61 } 62 63 static Symbol PRODUCT_KEY() { 64 __gshared Symbol m; 65 return initOnce!(m)(Symbol.valueOf("product")); 66 } 67 68 private AmqpClientOptions _options; 69 //private AtomicBoolean _isClosed = new AtomicBoolean(); 70 private bool _isClosed; 71 //private AtomicReference<ProtonConnection> connection = new AtomicReference<>(); 72 private ProtonConnection connection; 73 // private Context context; 74 75 private List!AmqpSender senders; //= new CopyOnWriteArrayList<>(); 76 private List!AmqpReceiver receivers; // = new CopyOnWriteArrayList<>(); 77 /** 78 * The exception handler, protected by the monitor lock. 79 */ 80 private Handler!Throwable _exceptionHandler; 81 82 this(AmqpClientImpl client, AmqpClientOptions options, ProtonClient proton, 83 Handler!AmqpConnection connectionHandler) { 84 85 assert(proton !is null, "proton cannot be `null`"); 86 assert(connectionHandler !is null, "connection handler cannot be `null`"); 87 88 this._options = options; 89 senders = new ArrayList!AmqpSender; 90 receivers = new ArrayList!AmqpReceiver; 91 connect(client, proton, connectionHandler); 92 } 93 94 bool isClosed() { 95 return _isClosed; 96 } 97 98 // dfmt off 99 private void connect(AmqpClientImpl client, ProtonClient proton, Handler!AmqpConnection connectionHandler) { 100 //void connect(ProtonClientOptions options, string host, int port, string username, string password, 101 //Handler!ProtonConnection connectionHandler); 102 proton.connect(_options, _options.getHost(), _options.getPort(), 103 _options.getUsername(), _options.getPassword(), 104 new class Handler!ProtonConnection { 105 // Called on the connection context. 106 void handle(ProtonConnection ar) 107 { 108 if (ar !is null) 109 { 110 if (connection !is null) { 111 connectionHandler.handle(null); 112 logError("Unable to connect - already holding a connection"); 113 return; 114 }else 115 { 116 connection = ar; 117 } 118 119 Map!(Symbol, Object) map = new HashMap!(Symbol,Object)(); 120 map.put(AmqpConnectionImpl.PRODUCT_KEY, AmqpConnectionImpl.PRODUCT); 121 if (_options.getContainerId() !is null) { 122 connection.setContainer(_options.getContainerId()); 123 } 124 125 if (_options.getVirtualHost() !is null) { 126 connection.setHostname(_options.getVirtualHost()); 127 } 128 129 connection 130 .setProperties(map) 131 .disconnectHandler(new class Handler!ProtonConnection { 132 void handle(ProtonConnection var1) 133 { 134 try { 135 onDisconnect(); 136 } finally { 137 _isClosed = true; 138 } 139 } 140 }) 141 .closeHandler(new class Handler!ProtonConnection { 142 // Not expected closing, consider it failed 143 void handle(ProtonConnection var1) 144 { 145 try { 146 onDisconnect(); 147 } finally { 148 _isClosed = true; 149 } 150 } 151 }) 152 .openHandler(new class Handler!ProtonConnection { 153 void handle(ProtonConnection conn) 154 { 155 if (conn !is null) { 156 if(client !is null) 157 client.register(this.outer.outer); 158 _isClosed = false; 159 connectionHandler.handle(this.outer.outer); 160 } else { 161 _isClosed = true; 162 connectionHandler.handle(null); 163 } 164 } 165 }); 166 167 connection.open(); 168 // } 169 } else { 170 connectionHandler.handle(null); 171 } 172 } 173 } 174 ); 175 } 176 177 // dfmt on 178 179 /** 180 * Must be called on context. 181 */ 182 private void onDisconnect() { 183 Handler!Throwable h = null; 184 ProtonConnection conn = connection; 185 connection = null; 186 synchronized (this) { 187 if (_exceptionHandler !is null) { 188 h = _exceptionHandler; 189 } 190 } 191 192 if (h !is null) { 193 string message = getErrorMessage(conn); 194 h.handle(new Exception(message)); 195 } 196 } 197 198 private string getErrorMessage(ProtonConnection conn) { 199 string message = "Connection disconnected"; 200 if (conn !is null) { 201 if (conn.getCondition() !is null && conn.getCondition().getDescription() !is null) { 202 message ~= " - " ~ (conn.getCondition().getDescription().value); 203 } else if (conn.getRemoteCondition() !is null 204 && conn.getRemoteCondition().getDescription() !is null) { 205 message ~= " - " ~ conn.getRemoteCondition().getDescription().value; 206 } 207 } 208 return message; 209 } 210 211 void runOnContext(Handler!Void action) { 212 implementationMissing(false); 213 // context.runOnContext(action); 214 } 215 216 void runWithTrampoline(Handler!Void action) { 217 implementationMissing(false); 218 //if (Vertx.currentContext() == context) { 219 // action.handle(null); 220 //} else { 221 // runOnContext(action); 222 //} 223 } 224 225 /** 226 * Must be called on context. 227 */ 228 private bool isLocalOpen() { 229 ProtonConnection conn = this.connection; 230 return conn !is null && (cast(ProtonConnectionImpl) conn) 231 .getLocalState() == EndpointState.ACTIVE; 232 } 233 234 /** 235 * Must be called on context. 236 */ 237 private bool isRemoteOpen() { 238 ProtonConnection conn = this.connection; 239 return conn !is null && (cast(ProtonConnectionImpl) conn) 240 .getRemoteState() == EndpointState.ACTIVE; 241 } 242 243 override AmqpConnection exceptionHandler(Handler!Throwable handler) { 244 this._exceptionHandler = handler; 245 return this; 246 } 247 248 // dfmt off 249 override AmqpConnection close(Handler!Void done) { 250 // context.runOnContext(ignored -> { 251 ProtonConnection actualConnection = connection; 252 if (actualConnection is null || _isClosed || (!isLocalOpen() && !isRemoteOpen())) { 253 if (done !is null) { 254 done.handle(new String("")); 255 } 256 return null; 257 } else { 258 _isClosed = true; 259 } 260 261 //Promise<Void> future = Promise.promise(); 262 //if (done !is null) { 263 // future.future().setHandler(done); 264 //} 265 if (actualConnection.isDisconnected()) { 266 // future.complete(); 267 } else { 268 try { 269 actualConnection 270 .disconnectHandler(new class Handler!ProtonConnection{ 271 // future.tryFail(getErrorMessage(con)); 272 void handle(ProtonConnection var1) 273 { 274 _isClosed = true; 275 } 276 }) 277 .closeHandler(new class Handler!ProtonConnection { 278 void handle(ProtonConnection var1) 279 { 280 infof("Close handling"); 281 _isClosed = true; 282 //if (res.succeeded()) { 283 // future.tryComplete(); 284 //} else { 285 // future.tryFail(res.cause()); 286 //} 287 } 288 }) 289 .close(); 290 } catch (Exception e) { 291 //future.fail(e); 292 logError("AmqpConnection close error"); 293 } 294 } 295 // }); 296 297 return this; 298 } 299 300 // dfmt on 301 302 //Future<Void> close() { 303 // Promise<Void> promise = Promise.promise(); 304 // close(promise); 305 // return promise.future(); 306 //} 307 308 void unregister(AmqpSender sender) { 309 senders.remove(sender); 310 } 311 312 void unregister(AmqpReceiver receiver) { 313 receivers.remove(receiver); 314 } 315 316 override AmqpConnection createDynamicReceiver(Handler!AmqpReceiver completionHandler) { 317 return createReceiver(null, new AmqpReceiverOptions().setDynamic(true), completionHandler); 318 } 319 320 //Future<AmqpReceiver> createDynamicReceiver() { 321 // Promise<AmqpReceiver> promise = Promise.promise(); 322 // createDynamicReceiver(promise); 323 // return promise.future(); 324 //} 325 326 override AmqpConnection createReceiver(string address, Handler!AmqpReceiver completionHandler) { 327 assert(!address.empty(), "The address must not be `null`"); 328 assert(completionHandler !is null, "The completion handler must not be `null`"); 329 330 ProtonLinkOptions opts = new ProtonLinkOptions(); 331 //runWithTrampoline(x -> { 332 ProtonReceiver receiver = connection.createReceiver(address, opts); 333 new AmqpReceiverImpl(address, this, new AmqpReceiverOptions(), receiver, completionHandler); 334 //}); 335 return this; 336 } 337 338 //Future<AmqpReceiver> createReceiver(String address) { 339 // Promise<AmqpReceiver> promise = Promise.promise(); 340 // createReceiver(address, promise); 341 // return promise.future(); 342 //} 343 344 override AmqpConnection createReceiver(string address, 345 AmqpReceiverOptions receiverOptions, Handler!AmqpReceiver completionHandler) { 346 ProtonLinkOptions opts = new ProtonLinkOptions(); 347 AmqpReceiverOptions recOpts = receiverOptions is null ? new AmqpReceiverOptions() 348 : receiverOptions; 349 opts.setDynamic(recOpts.isDynamic()).setLinkName(recOpts.getLinkName()); 350 351 // runWithTrampoline(v -> { 352 ProtonReceiver receiver = connection.createReceiver(address, opts); 353 354 if (receiverOptions !is null) { 355 if (receiverOptions.getQos() !is null) { 356 //receiver.setQoS(ProtonQoS.valueOf(receiverOptions.getQos().toUpper)); 357 if (receiverOptions.getQos().toUpper == "AT_MOST_ONCE") { 358 receiver.setQoS(ProtonQoS.AT_MOST_ONCE); 359 } else if (receiverOptions.getQos().toUpper == "AT_LEAST_ONCE") { 360 receiver.setQoS(ProtonQoS.AT_LEAST_ONCE); 361 } 362 } 363 364 configureTheSource(recOpts, receiver); 365 } 366 367 new AmqpReceiverImpl(address, this, recOpts, receiver, completionHandler); 368 // }); 369 return this; 370 } 371 372 //Future<AmqpReceiver> createReceiver(String address, AmqpReceiverOptions receiverOptions) { 373 // Promise<AmqpReceiver> promise = Promise.promise(); 374 // createReceiver(address, receiverOptions, promise); 375 // return promise.future(); 376 //} 377 378 private void configureTheSource(AmqpReceiverOptions receiverOptions, ProtonReceiver receiver) { 379 hunt.proton.amqp.messaging.Source.Source source = cast( 380 hunt.proton.amqp.messaging.Source.Source) receiver.getSource(); 381 382 List!string capabilities = receiverOptions.getCapabilities(); 383 if (!capabilities.isEmpty()) { 384 //source.setCapabilities(capabilities.stream().map(Symbol::valueOf).toArray(Symbol[]::new)); 385 List!Symbol tmpLst = new ArrayList!Symbol; 386 foreach (string s; capabilities) { 387 tmpLst.add(Symbol.valueOf(s)); 388 } 389 source.setCapabilities(tmpLst); 390 } 391 392 if (receiverOptions.isDurable()) { 393 source.setExpiryPolicy(TerminusExpiryPolicy.NEVER); 394 source.setDurable(TerminusDurability.UNSETTLED_STATE); 395 } 396 } 397 398 override AmqpConnection createSender(string address, Handler!AmqpSender completionHandler) { 399 // Objects.requireNonNull(address, "The address must be set"); 400 if (address is null || address.length == 0) { 401 logError("The address must be set"); 402 return null; 403 } 404 return createSender(address, new AmqpSenderOptions(), completionHandler); 405 } 406 407 //Future<AmqpSender> createSender(String address) { 408 // Promise<AmqpSender> promise = Promise.promise(); 409 // createSender(address, promise); 410 // return promise.future(); 411 //} 412 413 override AmqpConnection createSender(string address, AmqpSenderOptions options, 414 Handler!AmqpSender completionHandler) { 415 if (address is null && !options.isDynamic()) { 416 throw new IllegalArgumentException("Address must be set if the link is not dynamic"); 417 } 418 419 // Objects.requireNonNull(completionHandler, "The completion handler must be set"); 420 421 if (completionHandler is null) { 422 logError("The completion handler must be set"); 423 return null; 424 } 425 426 // runWithTrampoline(x -> { 427 428 ProtonSender sender; 429 if (options !is null) { 430 ProtonLinkOptions opts = new ProtonLinkOptions(); 431 opts.setLinkName(options.getLinkName()); 432 opts.setDynamic(options.isDynamic()); 433 434 sender = connection.createSender(address, opts); 435 sender.setAutoDrained(options.isAutoDrained()); 436 } else { 437 sender = connection.createSender(address); 438 } 439 440 // TODO durable? 441 442 AmqpSenderImpl.create(sender, this, completionHandler); 443 // }); 444 return this; 445 } 446 447 //Future<AmqpSender> createSender(String address, AmqpSenderOptions options) { 448 // Promise<AmqpSender> promise = Promise.promise(); 449 // createSender(address, options, promise); 450 // return promise.future(); 451 //} 452 453 override AmqpConnection createAnonymousSender(Handler!AmqpSender completionHandler) { 454 // Objects.requireNonNull(completionHandler, "The completion handler must be set"); 455 if (completionHandler is null) { 456 logError("The completion handler must be set"); 457 return null; 458 } 459 // runWithTrampoline(x -> { 460 ProtonSender sender = connection.createSender(null); 461 AmqpSenderImpl.create(sender, this, completionHandler); 462 // }); 463 return this; 464 } 465 466 //Future<AmqpSender> createAnonymousSender() { 467 // Promise<AmqpSender> promise = Promise.promise(); 468 // createAnonymousSender(promise); 469 // return promise.future(); 470 //} 471 472 ProtonConnection unwrap() { 473 return this.connection; 474 } 475 476 AmqpClientOptions options() { 477 return _options; 478 } 479 480 void register(AmqpSenderImpl sender) { 481 senders.add(sender); 482 } 483 484 void register(AmqpReceiverImpl receiver) { 485 receivers.add(receiver); 486 } 487 }