1 module hunt.amqp.client.AmqpFactory;
2 
3 import hunt.amqp.client.AmqpClientOptions;
4 import hunt.amqp.client.AmqpConnection;
5 import hunt.amqp.client.impl.AmqpClientImpl;
6 import hunt.amqp.client.impl.AmqpConnectionImpl;
7 import hunt.amqp.Handler;
8 import hunt.amqp.ProtonClient;
9 
10 import hunt.pool.PooledObject;
11 import hunt.pool.PooledObjectFactory;
12 import hunt.pool.impl.DefaultPooledObject;
13 
14 import hunt.concurrency.FuturePromise;
15 import hunt.Exceptions;
16 import hunt.logging.ConsoleLogger;
17 import hunt.net.util.HttpURI;
18 
19 import std.format;
20 import std.string;
21 
22 /**
23  * PoolableObjectFactory custom impl.
24  */
25 class AmqpFactory : PooledObjectFactory!(AmqpConnection) {
26     private AmqpClientOptions _options;
27     private ProtonClient _proton;
28 
29     this(AmqpClientOptions options) {
30         _options = options;
31         _proton = ProtonClient.create();
32     }
33 
34     void activateObject(IPooledObject obj) {
35         version (HUNT_AMQP_DEBUG)
36             warning("Do nothing.");
37     }
38 
39     void destroyObject(IPooledObject obj) {
40         auto pooledObj = cast(PooledObject!(AmqpConnection))obj;
41         AmqpConnection conn = pooledObj.getObject();
42         assert(conn !is null);
43         conn.close(null);
44     }
45 
46     IPooledObject makeObject() {
47         // dfmt off
48         auto promise = new FuturePromise!AmqpConnection();
49 
50         AmqpConnectionImpl conn = new AmqpConnectionImpl(null, _options, _proton, 
51             new class Handler!AmqpConnection {
52                 void handle(AmqpConnection conn) {
53                     if (conn is null) 
54                         promise.failed(new Exception("Unable to connect to the broker."));
55                     else
56                         promise.succeeded(conn);
57                 }
58             }
59         );
60         // dfmt on
61 
62         version (HUNT_AMQP_DEBUG) tracef("try to get a result in %s", _options.getIdleTimeout());
63         AmqpConnection c = promise.get(_options.getIdleTimeout());
64         return new DefaultPooledObject!(AmqpConnection)(c);
65     }
66 
67     void passivateObject(IPooledObject obj) {
68         implementationMissing(false);
69     }
70 
71     bool validateObject(IPooledObject obj) {
72         trace("running here");
73         auto pooledObj = cast(PooledObject!(AmqpConnection))obj;
74         AmqpConnection conn = pooledObj.getObject();
75         assert(conn !is null);
76 
77         return !conn.isClosed();
78     }
79 }