1 /*
2  * hunt-amqp-client: AMQP Client Library for D Programming Language. Support for RabbitMQ and other AMQP Server.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module hunt.amqp.client.impl.AmqpClientImpl;
12 
13 import hunt.amqp.ProtonClient;
14 
15 import hunt.collection.ArrayList;
16 import hunt.collection.List;
17 import hunt.amqp.client.AmqpClient;
18 import hunt.amqp.client.AmqpClientOptions;
19 import hunt.amqp.client.AmqpConnection;
20 import hunt.amqp.Handler;
21 import hunt.amqp.client.impl.AmqpConnectionImpl;
22 import hunt.amqp.client.AmqpReceiver;
23 import hunt.amqp.client.AmqpSender;
24 import hunt.amqp.client.AmqpReceiverOptions;
25 import hunt.amqp.client.AmqpSenderOptions;
26 
27 import hunt.Assert;
28 import hunt.concurrency.CompletableFuture;
29 import hunt.concurrency.Future;
30 import hunt.concurrency.FuturePromise;
31 import hunt.concurrency.Promise;
32 import hunt.logging;
33 import hunt.net.AsyncResult;
34 import hunt.Object;
35 import hunt.Exceptions;
36 import hunt.String;
37 
38 /**
39  * 
40  */
41 class AmqpClientImpl : AmqpClient {
42 
43     private ProtonClient proton;
44     private AmqpClientOptions options;
45 
46     private List!AmqpConnection connections; //= new CopyOnWriteArrayList<>();
47     private bool mustCloseVertxOnClose;
48 
49     this(AmqpClientOptions options, bool mustCloseVertxOnClose) {
50         if (options is null) {
51             this.options = new AmqpClientOptions();
52         } else {
53             this.options = options;
54         }
55         this.proton = ProtonClient.create();
56         this.mustCloseVertxOnClose = mustCloseVertxOnClose;
57         this.connections = new ArrayList!AmqpConnection;
58     }
59 
60     AmqpClient connect(AsyncResultHandler!AmqpConnection connectionHandler) {
61         if (options.getHost() is null) {
62             logError("Host must be set");
63         }
64         if (connectionHandler is null) {
65             logError("Handler must not be null");
66         }
67         // options.getHost(), "Host must be set");
68         // Objects.requireNonNull(connectionHandler, "Handler must not be null");
69         new AmqpConnectionImpl(this, options, proton, connectionHandler);
70         return this;
71     }
72 
73     Future!AmqpConnection connectAsync() {
74         // dfmt off
75         auto promise = new FuturePromise!AmqpConnection();
76 
77         connect((ar) {
78             if(ar.succeeded()) {
79                     promise.succeeded(ar.result());
80             } else {
81                 Throwable th = ar.cause();
82                 warning(th.msg);
83                 version(HUNT_DEWBUG) warning(th);
84                 promise.failed(new Exception("Unable to connect to the broker."));
85             }
86             // void handle(AmqpConnection conn) {
87             //     if (conn is null) 
88             //         promise.failed(new Exception("Unable to connect to the broker."));
89             //     else
90             //         promise.succeeded(conn);
91             // }
92         });
93 
94         // dfmt on
95         return promise;
96     }
97 
98     AmqpConnection connect() {
99         Future!AmqpConnection promise = connectAsync();
100         version (HUNT_AMQP_DEBUG)
101             warning("try to get a result");
102         return promise.get(options.getIdleTimeout());
103 
104     }
105 
106 // dfmt off
107     void close(AsyncResultHandler!Void handler) {
108         if(handler is null) return;
109 
110         Future!(void)[] actions;
111         foreach (AmqpConnection connection ; connections) {
112             FuturePromise!(void) future = new FuturePromise!void();
113             connection.close((ar) {
114                 if (ar.succeeded()) {
115                     future.succeeded();
116                 } else {
117                     future.failed(cast(Exception) ar.cause());
118                 }
119             });
120 
121             actions ~= future;
122         }
123 
124         CompletableFuture!bool cf = supplyAsync!(bool)(() {
125             foreach(Future!void f; actions) {
126                 try {
127                     f.get();
128                 } catch(Throwable th) {
129                     warning(th.msg);
130                     version(HUNT_DEBUG) warning(th);
131                     return false;
132                 }
133             }
134 
135             return true;
136         });
137 
138         cf.thenAccept((done) {
139             version(HUNT_DEBUG) info("All connections closed.");
140             connections = null;
141             if(done) {
142                 handler(succeededResult!(Void)(null));
143             } else {
144                 Exception ex = new Exception("Failed to close the connections. See the log for more details.");
145                 handler(failedResult!(Void)(ex));
146             }
147         });
148     }
149 
150     //Future<Void> close() {
151     //  Promise<Void> promise = Promise.promise();
152     //  close(promise);
153     //  return promise.future();
154     //}
155 
156     AmqpClient createReceiver(string address, Handler!AmqpReceiver completionHandler) {
157         return connect((res)  {
158          if (res.failed()) {
159            completionHandler.handle(null);
160          } else {
161            res.result().createReceiver(address, completionHandler);
162          }
163         });
164 
165         // return connect(new class Handler!AmqpConnection {
166         //     void handle(AmqpConnection conn) {
167         //         if (conn !is null) {
168         //             conn.createReceiver(address, completionHandler);
169         //         } else {
170         //             completionHandler.handle(null);
171         //         }
172         //     }
173         // });
174     }
175 
176     //Future<AmqpReceiver> createReceiver(String address) {
177     //  Promise<AmqpReceiver> promise = Promise.promise();
178     //  createReceiver(address, promise);
179     //  return promise.future();
180     //}
181 
182     AmqpClient createReceiver(string address, AmqpReceiverOptions receiverOptions,
183             Handler!AmqpReceiver completionHandler) {
184         return connect((res) {
185             if(res.succeeded()) {
186                 res.result().createReceiver(address, receiverOptions, completionHandler);
187             } else {
188                 completionHandler.handle(null);
189             }
190 
191             // void handle(AmqpConnection conn) {
192             //     if (conn !is null) {
193             //         conn.createReceiver(address, receiverOptions, completionHandler);
194             //     } else {
195             //         completionHandler.handle(null);
196             //     }
197             // }
198         });
199     }
200 
201     //Future<AmqpReceiver> createReceiver(String address, AmqpReceiverOptions receiverOptions) {
202     //  Promise<AmqpReceiver> promise = Promise.promise();
203     //  createReceiver(address, receiverOptions, promise);
204     //  return promise.future();
205     //}
206 
207     AmqpClient createSender(string address, Handler!AmqpSender completionHandler) {
208         return connect((ar) {
209             if(ar.succeeded()) {
210                 auto conn = ar.result();
211                 conn.createSender(address, completionHandler);
212             } else {
213                 Throwable th = ar.cause();
214                 warning(th.msg);
215                 version(HUNT_DEWBUG) warning(th);
216                 completionHandler.handle(null);
217             }
218         });
219     }
220 
221     //Future<AmqpSender> createSender(String address) {
222     //  Promise<AmqpSender> promise = Promise.promise();
223     //  createSender(address, promise);
224     //  return promise.future();
225     //}
226 
227     AmqpClient createSender(string address, AmqpSenderOptions options,
228             Handler!AmqpSender completionHandler) {
229 
230         return connect((ar) {
231             if(ar.succeeded()) {
232                 auto conn = ar.result();
233                 conn.createSender(address, options, completionHandler);
234             } else {
235                 Throwable th = ar.cause();
236                 warning(th.msg);
237                 version(HUNT_DEWBUG) warning(th);
238                 completionHandler.handle(null);
239             }
240         });
241     }
242 
243 // dfmt on
244 
245     //Future<AmqpSender> createSender(String address, AmqpSenderOptions options) {
246     //  Promise<AmqpSender> promise = Promise.promise();
247     //  createSender(address, options, promise);
248     //  return promise.future();
249     //}
250     //
251     void register(AmqpConnectionImpl connection) {
252         synchronized(this) {
253             connections.add(connection);
254         }
255     }
256 }