001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.udp;
018
019 import java.io.DataInputStream;
020 import java.io.DataOutputStream;
021 import java.io.IOException;
022 import java.net.SocketAddress;
023 import java.nio.ByteBuffer;
024 import java.nio.channels.DatagramChannel;
025
026 import org.apache.activemq.command.Command;
027 import org.apache.activemq.command.Endpoint;
028 import org.apache.activemq.command.LastPartialCommand;
029 import org.apache.activemq.command.PartialCommand;
030 import org.apache.activemq.openwire.BooleanStream;
031 import org.apache.activemq.openwire.OpenWireFormat;
032 import org.apache.activemq.transport.reliable.ReplayBuffer;
033 import org.apache.activemq.util.ByteArrayInputStream;
034 import org.apache.activemq.util.ByteArrayOutputStream;
035 import org.slf4j.Logger;
036 import org.slf4j.LoggerFactory;
037
038 /**
039 * A strategy for reading datagrams and de-fragmenting them together.
040 *
041 *
042 */
043 public class CommandDatagramChannel extends CommandChannelSupport {
044
045 private static final Logger LOG = LoggerFactory.getLogger(CommandDatagramChannel.class);
046
047 private DatagramChannel channel;
048 private ByteBufferPool bufferPool;
049
050 // reading
051 private Object readLock = new Object();
052 private ByteBuffer readBuffer;
053
054 // writing
055 private Object writeLock = new Object();
056 private int defaultMarshalBufferSize = 64 * 1024;
057 private volatile int receiveCounter;
058
059 public CommandDatagramChannel(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller,
060 DatagramChannel channel, ByteBufferPool bufferPool) {
061 super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller);
062 this.channel = channel;
063 this.bufferPool = bufferPool;
064 }
065
066 public void start() throws Exception {
067 bufferPool.setDefaultSize(datagramSize);
068 bufferPool.start();
069 readBuffer = bufferPool.borrowBuffer();
070 }
071
072 public void stop() throws Exception {
073 bufferPool.stop();
074 }
075
076 public Command read() throws IOException {
077 Command answer = null;
078 Endpoint from = null;
079 synchronized (readLock) {
080 while (true) {
081 readBuffer.clear();
082 SocketAddress address = channel.receive(readBuffer);
083
084 readBuffer.flip();
085
086 if (readBuffer.limit() == 0) {
087 continue;
088 }
089
090 receiveCounter++;
091 from = headerMarshaller.createEndpoint(readBuffer, address);
092
093 int remaining = readBuffer.remaining();
094 byte[] data = new byte[remaining];
095 readBuffer.get(data);
096
097 // TODO could use a DataInput implementation that talks direct
098 // to
099 // the ByteBuffer to avoid object allocation and unnecessary
100 // buffering?
101 DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data));
102 answer = (Command)wireFormat.unmarshal(dataIn);
103 break;
104 }
105 }
106 if (answer != null) {
107 answer.setFrom(from);
108
109 if (LOG.isDebugEnabled()) {
110 LOG.debug("Channel: " + name + " received from: " + from + " about to process: " + answer);
111 }
112 }
113 return answer;
114 }
115
116 public void write(Command command, SocketAddress address) throws IOException {
117 synchronized (writeLock) {
118
119 ByteArrayOutputStream largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize);
120 wireFormat.marshal(command, new DataOutputStream(largeBuffer));
121 byte[] data = largeBuffer.toByteArray();
122 int size = data.length;
123
124 ByteBuffer writeBuffer = bufferPool.borrowBuffer();
125 writeBuffer.clear();
126 headerMarshaller.writeHeader(command, writeBuffer);
127
128 if (size > writeBuffer.remaining()) {
129 // lets split the command up into chunks
130 int offset = 0;
131 boolean lastFragment = false;
132 int length = data.length;
133 for (int fragment = 0; !lastFragment; fragment++) {
134 // write the header
135 if (fragment > 0) {
136 writeBuffer = bufferPool.borrowBuffer();
137 writeBuffer.clear();
138 headerMarshaller.writeHeader(command, writeBuffer);
139 }
140
141 int chunkSize = writeBuffer.remaining();
142
143 // we need to remove the amount of overhead to write the
144 // partial command
145
146 // lets write the flags in there
147 BooleanStream bs = null;
148 if (wireFormat.isTightEncodingEnabled()) {
149 bs = new BooleanStream();
150 bs.writeBoolean(true); // the partial data byte[] is
151 // never null
152 }
153
154 // lets remove the header of the partial command
155 // which is the byte for the type and an int for the size of
156 // the byte[]
157
158 // data type + the command ID + size of the partial data
159 chunkSize -= 1 + 4 + 4;
160
161 // the boolean flags
162 if (bs != null) {
163 chunkSize -= bs.marshalledSize();
164 } else {
165 chunkSize -= 1;
166 }
167
168 if (!wireFormat.isSizePrefixDisabled()) {
169 // lets write the size of the command buffer
170 writeBuffer.putInt(chunkSize);
171 chunkSize -= 4;
172 }
173
174 lastFragment = offset + chunkSize >= length;
175 if (chunkSize + offset > length) {
176 chunkSize = length - offset;
177 }
178
179 if (lastFragment) {
180 writeBuffer.put(LastPartialCommand.DATA_STRUCTURE_TYPE);
181 } else {
182 writeBuffer.put(PartialCommand.DATA_STRUCTURE_TYPE);
183 }
184
185 if (bs != null) {
186 bs.marshal(writeBuffer);
187 }
188
189 int commandId = command.getCommandId();
190 if (fragment > 0) {
191 commandId = sequenceGenerator.getNextSequenceId();
192 }
193 writeBuffer.putInt(commandId);
194 if (bs == null) {
195 writeBuffer.put((byte)1);
196 }
197
198 // size of byte array
199 writeBuffer.putInt(chunkSize);
200
201 // now the data
202 writeBuffer.put(data, offset, chunkSize);
203
204 offset += chunkSize;
205 sendWriteBuffer(commandId, address, writeBuffer, false);
206 }
207 } else {
208 writeBuffer.put(data);
209 sendWriteBuffer(command.getCommandId(), address, writeBuffer, false);
210 }
211 }
212 }
213
214 // Properties
215 // -------------------------------------------------------------------------
216
217 public ByteBufferPool getBufferPool() {
218 return bufferPool;
219 }
220
221 /**
222 * Sets the implementation of the byte buffer pool to use
223 */
224 public void setBufferPool(ByteBufferPool bufferPool) {
225 this.bufferPool = bufferPool;
226 }
227
228 // Implementation methods
229 // -------------------------------------------------------------------------
230 protected void sendWriteBuffer(int commandId, SocketAddress address, ByteBuffer writeBuffer, boolean redelivery) throws IOException {
231 // lets put the datagram into the replay buffer first to prevent timing
232 // issues
233 ReplayBuffer bufferCache = getReplayBuffer();
234 if (bufferCache != null && !redelivery) {
235 bufferCache.addBuffer(commandId, writeBuffer);
236 }
237
238 writeBuffer.flip();
239
240 if (LOG.isDebugEnabled()) {
241 String text = redelivery ? "REDELIVERING" : "sending";
242 LOG.debug("Channel: " + name + " " + text + " datagram: " + commandId + " to: " + address);
243 }
244 channel.send(writeBuffer, address);
245 }
246
247 public void sendBuffer(int commandId, Object buffer) throws IOException {
248 if (buffer != null) {
249 ByteBuffer writeBuffer = (ByteBuffer)buffer;
250 sendWriteBuffer(commandId, getReplayAddress(), writeBuffer, true);
251 } else {
252 if (LOG.isWarnEnabled()) {
253 LOG.warn("Request for buffer: " + commandId + " is no longer present");
254 }
255 }
256 }
257
258 public int getReceiveCounter() {
259 return receiveCounter;
260 }
261
262 }