1 /* 2 * hunt-amqp-client: AMQP Client Library for D Programming Language. Support for RabbitMQ and other AMQP Server. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module hunt.amqp.client.impl.AmqpReceiverImpl; 12 13 import hunt.amqp.client.AmqpConnection; 14 import hunt.amqp.client.AmqpMessage; 15 import hunt.amqp.client.AmqpReceiver; 16 import hunt.amqp.client.AmqpReceiverOptions; 17 import hunt.amqp.Handler; 18 import hunt.amqp.ProtonReceiver; 19 import hunt.Long; 20 import hunt.Object; 21 import hunt.collection.ArrayDeque; 22 import hunt.collection.Queue; 23 import hunt.amqp.client.impl.AmqpMessageImpl; 24 import hunt.collection.List; 25 import hunt.collection.ArrayList; 26 import hunt.amqp.ProtonMessageHandler; 27 import hunt.proton.message.Message; 28 import hunt.amqp.ProtonDelivery; 29 import hunt.logging; 30 31 import hunt.amqp.client.impl.AmqpConnectionImpl; 32 33 class AmqpReceiverImpl : AmqpReceiver { 34 35 private ProtonReceiver receiver; 36 private AmqpConnectionImpl _connection; 37 private List!AmqpMessageImpl buffered ;//= new ArrayDeque<>(); 38 private bool durable; 39 private bool autoAck; 40 /** 41 * The address. 42 * Not because for dynamic link the address is set when the createReceiver is opened. 43 */ 44 private string _address; 45 private Handler!AmqpMessage _handler; 46 private long demand ;//= Long.MAX_VALUE; 47 private bool closed; 48 private Handler!Void _endHandler; 49 private Handler!Throwable _exceptionHandler; 50 private bool initialCreditGiven; 51 private int initialCredit = 1000; 52 53 /** 54 * Creates a new instance of {@link AmqpReceiverImpl}. 55 * This method must be called on the connection context. 56 * 57 * @param address the address, may be {@code null} for dynamic links 58 * @param connection the connection 59 * @param options the receiver options, must not be {@code null} 60 * @param receiver the underlying proton createReceiver 61 * @param completionHandler called when the createReceiver is opened 62 */ 63 this( 64 string address, 65 AmqpConnectionImpl connection, 66 AmqpReceiverOptions options, 67 ProtonReceiver receiver, 68 Handler!AmqpReceiver completionHandler) { 69 this._address = address; 70 this.receiver = receiver; 71 this._connection = connection; 72 this.durable = options.isDurable(); 73 this.autoAck = options.isAutoAcknowledgement(); 74 int maxBufferedMessages = options.getMaxBufferedMessages(); 75 if (maxBufferedMessages > 0) { 76 this.initialCredit = maxBufferedMessages; 77 } 78 79 this.demand = Long.MAX_VALUE; 80 this.buffered = new ArrayList!AmqpMessageImpl; 81 // Disable auto-accept and automated prefetch, we manage disposition and credit 82 // manually to allow for delayed handler registration and pause/resume functionality. 83 this.receiver 84 .setAutoAccept(false) 85 .setPrefetch(0); 86 87 //this.receiver.handler((delivery, message) -> handleMessage(new AmqpMessageImpl(message, delivery))); 88 //if (this.handler !is null) { 89 // handler(this.handler); 90 //} 91 //this.receiver.handler((delivery, message) -> handleMessage(new AmqpMessageImpl(message, delivery))); 92 this.receiver.handler(new class ProtonMessageHandler{ 93 void handle(ProtonDelivery delivery, Message message) 94 { 95 handleMessage(new AmqpMessageImpl(message, delivery)); 96 } 97 }); 98 99 if (this._handler !is null) { 100 handler(this._handler); 101 } 102 103 // this.receiver.closeHandler(res -> { 104 // onClose(address, receiver, res, false); 105 //}) 106 // .detachHandler(res -> { 107 // onClose(address, receiver, res, true); 108 // }); 109 110 this.receiver.closeHandler(new class Handler!ProtonReceiver{ 111 void handle(ProtonReceiver var1) 112 { 113 onClose(_address, receiver, var1, false); 114 } 115 }) 116 .detachHandler(new class Handler!ProtonReceiver{ 117 void handle(ProtonReceiver var1) 118 { 119 onClose(_address, receiver, var1, true); 120 } 121 }); 122 123 124 // this.receiver 125 // .openHandler(res -> { 126 // if (res.failed()) { 127 // completionHandler.handle(res.mapEmpty()); 128 //} else { 129 // this.connection.register(this); 130 // synchronized (this) { 131 // if (this.address is null) { 132 // this.address = res.result().getRemoteAddress(); 133 // } 134 // } 135 // completionHandler.handle(Future.succeededFuture(this)); 136 // } 137 // }); 138 this.receiver 139 .openHandler(new class Handler!ProtonReceiver { 140 void handle(ProtonReceiver var1) 141 { 142 if(var1 is null) 143 { 144 completionHandler.handle(null); 145 }else 146 { 147 _connection.register(this.outer); 148 synchronized (this) 149 { 150 if (_address is null || _address.length == 0) { 151 _address = var1.getRemoteAddress(); 152 } 153 } 154 completionHandler.handle(this.outer); 155 } 156 } 157 }); 158 159 this.receiver.open(); 160 } 161 162 private void onClose(string address, ProtonReceiver receiver, ProtonReceiver res, bool detach) { 163 Handler!Void endh = null; 164 Handler!Throwable exh = null; 165 bool closeReceiver = false; 166 167 synchronized (this) { 168 if (!closed && _endHandler !is null) { 169 endh = _endHandler; 170 } else if (!closed && _exceptionHandler !is null) { 171 exh = _exceptionHandler; 172 } 173 174 if (!closed) { 175 closed = true; 176 closeReceiver = true; 177 } 178 } 179 180 if (endh !is null) { 181 endh.handle(null); 182 } else if (exh !is null) { 183 if (res !is null) { 184 exh.handle(new Exception("Consumer closed remotely")); 185 } else { 186 exh.handle(new Exception("Consumer closed remotely with error")); 187 } 188 } else { 189 if (res) { 190 // LOGGER.warn("Consumer for address " + address + " unexpectedly closed remotely"); 191 logError("Consumer for address %s unexpectedly closed remotely",address); 192 } else { 193 // LOGGER.warn("Consumer for address " + address + " unexpectedly closed remotely with error", res.cause()); 194 logError("Consumer for address %s unexpectedly closed remotely with error", address); 195 } 196 } 197 198 if (closeReceiver) { 199 if (detach) { 200 receiver.detach(); 201 } else { 202 receiver.close(); 203 } 204 } 205 } 206 207 private void handleMessage(AmqpMessageImpl message) { 208 bool schedule = false; 209 bool dispatchNow = false; 210 211 synchronized (this) { 212 if (_handler !is null && demand > 0L && buffered.isEmpty()) { 213 if (demand != Long.MAX_VALUE) { 214 demand--; 215 } 216 dispatchNow = true; 217 } else if (_handler !is null && demand > 0L) { 218 // Buffered messages present, deliver the oldest of those instead 219 buffered.add(message); 220 // message = buffered.poll(); 221 message = buffered.get(0); 222 buffered.removeAt(0); 223 224 if (demand != Long.MAX_VALUE) { 225 demand--; 226 } 227 228 // Schedule a delivery for the next buffered message 229 schedule = true; 230 } else { 231 // Buffer message until we aren't paused 232 buffered.add(message); 233 } 234 } 235 236 // schedule next delivery if appropriate, after earlier delivery to allow chance to pause etc. 237 if (schedule) { 238 scheduleBufferedMessageDelivery(); 239 } else if (dispatchNow) { 240 deliverMessageToHandler(message); 241 } 242 } 243 244 245 public AmqpReceiver exceptionHandler(Handler!Throwable handler) { 246 _exceptionHandler = handler; 247 return this; 248 } 249 250 251 public AmqpReceiver handler( Handler!AmqpMessage handler) { 252 int creditToFlow = 0; 253 bool schedule = false; 254 255 synchronized (this) { 256 this._handler = handler; 257 if (handler !is null) { 258 schedule = true; 259 260 // Flow initial credit if needed 261 if (!initialCreditGiven) { 262 initialCreditGiven = true; 263 creditToFlow = initialCredit; 264 } 265 } 266 } 267 268 if (creditToFlow > 0) { 269 int c = creditToFlow; 270 receiver.flow(c); 271 //connection.runWithTrampoline(v -> receiver.flow(c)); 272 } 273 274 if (schedule) { 275 scheduleBufferedMessageDelivery(); 276 } 277 278 return this; 279 } 280 281 282 AmqpReceiverImpl pause() { 283 demand = 0L; 284 return this; 285 } 286 287 288 AmqpReceiverImpl fetch(long amount) { 289 if (amount > 0) { 290 demand += amount; 291 if (demand < 0L) { 292 demand = Long.MAX_VALUE; 293 } 294 scheduleBufferedMessageDelivery(); 295 } 296 return this; 297 } 298 299 300 AmqpReceiverImpl resume() { 301 return fetch(Long.MAX_VALUE); 302 } 303 304 305 AmqpReceiverImpl endHandler(Handler!Void endHandler) { 306 this._endHandler = endHandler; 307 return this; 308 } 309 310 private void deliverMessageToHandler(AmqpMessageImpl message) { 311 Handler!AmqpMessage h; 312 synchronized (this) { 313 h = _handler; 314 } 315 316 try { 317 h.handle(message); 318 if (autoAck) { 319 message.accepted(); 320 } 321 } catch (Throwable e) { 322 logError("Unable to dispatch the AMQP message"); 323 version(HUNT_DEBUG) warning(e); 324 if (autoAck) { 325 message.rejected(); 326 } 327 } 328 329 this.receiver.flow(1); 330 } 331 332 private void scheduleBufferedMessageDelivery() { 333 bool schedule; 334 335 synchronized (this) { 336 schedule = !buffered.isEmpty() && demand > 0L; 337 } 338 339 if (schedule) { 340 // connection.runOnContext(v -> { 341 AmqpMessageImpl message = null; 342 343 synchronized (this) { 344 if (demand > 0L) { 345 if (demand != Long.MAX_VALUE) { 346 demand--; 347 } 348 //message = buffered.poll(); 349 message = buffered.get(0); 350 buffered.removeAt(0); 351 } 352 } 353 354 if (message !is null) { 355 // Delivering outside the synchronized block 356 deliverMessageToHandler(message); 357 358 // Schedule a delivery for a further buffered message if any 359 scheduleBufferedMessageDelivery(); 360 } 361 }//); 362 } 363 364 365 string address() { 366 return _address; 367 } 368 369 370 public AmqpConnection connection() { 371 return _connection; 372 } 373 374 375 public void close(Handler!Void handler) { 376 Handler!Void actualHandler; 377 if (handler is null) { 378 actualHandler = new class Handler!Void{ 379 void handle(Void var1) 380 {} 381 }; 382 } else { 383 actualHandler = handler; 384 } 385 386 synchronized (this) { 387 if (closed) { 388 actualHandler.handle(this); 389 return; 390 } 391 closed = true; 392 } 393 394 // receiver.detachHandler(done -> actualHandler.handle(done.mapEmpty())) 395 //done -> actualHandler.handle(done.mapEmpty()) 396 _connection.unregister(this); 397 // connection.runWithTrampoline(x -> { 398 if (this.receiver.isOpen()) { 399 try { 400 if (isDurable()) { 401 receiver.detachHandler(new class Handler!ProtonReceiver{ 402 void handle(ProtonReceiver var1) 403 { 404 actualHandler.handle(null); 405 } 406 }).detach(); 407 } else { 408 receiver 409 .closeHandler(new class Handler!ProtonReceiver{ 410 void handle(ProtonReceiver var1) 411 { 412 actualHandler.handle(null); 413 } 414 }) 415 .close(); 416 } 417 } catch (Exception e) { 418 // Somehow closed remotely 419 actualHandler.handle(null); 420 } 421 } else { 422 actualHandler.handle(this); 423 } 424 //}//); 425 426 } 427 428 429 //public Future<Void> close() { 430 // Promise<Void> promise = Promise.promise(); 431 // close(promise); 432 // return promise.future(); 433 //} 434 435 private bool isDurable() { 436 return durable; 437 } 438 }