1 /*
2  * hunt-amqp-client: AMQP Client Library for D Programming Language. Support for RabbitMQ and other AMQP Server.
3  *
4  * Copyright (C) 2018-2019 HuntLabs
5  *
6  * Website: https://www.huntlabs.net
7  *
8  * Licensed under the Apache-2.0 License.
9  *
10  */
11 module hunt.amqp.client.impl.AmqpMessageImpl;
12 
13 import hunt.proton.amqp.messaging.Section;
14 import hunt.amqp.client.AmqpMessage;
15 import hunt.amqp.ProtonDelivery;
16 import hunt.amqp.ProtonHelper;
17 import hunt.proton.amqp.Symbol;
18 import hunt.proton.message.Message;
19 import hunt.proton.amqp.messaging.AmqpValue;
20 import hunt.proton.amqp.messaging.AmqpSequence;
21 import hunt.proton.amqp.messaging.Data;
22 import hunt.proton.amqp.messaging.ApplicationProperties;
23 import hunt.Exceptions;
24 import hunt.collection.List;
25 import hunt.collection.Map;
26 import hunt.Boolean;
27 import hunt.Short;
28 import hunt.Byte;
29 import hunt.Long;
30 import hunt.Float;
31 import hunt.Char;
32 import hunt.Double;
33 import hunt.Integer;
34 import hunt.String;
35 
36 //import hunt.collection.UUID;
37 
38 class AmqpMessageImpl : AmqpMessage {
39   private  Message message;
40   private  ProtonDelivery delivery;
41 
42   this(Message message, ProtonDelivery delivery) {
43     this.message = message;
44     this.delivery = delivery;
45   }
46 
47   this(Message message) {
48     this.message = message;
49     this.delivery = null;
50   }
51 
52 
53   public bool isDurable() {
54     return message.isDurable();
55   }
56 
57 
58   public bool isFirstAcquirer() {
59     return message.isFirstAcquirer();
60   }
61 
62 
63   public int priority() {
64     return message.getPriority();
65   }
66 
67 
68   public string id() {
69     Object id = message.getMessageId();
70     if (id !is null) {
71       return (cast(String)id).value;
72     }
73     return null;
74   }
75 
76 
77   public string address() {
78     //return message.getAddress().value;
79    return message.getAddress() is null ? null :message.getAddress().value;
80   }
81 
82 
83   public string replyTo() {
84     return message.getReplyTo().value;
85   }
86 
87 
88   public string correlationId() {
89     Object id = message.getCorrelationId();
90     if (id !is null) {
91       return (cast(String)id).value;
92     }
93     return null;
94   }
95 
96 
97   public bool isBodyNull() {
98     return message.getBody() is null || getAmqpValue() is null;
99   }
100 
101   private Object getAmqpValue() {
102     if (message.getBody().getType() != Section.SectionType.AmqpValue) {
103       throw new IllegalStateException("The body is not an AMQP Value");
104     }
105     return (cast(AmqpValue) message.getBody()).getValue();
106   }
107 
108 
109   public bool bodyAsBoolean() {
110     return (cast(Boolean) getAmqpValue()).booleanValue;
111   }
112 
113 
114   public byte bodyAsByte() {
115     return (cast(Byte) getAmqpValue()).byteValue;
116   }
117 
118 
119   public short bodyAsShort() {
120     return (cast(Short) getAmqpValue()).shortValue;
121   }
122 
123 
124   public int bodyAsInteger() {
125     return (cast(Integer) getAmqpValue()).intValue;
126   }
127 
128 
129   public long bodyAsLong() {
130     return (cast(Long) getAmqpValue()).longValue;
131   }
132 
133 
134   public float bodyAsFloat() {
135     return (cast(Float) getAmqpValue()).floatValue;
136   }
137 
138 
139   public double bodyAsDouble() {
140     return (cast(Double) getAmqpValue()).doubleValue;
141   }
142 
143 
144   public char bodyAsChar() {
145     return (cast(Char) getAmqpValue()).value;
146   }
147 
148 
149   //public Instant bodyAsTimestamp() {
150   //  Object value = getAmqpValue();
151   //  if (!(value instanceof Date)) {
152   //    throw new IllegalStateException("Expecting a Date object, got a " + value);
153   //  }
154   //  return ((Date) value).toInstant();
155   //}
156   //
157   //
158   //public UUID bodyAsUUID() {
159   //  return (UUID) getAmqpValue();
160   //}
161 
162 
163   public byte[] bodyAsBinary() {
164     Section bd = message.getBody();
165     if (bd.getType() != Section.SectionType.Data) {
166       throw new IllegalStateException("The body is not of type 'data'");
167     }
168     byte[] bytes = (cast(Data) message.getBody()).getValue().getArray();
169    // return Buffer.buffer(bytes);
170     return bytes;
171   }
172 
173 
174   public string bodyAsString() {
175     return (cast(String) getAmqpValue()).value;
176   }
177 
178 
179   public string bodyAsSymbol() {
180     Object value = getAmqpValue();
181     if (cast(Symbol)value !is null) {
182       return (cast(Symbol) value).toString();
183     }
184     throw new IllegalStateException("Expected a Symbol, got a ");
185   }
186 
187   /**
188    * @noinspection unchecked
189    */
190 
191   public List!Object bodyAsList() {
192     Section bd = message.getBody();
193     if (bd.getType() == Section.SectionType.AmqpSequence) {
194       return (cast(AmqpSequence) message.getBody()).getValue();
195     } else {
196       //Object value = getAmqpValue();
197       //if (value instanceof List) {
198       //  return (List<T>) value;
199       //}
200       throw new IllegalStateException("Cannot extract a list from the message body");
201     }
202   }
203 
204   /**
205    * @noinspection unchecked
206    */
207 
208   //public <K, V> Map<K, V> bodyAsMap() {
209   //  Object value = getAmqpValue();
210   //  if (value instanceof Map) {
211   //    return (Map<K, V>) value;
212   //  }
213   //  throw new IllegalStateException("Cannot extract a map from the message body");
214   //}
215 
216 
217   //public JsonObject bodyAsJsonObject() {
218   //  return bodyAsBinary().toJsonObject();
219   //}
220 
221 
222   //public JsonArray bodyAsJsonArray() {
223   //  return bodyAsBinary().toJsonArray();
224   //}
225 
226 
227   public string subject() {
228     return message.getSubject().value;
229   }
230 
231 
232   public string contentType() {
233     return message.getContentType().value;
234   }
235 
236 
237   public string contentEncoding() {
238     return message.getContentEncoding().value;
239   }
240 
241 
242   public long expiryTime() {
243     return message.getExpiryTime();
244   }
245 
246 
247   public long creationTime() {
248     return message.getCreationTime();
249   }
250 
251 
252   public long ttl() {
253     return message.getTtl();
254   }
255 
256 
257   public int deliveryCount() {
258     return cast(int) message.getDeliveryCount();
259   }
260 
261 
262   public string groupId() {
263     return message.getGroupId().value;
264   }
265 
266 
267   public string replyToGroupId() {
268     return message.getReplyToGroupId().value;
269   }
270 
271 
272   public long groupSequence() {
273     return message.getGroupSequence();
274   }
275 
276 
277   public ApplicationProperties applicationProperties() {
278     ApplicationProperties properties = message.getApplicationProperties();
279     if (properties is null) {
280       return null;
281     }
282   //  return JsonObject.mapFrom(properties.getValue());
283     return properties;
284   }
285 
286 
287   public Message unwrap() {
288     return message;
289   }
290 
291 
292   public AmqpMessage accepted() {
293     if (delivery !is null) {
294       ProtonHelper.accepted(delivery, true);
295     } else {
296       throw new IllegalStateException("The message is not a received message");
297     }
298     return this;
299   }
300 
301 
302   public AmqpMessage rejected() {
303     if (delivery !is null) {
304       ProtonHelper.rejected(delivery, true);
305     } else {
306       throw new IllegalStateException("The message is not a received message");
307     }
308     return this;
309   }
310 
311 
312   public AmqpMessage released() {
313     if (delivery !is null) {
314       ProtonHelper.released(delivery, true);
315     } else {
316       throw new IllegalStateException("The message is not a received message");
317     }
318     return this;
319   }
320 
321 
322   public AmqpMessage modified(bool didItFail, bool wasItDeliveredHere) {
323     if (delivery !is null) {
324       ProtonHelper.modified(delivery, true, didItFail, wasItDeliveredHere);
325     } else {
326       throw new IllegalStateException("The message is not a received message");
327     }
328     return this;
329   }
330 }