1 /*
2  * hunt-amqp-client: AMQP Client Library for D Programming Language. Support for RabbitMQ and other AMQP Server.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module hunt.amqp.client.impl.AmqpSenderImpl;
12 
13 import hunt.amqp.client.AmqpConnection;
14 import hunt.amqp.client.AmqpMessage;
15 import hunt.amqp.client.AmqpSender;
16 import hunt.amqp.ProtonDelivery;
17 import hunt.amqp.ProtonSender;
18 import hunt.amqp.impl.ProtonSenderImpl;
19 import hunt.amqp.Handler;
20 import hunt.amqp.client.impl.AmqpConnectionImpl;
21 import hunt.proton.amqp.transport.DeliveryState;
22 
23 import hunt.concurrency.Future;
24 import hunt.concurrency.FuturePromise;
25 import hunt.Object;
26 import hunt.logging;
27 import hunt.net.AsyncResult;
28 import hunt.String;
29 import hunt.Exceptions;
30 import std.format;
31 
32 /**
33  * 
34  */
35 class AmqpSenderImpl : AmqpSender {
36     private  ProtonSender sender;
37     private  AmqpConnectionImpl _connection;
38     private bool closed;
39     private Handler!Throwable _exceptionHandler;
40     private Handler!Void _drainHandler;
41     private long remoteCredit = 0;
42 
43     this(ProtonSender sender, AmqpConnectionImpl connection,
44         Handler!AmqpSender completionHandler) {
45         this.sender = sender;
46         this._connection = connection;
47 
48         //sender
49         //  .closeHandler(res -> onClose(sender, res, false))
50         //  .detachHandler(res -> onClose(sender, res, true));
51         sender
52             .closeHandler(new class Handler!ProtonSender {
53                 void handle(ProtonSender var1)
54                 {
55                             onClose(sender,var1,false);
56                 }
57             })
58             .detachHandler(new class Handler!ProtonSender{
59                 void handle(ProtonSender var1)
60                 {
61                     onClose(sender,var1,true);
62                 }
63             });
64 
65 
66 
67         //      Handler<Void> dh = null;
68         //  synchronized (AmqpSenderImpl.this) {
69         //    // Update current state of remote credit
70         //    remoteCredit = ((ProtonSenderImpl) sender).getRemoteCredit();
71         //
72         //    // check the user drain handler, fire it outside synchronized block if not null
73         //    if (drainHandler !is null) {
74         //      dh = drainHandler;
75         //    }
76         //  }
77         //
78         //  if (dh !is null) {
79         //    dh.handle(null);
80         //  }
81         //}
82 
83         sender.sendQueueDrainHandler(new class Handler!ProtonSender {
84             void handle(ProtonSender var1)
85             {
86                 Handler!Void dh = null;
87                 synchronized(this)
88                 {
89                      remoteCredit = (cast(ProtonSenderImpl) sender).getRemoteCredit();
90                      if (_drainHandler !is null) {
91                          dh = _drainHandler;
92                      }
93                 }
94                 if (dh !is null) {
95                     dh.handle(null);
96                 }
97             }
98         });
99 
100 
101     //  if (done.failed()) {
102     //    completionHandler.handle(done.mapEmpty());
103     //  } else {
104     //    connection.register(this);
105     //    completionHandler.handle(Future.succeededFuture(this));
106     //  }
107     //}
108         sender.openHandler(new class Handler!ProtonSender{
109             void handle(ProtonSender var1)
110             {
111                     if(var1 is null)
112                     {
113                         completionHandler.handle(null);
114                     }else
115                     {
116                         _connection.register(this.outer);
117                         completionHandler.handle(this.outer);
118                     }
119             }
120         });
121         sender.open();
122     }
123 
124     /**
125      * Creates a new instance of {@link AmqpSenderImpl}. The created sender is passed into the {@code completionHandler}
126      * once opened. This method must be called on the connection context.
127      *
128      * @param sender            the underlying proton sender
129      * @param connection        the connection
130      * @param completionHandler the completion handler
131      */
132     static void create(ProtonSender sender, AmqpConnectionImpl connection,
133         Handler!AmqpSender completionHandler) {
134         new AmqpSenderImpl(sender, connection, completionHandler);
135     }
136 
137     private void onClose(ProtonSender sender, ProtonSender res, bool detach) {
138         Handler!Throwable eh = null;
139         bool closeSender = false;
140 
141         synchronized (this) {
142             if (!closed && _exceptionHandler !is null) {
143                 eh = _exceptionHandler;
144             }
145 
146             if (!closed) {
147                 closed = true;
148                 closeSender = true;
149             }
150         }
151 
152         if (eh !is null) {
153             if (res !is null) {
154                 eh.handle(new Exception("Sender closed remotely"));
155             } else {
156                 eh.handle(new Exception("Sender closed remotely with error"));
157             }
158         }
159 
160         if (closeSender) {
161             if (detach) {
162                 sender.detach();
163             } else {
164                 sender.close();
165             }
166         }
167     }
168 
169 
170     bool writeQueueFull() {
171         return remoteCredit <= 0;
172     }
173 
174 
175     AmqpConnection connection() {
176         return _connection;
177     }
178 
179 
180     AmqpSender send(AmqpMessage message) {
181         return doSend(message, null);
182     }
183 
184     private AmqpSender doSend(AmqpMessage message, VoidAsyncHandler acknowledgmentHandler) {
185         AmqpMessage updated;
186         if (message.address() is null) {
187             updated = AmqpMessage.create(message).address(address()).build();
188         } else {
189             updated = message;
190         }
191 
192 
193         Handler!ProtonDelivery ack = new class Handler!ProtonDelivery{
194             void handle(ProtonDelivery delivery)
195             {
196                 VoidAsyncHandler handler = acknowledgmentHandler;
197                 if (acknowledgmentHandler is null) {
198                     handler = (VoidAsyncResult ar) {
199                         if (ar.failed()) {
200                             Throwable th = ar.cause();
201                             warningf("Message rejected by remote peer: %s", th.msg);
202                             version(HUNT_DEBUG) warning(th);
203                         } 
204                     };
205                 }
206 
207                 switch (delivery.getRemoteState().getType()) {
208                     case DeliveryStateType.Rejected:
209                         version(HUNT_AMQP_DEBUG) trace("message rejected (REJECTED)");
210                         handler(failedResult!(Void)(new Exception("message rejected (REJECTED)")));
211                         break;
212                     case DeliveryStateType.Modified:
213                         version(HUNT_AMQP_DEBUG) trace("message rejected (MODIFIED)");
214                         handler(failedResult!(Void)(new Exception("message rejected (MODIFIED)")));
215                         break;
216                     case DeliveryStateType.Released:
217                         version(HUNT_AMQP_DEBUG) trace("message rejected (RELEASED)");
218                         handler(failedResult!(Void)(new Exception("message rejected (RELEASED)")));
219                         break;
220                     case DeliveryStateType.Accepted:
221                         version(HUNT_AMQP_DEBUG) trace("Accepted");
222                         handler(succeededResult!(Void)(null));
223                         break;
224 
225                     default:
226                         string msg = format("Unsupported delivery type %d", delivery.getRemoteState().getType());
227                         logError(msg);
228 
229                         handler(failedResult!(Void)(new Exception(msg)));
230                 }
231             }
232         };
233 
234         synchronized (this) {
235             // Update the credit tracking. We only need to adjust this here because the sends etc may not be on the context
236             // thread and if that is the case we can't use the ProtonSender sendQueueFull method to check that credit has been
237             // exhausted following this doSend call since we will have only scheduled the actual send for later.
238             remoteCredit--;
239         }
240 
241 
242         sender.send(updated.unwrap(), ack);
243         synchronized (this)
244         {
245             remoteCredit = (cast(ProtonSenderImpl) sender).getRemoteCredit();
246         }
247 
248         //connection.runWithTrampoline(new class Handler!Void {
249         //  void handle(Void var1)
250         //  {
251         //    sender.send(updated.unwrap(), ack);
252         //    synchronized (this)
253         //    {
254         //      remoteCredit = (cast(ProtonSenderImpl) sender).getRemoteCredit();
255         //    }
256         //  }
257         //});
258 
259         //connection.runWithTrampoline(x -> {
260         //  sender.send(updated.unwrap(), ack);
261         //
262         //  synchronized (AmqpSenderImpl.this) {
263         //    // Update the credit tracking *again*. We need to reinitialise it here in case the doSend call was performed on
264         //    // a thread other than the client context, to ensure we didn't fall foul of a race between the above pre-send
265         //    // update on that thread, the above send on the context thread, and the sendQueueDrainHandler based updates on
266         //    // the context thread.
267         //    remoteCredit = ((ProtonSenderImpl) sender).getRemoteCredit();
268         //  }
269         //});
270 
271 
272         return this;
273     }
274 
275 
276     AmqpSender exceptionHandler(Handler!Throwable handler) {
277         _exceptionHandler = handler;
278         return this;
279     }
280 
281 
282     Future!Void write(AmqpMessage data) {
283         FuturePromise!Void f = new FuturePromise!Void();
284         doSend(data, (VoidAsyncResult ar) {
285             if (ar.succeeded()) {
286                 f.succeeded(ar.result());
287             } else {
288                 f.failed(cast(Exception) ar.cause());
289             }
290         });
291 
292         return f;        
293     }
294 
295     void write(AmqpMessage data, VoidAsyncHandler handler) {
296         doSend(data, handler);
297     }
298 
299     AmqpSender setWriteQueueMaxSize(int maxSize) {
300         // No-op, available sending credit is controlled by recipient peer in AMQP 1.0.
301         return this;
302     }
303 
304     override
305     void end(VoidAsyncHandler handler) {
306         close(handler);
307     }
308 
309     AmqpSender drainHandler(Handler!Void handler) {
310         _drainHandler = handler;
311         return this;
312     }
313 
314 
315     AmqpSender sendWithAck(AmqpMessage message, VoidAsyncHandler acknowledgementHandler) {
316         return doSend(message, acknowledgementHandler);
317     }
318 
319 
320     //Future<Void> sendWithAck(AmqpMessage message) {
321     //  Promise<Void> promise = Promise.promise();
322     //  sendWithAck(message, promise);
323     //  return promise.future();
324     //}
325 
326 
327     void close(VoidAsyncHandler handler) {
328         
329         VoidAsyncHandler actualHandler;
330         if (handler is null) {
331             actualHandler = (VoidAsyncResult ar) {
332                 /* NOOP */
333             };
334         } else {
335             actualHandler = handler;
336         }
337 
338         synchronized (this) {
339             if (closed) {
340                 actualHandler(succeededResult!(Void)(null));
341                 return;
342             }
343             closed = true;
344         }
345 
346         _connection.unregister(this);
347 
348         if (sender.isOpen()) {
349             try {
350                 sender.closeHandler(new class Handler!ProtonSender{
351                     void handle(ProtonSender var1) {
352                         actualHandler(succeededResult!(Void)(null));
353                     }
354                 }).close();
355             } catch (Exception e) {
356                 // Somehow closed remotely
357                 actualHandler(failedResult!(Void)(e));
358             }
359         } else {
360             actualHandler(succeededResult!(Void)(null));
361         }
362     }
363 
364 
365     Future!Void close() {
366 
367         FuturePromise!Void f = new FuturePromise!Void();
368         close((VoidAsyncResult ar) {
369             if (ar.succeeded()) {
370                 f.succeeded(ar.result());
371             } else {
372                 f.failed(cast(Exception) ar.cause());
373             }
374         });
375 
376         return f; 
377     }
378 
379 
380     string address() {
381         return sender.getRemoteAddress();
382     }
383 }