1 module hunt.amqp.client.AmqpFactory; 2 3 import hunt.amqp.client.AmqpClientOptions; 4 import hunt.amqp.client.AmqpConnection; 5 import hunt.amqp.client.impl.AmqpClientImpl; 6 import hunt.amqp.client.impl.AmqpConnectionImpl; 7 import hunt.amqp.Handler; 8 import hunt.amqp.ProtonClient; 9 10 import hunt.pool.PooledObject; 11 import hunt.pool.PooledObjectFactory; 12 import hunt.pool.impl.DefaultPooledObject; 13 14 import hunt.concurrency.FuturePromise; 15 import hunt.Exceptions; 16 import hunt.logging.ConsoleLogger; 17 import hunt.net.util.HttpURI; 18 import hunt.net.AsyncResult; 19 20 import std.format; 21 import std.string; 22 23 /** 24 * PoolableObjectFactory custom impl. 25 */ 26 class AmqpFactory : PooledObjectFactory!(AmqpConnection) { 27 private AmqpClientOptions _options; 28 private ProtonClient _proton; 29 30 this(AmqpClientOptions options) { 31 _options = options; 32 _proton = ProtonClient.create(); 33 } 34 35 void activateObject(IPooledObject obj) { 36 version (HUNT_AMQP_DEBUG) 37 warning("Do nothing."); 38 } 39 40 void destroyObject(IPooledObject obj) { 41 auto pooledObj = cast(PooledObject!(AmqpConnection))obj; 42 AmqpConnection conn = pooledObj.getObject(); 43 assert(conn !is null); 44 conn.close(null); 45 } 46 47 IPooledObject makeObject() { 48 // dfmt off 49 auto promise = new FuturePromise!AmqpConnection(); 50 51 AmqpConnectionImpl conn = new AmqpConnectionImpl(null, _options, _proton, 52 (ar) { 53 if(ar.succeeded()) { 54 promise.succeeded(ar.result()); 55 } else { 56 Throwable th = ar.cause(); 57 warning(th.msg); 58 version(HUNT_DEWBUG) warning(th); 59 promise.failed(new Exception("Unable to connect to the broker.")); 60 } 61 } 62 ); 63 // dfmt on 64 65 version (HUNT_AMQP_DEBUG) tracef("try to get a result in %s", _options.getIdleTimeout()); 66 AmqpConnection c = promise.get(_options.getIdleTimeout()); 67 return new DefaultPooledObject!(AmqpConnection)(c); 68 } 69 70 void passivateObject(IPooledObject obj) { 71 implementationMissing(false); 72 } 73 74 bool validateObject(IPooledObject obj) { 75 trace("running here"); 76 auto pooledObj = cast(PooledObject!(AmqpConnection))obj; 77 AmqpConnection conn = pooledObj.getObject(); 78 assert(conn !is null); 79 80 return !conn.isClosed(); 81 } 82 }