001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.transport;
018
019 import java.io.ByteArrayOutputStream;
020 import java.io.DataInputStream;
021 import java.io.IOException;
022
023 import org.apache.activemq.command.Command;
024 import org.apache.activemq.command.LastPartialCommand;
025 import org.apache.activemq.command.PartialCommand;
026 import org.apache.activemq.openwire.OpenWireFormat;
027 import org.apache.activemq.util.ByteArrayInputStream;
028 import org.slf4j.Logger;
029 import org.slf4j.LoggerFactory;
030
031 /**
032 * Joins together of partial commands which were split into individual chunks of
033 * data.
034 *
035 *
036 */
037 public class CommandJoiner extends TransportFilter {
038 private static final Logger LOG = LoggerFactory.getLogger(CommandJoiner.class);
039
040 private ByteArrayOutputStream out = new ByteArrayOutputStream();
041 private OpenWireFormat wireFormat;
042
043 public CommandJoiner(Transport next, OpenWireFormat wireFormat) {
044 super(next);
045 this.wireFormat = wireFormat;
046 }
047
048 public void onCommand(Object o) {
049 Command command = (Command)o;
050 byte type = command.getDataStructureType();
051 if (type == PartialCommand.DATA_STRUCTURE_TYPE || type == LastPartialCommand.DATA_STRUCTURE_TYPE) {
052 PartialCommand header = (PartialCommand)command;
053 byte[] partialData = header.getData();
054 try {
055 out.write(partialData);
056 } catch (IOException e) {
057 getTransportListener().onException(e);
058 }
059 if (type == LastPartialCommand.DATA_STRUCTURE_TYPE) {
060 try {
061 byte[] fullData = out.toByteArray();
062 out.reset();
063 DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(fullData));
064 Command completeCommand = (Command)wireFormat.unmarshal(dataIn);
065
066 LastPartialCommand lastCommand = (LastPartialCommand)command;
067 lastCommand.configure(completeCommand);
068
069 getTransportListener().onCommand(completeCommand);
070 } catch (IOException e) {
071 LOG.warn("Failed to unmarshal partial command: " + command);
072 getTransportListener().onException(e);
073 }
074 }
075 } else {
076 getTransportListener().onCommand(command);
077 }
078 }
079
080 public void stop() throws Exception {
081 super.stop();
082 out = null;
083 }
084
085 public String toString() {
086 return next.toString();
087 }
088 }