1 /*
2  * hunt-amqp-client: AMQP Client Library for D Programming Language. Support for RabbitMQ and other AMQP Server.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module hunt.amqp.client.impl.AmqpMessageImpl;
12 
13 import hunt.proton.amqp.messaging.Section;
14 import hunt.amqp.client.AmqpMessage;
15 //import hunt.core.buffer.Buffer;
16 import hunt.collection.ByteBuffer;
17 //import hunt.core.json.JsonArray;
18 //import hunt.core.json.JsonObject;
19 import hunt.amqp.ProtonDelivery;
20 import hunt.amqp.ProtonHelper;
21 import hunt.proton.amqp.Symbol;
22 //import hunt.proton.amqp.messaging.*;
23 import hunt.proton.message.Message;
24 import hunt.proton.amqp.messaging.AmqpValue;
25 import hunt.proton.amqp.messaging.AmqpSequence;
26 import hunt.proton.amqp.messaging.Data;
27 import hunt.proton.amqp.messaging.ApplicationProperties;
28 import hunt.Exceptions;
29 //import hunt.collection.Date;
30 import hunt.collection.List;
31 import hunt.collection.Map;
32 import hunt.Boolean;
33 import hunt.Short;
34 import hunt.Byte;
35 import hunt.Long;
36 import hunt.Float;
37 import hunt.Char;
38 import hunt.Double;
39 import hunt.Integer;
40 import hunt.String;
41 
42 //import hunt.collection.UUID;
43 
44 class AmqpMessageImpl : AmqpMessage {
45   private  Message message;
46   private  ProtonDelivery delivery;
47 
48   this(Message message, ProtonDelivery delivery) {
49     this.message = message;
50     this.delivery = delivery;
51   }
52 
53   this(Message message) {
54     this.message = message;
55     this.delivery = null;
56   }
57 
58 
59   public bool isDurable() {
60     return message.isDurable();
61   }
62 
63 
64   public bool isFirstAcquirer() {
65     return message.isFirstAcquirer();
66   }
67 
68 
69   public int priority() {
70     return message.getPriority();
71   }
72 
73 
74   public string id() {
75     Object id = message.getMessageId();
76     if (id !is null) {
77       return (cast(String)id).value;
78     }
79     return null;
80   }
81 
82 
83   public string address() {
84     //return message.getAddress().value;
85    return message.getAddress() is null ? null :message.getAddress().value;
86   }
87 
88 
89   public string replyTo() {
90     return message.getReplyTo().value;
91   }
92 
93 
94   public string correlationId() {
95     Object id = message.getCorrelationId();
96     if (id !is null) {
97       return (cast(String)id).value;
98     }
99     return null;
100   }
101 
102 
103   public bool isBodyNull() {
104     return message.getBody() is null || getAmqpValue() is null;
105   }
106 
107   private Object getAmqpValue() {
108     if (message.getBody().getType() != Section.SectionType.AmqpValue) {
109       throw new IllegalStateException("The body is not an AMQP Value");
110     }
111     return (cast(AmqpValue) message.getBody()).getValue();
112   }
113 
114 
115   public bool bodyAsBoolean() {
116     return (cast(Boolean) getAmqpValue()).booleanValue;
117   }
118 
119 
120   public byte bodyAsByte() {
121     return (cast(Byte) getAmqpValue()).byteValue;
122   }
123 
124 
125   public short bodyAsShort() {
126     return (cast(Short) getAmqpValue()).shortValue;
127   }
128 
129 
130   public int bodyAsInteger() {
131     return (cast(Integer) getAmqpValue()).intValue;
132   }
133 
134 
135   public long bodyAsLong() {
136     return (cast(Long) getAmqpValue()).longValue;
137   }
138 
139 
140   public float bodyAsFloat() {
141     return (cast(Float) getAmqpValue()).floatValue;
142   }
143 
144 
145   public double bodyAsDouble() {
146     return (cast(Double) getAmqpValue()).doubleValue;
147   }
148 
149 
150   public char bodyAsChar() {
151     return (cast(Char) getAmqpValue()).value;
152   }
153 
154 
155   //public Instant bodyAsTimestamp() {
156   //  Object value = getAmqpValue();
157   //  if (!(value instanceof Date)) {
158   //    throw new IllegalStateException("Expecting a Date object, got a " + value);
159   //  }
160   //  return ((Date) value).toInstant();
161   //}
162   //
163   //
164   //public UUID bodyAsUUID() {
165   //  return (UUID) getAmqpValue();
166   //}
167 
168 
169   public byte[] bodyAsBinary() {
170     Section bd = message.getBody();
171     if (bd.getType() != Section.SectionType.Data) {
172       throw new IllegalStateException("The body is not of type 'data'");
173     }
174     byte[] bytes = (cast(Data) message.getBody()).getValue().getArray();
175    // return Buffer.buffer(bytes);
176     return bytes;
177   }
178 
179 
180   public string bodyAsString() {
181     return (cast(String) getAmqpValue()).value;
182   }
183 
184 
185   public string bodyAsSymbol() {
186     Object value = getAmqpValue();
187     if (cast(Symbol)value !is null) {
188       return (cast(Symbol) value).toString();
189     }
190     throw new IllegalStateException("Expected a Symbol, got a ");
191   }
192 
193   /**
194    * @noinspection unchecked
195    */
196 
197   public List!Object bodyAsList() {
198     Section bd = message.getBody();
199     if (bd.getType() == Section.SectionType.AmqpSequence) {
200       return (cast(AmqpSequence) message.getBody()).getValue();
201     } else {
202       //Object value = getAmqpValue();
203       //if (value instanceof List) {
204       //  return (List<T>) value;
205       //}
206       throw new IllegalStateException("Cannot extract a list from the message body");
207     }
208   }
209 
210   /**
211    * @noinspection unchecked
212    */
213 
214   //public <K, V> Map<K, V> bodyAsMap() {
215   //  Object value = getAmqpValue();
216   //  if (value instanceof Map) {
217   //    return (Map<K, V>) value;
218   //  }
219   //  throw new IllegalStateException("Cannot extract a map from the message body");
220   //}
221 
222 
223   //public JsonObject bodyAsJsonObject() {
224   //  return bodyAsBinary().toJsonObject();
225   //}
226 
227 
228   //public JsonArray bodyAsJsonArray() {
229   //  return bodyAsBinary().toJsonArray();
230   //}
231 
232 
233   public string subject() {
234     return message.getSubject().value;
235   }
236 
237 
238   public string contentType() {
239     return message.getContentType().value;
240   }
241 
242 
243   public string contentEncoding() {
244     return message.getContentEncoding().value;
245   }
246 
247 
248   public long expiryTime() {
249     return message.getExpiryTime();
250   }
251 
252 
253   public long creationTime() {
254     return message.getCreationTime();
255   }
256 
257 
258   public long ttl() {
259     return message.getTtl();
260   }
261 
262 
263   public int deliveryCount() {
264     return cast(int) message.getDeliveryCount();
265   }
266 
267 
268   public string groupId() {
269     return message.getGroupId().value;
270   }
271 
272 
273   public string replyToGroupId() {
274     return message.getReplyToGroupId().value;
275   }
276 
277 
278   public long groupSequence() {
279     return message.getGroupSequence();
280   }
281 
282 
283   public ApplicationProperties applicationProperties() {
284     ApplicationProperties properties = message.getApplicationProperties();
285     if (properties is null) {
286       return null;
287     }
288   //  return JsonObject.mapFrom(properties.getValue());
289     return properties;
290   }
291 
292 
293   public Message unwrap() {
294     return message;
295   }
296 
297 
298   public AmqpMessage accepted() {
299     if (delivery !is null) {
300       ProtonHelper.accepted(delivery, true);
301     } else {
302       throw new IllegalStateException("The message is not a received message");
303     }
304     return this;
305   }
306 
307 
308   public AmqpMessage rejected() {
309     if (delivery !is null) {
310       ProtonHelper.rejected(delivery, true);
311     } else {
312       throw new IllegalStateException("The message is not a received message");
313     }
314     return this;
315   }
316 
317 
318   public AmqpMessage released() {
319     if (delivery !is null) {
320       ProtonHelper.released(delivery, true);
321     } else {
322       throw new IllegalStateException("The message is not a received message");
323     }
324     return this;
325   }
326 
327 
328   public AmqpMessage modified(bool didItFail, bool wasItDeliveredHere) {
329     if (delivery !is null) {
330       ProtonHelper.modified(delivery, true, didItFail, wasItDeliveredHere);
331     } else {
332       throw new IllegalStateException("The message is not a received message");
333     }
334     return this;
335   }
336 }