001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.jdbc;
018
019 import java.io.IOException;
020 import java.sql.Connection;
021 import java.sql.PreparedStatement;
022 import java.sql.SQLException;
023 import java.sql.Statement;
024
025 import javax.sql.DataSource;
026
027 import org.apache.activemq.util.IOExceptionSupport;
028 import org.slf4j.Logger;
029 import org.slf4j.LoggerFactory;
030
031 /**
032 * Helps keep track of the current transaction/JDBC connection.
033 *
034 *
035 */
036 public class TransactionContext {
037
038 private static final Logger LOG = LoggerFactory.getLogger(TransactionContext.class);
039
040 private final DataSource dataSource;
041 private final JDBCPersistenceAdapter persistenceAdapter;
042 private Connection connection;
043 private boolean inTx;
044 private PreparedStatement addMessageStatement;
045 private PreparedStatement removedMessageStatement;
046 private PreparedStatement updateLastAckStatement;
047 // a cheap dirty level that we can live with
048 private int transactionIsolation = Connection.TRANSACTION_READ_UNCOMMITTED;
049
050 public TransactionContext(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
051 this.persistenceAdapter = persistenceAdapter;
052 this.dataSource = persistenceAdapter.getDataSource();
053 }
054
055 public Connection getConnection() throws IOException {
056 if (connection == null) {
057 try {
058 connection = dataSource.getConnection();
059 boolean autoCommit = !inTx;
060 if (connection.getAutoCommit() != autoCommit) {
061 connection.setAutoCommit(autoCommit);
062 }
063 } catch (SQLException e) {
064 JDBCPersistenceAdapter.log("Could not get JDBC connection: ", e);
065 IOException ioe = IOExceptionSupport.create(e);
066 persistenceAdapter.getBrokerService().handleIOException(ioe);
067 throw ioe;
068
069 }
070
071 try {
072 connection.setTransactionIsolation(transactionIsolation);
073 } catch (Throwable e) {
074 }
075 }
076 return connection;
077 }
078
079 public void executeBatch() throws SQLException {
080 try {
081 executeBatch(addMessageStatement, "Failed add a message");
082 } finally {
083 addMessageStatement = null;
084 try {
085 executeBatch(removedMessageStatement, "Failed to remove a message");
086 } finally {
087 removedMessageStatement = null;
088 try {
089 executeBatch(updateLastAckStatement, "Failed to ack a message");
090 } finally {
091 updateLastAckStatement = null;
092 }
093 }
094 }
095 }
096
097 private void executeBatch(PreparedStatement p, String message) throws SQLException {
098 if (p == null) {
099 return;
100 }
101
102 try {
103 int[] rc = p.executeBatch();
104 for (int i = 0; i < rc.length; i++) {
105 int code = rc[i];
106 if (code < 0 && code != Statement.SUCCESS_NO_INFO) {
107 throw new SQLException(message + ". Response code: " + code);
108 }
109 }
110 } finally {
111 try {
112 p.close();
113 } catch (Throwable e) {
114 }
115 }
116 }
117
118 public void close() throws IOException {
119 if (!inTx) {
120 try {
121
122 /**
123 * we are not in a transaction so should not be committing ??
124 * This was previously commented out - but had adverse affects
125 * on testing - so it's back!
126 *
127 */
128 try {
129 executeBatch();
130 } finally {
131 if (connection != null && !connection.getAutoCommit()) {
132 connection.commit();
133 }
134 }
135
136 } catch (SQLException e) {
137 JDBCPersistenceAdapter.log("Error while closing connection: ", e);
138 throw IOExceptionSupport.create(e);
139 } finally {
140 try {
141 if (connection != null) {
142 connection.close();
143 }
144 } catch (Throwable e) {
145 LOG.warn("Close failed: " + e.getMessage(), e);
146 } finally {
147 connection = null;
148 }
149 }
150 }
151 }
152
153 public void begin() throws IOException {
154 if (inTx) {
155 throw new IOException("Already started.");
156 }
157 inTx = true;
158 connection = getConnection();
159 }
160
161 public void commit() throws IOException {
162 if (!inTx) {
163 throw new IOException("Not started.");
164 }
165 try {
166 executeBatch();
167 if (!connection.getAutoCommit()) {
168 connection.commit();
169 }
170 } catch (SQLException e) {
171 JDBCPersistenceAdapter.log("Commit failed: ", e);
172
173 this.rollback();
174
175 throw IOExceptionSupport.create(e);
176 } finally {
177 inTx = false;
178 close();
179 }
180 }
181
182 public void rollback() throws IOException {
183 if (!inTx) {
184 throw new IOException("Not started.");
185 }
186 try {
187 if (addMessageStatement != null) {
188 addMessageStatement.close();
189 addMessageStatement = null;
190 }
191 if (removedMessageStatement != null) {
192 removedMessageStatement.close();
193 removedMessageStatement = null;
194 }
195 if (updateLastAckStatement != null) {
196 updateLastAckStatement.close();
197 updateLastAckStatement = null;
198 }
199 connection.rollback();
200
201 } catch (SQLException e) {
202 JDBCPersistenceAdapter.log("Rollback failed: ", e);
203 throw IOExceptionSupport.create(e);
204 } finally {
205 inTx = false;
206 close();
207 }
208 }
209
210 public PreparedStatement getAddMessageStatement() {
211 return addMessageStatement;
212 }
213
214 public void setAddMessageStatement(PreparedStatement addMessageStatement) {
215 this.addMessageStatement = addMessageStatement;
216 }
217
218 public PreparedStatement getUpdateLastAckStatement() {
219 return updateLastAckStatement;
220 }
221
222 public void setUpdateLastAckStatement(PreparedStatement ackMessageStatement) {
223 this.updateLastAckStatement = ackMessageStatement;
224 }
225
226 public PreparedStatement getRemovedMessageStatement() {
227 return removedMessageStatement;
228 }
229
230 public void setRemovedMessageStatement(PreparedStatement removedMessageStatement) {
231 this.removedMessageStatement = removedMessageStatement;
232 }
233
234 public void setTransactionIsolation(int transactionIsolation) {
235 this.transactionIsolation = transactionIsolation;
236 }
237
238 }