1 /*
2  * hunt-amqp-client: AMQP Client Library for D Programming Language. Support for RabbitMQ and other AMQP Server.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module hunt.amqp.client.impl.AmqpReceiverImpl;
12 
13 import hunt.amqp.client.AmqpConnection;
14 import hunt.amqp.client.AmqpMessage;
15 import hunt.amqp.client.AmqpReceiver;
16 import hunt.amqp.client.AmqpReceiverOptions;
17 import hunt.amqp.Handler;
18 import hunt.amqp.ProtonReceiver;
19 import hunt.Long;
20 import hunt.Object;
21 import hunt.collection.ArrayDeque;
22 import hunt.collection.Queue;
23 import hunt.amqp.client.impl.AmqpMessageImpl;
24 import hunt.collection.List;
25 import hunt.collection.ArrayList;
26 import hunt.amqp.ProtonMessageHandler;
27 import hunt.proton.message.Message;
28 import hunt.amqp.ProtonDelivery;
29 import hunt.logging;
30 
31 import hunt.amqp.client.impl.AmqpConnectionImpl;
32 
33 class AmqpReceiverImpl : AmqpReceiver {
34 
35     private  ProtonReceiver receiver;
36     private  AmqpConnectionImpl _connection;
37     private  List!AmqpMessageImpl buffered ;//= new ArrayDeque<>();
38     private  bool durable;
39     private  bool autoAck;
40     /**
41      * The address.
42      * Not  because for dynamic link the address is set when the createReceiver is opened.
43      */
44     private string _address;
45     private Handler!AmqpMessage _handler;
46     private long demand ;//= Long.MAX_VALUE;
47     private bool closed;
48     private Handler!Void _endHandler;
49     private Handler!Throwable _exceptionHandler;
50     private bool initialCreditGiven;
51     private int initialCredit = 1000;
52 
53     /**
54      * Creates a new instance of {@link AmqpReceiverImpl}.
55      * This method must be called on the connection context.
56      *
57      * @param address           the address, may be {@code null} for dynamic links
58      * @param connection        the connection
59      * @param options           the receiver options, must not be {@code null}
60      * @param receiver          the underlying proton createReceiver
61      * @param completionHandler called when the createReceiver is opened
62      */
63     this(
64         string address,
65         AmqpConnectionImpl connection,
66         AmqpReceiverOptions options,
67         ProtonReceiver receiver,
68         Handler!AmqpReceiver completionHandler) {
69         this._address = address;
70         this.receiver = receiver;
71         this._connection = connection;
72         this.durable = options.isDurable();
73         this.autoAck = options.isAutoAcknowledgement();
74         int maxBufferedMessages = options.getMaxBufferedMessages();
75         if (maxBufferedMessages > 0) {
76             this.initialCredit = maxBufferedMessages;
77         }
78 
79         this.demand = Long.MAX_VALUE;
80         this.buffered =  new ArrayList!AmqpMessageImpl;
81         // Disable auto-accept and automated prefetch, we manage disposition and credit
82         // manually to allow for delayed handler registration and pause/resume functionality.
83         this.receiver
84             .setAutoAccept(false)
85             .setPrefetch(0);
86 
87         //this.receiver.handler((delivery, message) -> handleMessage(new AmqpMessageImpl(message, delivery)));
88         //if (this.handler !is null) {
89         //  handler(this.handler);
90         //}
91         //this.receiver.handler((delivery, message) -> handleMessage(new AmqpMessageImpl(message, delivery)));
92         this.receiver.handler(new class ProtonMessageHandler{
93             void handle(ProtonDelivery delivery, Message message)
94             {
95                 handleMessage(new AmqpMessageImpl(message, delivery));
96             }
97         });
98 
99         if (this._handler !is null) {
100             handler(this._handler);
101         }
102 
103         //   this.receiver.closeHandler(res -> {
104         //  onClose(address, receiver, res, false);
105         //})
106         //  .detachHandler(res -> {
107         //    onClose(address, receiver, res, true);
108         //  });
109 
110         this.receiver.closeHandler(new class Handler!ProtonReceiver{
111             void handle(ProtonReceiver var1)
112             {
113                 onClose(_address, receiver, var1, false);
114             }
115         })
116             .detachHandler(new class Handler!ProtonReceiver{
117             void handle(ProtonReceiver var1)
118             {
119                 onClose(_address, receiver, var1, true);
120             }
121         });
122 
123 
124         //    this.receiver
125         //    .openHandler(res -> {
126         //    if (res.failed()) {
127         //  completionHandler.handle(res.mapEmpty());
128         //} else {
129         //      this.connection.register(this);
130         //      synchronized (this) {
131         //        if (this.address is null) {
132         //          this.address = res.result().getRemoteAddress();
133         //        }
134         //      }
135         //      completionHandler.handle(Future.succeededFuture(this));
136         //    }
137         //  });
138         this.receiver
139             .openHandler(new class Handler!ProtonReceiver {
140              void handle(ProtonReceiver var1)
141              {
142                      if(var1 is null)
143                      {
144                          completionHandler.handle(null);
145                      }else
146                      {
147                          _connection.register(this.outer);
148                          synchronized (this)
149                          {
150                              if (_address is null || _address.length == 0) {
151                                  _address = var1.getRemoteAddress();
152                              }
153                          }
154                          completionHandler.handle(this.outer);
155                      }
156              }
157         });
158 
159         this.receiver.open();
160     }
161 
162     private void onClose(string address, ProtonReceiver receiver, ProtonReceiver res, bool detach) {
163         Handler!Void endh = null;
164         Handler!Throwable exh = null;
165         bool closeReceiver = false;
166 
167         synchronized (this) {
168             if (!closed && _endHandler !is null) {
169                 endh = _endHandler;
170             } else if (!closed && _exceptionHandler !is null) {
171                 exh = _exceptionHandler;
172             }
173 
174             if (!closed) {
175                 closed = true;
176                 closeReceiver = true;
177             }
178         }
179 
180         if (endh !is null) {
181             endh.handle(null);
182         } else if (exh !is null) {
183             if (res !is null) {
184                 exh.handle(new Exception("Consumer closed remotely"));
185             } else {
186                 exh.handle(new Exception("Consumer closed remotely with error"));
187             }
188         } else {
189             if (res) {
190             //  LOGGER.warn("Consumer for address " + address + " unexpectedly closed remotely");
191                 logError("Consumer for address %s unexpectedly closed remotely",address);
192             } else {
193              // LOGGER.warn("Consumer for address " + address + " unexpectedly closed remotely with error", res.cause());
194                 logError("Consumer for address %s unexpectedly closed remotely with error", address);
195             }
196         }
197 
198         if (closeReceiver) {
199             if (detach) {
200                 receiver.detach();
201             } else {
202                 receiver.close();
203             }
204         }
205     }
206 
207     private void handleMessage(AmqpMessageImpl message) {
208         bool schedule = false;
209         bool dispatchNow = false;
210 
211         synchronized (this) {
212             if (_handler !is null && demand > 0L && buffered.isEmpty()) {
213                 if (demand != Long.MAX_VALUE) {
214                     demand--;
215                 }
216                 dispatchNow = true;
217             } else if (_handler !is null && demand > 0L) {
218                 // Buffered messages present, deliver the oldest of those instead
219                 buffered.add(message);
220              // message = buffered.poll();
221                 message = buffered.get(0);
222                 buffered.removeAt(0);
223 
224                 if (demand != Long.MAX_VALUE) {
225                     demand--;
226                 }
227 
228                 // Schedule a delivery for the next buffered message
229                 schedule = true;
230             } else {
231                 // Buffer message until we aren't paused
232                 buffered.add(message);
233             }
234         }
235 
236         // schedule next delivery if appropriate, after earlier delivery to allow chance to pause etc.
237         if (schedule) {
238             scheduleBufferedMessageDelivery();
239         } else if (dispatchNow) {
240             deliverMessageToHandler(message);
241         }
242     }
243 
244 
245     public AmqpReceiver exceptionHandler(Handler!Throwable handler) {
246         _exceptionHandler = handler;
247         return this;
248     }
249 
250 
251     public AmqpReceiver handler( Handler!AmqpMessage handler) {
252         int creditToFlow = 0;
253         bool schedule = false;
254 
255         synchronized (this) {
256             this._handler = handler;
257             if (handler !is null) {
258                 schedule = true;
259 
260                 // Flow initial credit if needed
261                 if (!initialCreditGiven) {
262                     initialCreditGiven = true;
263                     creditToFlow = initialCredit;
264                 }
265             }
266         }
267 
268         if (creditToFlow > 0) {
269              int c = creditToFlow;
270              receiver.flow(c);
271             //connection.runWithTrampoline(v -> receiver.flow(c));
272         }
273 
274         if (schedule) {
275             scheduleBufferedMessageDelivery();
276         }
277 
278         return this;
279     }
280 
281 
282     AmqpReceiverImpl pause() {
283         demand = 0L;
284         return this;
285     }
286 
287 
288     AmqpReceiverImpl fetch(long amount) {
289         if (amount > 0) {
290             demand += amount;
291             if (demand < 0L) {
292                 demand = Long.MAX_VALUE;
293             }
294             scheduleBufferedMessageDelivery();
295         }
296         return this;
297     }
298 
299 
300     AmqpReceiverImpl resume() {
301         return fetch(Long.MAX_VALUE);
302     }
303 
304 
305     AmqpReceiverImpl endHandler(Handler!Void endHandler) {
306         this._endHandler = endHandler;
307         return this;
308     }
309 
310     private void deliverMessageToHandler(AmqpMessageImpl message) {
311         Handler!AmqpMessage h;
312         synchronized (this) {
313             h = _handler;
314         }
315 
316         try {
317             h.handle(message);
318             if (autoAck) {
319                 message.accepted();
320             }
321         } catch (Throwable e) {
322             logError("Unable to dispatch the AMQP message");
323             version(HUNT_DEBUG) warning(e);
324             if (autoAck) {
325                 message.rejected();
326             }
327         }
328 
329         this.receiver.flow(1);
330     }
331 
332     private void scheduleBufferedMessageDelivery() {
333         bool schedule;
334 
335         synchronized (this) {
336             schedule = !buffered.isEmpty() && demand > 0L;
337         }
338 
339         if (schedule) {
340      //   connection.runOnContext(v -> {
341                 AmqpMessageImpl message = null;
342 
343                 synchronized (this) {
344                     if (demand > 0L) {
345                         if (demand != Long.MAX_VALUE) {
346                             demand--;
347                         }
348                         //message = buffered.poll();
349                         message = buffered.get(0);
350                         buffered.removeAt(0);
351                     }
352                 }
353 
354                 if (message !is null) {
355                     // Delivering outside the synchronized block
356                     deliverMessageToHandler(message);
357 
358                     // Schedule a delivery for a further buffered message if any
359                     scheduleBufferedMessageDelivery();
360                 }
361             }//);
362         }
363 
364 
365     string address() {
366         return _address;
367     }
368 
369 
370     public AmqpConnection connection() {
371         return _connection;
372     }
373 
374 
375     public void close(Handler!Void handler) {
376         Handler!Void actualHandler;
377         if (handler is null) {
378             actualHandler = new class Handler!Void{
379                 void handle(Void var1)
380                 {}
381             };
382         } else {
383             actualHandler = handler;
384         }
385 
386         synchronized (this) {
387             if (closed) {
388                 actualHandler.handle(this);
389                 return;
390             }
391             closed = true;
392         }
393 
394         // receiver.detachHandler(done -> actualHandler.handle(done.mapEmpty()))
395         //done -> actualHandler.handle(done.mapEmpty())
396         _connection.unregister(this);
397  //   connection.runWithTrampoline(x -> {
398             if (this.receiver.isOpen()) {
399                 try {
400                     if (isDurable()) {
401                         receiver.detachHandler(new class Handler!ProtonReceiver{
402                             void handle(ProtonReceiver var1)
403                             {
404                                     actualHandler.handle(null);
405                             }
406                         }).detach();
407                     } else {
408                         receiver
409                             .closeHandler(new class Handler!ProtonReceiver{
410                             void handle(ProtonReceiver var1)
411                             {
412                                 actualHandler.handle(null);
413                             }
414                         })
415                             .close();
416                     }
417                 } catch (Exception e) {
418                     // Somehow closed remotely
419                     actualHandler.handle(null);
420                 }
421             } else {
422                 actualHandler.handle(this);
423             }
424         //}//);
425 
426     }
427 
428 
429     //public Future<Void> close() {
430     //  Promise<Void> promise = Promise.promise();
431     //  close(promise);
432     //  return promise.future();
433     //}
434 
435     private  bool isDurable() {
436         return durable;
437     }
438 }