001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq;
018
019 import java.io.IOException;
020 import java.io.OutputStream;
021 import java.util.HashMap;
022 import java.util.Iterator;
023 import java.util.Map;
024
025 import javax.jms.InvalidDestinationException;
026 import javax.jms.JMSException;
027
028 import org.apache.activemq.command.ActiveMQBytesMessage;
029 import org.apache.activemq.command.ActiveMQDestination;
030 import org.apache.activemq.command.ActiveMQMessage;
031 import org.apache.activemq.command.MessageId;
032 import org.apache.activemq.command.ProducerId;
033 import org.apache.activemq.command.ProducerInfo;
034 import org.apache.activemq.util.IOExceptionSupport;
035 import org.apache.activemq.util.IntrospectionSupport;
036
037 /**
038 *
039 */
040 public class ActiveMQOutputStream extends OutputStream implements Disposable {
041
042 protected int count;
043
044 final byte buffer[];
045
046 private final ActiveMQConnection connection;
047 private final Map<String, Object> properties;
048 private final ProducerInfo info;
049
050 private long messageSequence;
051 private boolean closed;
052 private final int deliveryMode;
053 private final int priority;
054 private final long timeToLive;
055 private boolean alwaysSyncSend = false;
056
057 /**
058 * JMS Property which is used to specify the size (in kb) which is used as chunk size when splitting the stream. Default is 64kb
059 */
060 public final static String AMQ_STREAM_CHUNK_SIZE = "AMQ_STREAM_CHUNK_SIZE";
061
062 public ActiveMQOutputStream(ActiveMQConnection connection, ProducerId producerId, ActiveMQDestination destination, Map<String, Object> properties, int deliveryMode, int priority,
063 long timeToLive) throws JMSException {
064 this.connection = connection;
065 this.deliveryMode = deliveryMode;
066 this.priority = priority;
067 this.timeToLive = timeToLive;
068 this.properties = properties == null ? null : new HashMap<String, Object>(properties);
069
070 Integer chunkSize = this.properties == null ? null : (Integer) this.properties.get(AMQ_STREAM_CHUNK_SIZE);
071 if (chunkSize == null) {
072 chunkSize = 64 * 1024;
073 } else {
074 if (chunkSize < 1) {
075 throw new IllegalArgumentException("Chunk size must be greater then 0");
076 } else {
077 chunkSize *= 1024;
078 }
079 }
080
081 buffer = new byte[chunkSize];
082
083 if (destination == null) {
084 throw new InvalidDestinationException("Don't understand null destinations");
085 }
086
087 this.info = new ProducerInfo(producerId);
088
089 // Allows the options on the destination to configure the stream
090 if (destination.getOptions() != null) {
091 Map<String, String> options = new HashMap<String, String>(destination.getOptions());
092 IntrospectionSupport.setProperties(this, options, "producer.");
093 IntrospectionSupport.setProperties(this.info, options, "producer.");
094 }
095
096 this.info.setDestination(destination);
097
098 this.connection.addOutputStream(this);
099 this.connection.asyncSendPacket(info);
100 }
101
102 public void close() throws IOException {
103 if (!closed) {
104 flushBuffer();
105 try {
106 // Send an EOS style empty message to signal EOS.
107 send(new ActiveMQMessage(), true);
108 dispose();
109 this.connection.asyncSendPacket(info.createRemoveCommand());
110 } catch (JMSException e) {
111 IOExceptionSupport.create(e);
112 }
113 }
114 }
115
116 public void dispose() {
117 if (!closed) {
118 this.connection.removeOutputStream(this);
119 closed = true;
120 }
121 }
122
123 public synchronized void write(int b) throws IOException {
124 buffer[count++] = (byte) b;
125 if (count == buffer.length) {
126 flushBuffer();
127 }
128 }
129
130 public synchronized void write(byte b[], int off, int len) throws IOException {
131 while (len > 0) {
132 int max = Math.min(len, buffer.length - count);
133 System.arraycopy(b, off, buffer, count, max);
134
135 len -= max;
136 count += max;
137 off += max;
138
139 if (count == buffer.length) {
140 flushBuffer();
141 }
142 }
143 }
144
145 public synchronized void flush() throws IOException {
146 flushBuffer();
147 }
148
149 private void flushBuffer() throws IOException {
150 if (count != 0) {
151 try {
152 ActiveMQBytesMessage msg = new ActiveMQBytesMessage();
153 msg.writeBytes(buffer, 0, count);
154 send(msg, false);
155 } catch (JMSException e) {
156 throw IOExceptionSupport.create(e);
157 }
158 count = 0;
159 }
160 }
161
162 /**
163 * @param msg
164 * @throws JMSException
165 */
166 private void send(ActiveMQMessage msg, boolean eosMessage) throws JMSException {
167 if (properties != null) {
168 for (Iterator<String> iter = properties.keySet().iterator(); iter.hasNext();) {
169 String key = iter.next();
170 Object value = properties.get(key);
171 msg.setObjectProperty(key, value);
172 }
173 }
174 msg.setType("org.apache.activemq.Stream");
175 msg.setGroupID(info.getProducerId().toString());
176 if (eosMessage) {
177 msg.setGroupSequence(-1);
178 } else {
179 msg.setGroupSequence((int) messageSequence);
180 }
181 MessageId id = new MessageId(info.getProducerId(), messageSequence++);
182 connection.send(info.getDestination(), msg, id, deliveryMode, priority, timeToLive, !eosMessage && !isAlwaysSyncSend());
183 }
184
185 public String toString() {
186 return "ActiveMQOutputStream { producerId=" + info.getProducerId() + " }";
187 }
188
189 public boolean isAlwaysSyncSend() {
190 return alwaysSyncSend;
191 }
192
193 public void setAlwaysSyncSend(boolean alwaysSyncSend) {
194 this.alwaysSyncSend = alwaysSyncSend;
195 }
196
197 }