1 /*
2  * hunt-amqp-client: AMQP Client Library for D Programming Language. Support for RabbitMQ and other AMQP Server.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module hunt.amqp.client.impl.AmqpConnectionImpl;
12 
13 import hunt.amqp.client.AmqpClientOptions;
14 import hunt.amqp.client.AmqpSender;
15 import hunt.amqp.client.AmqpReceiver;
16 import hunt.amqp.client.impl.AmqpClientImpl;
17 import hunt.amqp.client.AmqpReceiverOptions;
18 import hunt.amqp.client.impl.AmqpReceiverImpl;
19 import hunt.amqp.client.impl.AmqpSenderImpl;
20 import hunt.amqp.client.AmqpSenderOptions;
21 import hunt.amqp.client.AmqpConnection;
22 
23 //import hunt.core.*;
24 import hunt.amqp.ProtonConnection;
25 import hunt.amqp.impl.ProtonConnectionImpl;
26 import hunt.amqp.Handler;
27 import hunt.amqp.ProtonQoS;
28 import hunt.amqp.ProtonClient;
29 import hunt.amqp.ProtonLinkOptions;
30 import hunt.amqp.ProtonReceiver;
31 import hunt.amqp.ProtonSender;
32 import hunt.amqp.impl.ProtonReceiverImpl;
33 import hunt.amqp.impl.ProtonLinkImpl;
34 import hunt.proton.amqp.Symbol;
35 import hunt.proton.amqp.messaging.TerminusDurability;
36 import hunt.proton.amqp.messaging.TerminusExpiryPolicy;
37 import hunt.proton.engine.EndpointState;
38 import hunt.proton.amqp.messaging.Source;
39 
40 import hunt.collection.HashMap;
41 import hunt.collection.List;
42 import hunt.collection.Map;
43 import hunt.collection.ArrayList;
44 
45 import hunt.concurrency.Future;
46 import hunt.concurrency.FuturePromise;
47 import hunt.Exceptions;
48 import hunt.logging;
49 import hunt.net.AsyncResult;
50 import hunt.Object;
51 
52 import hunt.String;
53 import std.concurrency : initOnce;
54 import core.atomic;
55 import std.uni;
56 import std.range;
57 
58 class AmqpConnectionImpl : AmqpConnection {
59 
60     // static  Symbol PRODUCT_KEY = Symbol.valueOf("product");
61 
62     static String PRODUCT() {
63         __gshared String m;
64         return initOnce!(m)(new String("hunt-amqp-client"));
65     }
66 
67     static Symbol PRODUCT_KEY() {
68         __gshared Symbol m;
69         return initOnce!(m)(Symbol.valueOf("product"));
70     }
71 
72     private AmqpClientOptions _options;
73     //private  AtomicBoolean _isClosed = new AtomicBoolean();
74     private shared bool _isClosing;
75     private shared bool _isClosed;
76     //private  AtomicReference<ProtonConnection> connection = new AtomicReference<>();
77     private ProtonConnection connection;
78     // private  Context context;
79 
80     private List!AmqpSender senders; //= new CopyOnWriteArrayList<>();
81     private List!AmqpReceiver receivers; // = new CopyOnWriteArrayList<>();
82     /**
83      * The exception handler, protected by the monitor lock.
84      */
85     private Handler!Throwable _exceptionHandler;
86 
87     this(AmqpClientImpl client, AmqpClientOptions options, ProtonClient proton,
88             AsyncResultHandler!AmqpConnection connectionHandler) {
89 
90         assert(proton !is null, "proton cannot be `null`");
91         assert(connectionHandler !is null, "connection handler cannot be `null`");
92 
93         _isClosing = false;
94         _options = options;
95         senders = new ArrayList!AmqpSender;
96         receivers = new ArrayList!AmqpReceiver;
97         connect(client, proton, connectionHandler);
98     }
99 
100     bool isClosed() {
101         return _isClosed;
102     }
103 
104     // dfmt off
105     private void connect(AmqpClientImpl client, ProtonClient proton, 
106         AsyncResultHandler!AmqpConnection connectionHandler) {
107 
108         proton.connect(_options, _options.getHost(), _options.getPort(), 
109             _options.getUsername(), _options.getPassword(),
110             new class Handler!ProtonConnection {
111                 // Called on the connection context.
112                 void handle(ProtonConnection ar)
113                 {
114                     if (ar !is null)
115                     {
116                         if (connection !is null) {
117                             string msg = "Unable to connect - already holding a connection";
118                             error(msg);
119                             if(connectionHandler !is null) {
120                                 connectionHandler(failedResult!AmqpConnection(new Exception(msg)));
121                             }
122                             return;
123                         }else
124                         {
125                             connection = ar;
126                         }
127 
128                         Map!(Symbol, Object) map = new HashMap!(Symbol,Object)();
129                         map.put(AmqpConnectionImpl.PRODUCT_KEY, AmqpConnectionImpl.PRODUCT);
130                         if (_options.getContainerId() !is null) {
131                             connection.setContainer(_options.getContainerId());
132                         }
133 
134                         if (_options.getVirtualHost() !is null) {
135                             connection.setHostname(_options.getVirtualHost());
136                         }
137 
138                         connection
139                         .setProperties(map)
140                         .disconnectHandler((ProtonConnection var1) {
141                                 try {
142                                     onDisconnect();
143                                 } finally {
144                                     _isClosed = true;
145                                 }
146                             })
147                             .closeHandler( (x) {
148                                 // Not expected closing, consider it failed
149                                 try {
150                                     onDisconnect();
151                                 } finally {
152                                     _isClosed = true;
153                                 }
154                             })
155                             .openHandler(new class Handler!ProtonConnection {
156                                 void handle(ProtonConnection conn)
157                                 {
158                                     if (conn !is null) {
159                                         if(client !is null)
160                                             client.register(this.outer.outer);
161                                         _isClosed = false;
162                                         connectionHandler(succeededResult!(AmqpConnection)(this.outer.outer));
163                                     } else {
164                                         _isClosed = true;
165                                         connectionHandler(failedResult!(AmqpConnection)(null));
166                                     }
167                                 }
168                             });
169 
170                         connection.open();
171                         // }
172                     } else {
173                         connectionHandler(failedResult!(AmqpConnection)(null));
174                     }
175                 }
176             }
177         );
178     }
179 
180 // dfmt on
181 
182     /**
183      * Must be called on context.
184      */
185     private void onDisconnect() {
186         Handler!Throwable h = null;
187         ProtonConnection conn = connection;
188         connection = null;
189         synchronized (this) {
190             if (_exceptionHandler !is null) {
191                 h = _exceptionHandler;
192             }
193         }
194 
195         version(HUNT_DEBUG) trace("handling...");
196 
197         if (h !is null) {
198             string message = getErrorMessage(conn);
199             h.handle(new Exception(message));
200         }
201     }
202 
203     private string getErrorMessage(ProtonConnection conn) {
204         string message = "Connection disconnected";
205         if (conn !is null) {
206             if (conn.getCondition() !is null && conn.getCondition().getDescription() !is null) {
207                 message ~= " - " ~ (conn.getCondition().getDescription().value);
208             } else if (conn.getRemoteCondition() !is null
209                     && conn.getRemoteCondition().getDescription() !is null) {
210                 message ~= " - " ~ conn.getRemoteCondition().getDescription().value;
211             }
212         }
213         return message;
214     }
215 
216     void runOnContext(Handler!Void action) {
217         implementationMissing(false);
218         // context.runOnContext(action);
219     }
220 
221     void runWithTrampoline(Handler!Void action) {
222         implementationMissing(false);
223         //if (Vertx.currentContext() == context) {
224         //  action.handle(null);
225         //} else {
226         //  runOnContext(action);
227         //}
228     }
229 
230     /**
231      * Must be called on context.
232      */
233     private bool isLocalOpen() {
234         ProtonConnection conn = this.connection;
235         return conn !is null && (cast(ProtonConnectionImpl) conn)
236             .getLocalState() == EndpointState.ACTIVE;
237     }
238 
239     /**
240      * Must be called on context.
241      */
242     private bool isRemoteOpen() {
243         ProtonConnection conn = this.connection;
244         return conn !is null && (cast(ProtonConnectionImpl) conn)
245             .getRemoteState() == EndpointState.ACTIVE;
246     }
247 
248     override AmqpConnection exceptionHandler(Handler!Throwable handler) {
249         this._exceptionHandler = handler;
250         return this;
251     }
252 
253     // dfmt off
254     override AmqpConnection close(AsyncResultHandler!Void done) {
255         ProtonConnection actualConnection = connection;
256         if (actualConnection is null || _isClosed || _isClosing || (!isLocalOpen() && !isRemoteOpen())) {
257             if (done !is null) {
258                 done(succeededResult!(Void)(null));
259             }
260             return null;
261         } else {
262             _isClosing = true;
263         }
264 
265         void onClosed(Exception ex) {
266             _isClosing = false;
267 
268             if(done !is null) {
269                 if(ex is null) {
270                     done(succeededResult!(Void)(null));
271                 } else {
272                     done(failedResult!(Void)(ex));
273                 }
274             }
275         }
276 
277         if (actualConnection.isDisconnected()) {
278             onClosed(null);
279         } else {
280             try {
281                 actualConnection
282                     .disconnectHandler( (ProtonConnection conn) {
283                         if(cas(&_isClosed, false, true)) {
284                             string msg = getErrorMessage(conn);
285                             version(HUNT_DEBUG) warning(msg);
286                             onClosed(new Exception(msg));
287                         }
288                     })
289                     .closeHandler((res) {
290                         version(HUNT_DEBUG) infof("Close handling");
291                         if(cas(&_isClosed, false, true)) {
292                             if (res.succeeded()) {
293                                 onClosed(null);
294                             } else {
295                                 onClosed(cast(Exception)res.cause());
296                             }
297                         }
298                     })
299                     .close();
300             } catch (Exception e) {
301                 warning(e.msg);
302                 version(HUNT_DEBUG) warning(e);
303                 onClosed(cast(Exception)e);
304             }
305         }
306 
307         return this;
308     }
309 
310 // dfmt on
311 
312     //Future<Void> close() {
313     //  Promise<Void> promise = Promise.promise();
314     //  close(promise);
315     //  return promise.future();
316     //}
317 
318     void unregister(AmqpSender sender) {
319         senders.remove(sender);
320     }
321 
322     void unregister(AmqpReceiver receiver) {
323         receivers.remove(receiver);
324     }
325 
326     override AmqpConnection createDynamicReceiver(Handler!AmqpReceiver completionHandler) {
327         return createReceiver(null, new AmqpReceiverOptions().setDynamic(true), completionHandler);
328     }
329 
330     //Future<AmqpReceiver> createDynamicReceiver() {
331     //  Promise<AmqpReceiver> promise = Promise.promise();
332     //  createDynamicReceiver(promise);
333     //  return promise.future();
334     //}
335 
336     override AmqpConnection createReceiver(string address, Handler!AmqpReceiver completionHandler) {
337         assert(!address.empty(), "The address must not be `null`");
338         assert(completionHandler !is null, "The completion handler must not be `null`");
339 
340         ProtonLinkOptions opts = new ProtonLinkOptions();
341         //runWithTrampoline(x -> {
342         ProtonReceiver receiver = connection.createReceiver(address, opts);
343         new AmqpReceiverImpl(address, this, new AmqpReceiverOptions(), receiver, completionHandler);
344         //});
345         return this;
346     }
347 
348     //Future<AmqpReceiver> createReceiver(String address) {
349     //  Promise<AmqpReceiver> promise = Promise.promise();
350     //  createReceiver(address, promise);
351     //  return promise.future();
352     //}
353 
354     override AmqpConnection createReceiver(string address,
355             AmqpReceiverOptions receiverOptions, Handler!AmqpReceiver completionHandler) {
356         ProtonLinkOptions opts = new ProtonLinkOptions();
357         AmqpReceiverOptions recOpts = receiverOptions is null ? new AmqpReceiverOptions()
358             : receiverOptions;
359         opts.setDynamic(recOpts.isDynamic()).setLinkName(recOpts.getLinkName());
360 
361         //   runWithTrampoline(v -> {
362         ProtonReceiver receiver = connection.createReceiver(address, opts);
363 
364         if (receiverOptions !is null) {
365             if (receiverOptions.getQos() !is null) {
366                 //receiver.setQoS(ProtonQoS.valueOf(receiverOptions.getQos().toUpper));
367                 if (receiverOptions.getQos().toUpper == "AT_MOST_ONCE") {
368                     receiver.setQoS(ProtonQoS.AT_MOST_ONCE);
369                 } else if (receiverOptions.getQos().toUpper == "AT_LEAST_ONCE") {
370                     receiver.setQoS(ProtonQoS.AT_LEAST_ONCE);
371                 }
372             }
373 
374             configureTheSource(recOpts, receiver);
375         }
376 
377         new AmqpReceiverImpl(address, this, recOpts, receiver, completionHandler);
378         //   });
379         return this;
380     }
381 
382     //Future<AmqpReceiver> createReceiver(String address, AmqpReceiverOptions receiverOptions) {
383     //  Promise<AmqpReceiver> promise = Promise.promise();
384     //  createReceiver(address, receiverOptions, promise);
385     //  return promise.future();
386     //}
387 
388     private void configureTheSource(AmqpReceiverOptions receiverOptions, ProtonReceiver receiver) {
389         hunt.proton.amqp.messaging.Source.Source source = cast(
390                 hunt.proton.amqp.messaging.Source.Source) receiver.getSource();
391 
392         List!string capabilities = receiverOptions.getCapabilities();
393         if (!capabilities.isEmpty()) {
394             //source.setCapabilities(capabilities.stream().map(Symbol::valueOf).toArray(Symbol[]::new));
395             List!Symbol tmpLst = new ArrayList!Symbol;
396             foreach (string s; capabilities) {
397                 tmpLst.add(Symbol.valueOf(s));
398             }
399             source.setCapabilities(tmpLst);
400         }
401 
402         if (receiverOptions.isDurable()) {
403             source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
404             source.setDurable(TerminusDurability.UNSETTLED_STATE);
405         }
406     }
407 
408     override AmqpConnection createSender(string address, Handler!AmqpSender completionHandler) {
409         // Objects.requireNonNull(address, "The address must be set");
410         if (address is null || address.length == 0) {
411             logError("The address must be set");
412             return null;
413         }
414         return createSender(address, new AmqpSenderOptions(), completionHandler);
415     }
416 
417     //Future<AmqpSender> createSender(String address) {
418     //  Promise<AmqpSender> promise = Promise.promise();
419     //  createSender(address, promise);
420     //  return promise.future();
421     //}
422 
423     override AmqpConnection createSender(string address, AmqpSenderOptions options,
424             Handler!AmqpSender completionHandler) {
425         if (address is null && !options.isDynamic()) {
426             throw new IllegalArgumentException("Address must be set if the link is not dynamic");
427         }
428 
429         // Objects.requireNonNull(completionHandler, "The completion handler must be set");
430 
431         if (completionHandler is null) {
432             logError("The completion handler must be set");
433             return null;
434         }
435 
436         // runWithTrampoline(x -> {
437 
438         ProtonSender sender;
439         if (options !is null) {
440             ProtonLinkOptions opts = new ProtonLinkOptions();
441             opts.setLinkName(options.getLinkName());
442             opts.setDynamic(options.isDynamic());
443 
444             sender = connection.createSender(address, opts);
445             sender.setAutoDrained(options.isAutoDrained());
446         } else {
447             sender = connection.createSender(address);
448         }
449 
450         // TODO durable?
451 
452         AmqpSenderImpl.create(sender, this, completionHandler);
453         // });
454         return this;
455     }
456 
457     //Future<AmqpSender> createSender(String address, AmqpSenderOptions options) {
458     //  Promise<AmqpSender> promise = Promise.promise();
459     //  createSender(address, options, promise);
460     //  return promise.future();
461     //}
462 
463     override AmqpConnection createAnonymousSender(Handler!AmqpSender completionHandler) {
464         // Objects.requireNonNull(completionHandler, "The completion handler must be set");
465         if (completionHandler is null) {
466             logError("The completion handler must be set");
467             return null;
468         }
469         // runWithTrampoline(x -> {
470         ProtonSender sender = connection.createSender(null);
471         AmqpSenderImpl.create(sender, this, completionHandler);
472         //  });
473         return this;
474     }
475 
476     //Future<AmqpSender> createAnonymousSender() {
477     //  Promise<AmqpSender> promise = Promise.promise();
478     //  createAnonymousSender(promise);
479     //  return promise.future();
480     //}
481 
482     ProtonConnection unwrap() {
483         return this.connection;
484     }
485 
486     AmqpClientOptions options() {
487         return _options;
488     }
489 
490     void register(AmqpSenderImpl sender) {
491         senders.add(sender);
492     }
493 
494     void register(AmqpReceiverImpl receiver) {
495         receivers.add(receiver);
496     }
497 }