1 /* 2 * hunt-amqp-client: AMQP Client Library for D Programming Language. Support for RabbitMQ and other AMQP Server. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module hunt.amqp.client.impl.AmqpSenderImpl; 12 13 import hunt.amqp.client.AmqpConnection; 14 import hunt.amqp.client.AmqpMessage; 15 import hunt.amqp.client.AmqpSender; 16 import hunt.proton.amqp.transport.DeliveryState; 17 import hunt.amqp.ProtonDelivery; 18 import hunt.amqp.ProtonSender; 19 import hunt.amqp.impl.ProtonSenderImpl; 20 import hunt.amqp.Handler; 21 import hunt.amqp.client.impl.AmqpConnectionImpl; 22 import hunt.Object; 23 import hunt.logging; 24 import hunt.String; 25 import hunt.Exceptions; 26 27 /** 28 * 29 */ 30 class AmqpSenderImpl : AmqpSender { 31 private ProtonSender sender; 32 private AmqpConnectionImpl _connection; 33 private bool closed; 34 private Handler!Throwable _exceptionHandler; 35 private Handler!Void _drainHandler; 36 private long remoteCredit = 0; 37 38 this(ProtonSender sender, AmqpConnectionImpl connection, 39 Handler!AmqpSender completionHandler) { 40 this.sender = sender; 41 this._connection = connection; 42 43 //sender 44 // .closeHandler(res -> onClose(sender, res, false)) 45 // .detachHandler(res -> onClose(sender, res, true)); 46 sender 47 .closeHandler(new class Handler!ProtonSender { 48 void handle(ProtonSender var1) 49 { 50 onClose(sender,var1,false); 51 } 52 }) 53 .detachHandler(new class Handler!ProtonSender{ 54 void handle(ProtonSender var1) 55 { 56 onClose(sender,var1,true); 57 } 58 }); 59 60 61 62 // Handler<Void> dh = null; 63 // synchronized (AmqpSenderImpl.this) { 64 // // Update current state of remote credit 65 // remoteCredit = ((ProtonSenderImpl) sender).getRemoteCredit(); 66 // 67 // // check the user drain handler, fire it outside synchronized block if not null 68 // if (drainHandler !is null) { 69 // dh = drainHandler; 70 // } 71 // } 72 // 73 // if (dh !is null) { 74 // dh.handle(null); 75 // } 76 //} 77 78 sender.sendQueueDrainHandler(new class Handler!ProtonSender { 79 void handle(ProtonSender var1) 80 { 81 Handler!Void dh = null; 82 synchronized(this) 83 { 84 remoteCredit = (cast(ProtonSenderImpl) sender).getRemoteCredit(); 85 if (_drainHandler !is null) { 86 dh = _drainHandler; 87 } 88 } 89 if (dh !is null) { 90 dh.handle(null); 91 } 92 } 93 }); 94 95 96 // if (done.failed()) { 97 // completionHandler.handle(done.mapEmpty()); 98 // } else { 99 // connection.register(this); 100 // completionHandler.handle(Future.succeededFuture(this)); 101 // } 102 //} 103 sender.openHandler(new class Handler!ProtonSender{ 104 void handle(ProtonSender var1) 105 { 106 if(var1 is null) 107 { 108 completionHandler.handle(null); 109 }else 110 { 111 _connection.register(this.outer); 112 completionHandler.handle(this.outer); 113 } 114 } 115 }); 116 sender.open(); 117 } 118 119 /** 120 * Creates a new instance of {@link AmqpSenderImpl}. The created sender is passed into the {@code completionHandler} 121 * once opened. This method must be called on the connection context. 122 * 123 * @param sender the underlying proton sender 124 * @param connection the connection 125 * @param completionHandler the completion handler 126 */ 127 static void create(ProtonSender sender, AmqpConnectionImpl connection, 128 Handler!AmqpSender completionHandler) { 129 new AmqpSenderImpl(sender, connection, completionHandler); 130 } 131 132 private void onClose(ProtonSender sender, ProtonSender res, bool detach) { 133 Handler!Throwable eh = null; 134 bool closeSender = false; 135 136 synchronized (this) { 137 if (!closed && _exceptionHandler !is null) { 138 eh = _exceptionHandler; 139 } 140 141 if (!closed) { 142 closed = true; 143 closeSender = true; 144 } 145 } 146 147 if (eh !is null) { 148 if (res !is null) { 149 eh.handle(new Exception("Sender closed remotely")); 150 } else { 151 eh.handle(new Exception("Sender closed remotely with error")); 152 } 153 } 154 155 if (closeSender) { 156 if (detach) { 157 sender.detach(); 158 } else { 159 sender.close(); 160 } 161 } 162 } 163 164 165 bool writeQueueFull() { 166 return remoteCredit <= 0; 167 } 168 169 170 AmqpConnection connection() { 171 return _connection; 172 } 173 174 175 AmqpSender send(AmqpMessage message) { 176 return doSend(message, null); 177 } 178 179 private AmqpSender doSend(AmqpMessage message, Handler!Void acknowledgmentHandler) { 180 AmqpMessage updated; 181 if (message.address() is null) { 182 updated = AmqpMessage.create(message).address(address()).build(); 183 } else { 184 updated = message; 185 } 186 187 188 189 Handler!ProtonDelivery ack = new class Handler!ProtonDelivery{ 190 void handle(ProtonDelivery delivery) 191 { 192 Handler!Void handler = acknowledgmentHandler; 193 if (acknowledgmentHandler is null) { 194 handler = new class Handler!Void{ 195 void handle(Void v) 196 { 197 if(v is null) 198 { 199 logWarning("Message rejected by remote peer"); 200 } 201 } 202 }; 203 } 204 switch (delivery.getRemoteState().getType()) { 205 case DeliveryStateType.Rejected: 206 // handler.handle(Future.failedFuture("message rejected (REJECTED")); 207 logInfo("message rejected (REJECTED"); 208 handler.handle(null); 209 break; 210 case DeliveryStateType.Modified: 211 //handler.handle(Future.failedFuture("message rejected (MODIFIED)")); 212 logInfo("message rejected (MODIFIED)"); 213 handler.handle(null); 214 break; 215 case DeliveryStateType.Released: 216 // handler.handle(Future.failedFuture("message rejected (RELEASED)")); 217 logInfo("message rejected (RELEASED)"); 218 handler.handle(null); 219 break; 220 case DeliveryStateType.Accepted: 221 handler.handle(new String("Accepted")); 222 break; 223 default: 224 //handler.handle(Future.failedFuture("Unsupported delivery type: " + delivery.getRemoteState().getType())); 225 logError("Unsupported delivery type %d",delivery.getRemoteState().getType()); 226 handler.handle(null); 227 } 228 } 229 }; 230 231 232 //Handler!ProtonDelivery ack = delivery -> { 233 // Handler<AsyncResult<Void>> handler = acknowledgmentHandler; 234 // if (acknowledgmentHandler is null) { 235 // handler = ar -> { 236 // if (ar.failed()) { 237 // LOGGER.warn("Message rejected by remote peer", ar.cause()); 238 // } 239 // }; 240 // } 241 // 242 // switch (delivery.getRemoteState().getType()) { 243 // case Rejected: 244 // handler.handle(Future.failedFuture("message rejected (REJECTED")); 245 // break; 246 // case Modified: 247 // handler.handle(Future.failedFuture("message rejected (MODIFIED)")); 248 // break; 249 // case Released: 250 // handler.handle(Future.failedFuture("message rejected (RELEASED)")); 251 // break; 252 // case Accepted: 253 // handler.handle(Future.succeededFuture()); 254 // break; 255 // default: 256 // handler.handle(Future.failedFuture("Unsupported delivery type: " + delivery.getRemoteState().getType())); 257 // } 258 //}; 259 260 synchronized (this) { 261 // Update the credit tracking. We only need to adjust this here because the sends etc may not be on the context 262 // thread and if that is the case we can't use the ProtonSender sendQueueFull method to check that credit has been 263 // exhausted following this doSend call since we will have only scheduled the actual send for later. 264 remoteCredit--; 265 } 266 267 268 sender.send(updated.unwrap(), ack); 269 synchronized (this) 270 { 271 remoteCredit = (cast(ProtonSenderImpl) sender).getRemoteCredit(); 272 } 273 274 //connection.runWithTrampoline(new class Handler!Void { 275 // void handle(Void var1) 276 // { 277 // sender.send(updated.unwrap(), ack); 278 // synchronized (this) 279 // { 280 // remoteCredit = (cast(ProtonSenderImpl) sender).getRemoteCredit(); 281 // } 282 // } 283 //}); 284 285 //connection.runWithTrampoline(x -> { 286 // sender.send(updated.unwrap(), ack); 287 // 288 // synchronized (AmqpSenderImpl.this) { 289 // // Update the credit tracking *again*. We need to reinitialise it here in case the doSend call was performed on 290 // // a thread other than the client context, to ensure we didn't fall foul of a race between the above pre-send 291 // // update on that thread, the above send on the context thread, and the sendQueueDrainHandler based updates on 292 // // the context thread. 293 // remoteCredit = ((ProtonSenderImpl) sender).getRemoteCredit(); 294 // } 295 //}); 296 297 298 return this; 299 } 300 301 302 AmqpSender exceptionHandler(Handler!Throwable handler) { 303 _exceptionHandler = handler; 304 return this; 305 } 306 307 308 //Future<Void> write(AmqpMessage data) { 309 // Promise<Void> promise = Promise.promise(); 310 // doSend(data, promise); 311 // return promise.future(); 312 //} 313 314 315 void write(AmqpMessage data, Handler!Void handler) { 316 doSend(data, handler); 317 } 318 319 320 AmqpSender setWriteQueueMaxSize(int maxSize) { 321 // No-op, available sending credit is controlled by recipient peer in AMQP 1.0. 322 return this; 323 } 324 325 326 override 327 void end(Handler!Void handler) { 328 close(handler); 329 } 330 331 332 AmqpSender drainHandler(Handler!Void handler) { 333 _drainHandler = handler; 334 return this; 335 } 336 337 338 AmqpSender sendWithAck(AmqpMessage message, Handler!Void acknowledgementHandler) { 339 return doSend(message, acknowledgementHandler); 340 } 341 342 343 //Future<Void> sendWithAck(AmqpMessage message) { 344 // Promise<Void> promise = Promise.promise(); 345 // sendWithAck(message, promise); 346 // return promise.future(); 347 //} 348 349 350 void close(Handler!Void handler) { 351 Handler!Void actualHandler; 352 if (handler is null) { 353 implementationMissing(false); 354 // actualHandler = x -> { /* NOOP */ }; 355 } else { 356 actualHandler = handler; 357 } 358 359 synchronized (this) { 360 if (closed) { 361 actualHandler.handle(new String("")); 362 return; 363 } 364 closed = true; 365 } 366 367 368 //actualHandler.handle(v.mapEmpty() 369 _connection.unregister(this); 370 if (sender.isOpen()) { 371 try { 372 sender 373 .closeHandler(new class Handler!ProtonSender{ 374 void handle(ProtonSender var1) 375 { 376 if(actualHandler !is null) { 377 actualHandler.handle(null); 378 } 379 } 380 }).close(); 381 } catch (Exception e) { 382 // Somehow closed remotely 383 actualHandler.handle(null); 384 } 385 } else { 386 actualHandler.handle(new String("")); 387 } 388 } 389 //connection.runWithTrampoline(x -> { 390 // if (sender.isOpen()) { 391 // try { 392 // sender 393 // .closeHandler(v -> actualHandler.handle(v.mapEmpty())) 394 // .close(); 395 // } catch (Exception e) { 396 // // Somehow closed remotely 397 // actualHandler.handle(Future.failedFuture(e)); 398 // } 399 // } else { 400 // actualHandler.handle(Future.succeededFuture()); 401 // } 402 //}); 403 //} 404 405 406 //Future<Void> close() { 407 // Promise<Void> promise = Promise.promise(); 408 // close(promise); 409 // return promise.future(); 410 //} 411 412 413 string address() { 414 return sender.getRemoteAddress(); 415 } 416 }