001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.store.jdbc;
018
019 import java.io.IOException;
020 import java.sql.Connection;
021 import java.sql.PreparedStatement;
022 import java.sql.SQLException;
023 import java.sql.SQLFeatureNotSupportedException;
024
025 import javax.sql.DataSource;
026
027 import org.apache.activemq.util.Handler;
028 import org.slf4j.Logger;
029 import org.slf4j.LoggerFactory;
030
031 /**
032 * Represents an exclusive lock on a database to avoid multiple brokers running
033 * against the same logical database.
034 *
035 * @org.apache.xbean.XBean element="database-locker"
036 *
037 */
038 public class DefaultDatabaseLocker implements DatabaseLocker {
039 public static final long DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL = 1000;
040 private static final Logger LOG = LoggerFactory.getLogger(DefaultDatabaseLocker.class);
041 protected DataSource dataSource;
042 protected Statements statements;
043 protected long lockAcquireSleepInterval = DEFAULT_LOCK_ACQUIRE_SLEEP_INTERVAL;
044
045 protected PreparedStatement lockCreateStatement;
046 protected PreparedStatement lockUpdateStatement;
047 protected Connection connection;
048 protected boolean stopping;
049 protected Handler<Exception> exceptionHandler;
050 protected int queryTimeout = 10;
051
052 public DefaultDatabaseLocker() {
053 }
054
055 public DefaultDatabaseLocker(JDBCPersistenceAdapter persistenceAdapter) throws IOException {
056 setPersistenceAdapter(persistenceAdapter);
057 }
058
059 public void setPersistenceAdapter(JDBCPersistenceAdapter adapter) throws IOException {
060 this.dataSource = adapter.getLockDataSource();
061 this.statements = adapter.getStatements();
062 }
063
064 public void start() throws Exception {
065 stopping = false;
066
067 LOG.info("Attempting to acquire the exclusive lock to become the Master broker");
068 String sql = statements.getLockCreateStatement();
069 LOG.debug("Locking Query is "+sql);
070
071 while (true) {
072 try {
073 connection = dataSource.getConnection();
074 connection.setAutoCommit(false);
075 lockCreateStatement = connection.prepareStatement(sql);
076 lockCreateStatement.execute();
077 break;
078 } catch (Exception e) {
079 try {
080 if (stopping) {
081 throw new Exception(
082 "Cannot start broker as being asked to shut down. "
083 + "Interrupted attempt to acquire lock: "
084 + e, e);
085 }
086 if (exceptionHandler != null) {
087 try {
088 exceptionHandler.handle(e);
089 } catch (Throwable handlerException) {
090 LOG.error( "The exception handler "
091 + exceptionHandler.getClass().getCanonicalName()
092 + " threw this exception: "
093 + handlerException
094 + " while trying to handle this exception: "
095 + e, handlerException);
096 }
097
098 } else {
099 LOG.debug("Lock failure: "+ e, e);
100 }
101 } finally {
102 // Let's make sure the database connection is properly
103 // closed when an error occurs so that we're not leaking
104 // connections
105 if (null != connection) {
106 try {
107 connection.close();
108 } catch (SQLException e1) {
109 LOG.error("Caught exception while closing connection: " + e1, e1);
110 }
111
112 connection = null;
113 }
114 }
115 } finally {
116 if (null != lockCreateStatement) {
117 try {
118 lockCreateStatement.close();
119 } catch (SQLException e1) {
120 LOG.debug("Caught while closing statement: " + e1, e1);
121 }
122 lockCreateStatement = null;
123 }
124 }
125
126 LOG.info("Failed to acquire lock. Sleeping for " + lockAcquireSleepInterval + " milli(s) before trying again...");
127 try {
128 Thread.sleep(lockAcquireSleepInterval);
129 } catch (InterruptedException ie) {
130 LOG.warn("Master lock retry sleep interrupted", ie);
131 }
132 }
133
134 LOG.info("Becoming the master on dataSource: " + dataSource);
135 }
136
137 public void stop() throws Exception {
138 stopping = true;
139 try {
140 if (lockCreateStatement != null) {
141 lockCreateStatement.cancel();
142 }
143 } catch (SQLFeatureNotSupportedException e) {
144 LOG.warn("Failed to cancel locking query on dataSource" + dataSource, e);
145 }
146 try {
147 if (lockUpdateStatement != null) {
148 lockUpdateStatement.cancel();
149 }
150 } catch (SQLFeatureNotSupportedException e) {
151 LOG.warn("Failed to cancel locking query on dataSource" + dataSource, e);
152 }
153 try {
154 if (connection != null && !connection.isClosed()) {
155 try {
156 connection.rollback();
157 } catch (SQLException sqle) {
158 LOG.warn("Exception while rollbacking the connection on shutdown", sqle);
159 } finally {
160 try {
161 connection.close();
162 } catch (SQLException ignored) {
163 LOG.debug("Exception while closing connection on shutdown", ignored);
164 }
165 lockCreateStatement = null;
166 }
167 }
168 } catch (SQLException sqle) {
169 LOG.warn("Exception while checking close status of connection on shutdown", sqle);
170 }
171 }
172
173 public boolean keepAlive() {
174 boolean result = false;
175 try {
176 lockUpdateStatement = connection.prepareStatement(statements.getLockUpdateStatement());
177 lockUpdateStatement.setLong(1, System.currentTimeMillis());
178 if (queryTimeout > 0) {
179 lockUpdateStatement.setQueryTimeout(queryTimeout);
180 }
181 int rows = lockUpdateStatement.executeUpdate();
182 if (rows == 1) {
183 result=true;
184 }
185 } catch (Exception e) {
186 LOG.error("Failed to update database lock: " + e, e);
187 } finally {
188 if (lockUpdateStatement != null) {
189 try {
190 lockUpdateStatement.close();
191 } catch (SQLException e) {
192 LOG.error("Failed to close statement",e);
193 }
194 lockUpdateStatement = null;
195 }
196 }
197 return result;
198 }
199
200 public long getLockAcquireSleepInterval() {
201 return lockAcquireSleepInterval;
202 }
203
204 public void setLockAcquireSleepInterval(long lockAcquireSleepInterval) {
205 this.lockAcquireSleepInterval = lockAcquireSleepInterval;
206 }
207
208 public Handler getExceptionHandler() {
209 return exceptionHandler;
210 }
211
212 public void setExceptionHandler(Handler exceptionHandler) {
213 this.exceptionHandler = exceptionHandler;
214 }
215
216 public int getQueryTimeout() {
217 return queryTimeout;
218 }
219
220 public void setQueryTimeout(int queryTimeout) {
221 this.queryTimeout = queryTimeout;
222 }
223 }