001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017
018 package org.apache.activemq.store.amq;
019
020 import java.io.DataInput;
021 import java.io.DataOutput;
022 import java.io.IOException;
023 import org.apache.activemq.broker.ConnectionContext;
024 import org.apache.activemq.command.ActiveMQDestination;
025 import org.apache.activemq.command.JournalTopicAck;
026 import org.apache.activemq.command.Message;
027 import org.apache.activemq.command.MessageAck;
028 import org.apache.activemq.kaha.impl.async.Location;
029 import org.apache.activemq.util.ByteSequence;
030 import org.apache.activemq.wireformat.WireFormat;
031
032 /**
033 */
034 public class AMQTxOperation {
035
036 public static final byte ADD_OPERATION_TYPE = 0;
037 public static final byte REMOVE_OPERATION_TYPE = 1;
038 public static final byte ACK_OPERATION_TYPE = 3;
039 private byte operationType;
040 private ActiveMQDestination destination;
041 private Object data;
042 private Location location;
043
044 public AMQTxOperation() {
045 }
046
047 public AMQTxOperation(byte operationType, ActiveMQDestination destination, Object data, Location location) {
048 this.operationType = operationType;
049 this.destination = destination;
050 this.data = data;
051 this.location = location;
052
053 }
054
055 /**
056 * @return the data
057 */
058 public Object getData() {
059 return this.data;
060 }
061
062 /**
063 * @param data the data to set
064 */
065 public void setData(Object data) {
066 this.data = data;
067 }
068
069 /**
070 * @return the location
071 */
072 public Location getLocation() {
073 return this.location;
074 }
075
076 /**
077 * @param location the location to set
078 */
079 public void setLocation(Location location) {
080 this.location = location;
081 }
082
083 /**
084 * @return the operationType
085 */
086 public byte getOperationType() {
087 return this.operationType;
088 }
089
090 /**
091 * @param operationType the operationType to set
092 */
093 public void setOperationType(byte operationType) {
094 this.operationType = operationType;
095 }
096
097 public boolean replay(AMQPersistenceAdapter adapter, ConnectionContext context) throws IOException {
098 boolean result = false;
099 AMQMessageStore store = (AMQMessageStore)adapter.createMessageStore(destination);
100 if (operationType == ADD_OPERATION_TYPE) {
101 result = store.replayAddMessage(context, (Message)data, location);
102 } else if (operationType == REMOVE_OPERATION_TYPE) {
103 result = store.replayRemoveMessage(context, (MessageAck)data);
104 } else {
105 JournalTopicAck ack = (JournalTopicAck)data;
106 result = ((AMQTopicMessageStore)store).replayAcknowledge(context, ack.getClientId(), ack
107 .getSubscritionName(), ack.getMessageId());
108 }
109 return result;
110 }
111
112 public void writeExternal(WireFormat wireFormat, DataOutput dos) throws IOException {
113 location.writeExternal(dos);
114 ByteSequence packet = wireFormat.marshal(getData());
115 dos.writeInt(packet.length);
116 dos.write(packet.data, packet.offset, packet.length);
117 packet = wireFormat.marshal(destination);
118 dos.writeInt(packet.length);
119 dos.write(packet.data, packet.offset, packet.length);
120 }
121
122 public void readExternal(WireFormat wireFormat, DataInput dis) throws IOException {
123 this.location = new Location();
124 this.location.readExternal(dis);
125 int size = dis.readInt();
126 byte[] data = new byte[size];
127 dis.readFully(data);
128 setData(wireFormat.unmarshal(new ByteSequence(data)));
129 size = dis.readInt();
130 data = new byte[size];
131 dis.readFully(data);
132 this.destination = (ActiveMQDestination)wireFormat.unmarshal(new ByteSequence(data));
133 }
134 }