001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.jdbc.adapter;
018
019 import java.io.IOException;
020 import java.io.InputStream;
021 import java.io.OutputStream;
022 import java.sql.Blob;
023 import java.sql.Connection;
024 import java.sql.PreparedStatement;
025 import java.sql.ResultSet;
026 import java.sql.SQLException;
027
028 import javax.jms.JMSException;
029 import javax.sql.rowset.serial.SerialBlob;
030
031 import org.apache.activemq.command.ActiveMQDestination;
032 import org.apache.activemq.command.MessageId;
033 import org.apache.activemq.store.jdbc.TransactionContext;
034 import org.apache.activemq.util.ByteArrayOutputStream;
035
036 /**
037 * This JDBCAdapter inserts and extracts BLOB data using the getBlob()/setBlob()
038 * operations. This is a little more involved since to insert a blob you have
039 * to:
040 *
041 * 1: insert empty blob. 2: select the blob 3: finally update the blob with data
042 * value.
043 *
044 * The databases/JDBC drivers that use this adapter are:
045 * <ul>
046 * <li></li>
047 * </ul>
048 *
049 * @org.apache.xbean.XBean element="blobJDBCAdapter"
050 *
051 *
052 */
053 public class BlobJDBCAdapter extends DefaultJDBCAdapter {
054
055 @Override
056 public void doAddMessage(TransactionContext c, long sequence, MessageId messageID, ActiveMQDestination destination, byte[] data,
057 long expiration, byte priority) throws SQLException, IOException {
058 PreparedStatement s = null;
059 ResultSet rs = null;
060 cleanupExclusiveLock.readLock().lock();
061 try {
062 // Add the Blob record.
063 s = c.getConnection().prepareStatement(statements.getAddMessageStatement());
064 s.setLong(1, sequence);
065 s.setString(2, messageID.getProducerId().toString());
066 s.setLong(3, messageID.getProducerSequenceId());
067 s.setString(4, destination.getQualifiedName());
068 s.setLong(5, expiration);
069 s.setLong(6, priority);
070
071 if (s.executeUpdate() != 1) {
072 throw new IOException("Failed to add broker message: " + messageID + " in container.");
073 }
074 s.close();
075
076 // Select the blob record so that we can update it.
077 s = c.getConnection().prepareStatement(statements.getFindMessageByIdStatement(),
078 ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_UPDATABLE);
079 s.setLong(1, sequence);
080 rs = s.executeQuery();
081 if (!rs.next()) {
082 throw new IOException("Failed select blob for message: " + messageID + " in container.");
083 }
084
085 // Update the blob
086 Blob blob = rs.getBlob(1);
087 blob.truncate(0);
088 blob.setBytes(1, data);
089 rs.updateBlob(1, blob);
090 rs.updateRow(); // Update the row with the updated blob
091
092 } finally {
093 cleanupExclusiveLock.readLock().unlock();
094 close(rs);
095 close(s);
096 }
097 }
098
099 @Override
100 public byte[] doGetMessage(TransactionContext c, MessageId id) throws SQLException, IOException {
101 PreparedStatement s = null;
102 ResultSet rs = null;
103 cleanupExclusiveLock.readLock().lock();
104 try {
105
106 s = c.getConnection().prepareStatement(statements.getFindMessageStatement());
107 s.setString(1, id.getProducerId().toString());
108 s.setLong(2, id.getProducerSequenceId());
109 rs = s.executeQuery();
110
111 if (!rs.next()) {
112 return null;
113 }
114 Blob blob = rs.getBlob(1);
115 InputStream is = blob.getBinaryStream();
116
117 ByteArrayOutputStream os = new ByteArrayOutputStream((int)blob.length());
118 int ch;
119 while ((ch = is.read()) >= 0) {
120 os.write(ch);
121 }
122 is.close();
123 os.close();
124
125 return os.toByteArray();
126
127 } finally {
128 cleanupExclusiveLock.readLock().unlock();
129 close(rs);
130 close(s);
131 }
132 }
133
134 }