001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.udp;
018
019 import java.io.IOException;
020 import java.net.SocketAddress;
021
022 import org.apache.activemq.command.Command;
023 import org.apache.activemq.openwire.OpenWireFormat;
024 import org.apache.activemq.transport.reliable.ReplayBuffer;
025 import org.apache.activemq.util.IntSequenceGenerator;
026
027 /**
028 *
029 *
030 */
031 public abstract class CommandChannelSupport implements CommandChannel {
032
033 protected OpenWireFormat wireFormat;
034 protected int datagramSize = 4 * 1024;
035 protected SocketAddress targetAddress;
036 protected SocketAddress replayAddress;
037 protected final String name;
038 protected final IntSequenceGenerator sequenceGenerator;
039 protected DatagramHeaderMarshaller headerMarshaller;
040 private ReplayBuffer replayBuffer;
041
042 public CommandChannelSupport(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress,
043 DatagramHeaderMarshaller headerMarshaller) {
044 this.wireFormat = wireFormat;
045 this.datagramSize = datagramSize;
046 this.targetAddress = targetAddress;
047 this.headerMarshaller = headerMarshaller;
048 this.name = transport.toString();
049 this.sequenceGenerator = transport.getSequenceGenerator();
050 this.replayAddress = targetAddress;
051 if (sequenceGenerator == null) {
052 throw new IllegalArgumentException("No sequenceGenerator on the given transport: " + transport);
053 }
054 }
055
056 public void write(Command command) throws IOException {
057 write(command, targetAddress);
058 }
059
060
061 // Properties
062 // -------------------------------------------------------------------------
063
064 public int getDatagramSize() {
065 return datagramSize;
066 }
067
068 /**
069 * Sets the default size of a datagram on the network.
070 */
071 public void setDatagramSize(int datagramSize) {
072 this.datagramSize = datagramSize;
073 }
074
075 public SocketAddress getTargetAddress() {
076 return targetAddress;
077 }
078
079 public void setTargetAddress(SocketAddress targetAddress) {
080 this.targetAddress = targetAddress;
081 }
082
083 public SocketAddress getReplayAddress() {
084 return replayAddress;
085 }
086
087 public void setReplayAddress(SocketAddress replayAddress) {
088 this.replayAddress = replayAddress;
089 }
090
091 public String toString() {
092 return "CommandChannel#" + name;
093 }
094
095 public DatagramHeaderMarshaller getHeaderMarshaller() {
096 return headerMarshaller;
097 }
098
099 public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) {
100 this.headerMarshaller = headerMarshaller;
101 }
102
103 public ReplayBuffer getReplayBuffer() {
104 return replayBuffer;
105 }
106
107 public void setReplayBuffer(ReplayBuffer replayBuffer) {
108 this.replayBuffer = replayBuffer;
109 }
110
111 }