1 /*
2  * hunt-amqp-client: AMQP Client Library for D Programming Language. Support for RabbitMQ and other AMQP Server.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module hunt.amqp.client.impl.AmqpSenderImpl;
12 
13 import hunt.amqp.client.AmqpConnection;
14 import hunt.amqp.client.AmqpMessage;
15 import hunt.amqp.client.AmqpSender;
16 import hunt.proton.amqp.transport.DeliveryState;
17 import hunt.amqp.ProtonDelivery;
18 import hunt.amqp.ProtonSender;
19 import hunt.amqp.impl.ProtonSenderImpl;
20 import hunt.amqp.Handler;
21 import hunt.amqp.client.impl.AmqpConnectionImpl;
22 import hunt.Object;
23 import hunt.logging;
24 import hunt.String;
25 import hunt.Exceptions;
26 
27 /**
28  * 
29  */
30 class AmqpSenderImpl : AmqpSender {
31     private  ProtonSender sender;
32     private  AmqpConnectionImpl _connection;
33     private bool closed;
34     private Handler!Throwable _exceptionHandler;
35     private Handler!Void _drainHandler;
36     private long remoteCredit = 0;
37 
38     this(ProtonSender sender, AmqpConnectionImpl connection,
39         Handler!AmqpSender completionHandler) {
40         this.sender = sender;
41         this._connection = connection;
42 
43         //sender
44         //  .closeHandler(res -> onClose(sender, res, false))
45         //  .detachHandler(res -> onClose(sender, res, true));
46         sender
47             .closeHandler(new class Handler!ProtonSender {
48                 void handle(ProtonSender var1)
49                 {
50                             onClose(sender,var1,false);
51                 }
52             })
53             .detachHandler(new class Handler!ProtonSender{
54                 void handle(ProtonSender var1)
55                 {
56                     onClose(sender,var1,true);
57                 }
58             });
59 
60 
61 
62         //      Handler<Void> dh = null;
63         //  synchronized (AmqpSenderImpl.this) {
64         //    // Update current state of remote credit
65         //    remoteCredit = ((ProtonSenderImpl) sender).getRemoteCredit();
66         //
67         //    // check the user drain handler, fire it outside synchronized block if not null
68         //    if (drainHandler !is null) {
69         //      dh = drainHandler;
70         //    }
71         //  }
72         //
73         //  if (dh !is null) {
74         //    dh.handle(null);
75         //  }
76         //}
77 
78         sender.sendQueueDrainHandler(new class Handler!ProtonSender {
79             void handle(ProtonSender var1)
80             {
81                 Handler!Void dh = null;
82                 synchronized(this)
83                 {
84                      remoteCredit = (cast(ProtonSenderImpl) sender).getRemoteCredit();
85                      if (_drainHandler !is null) {
86                          dh = _drainHandler;
87                      }
88                 }
89                 if (dh !is null) {
90                     dh.handle(null);
91                 }
92             }
93         });
94 
95 
96     //  if (done.failed()) {
97     //    completionHandler.handle(done.mapEmpty());
98     //  } else {
99     //    connection.register(this);
100     //    completionHandler.handle(Future.succeededFuture(this));
101     //  }
102     //}
103         sender.openHandler(new class Handler!ProtonSender{
104             void handle(ProtonSender var1)
105             {
106                     if(var1 is null)
107                     {
108                         completionHandler.handle(null);
109                     }else
110                     {
111                         _connection.register(this.outer);
112                         completionHandler.handle(this.outer);
113                     }
114             }
115         });
116         sender.open();
117     }
118 
119     /**
120      * Creates a new instance of {@link AmqpSenderImpl}. The created sender is passed into the {@code completionHandler}
121      * once opened. This method must be called on the connection context.
122      *
123      * @param sender            the underlying proton sender
124      * @param connection        the connection
125      * @param completionHandler the completion handler
126      */
127     static void create(ProtonSender sender, AmqpConnectionImpl connection,
128         Handler!AmqpSender completionHandler) {
129         new AmqpSenderImpl(sender, connection, completionHandler);
130     }
131 
132     private void onClose(ProtonSender sender, ProtonSender res, bool detach) {
133         Handler!Throwable eh = null;
134         bool closeSender = false;
135 
136         synchronized (this) {
137             if (!closed && _exceptionHandler !is null) {
138                 eh = _exceptionHandler;
139             }
140 
141             if (!closed) {
142                 closed = true;
143                 closeSender = true;
144             }
145         }
146 
147         if (eh !is null) {
148             if (res !is null) {
149                 eh.handle(new Exception("Sender closed remotely"));
150             } else {
151                 eh.handle(new Exception("Sender closed remotely with error"));
152             }
153         }
154 
155         if (closeSender) {
156             if (detach) {
157                 sender.detach();
158             } else {
159                 sender.close();
160             }
161         }
162     }
163 
164 
165     bool writeQueueFull() {
166         return remoteCredit <= 0;
167     }
168 
169 
170     AmqpConnection connection() {
171         return _connection;
172     }
173 
174 
175     AmqpSender send(AmqpMessage message) {
176         return doSend(message, null);
177     }
178 
179     private AmqpSender doSend(AmqpMessage message, Handler!Void acknowledgmentHandler) {
180         AmqpMessage updated;
181         if (message.address() is null) {
182             updated = AmqpMessage.create(message).address(address()).build();
183         } else {
184             updated = message;
185         }
186 
187 
188 
189         Handler!ProtonDelivery ack = new class Handler!ProtonDelivery{
190             void handle(ProtonDelivery delivery)
191             {
192                 Handler!Void handler = acknowledgmentHandler;
193                 if (acknowledgmentHandler is null) {
194                     handler = new class Handler!Void{
195                         void handle(Void v)
196                         {
197                             if(v is null)
198                             {
199                                 logWarning("Message rejected by remote peer");
200                             }
201                         }
202                     };
203                 }
204                 switch (delivery.getRemoteState().getType()) {
205                     case DeliveryStateType.Rejected:
206                      // handler.handle(Future.failedFuture("message rejected (REJECTED"));
207                         logInfo("message rejected (REJECTED");
208                         handler.handle(null);
209                         break;
210                     case DeliveryStateType.Modified:
211                         //handler.handle(Future.failedFuture("message rejected (MODIFIED)"));
212                         logInfo("message rejected (MODIFIED)");
213                         handler.handle(null);
214                         break;
215                     case DeliveryStateType.Released:
216                      // handler.handle(Future.failedFuture("message rejected (RELEASED)"));
217                         logInfo("message rejected (RELEASED)");
218                         handler.handle(null);
219                         break;
220                     case DeliveryStateType.Accepted:
221                         handler.handle(new String("Accepted"));
222                         break;
223                     default:
224                         //handler.handle(Future.failedFuture("Unsupported delivery type: " + delivery.getRemoteState().getType()));
225                         logError("Unsupported delivery type %d",delivery.getRemoteState().getType());
226                         handler.handle(null);
227                 }
228             }
229         };
230 
231 
232         //Handler!ProtonDelivery ack = delivery -> {
233         //  Handler<AsyncResult<Void>> handler = acknowledgmentHandler;
234         //  if (acknowledgmentHandler is null) {
235         //    handler = ar -> {
236         //      if (ar.failed()) {
237         //        LOGGER.warn("Message rejected by remote peer", ar.cause());
238         //      }
239         //    };
240         //  }
241         //
242         //  switch (delivery.getRemoteState().getType()) {
243         //    case Rejected:
244         //      handler.handle(Future.failedFuture("message rejected (REJECTED"));
245         //      break;
246         //    case Modified:
247         //      handler.handle(Future.failedFuture("message rejected (MODIFIED)"));
248         //      break;
249         //    case Released:
250         //      handler.handle(Future.failedFuture("message rejected (RELEASED)"));
251         //      break;
252         //    case Accepted:
253         //      handler.handle(Future.succeededFuture());
254         //      break;
255         //    default:
256         //      handler.handle(Future.failedFuture("Unsupported delivery type: " + delivery.getRemoteState().getType()));
257         //  }
258         //};
259 
260         synchronized (this) {
261             // Update the credit tracking. We only need to adjust this here because the sends etc may not be on the context
262             // thread and if that is the case we can't use the ProtonSender sendQueueFull method to check that credit has been
263             // exhausted following this doSend call since we will have only scheduled the actual send for later.
264             remoteCredit--;
265         }
266 
267 
268         sender.send(updated.unwrap(), ack);
269         synchronized (this)
270         {
271             remoteCredit = (cast(ProtonSenderImpl) sender).getRemoteCredit();
272         }
273 
274         //connection.runWithTrampoline(new class Handler!Void {
275         //  void handle(Void var1)
276         //  {
277         //    sender.send(updated.unwrap(), ack);
278         //    synchronized (this)
279         //    {
280         //      remoteCredit = (cast(ProtonSenderImpl) sender).getRemoteCredit();
281         //    }
282         //  }
283         //});
284 
285         //connection.runWithTrampoline(x -> {
286         //  sender.send(updated.unwrap(), ack);
287         //
288         //  synchronized (AmqpSenderImpl.this) {
289         //    // Update the credit tracking *again*. We need to reinitialise it here in case the doSend call was performed on
290         //    // a thread other than the client context, to ensure we didn't fall foul of a race between the above pre-send
291         //    // update on that thread, the above send on the context thread, and the sendQueueDrainHandler based updates on
292         //    // the context thread.
293         //    remoteCredit = ((ProtonSenderImpl) sender).getRemoteCredit();
294         //  }
295         //});
296 
297 
298         return this;
299     }
300 
301 
302     AmqpSender exceptionHandler(Handler!Throwable handler) {
303         _exceptionHandler = handler;
304         return this;
305     }
306 
307 
308     //Future<Void> write(AmqpMessage data) {
309     //  Promise<Void> promise = Promise.promise();
310     //  doSend(data, promise);
311     //  return promise.future();
312     //}
313 
314 
315     void write(AmqpMessage data, Handler!Void handler) {
316         doSend(data, handler);
317     }
318 
319 
320     AmqpSender setWriteQueueMaxSize(int maxSize) {
321         // No-op, available sending credit is controlled by recipient peer in AMQP 1.0.
322         return this;
323     }
324 
325 
326     override
327     void end(Handler!Void handler) {
328         close(handler);
329     }
330 
331 
332     AmqpSender drainHandler(Handler!Void handler) {
333         _drainHandler = handler;
334         return this;
335     }
336 
337 
338     AmqpSender sendWithAck(AmqpMessage message, Handler!Void acknowledgementHandler) {
339         return doSend(message, acknowledgementHandler);
340     }
341 
342 
343     //Future<Void> sendWithAck(AmqpMessage message) {
344     //  Promise<Void> promise = Promise.promise();
345     //  sendWithAck(message, promise);
346     //  return promise.future();
347     //}
348 
349 
350     void close(Handler!Void handler) {
351         Handler!Void actualHandler;
352         if (handler is null) {
353             implementationMissing(false);
354          // actualHandler = x -> { /* NOOP */ };
355         } else {
356             actualHandler = handler;
357         }
358 
359         synchronized (this) {
360             if (closed) {
361                 actualHandler.handle(new String(""));
362                 return;
363             }
364             closed = true;
365         }
366 
367 
368         //actualHandler.handle(v.mapEmpty()
369         _connection.unregister(this);
370         if (sender.isOpen()) {
371                 try {
372                     sender
373                     .closeHandler(new class Handler!ProtonSender{
374                         void handle(ProtonSender var1)
375                         {
376                             if(actualHandler !is null) {
377                                 actualHandler.handle(null);
378                             }
379                         }
380                     }).close();
381                 } catch (Exception e) {
382                     // Somehow closed remotely
383                     actualHandler.handle(null);
384                 }
385             } else {
386                      actualHandler.handle(new String(""));
387                  }
388         }
389         //connection.runWithTrampoline(x -> {
390         //  if (sender.isOpen()) {
391         //    try {
392         //      sender
393         //        .closeHandler(v -> actualHandler.handle(v.mapEmpty()))
394         //        .close();
395         //    } catch (Exception e) {
396         //      // Somehow closed remotely
397         //      actualHandler.handle(Future.failedFuture(e));
398         //    }
399         //  } else {
400         //    actualHandler.handle(Future.succeededFuture());
401         //  }
402         //});
403     //}
404 
405 
406     //Future<Void> close() {
407     //  Promise<Void> promise = Promise.promise();
408     //  close(promise);
409     //  return promise.future();
410     //}
411 
412 
413     string address() {
414         return sender.getRemoteAddress();
415     }
416 }