001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport.udp;
018
019
020 import java.io.DataInputStream;
021 import java.io.DataOutputStream;
022 import java.net.DatagramPacket;
023 import java.net.SocketAddress;
024 import java.nio.ByteBuffer;
025 import java.util.HashMap;
026 import java.util.Map;
027
028 import org.apache.activemq.command.Command;
029 import org.apache.activemq.command.Endpoint;
030
031 /**
032 *
033 *
034 */
035 public class DatagramHeaderMarshaller {
036
037 // TODO for large dynamic networks
038 // we may want to evict endpoints that disconnect
039 // from a transport - e.g. for multicast
040 private Map<SocketAddress, Endpoint> endpoints = new HashMap<SocketAddress, Endpoint>();
041
042 /**
043 * Reads any header if applicable and then creates an endpoint object
044 */
045 public Endpoint createEndpoint(ByteBuffer readBuffer, SocketAddress address) {
046 return getEndpoint(address);
047 }
048
049 public Endpoint createEndpoint(DatagramPacket datagram, DataInputStream dataIn) {
050 return getEndpoint(datagram.getSocketAddress());
051 }
052
053 public void writeHeader(Command command, ByteBuffer writeBuffer) {
054 /*
055 writeBuffer.putLong(command.getCounter());
056 writeBuffer.putInt(command.getDataSize());
057 byte flags = command.getFlags();
058 //System.out.println("Writing header with counter: " + header.getCounter() + " size: " + header.getDataSize() + " with flags: " + flags);
059 writeBuffer.put(flags);
060 */
061 }
062
063 public void writeHeader(Command command, DataOutputStream dataOut) {
064 }
065
066 /**
067 * Gets the current endpoint object for this address or creates one if not available.
068 *
069 * Note that this method does not need to be synchronized as its only ever going to be
070 * used by the already-synchronized read() method of a CommandChannel
071 *
072 */
073 protected Endpoint getEndpoint(SocketAddress address) {
074 Endpoint endpoint = endpoints.get(address);
075 if (endpoint == null) {
076 endpoint = createEndpoint(address);
077 endpoints.put(address, endpoint);
078 }
079 return endpoint;
080 }
081
082 protected Endpoint createEndpoint(SocketAddress address) {
083 return new DatagramEndpoint(address.toString(), address);
084 }
085 }