1 /* 2 * hunt-amqp-client: AMQP Client Library for D Programming Language. Support for RabbitMQ and other AMQP Server. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module hunt.amqp.client.impl.AmqpSenderImpl; 12 13 import hunt.amqp.client.AmqpConnection; 14 import hunt.amqp.client.AmqpMessage; 15 import hunt.amqp.client.AmqpSender; 16 import hunt.amqp.ProtonDelivery; 17 import hunt.amqp.ProtonSender; 18 import hunt.amqp.impl.ProtonSenderImpl; 19 import hunt.amqp.Handler; 20 import hunt.amqp.client.impl.AmqpConnectionImpl; 21 import hunt.proton.amqp.transport.DeliveryState; 22 23 import hunt.concurrency.Future; 24 import hunt.concurrency.FuturePromise; 25 import hunt.Object; 26 import hunt.logging; 27 import hunt.net.AsyncResult; 28 import hunt.String; 29 import hunt.Exceptions; 30 import std.format; 31 32 /** 33 * 34 */ 35 class AmqpSenderImpl : AmqpSender { 36 private ProtonSender sender; 37 private AmqpConnectionImpl _connection; 38 private bool closed; 39 private Handler!Throwable _exceptionHandler; 40 private Handler!Void _drainHandler; 41 private long remoteCredit = 0; 42 43 this(ProtonSender sender, AmqpConnectionImpl connection, 44 Handler!AmqpSender completionHandler) { 45 this.sender = sender; 46 this._connection = connection; 47 48 //sender 49 // .closeHandler(res -> onClose(sender, res, false)) 50 // .detachHandler(res -> onClose(sender, res, true)); 51 sender 52 .closeHandler(new class Handler!ProtonSender { 53 void handle(ProtonSender var1) 54 { 55 onClose(sender,var1,false); 56 } 57 }) 58 .detachHandler(new class Handler!ProtonSender{ 59 void handle(ProtonSender var1) 60 { 61 onClose(sender,var1,true); 62 } 63 }); 64 65 66 67 // Handler<Void> dh = null; 68 // synchronized (AmqpSenderImpl.this) { 69 // // Update current state of remote credit 70 // remoteCredit = ((ProtonSenderImpl) sender).getRemoteCredit(); 71 // 72 // // check the user drain handler, fire it outside synchronized block if not null 73 // if (drainHandler !is null) { 74 // dh = drainHandler; 75 // } 76 // } 77 // 78 // if (dh !is null) { 79 // dh.handle(null); 80 // } 81 //} 82 83 sender.sendQueueDrainHandler(new class Handler!ProtonSender { 84 void handle(ProtonSender var1) 85 { 86 Handler!Void dh = null; 87 synchronized(this) 88 { 89 remoteCredit = (cast(ProtonSenderImpl) sender).getRemoteCredit(); 90 if (_drainHandler !is null) { 91 dh = _drainHandler; 92 } 93 } 94 if (dh !is null) { 95 dh.handle(null); 96 } 97 } 98 }); 99 100 101 // if (done.failed()) { 102 // completionHandler.handle(done.mapEmpty()); 103 // } else { 104 // connection.register(this); 105 // completionHandler.handle(Future.succeededFuture(this)); 106 // } 107 //} 108 sender.openHandler(new class Handler!ProtonSender{ 109 void handle(ProtonSender var1) 110 { 111 if(var1 is null) 112 { 113 completionHandler.handle(null); 114 }else 115 { 116 _connection.register(this.outer); 117 completionHandler.handle(this.outer); 118 } 119 } 120 }); 121 sender.open(); 122 } 123 124 /** 125 * Creates a new instance of {@link AmqpSenderImpl}. The created sender is passed into the {@code completionHandler} 126 * once opened. This method must be called on the connection context. 127 * 128 * @param sender the underlying proton sender 129 * @param connection the connection 130 * @param completionHandler the completion handler 131 */ 132 static void create(ProtonSender sender, AmqpConnectionImpl connection, 133 Handler!AmqpSender completionHandler) { 134 new AmqpSenderImpl(sender, connection, completionHandler); 135 } 136 137 private void onClose(ProtonSender sender, ProtonSender res, bool detach) { 138 Handler!Throwable eh = null; 139 bool closeSender = false; 140 141 synchronized (this) { 142 if (!closed && _exceptionHandler !is null) { 143 eh = _exceptionHandler; 144 } 145 146 if (!closed) { 147 closed = true; 148 closeSender = true; 149 } 150 } 151 152 if (eh !is null) { 153 if (res !is null) { 154 eh.handle(new Exception("Sender closed remotely")); 155 } else { 156 eh.handle(new Exception("Sender closed remotely with error")); 157 } 158 } 159 160 if (closeSender) { 161 if (detach) { 162 sender.detach(); 163 } else { 164 sender.close(); 165 } 166 } 167 } 168 169 170 bool writeQueueFull() { 171 return remoteCredit <= 0; 172 } 173 174 175 AmqpConnection connection() { 176 return _connection; 177 } 178 179 180 AmqpSender send(AmqpMessage message) { 181 return doSend(message, null); 182 } 183 184 private AmqpSender doSend(AmqpMessage message, VoidAsyncHandler acknowledgmentHandler) { 185 AmqpMessage updated; 186 if (message.address() is null) { 187 updated = AmqpMessage.create(message).address(address()).build(); 188 } else { 189 updated = message; 190 } 191 192 193 Handler!ProtonDelivery ack = new class Handler!ProtonDelivery{ 194 void handle(ProtonDelivery delivery) 195 { 196 VoidAsyncHandler handler = acknowledgmentHandler; 197 if (acknowledgmentHandler is null) { 198 handler = (VoidAsyncResult ar) { 199 if (ar.failed()) { 200 Throwable th = ar.cause(); 201 warningf("Message rejected by remote peer: %s", th.msg); 202 version(HUNT_DEBUG) warning(th); 203 } 204 }; 205 } 206 207 switch (delivery.getRemoteState().getType()) { 208 case DeliveryStateType.Rejected: 209 version(HUNT_AMQP_DEBUG) trace("message rejected (REJECTED)"); 210 handler(failedResult!(Void)(new Exception("message rejected (REJECTED)"))); 211 break; 212 case DeliveryStateType.Modified: 213 version(HUNT_AMQP_DEBUG) trace("message rejected (MODIFIED)"); 214 handler(failedResult!(Void)(new Exception("message rejected (MODIFIED)"))); 215 break; 216 case DeliveryStateType.Released: 217 version(HUNT_AMQP_DEBUG) trace("message rejected (RELEASED)"); 218 handler(failedResult!(Void)(new Exception("message rejected (RELEASED)"))); 219 break; 220 case DeliveryStateType.Accepted: 221 version(HUNT_AMQP_DEBUG) trace("Accepted"); 222 handler(succeededResult!(Void)(null)); 223 break; 224 225 default: 226 string msg = format("Unsupported delivery type %d", delivery.getRemoteState().getType()); 227 logError(msg); 228 229 handler(failedResult!(Void)(new Exception(msg))); 230 } 231 } 232 }; 233 234 synchronized (this) { 235 // Update the credit tracking. We only need to adjust this here because the sends etc may not be on the context 236 // thread and if that is the case we can't use the ProtonSender sendQueueFull method to check that credit has been 237 // exhausted following this doSend call since we will have only scheduled the actual send for later. 238 remoteCredit--; 239 } 240 241 242 sender.send(updated.unwrap(), ack); 243 synchronized (this) 244 { 245 remoteCredit = (cast(ProtonSenderImpl) sender).getRemoteCredit(); 246 } 247 248 //connection.runWithTrampoline(new class Handler!Void { 249 // void handle(Void var1) 250 // { 251 // sender.send(updated.unwrap(), ack); 252 // synchronized (this) 253 // { 254 // remoteCredit = (cast(ProtonSenderImpl) sender).getRemoteCredit(); 255 // } 256 // } 257 //}); 258 259 //connection.runWithTrampoline(x -> { 260 // sender.send(updated.unwrap(), ack); 261 // 262 // synchronized (AmqpSenderImpl.this) { 263 // // Update the credit tracking *again*. We need to reinitialise it here in case the doSend call was performed on 264 // // a thread other than the client context, to ensure we didn't fall foul of a race between the above pre-send 265 // // update on that thread, the above send on the context thread, and the sendQueueDrainHandler based updates on 266 // // the context thread. 267 // remoteCredit = ((ProtonSenderImpl) sender).getRemoteCredit(); 268 // } 269 //}); 270 271 272 return this; 273 } 274 275 276 AmqpSender exceptionHandler(Handler!Throwable handler) { 277 _exceptionHandler = handler; 278 return this; 279 } 280 281 282 Future!Void write(AmqpMessage data) { 283 FuturePromise!Void f = new FuturePromise!Void(); 284 doSend(data, (VoidAsyncResult ar) { 285 if (ar.succeeded()) { 286 f.succeeded(ar.result()); 287 } else { 288 f.failed(cast(Exception) ar.cause()); 289 } 290 }); 291 292 return f; 293 } 294 295 void write(AmqpMessage data, VoidAsyncHandler handler) { 296 doSend(data, handler); 297 } 298 299 AmqpSender setWriteQueueMaxSize(int maxSize) { 300 // No-op, available sending credit is controlled by recipient peer in AMQP 1.0. 301 return this; 302 } 303 304 override 305 void end(VoidAsyncHandler handler) { 306 close(handler); 307 } 308 309 AmqpSender drainHandler(Handler!Void handler) { 310 _drainHandler = handler; 311 return this; 312 } 313 314 315 AmqpSender sendWithAck(AmqpMessage message, VoidAsyncHandler acknowledgementHandler) { 316 return doSend(message, acknowledgementHandler); 317 } 318 319 320 //Future<Void> sendWithAck(AmqpMessage message) { 321 // Promise<Void> promise = Promise.promise(); 322 // sendWithAck(message, promise); 323 // return promise.future(); 324 //} 325 326 327 void close(VoidAsyncHandler handler) { 328 329 VoidAsyncHandler actualHandler; 330 if (handler is null) { 331 actualHandler = (VoidAsyncResult ar) { 332 /* NOOP */ 333 }; 334 } else { 335 actualHandler = handler; 336 } 337 338 synchronized (this) { 339 if (closed) { 340 actualHandler(succeededResult!(Void)(null)); 341 return; 342 } 343 closed = true; 344 } 345 346 _connection.unregister(this); 347 348 if (sender.isOpen()) { 349 try { 350 sender.closeHandler(new class Handler!ProtonSender{ 351 void handle(ProtonSender var1) { 352 actualHandler(succeededResult!(Void)(null)); 353 } 354 }).close(); 355 } catch (Exception e) { 356 // Somehow closed remotely 357 actualHandler(failedResult!(Void)(e)); 358 } 359 } else { 360 actualHandler(succeededResult!(Void)(null)); 361 } 362 } 363 364 365 Future!Void close() { 366 367 FuturePromise!Void f = new FuturePromise!Void(); 368 close((VoidAsyncResult ar) { 369 if (ar.succeeded()) { 370 f.succeeded(ar.result()); 371 } else { 372 f.failed(cast(Exception) ar.cause()); 373 } 374 }); 375 376 return f; 377 } 378 379 380 string address() { 381 return sender.getRemoteAddress(); 382 } 383 }