1 module hunt.amqp.client.AmqpFactory; 2 3 import hunt.amqp.client.AmqpClientOptions; 4 import hunt.amqp.client.AmqpConnection; 5 import hunt.amqp.client.impl.AmqpClientImpl; 6 import hunt.amqp.client.impl.AmqpConnectionImpl; 7 import hunt.amqp.Handler; 8 import hunt.amqp.ProtonClient; 9 10 import hunt.pool.PooledObject; 11 import hunt.pool.PooledObjectFactory; 12 import hunt.pool.impl.DefaultPooledObject; 13 14 import hunt.concurrency.FuturePromise; 15 import hunt.Exceptions; 16 import hunt.logging.ConsoleLogger; 17 import hunt.net.util.HttpURI; 18 19 import std.format; 20 import std.string; 21 22 /** 23 * PoolableObjectFactory custom impl. 24 */ 25 class AmqpFactory : PooledObjectFactory!(AmqpConnection) { 26 private AmqpClientOptions _options; 27 private ProtonClient _proton; 28 29 this(AmqpClientOptions options) { 30 _options = options; 31 _proton = ProtonClient.create(); 32 } 33 34 void activateObject(IPooledObject obj) { 35 version (HUNT_AMQP_DEBUG) 36 warning("Do nothing."); 37 } 38 39 void destroyObject(IPooledObject obj) { 40 auto pooledObj = cast(PooledObject!(AmqpConnection))obj; 41 AmqpConnection conn = pooledObj.getObject(); 42 assert(conn !is null); 43 conn.close(null); 44 } 45 46 IPooledObject makeObject() { 47 // dfmt off 48 auto promise = new FuturePromise!AmqpConnection(); 49 50 AmqpConnectionImpl conn = new AmqpConnectionImpl(null, _options, _proton, 51 new class Handler!AmqpConnection { 52 void handle(AmqpConnection conn) { 53 if (conn is null) 54 promise.failed(new Exception("Unable to connect to the broker.")); 55 else 56 promise.succeeded(conn); 57 } 58 } 59 ); 60 // dfmt on 61 62 version (HUNT_AMQP_DEBUG) tracef("try to get a result in %s", _options.getIdleTimeout()); 63 AmqpConnection c = promise.get(_options.getIdleTimeout()); 64 return new DefaultPooledObject!(AmqpConnection)(c); 65 } 66 67 void passivateObject(IPooledObject obj) { 68 implementationMissing(false); 69 } 70 71 bool validateObject(IPooledObject obj) { 72 trace("running here"); 73 auto pooledObj = cast(PooledObject!(AmqpConnection))obj; 74 AmqpConnection conn = pooledObj.getObject(); 75 assert(conn !is null); 76 77 return !conn.isClosed(); 78 } 79 }