1 module hunt.amqp.client.AmqpFactory;
2 
3 import hunt.amqp.client.AmqpClientOptions;
4 import hunt.amqp.client.AmqpConnection;
5 import hunt.amqp.client.impl.AmqpClientImpl;
6 import hunt.amqp.client.impl.AmqpConnectionImpl;
7 import hunt.amqp.Handler;
8 import hunt.amqp.ProtonClient;
9 
10 import hunt.pool.PooledObject;
11 import hunt.pool.PooledObjectFactory;
12 import hunt.pool.impl.DefaultPooledObject;
13 
14 import hunt.concurrency.FuturePromise;
15 import hunt.Exceptions;
16 import hunt.logging.ConsoleLogger;
17 import hunt.net.util.HttpURI;
18 import hunt.net.AsyncResult;
19 
20 import std.format;
21 import std.string;
22 
23 /**
24  * PoolableObjectFactory custom impl.
25  */
26 class AmqpFactory : PooledObjectFactory!(AmqpConnection) {
27     private AmqpClientOptions _options;
28     private ProtonClient _proton;
29 
30     this(AmqpClientOptions options) {
31         _options = options;
32         _proton = ProtonClient.create();
33     }
34 
35     void activateObject(IPooledObject obj) {
36         version (HUNT_AMQP_DEBUG)
37             warning("Do nothing.");
38     }
39 
40     void destroyObject(IPooledObject obj) {
41         auto pooledObj = cast(PooledObject!(AmqpConnection))obj;
42         AmqpConnection conn = pooledObj.getObject();
43         assert(conn !is null);
44         conn.close(null);
45     }
46 
47     IPooledObject makeObject() {
48         // dfmt off
49         auto promise = new FuturePromise!AmqpConnection();
50 
51         AmqpConnectionImpl conn = new AmqpConnectionImpl(null, _options, _proton, 
52             (ar) {
53                 if(ar.succeeded()) {
54                     promise.succeeded(ar.result());
55                 } else {
56                     Throwable th = ar.cause();
57                     warning(th.msg);
58                     version(HUNT_DEWBUG) warning(th);
59                     promise.failed(new Exception("Unable to connect to the broker."));
60                 }
61             }
62         );
63         // dfmt on
64 
65         version (HUNT_AMQP_DEBUG) tracef("try to get a result in %s", _options.getIdleTimeout());
66         AmqpConnection c = promise.get(_options.getIdleTimeout());
67         return new DefaultPooledObject!(AmqpConnection)(c);
68     }
69 
70     void passivateObject(IPooledObject obj) {
71         implementationMissing(false);
72     }
73 
74     bool validateObject(IPooledObject obj) {
75         trace("running here");
76         auto pooledObj = cast(PooledObject!(AmqpConnection))obj;
77         AmqpConnection conn = pooledObj.getObject();
78         assert(conn !is null);
79 
80         return !conn.isClosed();
81     }
82 }