1 /*
2  * hunt-amqp-client: AMQP Client Library for D Programming Language. Support for RabbitMQ and other AMQP Server.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module hunt.amqp.client.impl.AmqpConnectionImpl;
12 
13 import hunt.amqp.client.AmqpClientOptions;
14 import hunt.amqp.client.AmqpSender;
15 import hunt.amqp.client.AmqpReceiver;
16 import hunt.amqp.client.impl.AmqpClientImpl;
17 import hunt.amqp.client.AmqpReceiverOptions;
18 import hunt.amqp.client.impl.AmqpReceiverImpl;
19 import hunt.amqp.client.impl.AmqpSenderImpl;
20 import hunt.amqp.client.AmqpSenderOptions;
21 import hunt.amqp.client.AmqpConnection;
22 
23 //import hunt.core.*;
24 import hunt.amqp.ProtonConnection;
25 import hunt.amqp.impl.ProtonConnectionImpl;
26 import hunt.amqp.Handler;
27 import hunt.amqp.ProtonQoS;
28 import hunt.amqp.ProtonClient;
29 import hunt.amqp.ProtonLinkOptions;
30 import hunt.amqp.ProtonReceiver;
31 import hunt.amqp.ProtonSender;
32 import hunt.amqp.impl.ProtonReceiverImpl;
33 import hunt.amqp.impl.ProtonLinkImpl;
34 import hunt.proton.amqp.Symbol;
35 import hunt.proton.amqp.messaging.TerminusDurability;
36 import hunt.proton.amqp.messaging.TerminusExpiryPolicy;
37 import hunt.proton.engine.EndpointState;
38 import hunt.proton.amqp.messaging.Source;
39 
40 import hunt.collection.HashMap;
41 import hunt.collection.List;
42 import hunt.collection.Map;
43 import hunt.collection.ArrayList;
44 
45 import hunt.logging;
46 import hunt.Exceptions;
47 import hunt.Object;
48 
49 import hunt.String;
50 import std.concurrency : initOnce;
51 import std.uni;
52 import std.range;
53 
54 class AmqpConnectionImpl : AmqpConnection {
55 
56     // static  Symbol PRODUCT_KEY = Symbol.valueOf("product");
57 
58     static String PRODUCT() {
59         __gshared String m;
60         return initOnce!(m)(new String("hunt-amqp-client"));
61     }
62 
63     static Symbol PRODUCT_KEY() {
64         __gshared Symbol m;
65         return initOnce!(m)(Symbol.valueOf("product"));
66     }
67 
68     private AmqpClientOptions _options;
69     //private  AtomicBoolean _isClosed = new AtomicBoolean();
70     private bool _isClosed;
71     //private  AtomicReference<ProtonConnection> connection = new AtomicReference<>();
72     private ProtonConnection connection;
73     // private  Context context;
74 
75     private List!AmqpSender senders; //= new CopyOnWriteArrayList<>();
76     private List!AmqpReceiver receivers; // = new CopyOnWriteArrayList<>();
77     /**
78      * The exception handler, protected by the monitor lock.
79      */
80     private Handler!Throwable _exceptionHandler;
81 
82     this(AmqpClientImpl client, AmqpClientOptions options, ProtonClient proton,
83             Handler!AmqpConnection connectionHandler) {
84 
85         assert(proton !is null, "proton cannot be `null`");
86         assert(connectionHandler !is null, "connection handler cannot be `null`");
87 
88         this._options = options;
89         senders = new ArrayList!AmqpSender;
90         receivers = new ArrayList!AmqpReceiver;
91         connect(client, proton, connectionHandler);
92     }
93 
94     bool isClosed() {
95         return _isClosed;
96     }
97 
98     // dfmt off
99     private void connect(AmqpClientImpl client, ProtonClient proton, Handler!AmqpConnection connectionHandler) {
100     //void connect(ProtonClientOptions options, string host, int port, string username, string password,
101     //Handler!ProtonConnection connectionHandler);
102         proton.connect(_options, _options.getHost(), _options.getPort(), 
103             _options.getUsername(), _options.getPassword(),
104             new class Handler!ProtonConnection {
105                 // Called on the connection context.
106                 void handle(ProtonConnection ar)
107                 {
108                     if (ar !is null)
109                     {
110                         if (connection !is null) {
111                             connectionHandler.handle(null);
112                             logError("Unable to connect - already holding a connection");
113                             return;
114                         }else
115                         {
116                             connection = ar;
117                         }
118 
119                         Map!(Symbol, Object) map = new HashMap!(Symbol,Object)();
120                         map.put(AmqpConnectionImpl.PRODUCT_KEY, AmqpConnectionImpl.PRODUCT);
121                         if (_options.getContainerId() !is null) {
122                             connection.setContainer(_options.getContainerId());
123                         }
124 
125                         if (_options.getVirtualHost() !is null) {
126                             connection.setHostname(_options.getVirtualHost());
127                         }
128 
129                         connection
130                         .setProperties(map)
131                         .disconnectHandler(new class Handler!ProtonConnection {
132                             void handle(ProtonConnection var1)
133                             {
134                                 try {
135                                     onDisconnect();
136                                 } finally {
137                                     _isClosed = true;
138                                 }
139                             }
140                             })
141                             .closeHandler(new class Handler!ProtonConnection {
142                                 // Not expected closing, consider it failed
143                                 void handle(ProtonConnection var1)
144                                 {
145                                     try {
146                                         onDisconnect();
147                                     } finally {
148                                         _isClosed = true;
149                                     }
150                                 }
151                             })
152                             .openHandler(new class Handler!ProtonConnection {
153                                 void handle(ProtonConnection conn)
154                                 {
155                                     if (conn !is null) {
156                                         if(client !is null)
157                                             client.register(this.outer.outer);
158                                         _isClosed = false;
159                                         connectionHandler.handle(this.outer.outer);
160                                     } else {
161                                         _isClosed = true;
162                                         connectionHandler.handle(null);
163                                     }
164                                 }
165                             });
166 
167                         connection.open();
168                         // }
169                     } else {
170                         connectionHandler.handle(null);
171                     }
172                 }
173             }
174         );
175     }
176 
177 // dfmt on
178 
179     /**
180      * Must be called on context.
181      */
182     private void onDisconnect() {
183         Handler!Throwable h = null;
184         ProtonConnection conn = connection;
185         connection = null;
186         synchronized (this) {
187             if (_exceptionHandler !is null) {
188                 h = _exceptionHandler;
189             }
190         }
191 
192         if (h !is null) {
193             string message = getErrorMessage(conn);
194             h.handle(new Exception(message));
195         }
196     }
197 
198     private string getErrorMessage(ProtonConnection conn) {
199         string message = "Connection disconnected";
200         if (conn !is null) {
201             if (conn.getCondition() !is null && conn.getCondition().getDescription() !is null) {
202                 message ~= " - " ~ (conn.getCondition().getDescription().value);
203             } else if (conn.getRemoteCondition() !is null
204                     && conn.getRemoteCondition().getDescription() !is null) {
205                 message ~= " - " ~ conn.getRemoteCondition().getDescription().value;
206             }
207         }
208         return message;
209     }
210 
211     void runOnContext(Handler!Void action) {
212         implementationMissing(false);
213         // context.runOnContext(action);
214     }
215 
216     void runWithTrampoline(Handler!Void action) {
217         implementationMissing(false);
218         //if (Vertx.currentContext() == context) {
219         //  action.handle(null);
220         //} else {
221         //  runOnContext(action);
222         //}
223     }
224 
225     /**
226      * Must be called on context.
227      */
228     private bool isLocalOpen() {
229         ProtonConnection conn = this.connection;
230         return conn !is null && (cast(ProtonConnectionImpl) conn)
231             .getLocalState() == EndpointState.ACTIVE;
232     }
233 
234     /**
235      * Must be called on context.
236      */
237     private bool isRemoteOpen() {
238         ProtonConnection conn = this.connection;
239         return conn !is null && (cast(ProtonConnectionImpl) conn)
240             .getRemoteState() == EndpointState.ACTIVE;
241     }
242 
243     override AmqpConnection exceptionHandler(Handler!Throwable handler) {
244         this._exceptionHandler = handler;
245         return this;
246     }
247 
248     // dfmt off
249     override AmqpConnection close(Handler!Void done) {
250      // context.runOnContext(ignored -> {
251             ProtonConnection actualConnection = connection;
252             if (actualConnection is null || _isClosed || (!isLocalOpen() && !isRemoteOpen())) {
253                 if (done !is null) {
254                     done.handle(new String(""));
255                 }
256                 return null;
257             } else {
258                 _isClosed = true;
259             }
260 
261             //Promise<Void> future = Promise.promise();
262             //if (done !is null) {
263             //  future.future().setHandler(done);
264             //}
265             if (actualConnection.isDisconnected()) {
266              // future.complete();
267             } else {
268                 try {
269                     actualConnection
270                         .disconnectHandler(new class Handler!ProtonConnection{
271                         //  future.tryFail(getErrorMessage(con));
272                          void handle(ProtonConnection var1)
273                          {
274                              _isClosed = true;
275                          }
276                         })
277                         .closeHandler(new class Handler!ProtonConnection {
278                             void handle(ProtonConnection var1)
279                             {
280                                 infof("Close handling");
281                                 _isClosed = true;
282                                 //if (res.succeeded()) {
283                                 //  future.tryComplete();
284                                 //} else {
285                                 //  future.tryFail(res.cause());
286                                 //}
287                             }
288                         })
289                         .close();
290                 } catch (Exception e) {
291                     //future.fail(e);
292                     logError("AmqpConnection close error");
293                 }
294             }
295     //  });
296 
297         return this;
298     }
299 
300 // dfmt on
301 
302     //Future<Void> close() {
303     //  Promise<Void> promise = Promise.promise();
304     //  close(promise);
305     //  return promise.future();
306     //}
307 
308     void unregister(AmqpSender sender) {
309         senders.remove(sender);
310     }
311 
312     void unregister(AmqpReceiver receiver) {
313         receivers.remove(receiver);
314     }
315 
316     override AmqpConnection createDynamicReceiver(Handler!AmqpReceiver completionHandler) {
317         return createReceiver(null, new AmqpReceiverOptions().setDynamic(true), completionHandler);
318     }
319 
320     //Future<AmqpReceiver> createDynamicReceiver() {
321     //  Promise<AmqpReceiver> promise = Promise.promise();
322     //  createDynamicReceiver(promise);
323     //  return promise.future();
324     //}
325 
326     override AmqpConnection createReceiver(string address, Handler!AmqpReceiver completionHandler) {
327         assert(!address.empty(), "The address must not be `null`");
328         assert(completionHandler !is null, "The completion handler must not be `null`");
329 
330         ProtonLinkOptions opts = new ProtonLinkOptions();
331         //runWithTrampoline(x -> {
332         ProtonReceiver receiver = connection.createReceiver(address, opts);
333         new AmqpReceiverImpl(address, this, new AmqpReceiverOptions(), receiver, completionHandler);
334         //});
335         return this;
336     }
337 
338     //Future<AmqpReceiver> createReceiver(String address) {
339     //  Promise<AmqpReceiver> promise = Promise.promise();
340     //  createReceiver(address, promise);
341     //  return promise.future();
342     //}
343 
344     override AmqpConnection createReceiver(string address,
345             AmqpReceiverOptions receiverOptions, Handler!AmqpReceiver completionHandler) {
346         ProtonLinkOptions opts = new ProtonLinkOptions();
347         AmqpReceiverOptions recOpts = receiverOptions is null ? new AmqpReceiverOptions()
348             : receiverOptions;
349         opts.setDynamic(recOpts.isDynamic()).setLinkName(recOpts.getLinkName());
350 
351         //   runWithTrampoline(v -> {
352         ProtonReceiver receiver = connection.createReceiver(address, opts);
353 
354         if (receiverOptions !is null) {
355             if (receiverOptions.getQos() !is null) {
356                 //receiver.setQoS(ProtonQoS.valueOf(receiverOptions.getQos().toUpper));
357                 if (receiverOptions.getQos().toUpper == "AT_MOST_ONCE") {
358                     receiver.setQoS(ProtonQoS.AT_MOST_ONCE);
359                 } else if (receiverOptions.getQos().toUpper == "AT_LEAST_ONCE") {
360                     receiver.setQoS(ProtonQoS.AT_LEAST_ONCE);
361                 }
362             }
363 
364             configureTheSource(recOpts, receiver);
365         }
366 
367         new AmqpReceiverImpl(address, this, recOpts, receiver, completionHandler);
368         //   });
369         return this;
370     }
371 
372     //Future<AmqpReceiver> createReceiver(String address, AmqpReceiverOptions receiverOptions) {
373     //  Promise<AmqpReceiver> promise = Promise.promise();
374     //  createReceiver(address, receiverOptions, promise);
375     //  return promise.future();
376     //}
377 
378     private void configureTheSource(AmqpReceiverOptions receiverOptions, ProtonReceiver receiver) {
379         hunt.proton.amqp.messaging.Source.Source source = cast(
380                 hunt.proton.amqp.messaging.Source.Source) receiver.getSource();
381 
382         List!string capabilities = receiverOptions.getCapabilities();
383         if (!capabilities.isEmpty()) {
384             //source.setCapabilities(capabilities.stream().map(Symbol::valueOf).toArray(Symbol[]::new));
385             List!Symbol tmpLst = new ArrayList!Symbol;
386             foreach (string s; capabilities) {
387                 tmpLst.add(Symbol.valueOf(s));
388             }
389             source.setCapabilities(tmpLst);
390         }
391 
392         if (receiverOptions.isDurable()) {
393             source.setExpiryPolicy(TerminusExpiryPolicy.NEVER);
394             source.setDurable(TerminusDurability.UNSETTLED_STATE);
395         }
396     }
397 
398     override AmqpConnection createSender(string address, Handler!AmqpSender completionHandler) {
399         // Objects.requireNonNull(address, "The address must be set");
400         if (address is null || address.length == 0) {
401             logError("The address must be set");
402             return null;
403         }
404         return createSender(address, new AmqpSenderOptions(), completionHandler);
405     }
406 
407     //Future<AmqpSender> createSender(String address) {
408     //  Promise<AmqpSender> promise = Promise.promise();
409     //  createSender(address, promise);
410     //  return promise.future();
411     //}
412 
413     override AmqpConnection createSender(string address, AmqpSenderOptions options,
414             Handler!AmqpSender completionHandler) {
415         if (address is null && !options.isDynamic()) {
416             throw new IllegalArgumentException("Address must be set if the link is not dynamic");
417         }
418 
419         // Objects.requireNonNull(completionHandler, "The completion handler must be set");
420 
421         if (completionHandler is null) {
422             logError("The completion handler must be set");
423             return null;
424         }
425 
426         // runWithTrampoline(x -> {
427 
428         ProtonSender sender;
429         if (options !is null) {
430             ProtonLinkOptions opts = new ProtonLinkOptions();
431             opts.setLinkName(options.getLinkName());
432             opts.setDynamic(options.isDynamic());
433 
434             sender = connection.createSender(address, opts);
435             sender.setAutoDrained(options.isAutoDrained());
436         } else {
437             sender = connection.createSender(address);
438         }
439 
440         // TODO durable?
441 
442         AmqpSenderImpl.create(sender, this, completionHandler);
443         // });
444         return this;
445     }
446 
447     //Future<AmqpSender> createSender(String address, AmqpSenderOptions options) {
448     //  Promise<AmqpSender> promise = Promise.promise();
449     //  createSender(address, options, promise);
450     //  return promise.future();
451     //}
452 
453     override AmqpConnection createAnonymousSender(Handler!AmqpSender completionHandler) {
454         // Objects.requireNonNull(completionHandler, "The completion handler must be set");
455         if (completionHandler is null) {
456             logError("The completion handler must be set");
457             return null;
458         }
459         // runWithTrampoline(x -> {
460         ProtonSender sender = connection.createSender(null);
461         AmqpSenderImpl.create(sender, this, completionHandler);
462         //  });
463         return this;
464     }
465 
466     //Future<AmqpSender> createAnonymousSender() {
467     //  Promise<AmqpSender> promise = Promise.promise();
468     //  createAnonymousSender(promise);
469     //  return promise.future();
470     //}
471 
472     ProtonConnection unwrap() {
473         return this.connection;
474     }
475 
476     AmqpClientOptions options() {
477         return _options;
478     }
479 
480     void register(AmqpSenderImpl sender) {
481         senders.add(sender);
482     }
483 
484     void register(AmqpReceiverImpl receiver) {
485         receivers.add(receiver);
486     }
487 }