001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.udp;
018
019 import java.io.DataInputStream;
020 import java.io.DataOutputStream;
021 import java.io.IOException;
022 import java.net.DatagramPacket;
023 import java.net.DatagramSocket;
024 import java.net.SocketAddress;
025
026 import org.apache.activemq.command.Command;
027 import org.apache.activemq.command.Endpoint;
028 import org.apache.activemq.command.LastPartialCommand;
029 import org.apache.activemq.command.PartialCommand;
030 import org.apache.activemq.openwire.BooleanStream;
031 import org.apache.activemq.openwire.OpenWireFormat;
032 import org.apache.activemq.transport.reliable.ReplayBuffer;
033 import org.apache.activemq.util.ByteArrayInputStream;
034 import org.apache.activemq.util.ByteArrayOutputStream;
035 import org.slf4j.Logger;
036 import org.slf4j.LoggerFactory;
037
038 /**
039 * A strategy for reading datagrams and de-fragmenting them together.
040 *
041 *
042 */
043 public class CommandDatagramSocket extends CommandChannelSupport {
044
045 private static final Logger LOG = LoggerFactory.getLogger(CommandDatagramSocket.class);
046
047 private DatagramSocket channel;
048 private Object readLock = new Object();
049 private Object writeLock = new Object();
050
051 private volatile int receiveCounter;
052
053 public CommandDatagramSocket(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller,
054 DatagramSocket channel) {
055 super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller);
056 this.channel = channel;
057 }
058
059 public void start() throws Exception {
060 }
061
062 public void stop() throws Exception {
063 }
064
065 public Command read() throws IOException {
066 Command answer = null;
067 Endpoint from = null;
068 synchronized (readLock) {
069 while (true) {
070 DatagramPacket datagram = createDatagramPacket();
071 channel.receive(datagram);
072
073 // TODO could use a DataInput implementation that talks direct
074 // to the byte[] to avoid object allocation
075 receiveCounter++;
076 DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(datagram.getData(), 0, datagram.getLength()));
077
078 from = headerMarshaller.createEndpoint(datagram, dataIn);
079 answer = (Command)wireFormat.unmarshal(dataIn);
080 break;
081 }
082 }
083 if (answer != null) {
084 answer.setFrom(from);
085
086 if (LOG.isDebugEnabled()) {
087 LOG.debug("Channel: " + name + " about to process: " + answer);
088 }
089 }
090 return answer;
091 }
092
093 public void write(Command command, SocketAddress address) throws IOException {
094 synchronized (writeLock) {
095
096 ByteArrayOutputStream writeBuffer = createByteArrayOutputStream();
097 DataOutputStream dataOut = new DataOutputStream(writeBuffer);
098 headerMarshaller.writeHeader(command, dataOut);
099
100 int offset = writeBuffer.size();
101
102 wireFormat.marshal(command, dataOut);
103
104 if (remaining(writeBuffer) >= 0) {
105 sendWriteBuffer(address, writeBuffer, command.getCommandId());
106 } else {
107 // lets split the command up into chunks
108 byte[] data = writeBuffer.toByteArray();
109 boolean lastFragment = false;
110 int length = data.length;
111 for (int fragment = 0; !lastFragment; fragment++) {
112 writeBuffer = createByteArrayOutputStream();
113 headerMarshaller.writeHeader(command, dataOut);
114
115 int chunkSize = remaining(writeBuffer);
116
117 // we need to remove the amount of overhead to write the
118 // partial command
119
120 // lets write the flags in there
121 BooleanStream bs = null;
122 if (wireFormat.isTightEncodingEnabled()) {
123 bs = new BooleanStream();
124 bs.writeBoolean(true); // the partial data byte[] is
125 // never null
126 }
127
128 // lets remove the header of the partial command
129 // which is the byte for the type and an int for the size of
130 // the byte[]
131
132 // data type + the command ID + size of the partial data
133 chunkSize -= 1 + 4 + 4;
134
135 // the boolean flags
136 if (bs != null) {
137 chunkSize -= bs.marshalledSize();
138 } else {
139 chunkSize -= 1;
140 }
141
142 if (!wireFormat.isSizePrefixDisabled()) {
143 // lets write the size of the command buffer
144 dataOut.writeInt(chunkSize);
145 chunkSize -= 4;
146 }
147
148 lastFragment = offset + chunkSize >= length;
149 if (chunkSize + offset > length) {
150 chunkSize = length - offset;
151 }
152
153 if (lastFragment) {
154 dataOut.write(LastPartialCommand.DATA_STRUCTURE_TYPE);
155 } else {
156 dataOut.write(PartialCommand.DATA_STRUCTURE_TYPE);
157 }
158
159 if (bs != null) {
160 bs.marshal(dataOut);
161 }
162
163 int commandId = command.getCommandId();
164 if (fragment > 0) {
165 commandId = sequenceGenerator.getNextSequenceId();
166 }
167 dataOut.writeInt(commandId);
168 if (bs == null) {
169 dataOut.write((byte)1);
170 }
171
172 // size of byte array
173 dataOut.writeInt(chunkSize);
174
175 // now the data
176 dataOut.write(data, offset, chunkSize);
177
178 offset += chunkSize;
179 sendWriteBuffer(address, writeBuffer, commandId);
180 }
181 }
182 }
183 }
184
185 public int getDatagramSize() {
186 return datagramSize;
187 }
188
189 public void setDatagramSize(int datagramSize) {
190 this.datagramSize = datagramSize;
191 }
192
193 // Implementation methods
194 // -------------------------------------------------------------------------
195 protected void sendWriteBuffer(SocketAddress address, ByteArrayOutputStream writeBuffer, int commandId) throws IOException {
196 byte[] data = writeBuffer.toByteArray();
197 sendWriteBuffer(commandId, address, data, false);
198 }
199
200 protected void sendWriteBuffer(int commandId, SocketAddress address, byte[] data, boolean redelivery) throws IOException {
201 // lets put the datagram into the replay buffer first to prevent timing
202 // issues
203 ReplayBuffer bufferCache = getReplayBuffer();
204 if (bufferCache != null && !redelivery) {
205 bufferCache.addBuffer(commandId, data);
206 }
207
208 if (LOG.isDebugEnabled()) {
209 String text = redelivery ? "REDELIVERING" : "sending";
210 LOG.debug("Channel: " + name + " " + text + " datagram: " + commandId + " to: " + address);
211 }
212 DatagramPacket packet = new DatagramPacket(data, 0, data.length, address);
213 channel.send(packet);
214 }
215
216 public void sendBuffer(int commandId, Object buffer) throws IOException {
217 if (buffer != null) {
218 byte[] data = (byte[])buffer;
219 sendWriteBuffer(commandId, replayAddress, data, true);
220 } else {
221 if (LOG.isWarnEnabled()) {
222 LOG.warn("Request for buffer: " + commandId + " is no longer present");
223 }
224 }
225 }
226
227 protected DatagramPacket createDatagramPacket() {
228 return new DatagramPacket(new byte[datagramSize], datagramSize);
229 }
230
231 protected int remaining(ByteArrayOutputStream buffer) {
232 return datagramSize - buffer.size();
233 }
234
235 protected ByteArrayOutputStream createByteArrayOutputStream() {
236 return new ByteArrayOutputStream(datagramSize);
237 }
238
239 public int getReceiveCounter() {
240 return receiveCounter;
241 }
242 }