1 /* 2 * hunt-amqp-client: AMQP Client Library for D Programming Language. Support for RabbitMQ and other AMQP Server. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module hunt.amqp.client.impl.AmqpMessageImpl; 12 13 import hunt.proton.amqp.messaging.Section; 14 import hunt.amqp.client.AmqpMessage; 15 //import hunt.core.buffer.Buffer; 16 import hunt.collection.ByteBuffer; 17 //import hunt.core.json.JsonArray; 18 //import hunt.core.json.JsonObject; 19 import hunt.amqp.ProtonDelivery; 20 import hunt.amqp.ProtonHelper; 21 import hunt.proton.amqp.Symbol; 22 //import hunt.proton.amqp.messaging.*; 23 import hunt.proton.message.Message; 24 import hunt.proton.amqp.messaging.AmqpValue; 25 import hunt.proton.amqp.messaging.AmqpSequence; 26 import hunt.proton.amqp.messaging.Data; 27 import hunt.proton.amqp.messaging.ApplicationProperties; 28 import hunt.Exceptions; 29 //import hunt.collection.Date; 30 import hunt.collection.List; 31 import hunt.collection.Map; 32 import hunt.Boolean; 33 import hunt.Short; 34 import hunt.Byte; 35 import hunt.Long; 36 import hunt.Float; 37 import hunt.Char; 38 import hunt.Double; 39 import hunt.Integer; 40 import hunt.String; 41 42 //import hunt.collection.UUID; 43 44 class AmqpMessageImpl : AmqpMessage { 45 private Message message; 46 private ProtonDelivery delivery; 47 48 this(Message message, ProtonDelivery delivery) { 49 this.message = message; 50 this.delivery = delivery; 51 } 52 53 this(Message message) { 54 this.message = message; 55 this.delivery = null; 56 } 57 58 59 public bool isDurable() { 60 return message.isDurable(); 61 } 62 63 64 public bool isFirstAcquirer() { 65 return message.isFirstAcquirer(); 66 } 67 68 69 public int priority() { 70 return message.getPriority(); 71 } 72 73 74 public string id() { 75 Object id = message.getMessageId(); 76 if (id !is null) { 77 return (cast(String)id).value; 78 } 79 return null; 80 } 81 82 83 public string address() { 84 //return message.getAddress().value; 85 return message.getAddress() is null ? null :message.getAddress().value; 86 } 87 88 89 public string replyTo() { 90 return message.getReplyTo().value; 91 } 92 93 94 public string correlationId() { 95 Object id = message.getCorrelationId(); 96 if (id !is null) { 97 return (cast(String)id).value; 98 } 99 return null; 100 } 101 102 103 public bool isBodyNull() { 104 return message.getBody() is null || getAmqpValue() is null; 105 } 106 107 private Object getAmqpValue() { 108 if (message.getBody().getType() != Section.SectionType.AmqpValue) { 109 throw new IllegalStateException("The body is not an AMQP Value"); 110 } 111 return (cast(AmqpValue) message.getBody()).getValue(); 112 } 113 114 115 public bool bodyAsBoolean() { 116 return (cast(Boolean) getAmqpValue()).booleanValue; 117 } 118 119 120 public byte bodyAsByte() { 121 return (cast(Byte) getAmqpValue()).byteValue; 122 } 123 124 125 public short bodyAsShort() { 126 return (cast(Short) getAmqpValue()).shortValue; 127 } 128 129 130 public int bodyAsInteger() { 131 return (cast(Integer) getAmqpValue()).intValue; 132 } 133 134 135 public long bodyAsLong() { 136 return (cast(Long) getAmqpValue()).longValue; 137 } 138 139 140 public float bodyAsFloat() { 141 return (cast(Float) getAmqpValue()).floatValue; 142 } 143 144 145 public double bodyAsDouble() { 146 return (cast(Double) getAmqpValue()).doubleValue; 147 } 148 149 150 public char bodyAsChar() { 151 return (cast(Char) getAmqpValue()).value; 152 } 153 154 155 //public Instant bodyAsTimestamp() { 156 // Object value = getAmqpValue(); 157 // if (!(value instanceof Date)) { 158 // throw new IllegalStateException("Expecting a Date object, got a " + value); 159 // } 160 // return ((Date) value).toInstant(); 161 //} 162 // 163 // 164 //public UUID bodyAsUUID() { 165 // return (UUID) getAmqpValue(); 166 //} 167 168 169 public byte[] bodyAsBinary() { 170 Section bd = message.getBody(); 171 if (bd.getType() != Section.SectionType.Data) { 172 throw new IllegalStateException("The body is not of type 'data'"); 173 } 174 byte[] bytes = (cast(Data) message.getBody()).getValue().getArray(); 175 // return Buffer.buffer(bytes); 176 return bytes; 177 } 178 179 180 public string bodyAsString() { 181 return (cast(String) getAmqpValue()).value; 182 } 183 184 185 public string bodyAsSymbol() { 186 Object value = getAmqpValue(); 187 if (cast(Symbol)value !is null) { 188 return (cast(Symbol) value).toString(); 189 } 190 throw new IllegalStateException("Expected a Symbol, got a "); 191 } 192 193 /** 194 * @noinspection unchecked 195 */ 196 197 public List!Object bodyAsList() { 198 Section bd = message.getBody(); 199 if (bd.getType() == Section.SectionType.AmqpSequence) { 200 return (cast(AmqpSequence) message.getBody()).getValue(); 201 } else { 202 //Object value = getAmqpValue(); 203 //if (value instanceof List) { 204 // return (List<T>) value; 205 //} 206 throw new IllegalStateException("Cannot extract a list from the message body"); 207 } 208 } 209 210 /** 211 * @noinspection unchecked 212 */ 213 214 //public <K, V> Map<K, V> bodyAsMap() { 215 // Object value = getAmqpValue(); 216 // if (value instanceof Map) { 217 // return (Map<K, V>) value; 218 // } 219 // throw new IllegalStateException("Cannot extract a map from the message body"); 220 //} 221 222 223 //public JsonObject bodyAsJsonObject() { 224 // return bodyAsBinary().toJsonObject(); 225 //} 226 227 228 //public JsonArray bodyAsJsonArray() { 229 // return bodyAsBinary().toJsonArray(); 230 //} 231 232 233 public string subject() { 234 return message.getSubject().value; 235 } 236 237 238 public string contentType() { 239 return message.getContentType().value; 240 } 241 242 243 public string contentEncoding() { 244 return message.getContentEncoding().value; 245 } 246 247 248 public long expiryTime() { 249 return message.getExpiryTime(); 250 } 251 252 253 public long creationTime() { 254 return message.getCreationTime(); 255 } 256 257 258 public long ttl() { 259 return message.getTtl(); 260 } 261 262 263 public int deliveryCount() { 264 return cast(int) message.getDeliveryCount(); 265 } 266 267 268 public string groupId() { 269 return message.getGroupId().value; 270 } 271 272 273 public string replyToGroupId() { 274 return message.getReplyToGroupId().value; 275 } 276 277 278 public long groupSequence() { 279 return message.getGroupSequence(); 280 } 281 282 283 public ApplicationProperties applicationProperties() { 284 ApplicationProperties properties = message.getApplicationProperties(); 285 if (properties is null) { 286 return null; 287 } 288 // return JsonObject.mapFrom(properties.getValue()); 289 return properties; 290 } 291 292 293 public Message unwrap() { 294 return message; 295 } 296 297 298 public AmqpMessage accepted() { 299 if (delivery !is null) { 300 ProtonHelper.accepted(delivery, true); 301 } else { 302 throw new IllegalStateException("The message is not a received message"); 303 } 304 return this; 305 } 306 307 308 public AmqpMessage rejected() { 309 if (delivery !is null) { 310 ProtonHelper.rejected(delivery, true); 311 } else { 312 throw new IllegalStateException("The message is not a received message"); 313 } 314 return this; 315 } 316 317 318 public AmqpMessage released() { 319 if (delivery !is null) { 320 ProtonHelper.released(delivery, true); 321 } else { 322 throw new IllegalStateException("The message is not a received message"); 323 } 324 return this; 325 } 326 327 328 public AmqpMessage modified(bool didItFail, bool wasItDeliveredHere) { 329 if (delivery !is null) { 330 ProtonHelper.modified(delivery, true, didItFail, wasItDeliveredHere); 331 } else { 332 throw new IllegalStateException("The message is not a received message"); 333 } 334 return this; 335 } 336 }