001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.openwire;
018
019 import java.io.DataInput;
020 import java.io.DataOutput;
021 import java.io.IOException;
022 import java.lang.reflect.Method;
023 import java.util.HashMap;
024 import java.util.Map;
025
026 import org.apache.activemq.command.CommandTypes;
027 import org.apache.activemq.command.DataStructure;
028 import org.apache.activemq.command.WireFormatInfo;
029 import org.apache.activemq.util.ByteSequence;
030 import org.apache.activemq.util.ByteSequenceData;
031 import org.apache.activemq.util.DataByteArrayInputStream;
032 import org.apache.activemq.util.DataByteArrayOutputStream;
033 import org.apache.activemq.wireformat.WireFormat;
034
035 /**
036 *
037 *
038 */
039 public final class OpenWireFormat implements WireFormat {
040
041 public static final int DEFAULT_VERSION = CommandTypes.PROTOCOL_STORE_VERSION;
042 public static final int DEFAULT_WIRE_VERSION = CommandTypes.PROTOCOL_VERSION;
043 public static final int DEFAULT_MAX_FRAME_SIZE = 100 * 1024 * 1024; //100 MB
044
045 static final byte NULL_TYPE = CommandTypes.NULL;
046 private static final int MARSHAL_CACHE_SIZE = Short.MAX_VALUE / 2;
047 private static final int MARSHAL_CACHE_FREE_SPACE = 100;
048
049 private DataStreamMarshaller dataMarshallers[];
050 private int version;
051 private boolean stackTraceEnabled;
052 private boolean tcpNoDelayEnabled;
053 private boolean cacheEnabled;
054 private boolean tightEncodingEnabled;
055 private boolean sizePrefixDisabled;
056 private long maxFrameSize = DEFAULT_MAX_FRAME_SIZE;
057
058 // The following fields are used for value caching
059 private short nextMarshallCacheIndex;
060 private short nextMarshallCacheEvictionIndex;
061 private Map<DataStructure, Short> marshallCacheMap = new HashMap<DataStructure, Short>();
062 private DataStructure marshallCache[] = null;
063 private DataStructure unmarshallCache[] = null;
064 private DataByteArrayOutputStream bytesOut = new DataByteArrayOutputStream();
065 private DataByteArrayInputStream bytesIn = new DataByteArrayInputStream();
066 private WireFormatInfo preferedWireFormatInfo;
067
068 public OpenWireFormat() {
069 this(DEFAULT_VERSION);
070 }
071
072 public OpenWireFormat(int i) {
073 setVersion(i);
074 }
075
076 public int hashCode() {
077 return version ^ (cacheEnabled ? 0x10000000 : 0x20000000)
078 ^ (stackTraceEnabled ? 0x01000000 : 0x02000000)
079 ^ (tightEncodingEnabled ? 0x00100000 : 0x00200000)
080 ^ (sizePrefixDisabled ? 0x00010000 : 0x00020000);
081 }
082
083 public OpenWireFormat copy() {
084 OpenWireFormat answer = new OpenWireFormat(version);
085 answer.stackTraceEnabled = stackTraceEnabled;
086 answer.tcpNoDelayEnabled = tcpNoDelayEnabled;
087 answer.cacheEnabled = cacheEnabled;
088 answer.tightEncodingEnabled = tightEncodingEnabled;
089 answer.sizePrefixDisabled = sizePrefixDisabled;
090 answer.preferedWireFormatInfo = preferedWireFormatInfo;
091 return answer;
092 }
093
094 public boolean equals(Object object) {
095 if (object == null) {
096 return false;
097 }
098 OpenWireFormat o = (OpenWireFormat)object;
099 return o.stackTraceEnabled == stackTraceEnabled && o.cacheEnabled == cacheEnabled
100 && o.version == version && o.tightEncodingEnabled == tightEncodingEnabled
101 && o.sizePrefixDisabled == sizePrefixDisabled;
102 }
103
104
105 public String toString() {
106 return "OpenWireFormat{version=" + version + ", cacheEnabled=" + cacheEnabled + ", stackTraceEnabled=" + stackTraceEnabled + ", tightEncodingEnabled="
107 + tightEncodingEnabled + ", sizePrefixDisabled=" + sizePrefixDisabled + ", maxFrameSize=" + maxFrameSize + "}";
108 // return "OpenWireFormat{id="+id+",
109 // tightEncodingEnabled="+tightEncodingEnabled+"}";
110 }
111
112 public int getVersion() {
113 return version;
114 }
115
116 public synchronized ByteSequence marshal(Object command) throws IOException {
117
118 if (cacheEnabled) {
119 runMarshallCacheEvictionSweep();
120 }
121
122 // MarshallAware ma = null;
123 // // If not using value caching, then the marshaled form is always the
124 // // same
125 // if (!cacheEnabled && ((DataStructure)command).isMarshallAware()) {
126 // ma = (MarshallAware)command;
127 // }
128
129 ByteSequence sequence = null;
130 // if( ma!=null ) {
131 // sequence = ma.getCachedMarshalledForm(this);
132 // }
133
134 if (sequence == null) {
135
136 int size = 1;
137 if (command != null) {
138
139 DataStructure c = (DataStructure)command;
140 byte type = c.getDataStructureType();
141 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
142 if (dsm == null) {
143 throw new IOException("Unknown data type: " + type);
144 }
145 if (tightEncodingEnabled) {
146
147 BooleanStream bs = new BooleanStream();
148 size += dsm.tightMarshal1(this, c, bs);
149 size += bs.marshalledSize();
150
151 bytesOut.restart(size);
152 if (!sizePrefixDisabled) {
153 bytesOut.writeInt(size);
154 }
155 bytesOut.writeByte(type);
156 bs.marshal(bytesOut);
157 dsm.tightMarshal2(this, c, bytesOut, bs);
158 sequence = bytesOut.toByteSequence();
159
160 } else {
161 bytesOut.restart();
162 if (!sizePrefixDisabled) {
163 bytesOut.writeInt(0); // we don't know the final size
164 // yet but write this here for
165 // now.
166 }
167 bytesOut.writeByte(type);
168 dsm.looseMarshal(this, c, bytesOut);
169 sequence = bytesOut.toByteSequence();
170
171 if (!sizePrefixDisabled) {
172 size = sequence.getLength() - 4;
173 int pos = sequence.offset;
174 ByteSequenceData.writeIntBig(sequence, size);
175 sequence.offset = pos;
176 }
177 }
178
179 } else {
180 bytesOut.restart(5);
181 bytesOut.writeInt(size);
182 bytesOut.writeByte(NULL_TYPE);
183 sequence = bytesOut.toByteSequence();
184 }
185
186 // if( ma!=null ) {
187 // ma.setCachedMarshalledForm(this, sequence);
188 // }
189 }
190 return sequence;
191 }
192
193 public synchronized Object unmarshal(ByteSequence sequence) throws IOException {
194 bytesIn.restart(sequence);
195 // DataInputStream dis = new DataInputStream(new
196 // ByteArrayInputStream(sequence));
197
198 if (!sizePrefixDisabled) {
199 int size = bytesIn.readInt();
200 if (sequence.getLength() - 4 != size) {
201 // throw new IOException("Packet size does not match marshaled
202 // size");
203 }
204
205 if (size > maxFrameSize) {
206 throw new IOException("Frame size of " + (size / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
207 }
208 }
209
210 Object command = doUnmarshal(bytesIn);
211 // if( !cacheEnabled && ((DataStructure)command).isMarshallAware() ) {
212 // ((MarshallAware) command).setCachedMarshalledForm(this, sequence);
213 // }
214 return command;
215 }
216
217 public synchronized void marshal(Object o, DataOutput dataOut) throws IOException {
218
219 if (cacheEnabled) {
220 runMarshallCacheEvictionSweep();
221 }
222
223 int size = 1;
224 if (o != null) {
225
226 DataStructure c = (DataStructure)o;
227 byte type = c.getDataStructureType();
228 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
229 if (dsm == null) {
230 throw new IOException("Unknown data type: " + type);
231 }
232 if (tightEncodingEnabled) {
233 BooleanStream bs = new BooleanStream();
234 size += dsm.tightMarshal1(this, c, bs);
235 size += bs.marshalledSize();
236
237 if (!sizePrefixDisabled) {
238 dataOut.writeInt(size);
239 }
240
241 dataOut.writeByte(type);
242 bs.marshal(dataOut);
243 dsm.tightMarshal2(this, c, dataOut, bs);
244
245 } else {
246 DataOutput looseOut = dataOut;
247
248 if (!sizePrefixDisabled) {
249 bytesOut.restart();
250 looseOut = bytesOut;
251 }
252
253 looseOut.writeByte(type);
254 dsm.looseMarshal(this, c, looseOut);
255
256 if (!sizePrefixDisabled) {
257 ByteSequence sequence = bytesOut.toByteSequence();
258 dataOut.writeInt(sequence.getLength());
259 dataOut.write(sequence.getData(), sequence.getOffset(), sequence.getLength());
260 }
261
262 }
263
264 } else {
265 if (!sizePrefixDisabled) {
266 dataOut.writeInt(size);
267 }
268 dataOut.writeByte(NULL_TYPE);
269 }
270 }
271
272 public Object unmarshal(DataInput dis) throws IOException {
273 DataInput dataIn = dis;
274 if (!sizePrefixDisabled) {
275 int size = dis.readInt();
276 if (size > maxFrameSize) {
277 throw new IOException("Frame size of " + (size / (1024 * 1024)) + " MB larger than max allowed " + (maxFrameSize / (1024 * 1024)) + " MB");
278 }
279 // int size = dis.readInt();
280 // byte[] data = new byte[size];
281 // dis.readFully(data);
282 // bytesIn.restart(data);
283 // dataIn = bytesIn;
284 }
285 return doUnmarshal(dataIn);
286 }
287
288 /**
289 * Used by NIO or AIO transports
290 */
291 public int tightMarshal1(Object o, BooleanStream bs) throws IOException {
292 int size = 1;
293 if (o != null) {
294 DataStructure c = (DataStructure)o;
295 byte type = c.getDataStructureType();
296 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
297 if (dsm == null) {
298 throw new IOException("Unknown data type: " + type);
299 }
300
301 size += dsm.tightMarshal1(this, c, bs);
302 size += bs.marshalledSize();
303 }
304 return size;
305 }
306
307 /**
308 * Used by NIO or AIO transports; note that the size is not written as part
309 * of this method.
310 */
311 public void tightMarshal2(Object o, DataOutput ds, BooleanStream bs) throws IOException {
312 if (cacheEnabled) {
313 runMarshallCacheEvictionSweep();
314 }
315
316 if (o != null) {
317 DataStructure c = (DataStructure)o;
318 byte type = c.getDataStructureType();
319 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
320 if (dsm == null) {
321 throw new IOException("Unknown data type: " + type);
322 }
323 ds.writeByte(type);
324 bs.marshal(ds);
325 dsm.tightMarshal2(this, c, ds, bs);
326 }
327 }
328
329 /**
330 * Allows you to dynamically switch the version of the openwire protocol
331 * being used.
332 *
333 * @param version
334 */
335 public void setVersion(int version) {
336 String mfName = "org.apache.activemq.openwire.v" + version + ".MarshallerFactory";
337 Class mfClass;
338 try {
339 mfClass = Class.forName(mfName, false, getClass().getClassLoader());
340 } catch (ClassNotFoundException e) {
341 throw (IllegalArgumentException)new IllegalArgumentException("Invalid version: " + version
342 + ", could not load " + mfName)
343 .initCause(e);
344 }
345 try {
346 Method method = mfClass.getMethod("createMarshallerMap", new Class[] {OpenWireFormat.class});
347 dataMarshallers = (DataStreamMarshaller[])method.invoke(null, new Object[] {this});
348 } catch (Throwable e) {
349 throw (IllegalArgumentException)new IllegalArgumentException(
350 "Invalid version: "
351 + version
352 + ", "
353 + mfName
354 + " does not properly implement the createMarshallerMap method.")
355 .initCause(e);
356 }
357 this.version = version;
358 }
359
360 public Object doUnmarshal(DataInput dis) throws IOException {
361 byte dataType = dis.readByte();
362 if (dataType != NULL_TYPE) {
363 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
364 if (dsm == null) {
365 throw new IOException("Unknown data type: " + dataType);
366 }
367 Object data = dsm.createObject();
368 if (this.tightEncodingEnabled) {
369 BooleanStream bs = new BooleanStream();
370 bs.unmarshal(dis);
371 dsm.tightUnmarshal(this, data, dis, bs);
372 } else {
373 dsm.looseUnmarshal(this, data, dis);
374 }
375 return data;
376 } else {
377 return null;
378 }
379 }
380
381 // public void debug(String msg) {
382 // String t = (Thread.currentThread().getName()+" ").substring(0, 40);
383 // System.out.println(t+": "+msg);
384 // }
385 public int tightMarshalNestedObject1(DataStructure o, BooleanStream bs) throws IOException {
386 bs.writeBoolean(o != null);
387 if (o == null) {
388 return 0;
389 }
390
391 if (o.isMarshallAware()) {
392 // MarshallAware ma = (MarshallAware)o;
393 ByteSequence sequence = null;
394 // sequence=ma.getCachedMarshalledForm(this);
395 bs.writeBoolean(sequence != null);
396 if (sequence != null) {
397 return 1 + sequence.getLength();
398 }
399 }
400
401 byte type = o.getDataStructureType();
402 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
403 if (dsm == null) {
404 throw new IOException("Unknown data type: " + type);
405 }
406 return 1 + dsm.tightMarshal1(this, o, bs);
407 }
408
409 public void tightMarshalNestedObject2(DataStructure o, DataOutput ds, BooleanStream bs)
410 throws IOException {
411 if (!bs.readBoolean()) {
412 return;
413 }
414
415 byte type = o.getDataStructureType();
416 ds.writeByte(type);
417
418 if (o.isMarshallAware() && bs.readBoolean()) {
419
420 // We should not be doing any caching
421 throw new IOException("Corrupted stream");
422 // MarshallAware ma = (MarshallAware) o;
423 // ByteSequence sequence=ma.getCachedMarshalledForm(this);
424 // ds.write(sequence.getData(), sequence.getOffset(),
425 // sequence.getLength());
426
427 } else {
428
429 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
430 if (dsm == null) {
431 throw new IOException("Unknown data type: " + type);
432 }
433 dsm.tightMarshal2(this, o, ds, bs);
434
435 }
436 }
437
438 public DataStructure tightUnmarshalNestedObject(DataInput dis, BooleanStream bs) throws IOException {
439 if (bs.readBoolean()) {
440
441 byte dataType = dis.readByte();
442 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
443 if (dsm == null) {
444 throw new IOException("Unknown data type: " + dataType);
445 }
446 DataStructure data = dsm.createObject();
447
448 if (data.isMarshallAware() && bs.readBoolean()) {
449
450 dis.readInt();
451 dis.readByte();
452
453 BooleanStream bs2 = new BooleanStream();
454 bs2.unmarshal(dis);
455 dsm.tightUnmarshal(this, data, dis, bs2);
456
457 // TODO: extract the sequence from the dis and associate it.
458 // MarshallAware ma = (MarshallAware)data
459 // ma.setCachedMarshalledForm(this, sequence);
460
461 } else {
462 dsm.tightUnmarshal(this, data, dis, bs);
463 }
464
465 return data;
466 } else {
467 return null;
468 }
469 }
470
471 public DataStructure looseUnmarshalNestedObject(DataInput dis) throws IOException {
472 if (dis.readBoolean()) {
473
474 byte dataType = dis.readByte();
475 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[dataType & 0xFF];
476 if (dsm == null) {
477 throw new IOException("Unknown data type: " + dataType);
478 }
479 DataStructure data = dsm.createObject();
480 dsm.looseUnmarshal(this, data, dis);
481 return data;
482
483 } else {
484 return null;
485 }
486 }
487
488 public void looseMarshalNestedObject(DataStructure o, DataOutput dataOut) throws IOException {
489 dataOut.writeBoolean(o != null);
490 if (o != null) {
491 byte type = o.getDataStructureType();
492 dataOut.writeByte(type);
493 DataStreamMarshaller dsm = (DataStreamMarshaller)dataMarshallers[type & 0xFF];
494 if (dsm == null) {
495 throw new IOException("Unknown data type: " + type);
496 }
497 dsm.looseMarshal(this, o, dataOut);
498 }
499 }
500
501 public void runMarshallCacheEvictionSweep() {
502 // Do we need to start evicting??
503 while (marshallCacheMap.size() > marshallCache.length - MARSHAL_CACHE_FREE_SPACE) {
504
505 marshallCacheMap.remove(marshallCache[nextMarshallCacheEvictionIndex]);
506 marshallCache[nextMarshallCacheEvictionIndex] = null;
507
508 nextMarshallCacheEvictionIndex++;
509 if (nextMarshallCacheEvictionIndex >= marshallCache.length) {
510 nextMarshallCacheEvictionIndex = 0;
511 }
512
513 }
514 }
515
516 public Short getMarshallCacheIndex(DataStructure o) {
517 return marshallCacheMap.get(o);
518 }
519
520 public Short addToMarshallCache(DataStructure o) {
521 short i = nextMarshallCacheIndex++;
522 if (nextMarshallCacheIndex >= marshallCache.length) {
523 nextMarshallCacheIndex = 0;
524 }
525
526 // We can only cache that item if there is space left.
527 if (marshallCacheMap.size() < marshallCache.length) {
528 marshallCache[i] = o;
529 Short index = new Short(i);
530 marshallCacheMap.put(o, index);
531 return index;
532 } else {
533 // Use -1 to indicate that the value was not cached due to cache
534 // being full.
535 return new Short((short)-1);
536 }
537 }
538
539 public void setInUnmarshallCache(short index, DataStructure o) {
540
541 // There was no space left in the cache, so we can't
542 // put this in the cache.
543 if (index == -1) {
544 return;
545 }
546
547 unmarshallCache[index] = o;
548 }
549
550 public DataStructure getFromUnmarshallCache(short index) {
551 return unmarshallCache[index];
552 }
553
554 public void setStackTraceEnabled(boolean b) {
555 stackTraceEnabled = b;
556 }
557
558 public boolean isStackTraceEnabled() {
559 return stackTraceEnabled;
560 }
561
562 public boolean isTcpNoDelayEnabled() {
563 return tcpNoDelayEnabled;
564 }
565
566 public void setTcpNoDelayEnabled(boolean tcpNoDelayEnabled) {
567 this.tcpNoDelayEnabled = tcpNoDelayEnabled;
568 }
569
570 public boolean isCacheEnabled() {
571 return cacheEnabled;
572 }
573
574 public void setCacheEnabled(boolean cacheEnabled) {
575 if(cacheEnabled){
576 marshallCache = new DataStructure[MARSHAL_CACHE_SIZE];
577 unmarshallCache = new DataStructure[MARSHAL_CACHE_SIZE];
578 }
579 this.cacheEnabled = cacheEnabled;
580 }
581
582 public boolean isTightEncodingEnabled() {
583 return tightEncodingEnabled;
584 }
585
586 public void setTightEncodingEnabled(boolean tightEncodingEnabled) {
587 this.tightEncodingEnabled = tightEncodingEnabled;
588 }
589
590 public boolean isSizePrefixDisabled() {
591 return sizePrefixDisabled;
592 }
593
594 public void setSizePrefixDisabled(boolean prefixPacketSize) {
595 this.sizePrefixDisabled = prefixPacketSize;
596 }
597
598 public void setPreferedWireFormatInfo(WireFormatInfo info) {
599 this.preferedWireFormatInfo = info;
600 }
601
602 public WireFormatInfo getPreferedWireFormatInfo() {
603 return preferedWireFormatInfo;
604 }
605
606 public long getMaxFrameSize() {
607 return maxFrameSize;
608 }
609
610 public void setMaxFrameSize(long maxFrameSize) {
611 this.maxFrameSize = maxFrameSize;
612 }
613
614 public void renegotiateWireFormat(WireFormatInfo info) throws IOException {
615
616 if (preferedWireFormatInfo == null) {
617 throw new IllegalStateException("Wireformat cannot not be renegotiated.");
618 }
619
620 this.setVersion(min(preferedWireFormatInfo.getVersion(), info.getVersion()));
621 info.setVersion(this.getVersion());
622
623 this.setMaxFrameSize(min(preferedWireFormatInfo.getMaxFrameSize(), info.getMaxFrameSize()));
624 info.setMaxFrameSize(this.getMaxFrameSize());
625
626 this.stackTraceEnabled = info.isStackTraceEnabled() && preferedWireFormatInfo.isStackTraceEnabled();
627 info.setStackTraceEnabled(this.stackTraceEnabled);
628
629 this.tcpNoDelayEnabled = info.isTcpNoDelayEnabled() && preferedWireFormatInfo.isTcpNoDelayEnabled();
630 info.setTcpNoDelayEnabled(this.tcpNoDelayEnabled);
631
632 this.cacheEnabled = info.isCacheEnabled() && preferedWireFormatInfo.isCacheEnabled();
633 info.setCacheEnabled(this.cacheEnabled);
634
635 this.tightEncodingEnabled = info.isTightEncodingEnabled()
636 && preferedWireFormatInfo.isTightEncodingEnabled();
637 info.setTightEncodingEnabled(this.tightEncodingEnabled);
638
639 this.sizePrefixDisabled = info.isSizePrefixDisabled()
640 && preferedWireFormatInfo.isSizePrefixDisabled();
641 info.setSizePrefixDisabled(this.sizePrefixDisabled);
642
643 if (cacheEnabled) {
644
645 int size = Math.min(preferedWireFormatInfo.getCacheSize(), info.getCacheSize());
646 info.setCacheSize(size);
647
648 if (size == 0) {
649 size = MARSHAL_CACHE_SIZE;
650 }
651
652 marshallCache = new DataStructure[size];
653 unmarshallCache = new DataStructure[size];
654 nextMarshallCacheIndex = 0;
655 nextMarshallCacheEvictionIndex = 0;
656 marshallCacheMap = new HashMap<DataStructure, Short>();
657 } else {
658 marshallCache = null;
659 unmarshallCache = null;
660 nextMarshallCacheIndex = 0;
661 nextMarshallCacheEvictionIndex = 0;
662 marshallCacheMap = null;
663 }
664
665 }
666
667 protected int min(int version1, int version2) {
668 if (version1 < version2 && version1 > 0 || version2 <= 0) {
669 return version1;
670 }
671 return version2;
672 }
673
674 protected long min(long version1, long version2) {
675 if (version1 < version2 && version1 > 0 || version2 <= 0) {
676 return version1;
677 }
678 return version2;
679 }
680 }