1 /*
2  * hunt-amqp-client: AMQP Client Library for D Programming Language. Support for RabbitMQ and other AMQP Server.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module hunt.amqp.client.impl.AmqpClientImpl;
12 
13 import hunt.amqp.ProtonClient;
14 
15 import hunt.collection.ArrayList;
16 import hunt.collection.List;
17 import hunt.amqp.client.AmqpClient;
18 import hunt.amqp.client.AmqpClientOptions;
19 import hunt.amqp.client.AmqpConnection;
20 import hunt.amqp.Handler;
21 import hunt.amqp.client.impl.AmqpConnectionImpl;
22 import hunt.amqp.client.AmqpReceiver;
23 import hunt.amqp.client.AmqpSender;
24 import hunt.amqp.client.AmqpReceiverOptions;
25 import hunt.amqp.client.AmqpSenderOptions;
26 
27 import hunt.Assert;
28 import hunt.concurrency.Future;
29 import hunt.concurrency.FuturePromise;
30 import hunt.concurrency.Promise;
31 import hunt.logging;
32 import hunt.Object;
33 import hunt.Exceptions;
34 import hunt.String;
35 
36 /**
37  * 
38  */
39 class AmqpClientImpl : AmqpClient {
40 
41     private ProtonClient proton;
42     private AmqpClientOptions options;
43 
44     private List!AmqpConnection connections; //= new CopyOnWriteArrayList<>();
45     private bool mustCloseVertxOnClose;
46 
47     this(AmqpClientOptions options, bool mustCloseVertxOnClose) {
48         if (options is null) {
49             this.options = new AmqpClientOptions();
50         } else {
51             this.options = options;
52         }
53         this.proton = ProtonClient.create();
54         this.mustCloseVertxOnClose = mustCloseVertxOnClose;
55         this.connections = new ArrayList!AmqpConnection;
56     }
57 
58     AmqpClient connect(Handler!AmqpConnection connectionHandler) {
59         if (options.getHost() is null) {
60             logError("Host must be set");
61         }
62         if (connectionHandler is null) {
63             logError("Handler must not be null");
64         }
65         // options.getHost(), "Host must be set");
66         // Objects.requireNonNull(connectionHandler, "Handler must not be null");
67         new AmqpConnectionImpl(this, options, proton, connectionHandler);
68         return this;
69     }
70 
71     Future!AmqpConnection connectAsync() {
72         // dfmt off
73         auto promise = new FuturePromise!AmqpConnection();
74 
75         connect(new class Handler!AmqpConnection {
76             void handle(AmqpConnection conn) {
77                 if (conn is null) 
78                     promise.failed(new Exception("Unable to connect to the broker."));
79                 else
80                     promise.succeeded(conn);
81             }
82         });
83 
84         // dfmt on
85         return promise;
86     }
87 
88     AmqpConnection connect() {
89         Future!AmqpConnection promise = connectAsync();
90         version (HUNT_AMQP_DEBUG)
91             warning("try to get a result");
92         return promise.get(options.getIdleTimeout());
93 
94     }
95 
96 // dfmt off
97     void close(Handler!Void handler) {
98         //List<Future> actions = new ArrayList<>();
99         //foreach (AmqpConnection connection ; connections) {
100         //  Promise<Void> future = Promise.promise();
101         //  connection.close(future);
102         //  actions.add(future.future());
103         //}
104         //
105         //CompositeFuture.join(actions).setHandler(done -> {
106         //  connections.clear();
107         //  if (mustCloseVertxOnClose) {
108         //    vertx.close(x -> {
109         //      if (done.succeeded() && x.succeeded()) {
110         //        if (handler !is null) {
111         //          handler.handle(Future.succeededFuture());
112         //        }
113         //      } else {
114         //        if (handler !is null) {
115         //          handler.handle(Future.failedFuture(done.failed() ? done.cause() : x.cause()));
116         //        }
117         //      }
118         //    });
119         //  } else if (handler !is null) {
120         //    handler.handle(done.mapEmpty());
121         //  }
122         //});
123         implementationMissing(false);
124     }
125 
126     //Future<Void> close() {
127     //  Promise<Void> promise = Promise.promise();
128     //  close(promise);
129     //  return promise.future();
130     //}
131 
132     AmqpClient createReceiver(string address, Handler!AmqpReceiver completionHandler) {
133 
134         //return connect(res -> {
135         //  if (res.failed()) {
136         //    completionHandler.handle(res.mapEmpty());
137         //  } else {
138         //    res.result().createReceiver(address, completionHandler);
139         //  }
140         //});
141         return connect(new class Handler!AmqpConnection {
142             void handle(AmqpConnection conn) {
143                 if (conn !is null) {
144                     conn.createReceiver(address, completionHandler);
145                 } else {
146                     completionHandler.handle(null);
147                 }
148             }
149         });
150     }
151 
152     //Future<AmqpReceiver> createReceiver(String address) {
153     //  Promise<AmqpReceiver> promise = Promise.promise();
154     //  createReceiver(address, promise);
155     //  return promise.future();
156     //}
157 
158     AmqpClient createReceiver(string address, AmqpReceiverOptions receiverOptions,
159             Handler!AmqpReceiver completionHandler) {
160         return connect(new class Handler!AmqpConnection {
161             void handle(AmqpConnection conn) {
162                 if (conn !is null) {
163                     conn.createReceiver(address, receiverOptions, completionHandler);
164                 } else {
165                     completionHandler.handle(null);
166                 }
167             }
168         });
169     }
170 
171     //Future<AmqpReceiver> createReceiver(String address, AmqpReceiverOptions receiverOptions) {
172     //  Promise<AmqpReceiver> promise = Promise.promise();
173     //  createReceiver(address, receiverOptions, promise);
174     //  return promise.future();
175     //}
176 
177     AmqpClient createSender(string address, Handler!AmqpSender completionHandler) {
178         return connect(new class Handler!AmqpConnection {
179             void handle(AmqpConnection conn) {
180                 if (conn !is null) {
181                     conn.createSender(address, completionHandler);
182                 } else {
183                     completionHandler.handle(null);
184                 }
185             }
186         });
187     }
188 
189     //Future<AmqpSender> createSender(String address) {
190     //  Promise<AmqpSender> promise = Promise.promise();
191     //  createSender(address, promise);
192     //  return promise.future();
193     //}
194 
195     AmqpClient createSender(string address, AmqpSenderOptions options,
196             Handler!AmqpSender completionHandler) {
197         return connect(new class Handler!AmqpConnection {
198             void handle(AmqpConnection conn) {
199                 if (conn !is null) {
200                     conn.createSender(address, options, completionHandler);
201                 } else {
202                     completionHandler.handle(null);
203                 }
204             }
205         });
206     }
207 
208 // dfmt on
209 
210     //Future<AmqpSender> createSender(String address, AmqpSenderOptions options) {
211     //  Promise<AmqpSender> promise = Promise.promise();
212     //  createSender(address, options, promise);
213     //  return promise.future();
214     //}
215     //
216     void register(AmqpConnectionImpl connection) {
217         synchronized(this) {
218             connections.add(connection);
219         }
220     }
221 }