001 /**
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.activemq.util;
018
019 import java.io.IOException;
020 import java.sql.SQLException;
021 import java.util.concurrent.TimeUnit;
022 import java.util.concurrent.atomic.AtomicBoolean;
023
024 import org.apache.activemq.broker.BrokerService;
025 import org.slf4j.Logger;
026 import org.slf4j.LoggerFactory;
027
028 /**
029 * @org.apache.xbean.XBean
030 */
031 public class DefaultIOExceptionHandler implements IOExceptionHandler {
032
033 private static final Logger LOG = LoggerFactory
034 .getLogger(DefaultIOExceptionHandler.class);
035 private BrokerService broker;
036 private boolean ignoreAllErrors = false;
037 private boolean ignoreNoSpaceErrors = true;
038 private boolean ignoreSQLExceptions = true;
039 private boolean stopStartConnectors = false;
040 private String noSpaceMessage = "space";
041 private String sqlExceptionMessage = ""; // match all
042 private long resumeCheckSleepPeriod = 5*1000;
043 private AtomicBoolean stopStartInProgress = new AtomicBoolean(false);
044
045 public void handle(IOException exception) {
046 if (ignoreAllErrors) {
047 LOG.info("Ignoring IO exception, " + exception, exception);
048 return;
049 }
050
051 if (ignoreNoSpaceErrors) {
052 Throwable cause = exception;
053 while (cause != null && cause instanceof IOException) {
054 String message = cause.getMessage();
055 if (message != null && message.contains(noSpaceMessage)) {
056 LOG.info("Ignoring no space left exception, " + exception, exception);
057 return;
058 }
059 cause = cause.getCause();
060 }
061 }
062
063 if (ignoreSQLExceptions) {
064 Throwable cause = exception;
065 while (cause != null) {
066 String message = cause.getMessage();
067 if (cause instanceof SQLException && message.contains(sqlExceptionMessage)) {
068 LOG.info("Ignoring SQLException, " + exception, cause);
069 return;
070 }
071 cause = cause.getCause();
072 }
073 }
074
075 if (stopStartConnectors) {
076 if (!stopStartInProgress.compareAndSet(false, true)) {
077 // we are already working on it
078 return;
079 }
080 LOG.info("Initiating stop/restart of broker transport due to IO exception, " + exception, exception);
081
082 new Thread("stop transport connectors on IO exception") {
083 public void run() {
084 try {
085 ServiceStopper stopper = new ServiceStopper();
086 broker.stopAllConnectors(stopper);
087 } catch (Exception e) {
088 LOG.warn("Failure occurred while stopping broker connectors", e);
089 }
090 }
091 }.start();
092
093 // resume again
094 new Thread("restart transport connectors post IO exception") {
095 public void run() {
096 try {
097 while (isPersistenceAdapterDown()) {
098 LOG.info("waiting for broker persistence adapter checkpoint to succeed before restarting transports");
099 TimeUnit.MILLISECONDS.sleep(resumeCheckSleepPeriod);
100 }
101 broker.startAllConnectors();
102 } catch (Exception e) {
103 LOG.warn("Failure occurred while restarting broker connectors", e);
104 } finally {
105 stopStartInProgress.compareAndSet(true, false);
106 }
107 }
108
109 private boolean isPersistenceAdapterDown() {
110 boolean checkpointSuccess = false;
111 try {
112 broker.getPersistenceAdapter().checkpoint(true);
113 checkpointSuccess = true;
114 } catch (Throwable ignored) {}
115 return !checkpointSuccess;
116 }
117 }.start();
118
119 return;
120 }
121
122 LOG.info("Stopping the broker due to IO exception, " + exception, exception);
123 new Thread("Stopping the broker due to IO exception") {
124 public void run() {
125 try {
126 broker.stop();
127 } catch (Exception e) {
128 LOG.warn("Failure occurred while stopping broker", e);
129 }
130 }
131 }.start();
132 }
133
134 public void setBrokerService(BrokerService broker) {
135 this.broker = broker;
136 }
137
138 public boolean isIgnoreAllErrors() {
139 return ignoreAllErrors;
140 }
141
142 public void setIgnoreAllErrors(boolean ignoreAllErrors) {
143 this.ignoreAllErrors = ignoreAllErrors;
144 }
145
146 public boolean isIgnoreNoSpaceErrors() {
147 return ignoreNoSpaceErrors;
148 }
149
150 public void setIgnoreNoSpaceErrors(boolean ignoreNoSpaceErrors) {
151 this.ignoreNoSpaceErrors = ignoreNoSpaceErrors;
152 }
153
154 public String getNoSpaceMessage() {
155 return noSpaceMessage;
156 }
157
158 public void setNoSpaceMessage(String noSpaceMessage) {
159 this.noSpaceMessage = noSpaceMessage;
160 }
161
162 public boolean isIgnoreSQLExceptions() {
163 return ignoreSQLExceptions;
164 }
165
166 public void setIgnoreSQLExceptions(boolean ignoreSQLExceptions) {
167 this.ignoreSQLExceptions = ignoreSQLExceptions;
168 }
169
170 public String getSqlExceptionMessage() {
171 return sqlExceptionMessage;
172 }
173
174 public void setSqlExceptionMessage(String sqlExceptionMessage) {
175 this.sqlExceptionMessage = sqlExceptionMessage;
176 }
177
178 public boolean isStopStartConnectors() {
179 return stopStartConnectors;
180 }
181
182 public void setStopStartConnectors(boolean stopStartConnectors) {
183 this.stopStartConnectors = stopStartConnectors;
184 }
185
186 public long getResumeCheckSleepPeriod() {
187 return resumeCheckSleepPeriod;
188 }
189
190 public void setResumeCheckSleepPeriod(long resumeCheckSleepPeriod) {
191 this.resumeCheckSleepPeriod = resumeCheckSleepPeriod;
192 }
193 }