1 /* 2 * hunt-amqp-client: AMQP Client Library for D Programming Language. Support for RabbitMQ and other AMQP Server. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module hunt.amqp.client.impl.AmqpClientImpl; 12 13 import hunt.amqp.ProtonClient; 14 15 import hunt.collection.ArrayList; 16 import hunt.collection.List; 17 import hunt.amqp.client.AmqpClient; 18 import hunt.amqp.client.AmqpClientOptions; 19 import hunt.amqp.client.AmqpConnection; 20 import hunt.amqp.Handler; 21 import hunt.amqp.client.impl.AmqpConnectionImpl; 22 import hunt.amqp.client.AmqpReceiver; 23 import hunt.amqp.client.AmqpSender; 24 import hunt.amqp.client.AmqpReceiverOptions; 25 import hunt.amqp.client.AmqpSenderOptions; 26 27 import hunt.Assert; 28 import hunt.concurrency.Future; 29 import hunt.concurrency.FuturePromise; 30 import hunt.concurrency.Promise; 31 import hunt.logging; 32 import hunt.Object; 33 import hunt.Exceptions; 34 import hunt.String; 35 36 /** 37 * 38 */ 39 class AmqpClientImpl : AmqpClient { 40 41 private ProtonClient proton; 42 private AmqpClientOptions options; 43 44 private List!AmqpConnection connections; //= new CopyOnWriteArrayList<>(); 45 private bool mustCloseVertxOnClose; 46 47 this(AmqpClientOptions options, bool mustCloseVertxOnClose) { 48 if (options is null) { 49 this.options = new AmqpClientOptions(); 50 } else { 51 this.options = options; 52 } 53 this.proton = ProtonClient.create(); 54 this.mustCloseVertxOnClose = mustCloseVertxOnClose; 55 this.connections = new ArrayList!AmqpConnection; 56 } 57 58 AmqpClient connect(Handler!AmqpConnection connectionHandler) { 59 if (options.getHost() is null) { 60 logError("Host must be set"); 61 } 62 if (connectionHandler is null) { 63 logError("Handler must not be null"); 64 } 65 // options.getHost(), "Host must be set"); 66 // Objects.requireNonNull(connectionHandler, "Handler must not be null"); 67 new AmqpConnectionImpl(this, options, proton, connectionHandler); 68 return this; 69 } 70 71 Future!AmqpConnection connectAsync() { 72 // dfmt off 73 auto promise = new FuturePromise!AmqpConnection(); 74 75 connect(new class Handler!AmqpConnection { 76 void handle(AmqpConnection conn) { 77 if (conn is null) 78 promise.failed(new Exception("Unable to connect to the broker.")); 79 else 80 promise.succeeded(conn); 81 } 82 }); 83 84 // dfmt on 85 return promise; 86 } 87 88 AmqpConnection connect() { 89 Future!AmqpConnection promise = connectAsync(); 90 version (HUNT_AMQP_DEBUG) 91 warning("try to get a result"); 92 return promise.get(options.getIdleTimeout()); 93 94 } 95 96 // dfmt off 97 void close(Handler!Void handler) { 98 //List<Future> actions = new ArrayList<>(); 99 //foreach (AmqpConnection connection ; connections) { 100 // Promise<Void> future = Promise.promise(); 101 // connection.close(future); 102 // actions.add(future.future()); 103 //} 104 // 105 //CompositeFuture.join(actions).setHandler(done -> { 106 // connections.clear(); 107 // if (mustCloseVertxOnClose) { 108 // vertx.close(x -> { 109 // if (done.succeeded() && x.succeeded()) { 110 // if (handler !is null) { 111 // handler.handle(Future.succeededFuture()); 112 // } 113 // } else { 114 // if (handler !is null) { 115 // handler.handle(Future.failedFuture(done.failed() ? done.cause() : x.cause())); 116 // } 117 // } 118 // }); 119 // } else if (handler !is null) { 120 // handler.handle(done.mapEmpty()); 121 // } 122 //}); 123 implementationMissing(false); 124 } 125 126 //Future<Void> close() { 127 // Promise<Void> promise = Promise.promise(); 128 // close(promise); 129 // return promise.future(); 130 //} 131 132 AmqpClient createReceiver(string address, Handler!AmqpReceiver completionHandler) { 133 134 //return connect(res -> { 135 // if (res.failed()) { 136 // completionHandler.handle(res.mapEmpty()); 137 // } else { 138 // res.result().createReceiver(address, completionHandler); 139 // } 140 //}); 141 return connect(new class Handler!AmqpConnection { 142 void handle(AmqpConnection conn) { 143 if (conn !is null) { 144 conn.createReceiver(address, completionHandler); 145 } else { 146 completionHandler.handle(null); 147 } 148 } 149 }); 150 } 151 152 //Future<AmqpReceiver> createReceiver(String address) { 153 // Promise<AmqpReceiver> promise = Promise.promise(); 154 // createReceiver(address, promise); 155 // return promise.future(); 156 //} 157 158 AmqpClient createReceiver(string address, AmqpReceiverOptions receiverOptions, 159 Handler!AmqpReceiver completionHandler) { 160 return connect(new class Handler!AmqpConnection { 161 void handle(AmqpConnection conn) { 162 if (conn !is null) { 163 conn.createReceiver(address, receiverOptions, completionHandler); 164 } else { 165 completionHandler.handle(null); 166 } 167 } 168 }); 169 } 170 171 //Future<AmqpReceiver> createReceiver(String address, AmqpReceiverOptions receiverOptions) { 172 // Promise<AmqpReceiver> promise = Promise.promise(); 173 // createReceiver(address, receiverOptions, promise); 174 // return promise.future(); 175 //} 176 177 AmqpClient createSender(string address, Handler!AmqpSender completionHandler) { 178 return connect(new class Handler!AmqpConnection { 179 void handle(AmqpConnection conn) { 180 if (conn !is null) { 181 conn.createSender(address, completionHandler); 182 } else { 183 completionHandler.handle(null); 184 } 185 } 186 }); 187 } 188 189 //Future<AmqpSender> createSender(String address) { 190 // Promise<AmqpSender> promise = Promise.promise(); 191 // createSender(address, promise); 192 // return promise.future(); 193 //} 194 195 AmqpClient createSender(string address, AmqpSenderOptions options, 196 Handler!AmqpSender completionHandler) { 197 return connect(new class Handler!AmqpConnection { 198 void handle(AmqpConnection conn) { 199 if (conn !is null) { 200 conn.createSender(address, options, completionHandler); 201 } else { 202 completionHandler.handle(null); 203 } 204 } 205 }); 206 } 207 208 // dfmt on 209 210 //Future<AmqpSender> createSender(String address, AmqpSenderOptions options) { 211 // Promise<AmqpSender> promise = Promise.promise(); 212 // createSender(address, options, promise); 213 // return promise.future(); 214 //} 215 // 216 void register(AmqpConnectionImpl connection) { 217 synchronized(this) { 218 connections.add(connection); 219 } 220 } 221 }