001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.command;
018
019 import java.io.DataInputStream;
020 import java.io.DataOutputStream;
021 import java.io.IOException;
022 import java.util.Collections;
023 import java.util.HashMap;
024 import java.util.Map;
025 import javax.jms.JMSException;
026 import org.apache.activemq.ActiveMQConnection;
027 import org.apache.activemq.advisory.AdvisorySupport;
028 import org.apache.activemq.broker.region.Destination;
029 import org.apache.activemq.broker.region.MessageReference;
030 import org.apache.activemq.broker.region.RegionBroker;
031 import org.apache.activemq.usage.MemoryUsage;
032 import org.apache.activemq.util.ByteArrayInputStream;
033 import org.apache.activemq.util.ByteArrayOutputStream;
034 import org.apache.activemq.util.ByteSequence;
035 import org.apache.activemq.util.MarshallingSupport;
036 import org.apache.activemq.wireformat.WireFormat;
037
038 /**
039 * Represents an ActiveMQ message
040 *
041 * @openwire:marshaller
042 *
043 */
044 public abstract class Message extends BaseCommand implements MarshallAware, MessageReference {
045
046 /**
047 * The default minimum amount of memory a message is assumed to use
048 */
049 public static final int DEFAULT_MINIMUM_MESSAGE_SIZE = 1024;
050
051 protected MessageId messageId;
052 protected ActiveMQDestination originalDestination;
053 protected TransactionId originalTransactionId;
054
055 protected ProducerId producerId;
056 protected ActiveMQDestination destination;
057 protected TransactionId transactionId;
058
059 protected long expiration;
060 protected long timestamp;
061 protected long arrival;
062 protected long brokerInTime;
063 protected long brokerOutTime;
064 protected String correlationId;
065 protected ActiveMQDestination replyTo;
066 protected boolean persistent;
067 protected String type;
068 protected byte priority;
069 protected String groupID;
070 protected int groupSequence;
071 protected ConsumerId targetConsumerId;
072 protected boolean compressed;
073 protected String userID;
074
075 protected ByteSequence content;
076 protected ByteSequence marshalledProperties;
077 protected DataStructure dataStructure;
078 protected int redeliveryCounter;
079
080 protected int size;
081 protected Map<String, Object> properties;
082 protected boolean readOnlyProperties;
083 protected boolean readOnlyBody;
084 protected transient boolean recievedByDFBridge;
085 protected boolean droppable;
086
087 private transient short referenceCount;
088 private transient ActiveMQConnection connection;
089 private transient org.apache.activemq.broker.region.Destination regionDestination;
090 private transient MemoryUsage memoryUsage;
091
092 private BrokerId[] brokerPath;
093 private BrokerId[] cluster;
094
095 public abstract Message copy();
096 public abstract void clearBody() throws JMSException;
097
098 // useful to reduce the memory footprint of a persisted message
099 public void clearMarshalledState() throws JMSException {
100 properties = null;
101 }
102
103 protected void copy(Message copy) {
104 super.copy(copy);
105 copy.producerId = producerId;
106 copy.transactionId = transactionId;
107 copy.destination = destination;
108 copy.messageId = messageId != null ? messageId.copy() : null;
109 copy.originalDestination = originalDestination;
110 copy.originalTransactionId = originalTransactionId;
111 copy.expiration = expiration;
112 copy.timestamp = timestamp;
113 copy.correlationId = correlationId;
114 copy.replyTo = replyTo;
115 copy.persistent = persistent;
116 copy.redeliveryCounter = redeliveryCounter;
117 copy.type = type;
118 copy.priority = priority;
119 copy.size = size;
120 copy.groupID = groupID;
121 copy.userID = userID;
122 copy.groupSequence = groupSequence;
123
124 if (properties != null) {
125 copy.properties = new HashMap<String, Object>(properties);
126
127 // The new message hasn't expired, so remove this feild.
128 copy.properties.remove(RegionBroker.ORIGINAL_EXPIRATION);
129 } else {
130 copy.properties = properties;
131 }
132
133 copy.content = content;
134 copy.marshalledProperties = marshalledProperties;
135 copy.dataStructure = dataStructure;
136 copy.readOnlyProperties = readOnlyProperties;
137 copy.readOnlyBody = readOnlyBody;
138 copy.compressed = compressed;
139 copy.recievedByDFBridge = recievedByDFBridge;
140
141 copy.arrival = arrival;
142 copy.connection = connection;
143 copy.regionDestination = regionDestination;
144 copy.brokerInTime = brokerInTime;
145 copy.brokerOutTime = brokerOutTime;
146 copy.memoryUsage=this.memoryUsage;
147 copy.brokerPath = brokerPath;
148
149 // lets not copy the following fields
150 // copy.targetConsumerId = targetConsumerId;
151 // copy.referenceCount = referenceCount;
152 }
153
154 public Object getProperty(String name) throws IOException {
155 if (properties == null) {
156 if (marshalledProperties == null) {
157 return null;
158 }
159 properties = unmarsallProperties(marshalledProperties);
160 }
161 return properties.get(name);
162 }
163
164 @SuppressWarnings("unchecked")
165 public Map<String, Object> getProperties() throws IOException {
166 if (properties == null) {
167 if (marshalledProperties == null) {
168 return Collections.EMPTY_MAP;
169 }
170 properties = unmarsallProperties(marshalledProperties);
171 }
172 return Collections.unmodifiableMap(properties);
173 }
174
175 public void clearProperties() {
176 marshalledProperties = null;
177 properties = null;
178 }
179
180 public void setProperty(String name, Object value) throws IOException {
181 lazyCreateProperties();
182 properties.put(name, value);
183 }
184
185 public void removeProperty(String name) throws IOException {
186 lazyCreateProperties();
187 properties.remove(name);
188 }
189
190 protected void lazyCreateProperties() throws IOException {
191 if (properties == null) {
192 if (marshalledProperties == null) {
193 properties = new HashMap<String, Object>();
194 } else {
195 properties = unmarsallProperties(marshalledProperties);
196 marshalledProperties = null;
197 }
198 }
199 }
200
201 private Map<String, Object> unmarsallProperties(ByteSequence marshalledProperties) throws IOException {
202 return MarshallingSupport.unmarshalPrimitiveMap(new DataInputStream(new ByteArrayInputStream(marshalledProperties)));
203 }
204
205 public void beforeMarshall(WireFormat wireFormat) throws IOException {
206 // Need to marshal the properties.
207 if (marshalledProperties == null && properties != null) {
208 ByteArrayOutputStream baos = new ByteArrayOutputStream();
209 DataOutputStream os = new DataOutputStream(baos);
210 MarshallingSupport.marshalPrimitiveMap(properties, os);
211 os.close();
212 marshalledProperties = baos.toByteSequence();
213 }
214 }
215
216 public void afterMarshall(WireFormat wireFormat) throws IOException {
217 }
218
219 public void beforeUnmarshall(WireFormat wireFormat) throws IOException {
220 }
221
222 public void afterUnmarshall(WireFormat wireFormat) throws IOException {
223 }
224
225 // /////////////////////////////////////////////////////////////////
226 //
227 // Simple Field accessors
228 //
229 // /////////////////////////////////////////////////////////////////
230
231 /**
232 * @openwire:property version=1 cache=true
233 */
234 public ProducerId getProducerId() {
235 return producerId;
236 }
237
238 public void setProducerId(ProducerId producerId) {
239 this.producerId = producerId;
240 }
241
242 /**
243 * @openwire:property version=1 cache=true
244 */
245 public ActiveMQDestination getDestination() {
246 return destination;
247 }
248
249 public void setDestination(ActiveMQDestination destination) {
250 this.destination = destination;
251 }
252
253 /**
254 * @openwire:property version=1 cache=true
255 */
256 public TransactionId getTransactionId() {
257 return transactionId;
258 }
259
260 public void setTransactionId(TransactionId transactionId) {
261 this.transactionId = transactionId;
262 }
263
264 public boolean isInTransaction() {
265 return transactionId != null;
266 }
267
268 /**
269 * @openwire:property version=1 cache=true
270 */
271 public ActiveMQDestination getOriginalDestination() {
272 return originalDestination;
273 }
274
275 public void setOriginalDestination(ActiveMQDestination destination) {
276 this.originalDestination = destination;
277 }
278
279 /**
280 * @openwire:property version=1
281 */
282 public MessageId getMessageId() {
283 return messageId;
284 }
285
286 public void setMessageId(MessageId messageId) {
287 this.messageId = messageId;
288 }
289
290 /**
291 * @openwire:property version=1 cache=true
292 */
293 public TransactionId getOriginalTransactionId() {
294 return originalTransactionId;
295 }
296
297 public void setOriginalTransactionId(TransactionId transactionId) {
298 this.originalTransactionId = transactionId;
299 }
300
301 /**
302 * @openwire:property version=1
303 */
304 public String getGroupID() {
305 return groupID;
306 }
307
308 public void setGroupID(String groupID) {
309 this.groupID = groupID;
310 }
311
312 /**
313 * @openwire:property version=1
314 */
315 public int getGroupSequence() {
316 return groupSequence;
317 }
318
319 public void setGroupSequence(int groupSequence) {
320 this.groupSequence = groupSequence;
321 }
322
323 /**
324 * @openwire:property version=1
325 */
326 public String getCorrelationId() {
327 return correlationId;
328 }
329
330 public void setCorrelationId(String correlationId) {
331 this.correlationId = correlationId;
332 }
333
334 /**
335 * @openwire:property version=1
336 */
337 public boolean isPersistent() {
338 return persistent;
339 }
340
341 public void setPersistent(boolean deliveryMode) {
342 this.persistent = deliveryMode;
343 }
344
345 /**
346 * @openwire:property version=1
347 */
348 public long getExpiration() {
349 return expiration;
350 }
351
352 public void setExpiration(long expiration) {
353 this.expiration = expiration;
354 }
355
356 /**
357 * @openwire:property version=1
358 */
359 public byte getPriority() {
360 return priority;
361 }
362
363 public void setPriority(byte priority) {
364 if (priority < 0) {
365 this.priority = 0;
366 } else if (priority > 9) {
367 this.priority = 9;
368 } else {
369 this.priority = priority;
370 }
371 }
372
373 /**
374 * @openwire:property version=1
375 */
376 public ActiveMQDestination getReplyTo() {
377 return replyTo;
378 }
379
380 public void setReplyTo(ActiveMQDestination replyTo) {
381 this.replyTo = replyTo;
382 }
383
384 /**
385 * @openwire:property version=1
386 */
387 public long getTimestamp() {
388 return timestamp;
389 }
390
391 public void setTimestamp(long timestamp) {
392 this.timestamp = timestamp;
393 }
394
395 /**
396 * @openwire:property version=1
397 */
398 public String getType() {
399 return type;
400 }
401
402 public void setType(String type) {
403 this.type = type;
404 }
405
406 /**
407 * @openwire:property version=1
408 */
409 public ByteSequence getContent() {
410 return content;
411 }
412
413 public void setContent(ByteSequence content) {
414 this.content = content;
415 }
416
417 /**
418 * @openwire:property version=1
419 */
420 public ByteSequence getMarshalledProperties() {
421 return marshalledProperties;
422 }
423
424 public void setMarshalledProperties(ByteSequence marshalledProperties) {
425 this.marshalledProperties = marshalledProperties;
426 }
427
428 /**
429 * @openwire:property version=1
430 */
431 public DataStructure getDataStructure() {
432 return dataStructure;
433 }
434
435 public void setDataStructure(DataStructure data) {
436 this.dataStructure = data;
437 }
438
439 /**
440 * Can be used to route the message to a specific consumer. Should be null
441 * to allow the broker use normal JMS routing semantics. If the target
442 * consumer id is an active consumer on the broker, the message is dropped.
443 * Used by the AdvisoryBroker to replay advisory messages to a specific
444 * consumer.
445 *
446 * @openwire:property version=1 cache=true
447 */
448 public ConsumerId getTargetConsumerId() {
449 return targetConsumerId;
450 }
451
452 public void setTargetConsumerId(ConsumerId targetConsumerId) {
453 this.targetConsumerId = targetConsumerId;
454 }
455
456 public boolean isExpired() {
457 long expireTime = getExpiration();
458 return expireTime > 0 && System.currentTimeMillis() > expireTime;
459 }
460
461 public boolean isAdvisory() {
462 return type != null && type.equals(AdvisorySupport.ADIVSORY_MESSAGE_TYPE);
463 }
464
465 /**
466 * @openwire:property version=1
467 */
468 public boolean isCompressed() {
469 return compressed;
470 }
471
472 public void setCompressed(boolean compressed) {
473 this.compressed = compressed;
474 }
475
476 public boolean isRedelivered() {
477 return redeliveryCounter > 0;
478 }
479
480 public void setRedelivered(boolean redelivered) {
481 if (redelivered) {
482 if (!isRedelivered()) {
483 setRedeliveryCounter(1);
484 }
485 } else {
486 if (isRedelivered()) {
487 setRedeliveryCounter(0);
488 }
489 }
490 }
491
492 public void incrementRedeliveryCounter() {
493 redeliveryCounter++;
494 }
495
496 /**
497 * @openwire:property version=1
498 */
499 public int getRedeliveryCounter() {
500 return redeliveryCounter;
501 }
502
503 public void setRedeliveryCounter(int deliveryCounter) {
504 this.redeliveryCounter = deliveryCounter;
505 }
506
507 /**
508 * The route of brokers the command has moved through.
509 *
510 * @openwire:property version=1 cache=true
511 */
512 public BrokerId[] getBrokerPath() {
513 return brokerPath;
514 }
515
516 public void setBrokerPath(BrokerId[] brokerPath) {
517 this.brokerPath = brokerPath;
518 }
519
520 public boolean isReadOnlyProperties() {
521 return readOnlyProperties;
522 }
523
524 public void setReadOnlyProperties(boolean readOnlyProperties) {
525 this.readOnlyProperties = readOnlyProperties;
526 }
527
528 public boolean isReadOnlyBody() {
529 return readOnlyBody;
530 }
531
532 public void setReadOnlyBody(boolean readOnlyBody) {
533 this.readOnlyBody = readOnlyBody;
534 }
535
536 public ActiveMQConnection getConnection() {
537 return this.connection;
538 }
539
540 public void setConnection(ActiveMQConnection connection) {
541 this.connection = connection;
542 }
543
544 /**
545 * Used to schedule the arrival time of a message to a broker. The broker
546 * will not dispatch a message to a consumer until it's arrival time has
547 * elapsed.
548 *
549 * @openwire:property version=1
550 */
551 public long getArrival() {
552 return arrival;
553 }
554
555 public void setArrival(long arrival) {
556 this.arrival = arrival;
557 }
558
559 /**
560 * Only set by the broker and defines the userID of the producer connection
561 * who sent this message. This is an optional field, it needs to be enabled
562 * on the broker to have this field populated.
563 *
564 * @openwire:property version=1
565 */
566 public String getUserID() {
567 return userID;
568 }
569
570 public void setUserID(String jmsxUserID) {
571 this.userID = jmsxUserID;
572 }
573
574 public int getReferenceCount() {
575 return referenceCount;
576 }
577
578 public Message getMessageHardRef() {
579 return this;
580 }
581
582 public Message getMessage() {
583 return this;
584 }
585
586 public org.apache.activemq.broker.region.Destination getRegionDestination() {
587 return regionDestination;
588 }
589
590 public void setRegionDestination(org.apache.activemq.broker.region.Destination destination) {
591 this.regionDestination = destination;
592 if(this.memoryUsage==null) {
593 this.memoryUsage=regionDestination.getMemoryUsage();
594 }
595 }
596
597 public MemoryUsage getMemoryUsage() {
598 return this.memoryUsage;
599 }
600
601 public void setMemoryUsage(MemoryUsage usage) {
602 this.memoryUsage=usage;
603 }
604
605 @Override
606 public boolean isMarshallAware() {
607 return true;
608 }
609
610 public int incrementReferenceCount() {
611 int rc;
612 int size;
613 synchronized (this) {
614 rc = ++referenceCount;
615 size = getSize();
616 }
617
618 if (rc == 1 && getMemoryUsage() != null) {
619 getMemoryUsage().increaseUsage(size);
620 //System.err.println("INCREASE USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
621
622 }
623
624 //System.out.println(" + "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
625 return rc;
626 }
627
628 public int decrementReferenceCount() {
629 int rc;
630 int size;
631 synchronized (this) {
632 rc = --referenceCount;
633 size = getSize();
634 }
635
636 if (rc == 0 && getMemoryUsage() != null) {
637 getMemoryUsage().decreaseUsage(size);
638 //Thread.dumpStack();
639 //System.err.println("DECREADED USAGE " + System.identityHashCode(getMemoryUsage()) + " PERCENT = " + getMemoryUsage().getPercentUsage());
640 }
641
642 //System.out.println(" - "+getMemoryUsage().getName()+" :::: "+getMessageId()+"rc="+rc);
643
644 return rc;
645 }
646
647 public int getSize() {
648 int minimumMessageSize = getMinimumMessageSize();
649 if (size < minimumMessageSize || size == 0) {
650 size = minimumMessageSize;
651 if (marshalledProperties != null) {
652 size += marshalledProperties.getLength();
653 }
654 if (content != null) {
655 size += content.getLength();
656 }
657 }
658 return size;
659 }
660
661 protected int getMinimumMessageSize() {
662 int result = DEFAULT_MINIMUM_MESSAGE_SIZE;
663 //let destination override
664 Destination dest = regionDestination;
665 if (dest != null) {
666 result=dest.getMinimumMessageSize();
667 }
668 return result;
669 }
670
671 /**
672 * @openwire:property version=1
673 * @return Returns the recievedByDFBridge.
674 */
675 public boolean isRecievedByDFBridge() {
676 return recievedByDFBridge;
677 }
678
679 /**
680 * @param recievedByDFBridge The recievedByDFBridge to set.
681 */
682 public void setRecievedByDFBridge(boolean recievedByDFBridge) {
683 this.recievedByDFBridge = recievedByDFBridge;
684 }
685
686 public void onMessageRolledBack() {
687 incrementRedeliveryCounter();
688 }
689
690 /**
691 * @openwire:property version=2 cache=true
692 */
693 public boolean isDroppable() {
694 return droppable;
695 }
696
697 public void setDroppable(boolean droppable) {
698 this.droppable = droppable;
699 }
700
701 /**
702 * If a message is stored in multiple nodes on a cluster, all the cluster
703 * members will be listed here. Otherwise, it will be null.
704 *
705 * @openwire:property version=3 cache=true
706 */
707 public BrokerId[] getCluster() {
708 return cluster;
709 }
710
711 public void setCluster(BrokerId[] cluster) {
712 this.cluster = cluster;
713 }
714
715 @Override
716 public boolean isMessage() {
717 return true;
718 }
719
720 /**
721 * @openwire:property version=3
722 */
723 public long getBrokerInTime() {
724 return this.brokerInTime;
725 }
726
727 public void setBrokerInTime(long brokerInTime) {
728 this.brokerInTime = brokerInTime;
729 }
730
731 /**
732 * @openwire:property version=3
733 */
734 public long getBrokerOutTime() {
735 return this.brokerOutTime;
736 }
737
738 public void setBrokerOutTime(long brokerOutTime) {
739 this.brokerOutTime = brokerOutTime;
740 }
741
742 public boolean isDropped() {
743 return false;
744 }
745
746 @Override
747 public String toString() {
748 return toString(null);
749 }
750
751 @Override
752 public String toString(Map<String, Object>overrideFields) {
753 try {
754 getProperties();
755 } catch (IOException e) {
756 }
757 return super.toString(overrideFields);
758 }
759 }