1 /* 2 * hunt-amqp-client: AMQP Client Library for D Programming Language. Support for RabbitMQ and other AMQP Server. 3 * 4 * Copyright (C) 2018-2019 HuntLabs 5 * 6 * Website: https://www.huntlabs.net 7 * 8 * Licensed under the Apache-2.0 License. 9 * 10 */ 11 module hunt.amqp.client.impl.AmqpMessageImpl; 12 13 import hunt.proton.amqp.messaging.Section; 14 import hunt.amqp.client.AmqpMessage; 15 import hunt.amqp.ProtonDelivery; 16 import hunt.amqp.ProtonHelper; 17 import hunt.proton.amqp.Symbol; 18 import hunt.proton.message.Message; 19 import hunt.proton.amqp.messaging.AmqpValue; 20 import hunt.proton.amqp.messaging.AmqpSequence; 21 import hunt.proton.amqp.messaging.Data; 22 import hunt.proton.amqp.messaging.ApplicationProperties; 23 import hunt.Exceptions; 24 import hunt.collection.List; 25 import hunt.collection.Map; 26 import hunt.Boolean; 27 import hunt.Short; 28 import hunt.Byte; 29 import hunt.Long; 30 import hunt.Float; 31 import hunt.Char; 32 import hunt.Double; 33 import hunt.Integer; 34 import hunt.String; 35 36 //import hunt.collection.UUID; 37 38 class AmqpMessageImpl : AmqpMessage { 39 private Message message; 40 private ProtonDelivery delivery; 41 42 this(Message message, ProtonDelivery delivery) { 43 this.message = message; 44 this.delivery = delivery; 45 } 46 47 this(Message message) { 48 this.message = message; 49 this.delivery = null; 50 } 51 52 53 public bool isDurable() { 54 return message.isDurable(); 55 } 56 57 58 public bool isFirstAcquirer() { 59 return message.isFirstAcquirer(); 60 } 61 62 63 public int priority() { 64 return message.getPriority(); 65 } 66 67 68 public string id() { 69 Object id = message.getMessageId(); 70 if (id !is null) { 71 return (cast(String)id).value; 72 } 73 return null; 74 } 75 76 77 public string address() { 78 //return message.getAddress().value; 79 return message.getAddress() is null ? null :message.getAddress().value; 80 } 81 82 83 public string replyTo() { 84 return message.getReplyTo().value; 85 } 86 87 88 public string correlationId() { 89 Object id = message.getCorrelationId(); 90 if (id !is null) { 91 return (cast(String)id).value; 92 } 93 return null; 94 } 95 96 97 public bool isBodyNull() { 98 return message.getBody() is null || getAmqpValue() is null; 99 } 100 101 private Object getAmqpValue() { 102 if (message.getBody().getType() != Section.SectionType.AmqpValue) { 103 throw new IllegalStateException("The body is not an AMQP Value"); 104 } 105 return (cast(AmqpValue) message.getBody()).getValue(); 106 } 107 108 109 public bool bodyAsBoolean() { 110 return (cast(Boolean) getAmqpValue()).booleanValue; 111 } 112 113 114 public byte bodyAsByte() { 115 return (cast(Byte) getAmqpValue()).byteValue; 116 } 117 118 119 public short bodyAsShort() { 120 return (cast(Short) getAmqpValue()).shortValue; 121 } 122 123 124 public int bodyAsInteger() { 125 return (cast(Integer) getAmqpValue()).intValue; 126 } 127 128 129 public long bodyAsLong() { 130 return (cast(Long) getAmqpValue()).longValue; 131 } 132 133 134 public float bodyAsFloat() { 135 return (cast(Float) getAmqpValue()).floatValue; 136 } 137 138 139 public double bodyAsDouble() { 140 return (cast(Double) getAmqpValue()).doubleValue; 141 } 142 143 144 public char bodyAsChar() { 145 return (cast(Char) getAmqpValue()).value; 146 } 147 148 149 //public Instant bodyAsTimestamp() { 150 // Object value = getAmqpValue(); 151 // if (!(value instanceof Date)) { 152 // throw new IllegalStateException("Expecting a Date object, got a " + value); 153 // } 154 // return ((Date) value).toInstant(); 155 //} 156 // 157 // 158 //public UUID bodyAsUUID() { 159 // return (UUID) getAmqpValue(); 160 //} 161 162 163 public byte[] bodyAsBinary() { 164 Section bd = message.getBody(); 165 if (bd.getType() != Section.SectionType.Data) { 166 throw new IllegalStateException("The body is not of type 'data'"); 167 } 168 byte[] bytes = (cast(Data) message.getBody()).getValue().getArray(); 169 // return Buffer.buffer(bytes); 170 return bytes; 171 } 172 173 174 public string bodyAsString() { 175 return (cast(String) getAmqpValue()).value; 176 } 177 178 179 public string bodyAsSymbol() { 180 Object value = getAmqpValue(); 181 if (cast(Symbol)value !is null) { 182 return (cast(Symbol) value).toString(); 183 } 184 throw new IllegalStateException("Expected a Symbol, got a "); 185 } 186 187 /** 188 * @noinspection unchecked 189 */ 190 191 public List!Object bodyAsList() { 192 Section bd = message.getBody(); 193 if (bd.getType() == Section.SectionType.AmqpSequence) { 194 return (cast(AmqpSequence) message.getBody()).getValue(); 195 } else { 196 //Object value = getAmqpValue(); 197 //if (value instanceof List) { 198 // return (List<T>) value; 199 //} 200 throw new IllegalStateException("Cannot extract a list from the message body"); 201 } 202 } 203 204 /** 205 * @noinspection unchecked 206 */ 207 208 //public <K, V> Map<K, V> bodyAsMap() { 209 // Object value = getAmqpValue(); 210 // if (value instanceof Map) { 211 // return (Map<K, V>) value; 212 // } 213 // throw new IllegalStateException("Cannot extract a map from the message body"); 214 //} 215 216 217 //public JsonObject bodyAsJsonObject() { 218 // return bodyAsBinary().toJsonObject(); 219 //} 220 221 222 //public JsonArray bodyAsJsonArray() { 223 // return bodyAsBinary().toJsonArray(); 224 //} 225 226 227 public string subject() { 228 return message.getSubject().value; 229 } 230 231 232 public string contentType() { 233 return message.getContentType().value; 234 } 235 236 237 public string contentEncoding() { 238 return message.getContentEncoding().value; 239 } 240 241 242 public long expiryTime() { 243 return message.getExpiryTime(); 244 } 245 246 247 public long creationTime() { 248 return message.getCreationTime(); 249 } 250 251 252 public long ttl() { 253 return message.getTtl(); 254 } 255 256 257 public int deliveryCount() { 258 return cast(int) message.getDeliveryCount(); 259 } 260 261 262 public string groupId() { 263 return message.getGroupId().value; 264 } 265 266 267 public string replyToGroupId() { 268 return message.getReplyToGroupId().value; 269 } 270 271 272 public long groupSequence() { 273 return message.getGroupSequence(); 274 } 275 276 277 public ApplicationProperties applicationProperties() { 278 ApplicationProperties properties = message.getApplicationProperties(); 279 if (properties is null) { 280 return null; 281 } 282 // return JsonObject.mapFrom(properties.getValue()); 283 return properties; 284 } 285 286 287 public Message unwrap() { 288 return message; 289 } 290 291 292 public AmqpMessage accepted() { 293 if (delivery !is null) { 294 ProtonHelper.accepted(delivery, true); 295 } else { 296 throw new IllegalStateException("The message is not a received message"); 297 } 298 return this; 299 } 300 301 302 public AmqpMessage rejected() { 303 if (delivery !is null) { 304 ProtonHelper.rejected(delivery, true); 305 } else { 306 throw new IllegalStateException("The message is not a received message"); 307 } 308 return this; 309 } 310 311 312 public AmqpMessage released() { 313 if (delivery !is null) { 314 ProtonHelper.released(delivery, true); 315 } else { 316 throw new IllegalStateException("The message is not a received message"); 317 } 318 return this; 319 } 320 321 322 public AmqpMessage modified(bool didItFail, bool wasItDeliveredHere) { 323 if (delivery !is null) { 324 ProtonHelper.modified(delivery, true, didItFail, wasItDeliveredHere); 325 } else { 326 throw new IllegalStateException("The message is not a received message"); 327 } 328 return this; 329 } 330 }