001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.kaha.impl.data;
018
019 import java.io.IOException;
020 import java.io.RandomAccessFile;
021
022 import org.apache.activemq.kaha.Marshaller;
023 import org.apache.activemq.util.DataByteArrayOutputStream;
024
025 /**
026 * Optimized Store writer. Synchronously marshalls and writes to the data file.
027 * Simple but may introduce a bit of contention when put under load.
028 *
029 *
030 */
031 public final class SyncDataFileWriter {
032
033 private DataByteArrayOutputStream buffer;
034 private DataManagerImpl dataManager;
035
036 /**
037 * Construct a Store writer
038 *
039 * @param fileId
040 */
041 SyncDataFileWriter(DataManagerImpl fileManager) {
042 this.dataManager = fileManager;
043 this.buffer = new DataByteArrayOutputStream();
044 }
045
046 /*
047 * (non-Javadoc)
048 *
049 * @see org.apache.activemq.kaha.impl.data.DataFileWriter#storeItem(org.apache.activemq.kaha.Marshaller,
050 * java.lang.Object, byte)
051 */
052 public synchronized DataItem storeItem(Marshaller marshaller, Object payload, byte type)
053 throws IOException {
054
055 // Write the packet our internal buffer.
056 buffer.reset();
057 buffer.position(DataManagerImpl.ITEM_HEAD_SIZE);
058 marshaller.writePayload(payload, buffer);
059 int size = buffer.size();
060 int payloadSize = size - DataManagerImpl.ITEM_HEAD_SIZE;
061 buffer.reset();
062 buffer.writeByte(type);
063 buffer.writeInt(payloadSize);
064
065 // Find the position where this item will land at.
066 DataItem item = new DataItem();
067 item.setSize(payloadSize);
068 DataFile dataFile = dataManager.findSpaceForData(item);
069
070 // Now splat the buffer to the file.
071 dataFile.getRandomAccessFile().seek(item.getOffset());
072 dataFile.getRandomAccessFile().write(buffer.getData(), 0, size);
073 dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker..
074
075 dataManager.addInterestInFile(dataFile);
076 return item;
077 }
078
079 /*
080 * (non-Javadoc)
081 *
082 * @see org.apache.activemq.kaha.impl.data.DataFileWriter#updateItem(org.apache.activemq.kaha.StoreLocation,
083 * org.apache.activemq.kaha.Marshaller, java.lang.Object, byte)
084 */
085 public synchronized void updateItem(DataItem item, Marshaller marshaller, Object payload, byte type)
086 throws IOException {
087 // Write the packet our internal buffer.
088 buffer.reset();
089 buffer.position(DataManagerImpl.ITEM_HEAD_SIZE);
090 marshaller.writePayload(payload, buffer);
091 int size = buffer.size();
092 int payloadSize = size - DataManagerImpl.ITEM_HEAD_SIZE;
093 buffer.reset();
094 buffer.writeByte(type);
095 buffer.writeInt(payloadSize);
096 item.setSize(payloadSize);
097 DataFile dataFile = dataManager.getDataFile(item);
098 RandomAccessFile file = dataFile.getRandomAccessFile();
099 file.seek(item.getOffset());
100 file.write(buffer.getData(), 0, size);
101 dataFile.setWriterData(Boolean.TRUE); // Use as dirty marker..
102 }
103
104 public synchronized void force(DataFile dataFile) throws IOException {
105 // If our dirty marker was set.. then we need to sync
106 if (dataFile.getWriterData() != null && dataFile.isDirty()) {
107 dataFile.getRandomAccessFile().getFD().sync();
108 dataFile.setWriterData(null);
109 dataFile.setDirty(false);
110 }
111 }
112
113 public void close() throws IOException {
114 }
115 }