001 /**
002 *
003 * Copyright 2004 Hiram Chirino
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.store.jdbc;
019
020 import java.sql.Connection;
021 import java.sql.SQLException;
022 import java.util.Map;
023
024 import javax.jms.JMSException;
025 import javax.sql.DataSource;
026
027 import org.apache.commons.logging.Log;
028 import org.apache.commons.logging.LogFactory;
029 import org.activemq.broker.BrokerContainer;
030 import org.activemq.io.WireFormat;
031 import org.activemq.io.impl.StatelessDefaultWireFormat;
032 import org.activemq.store.MessageStore;
033 import org.activemq.store.PersistenceAdapter;
034 import org.activemq.store.TopicMessageStore;
035 import org.activemq.store.TransactionStore;
036 import org.activemq.store.jdbc.adapter.DefaultJDBCAdapter;
037 import org.activemq.store.vm.VMTransactionStore;
038 import org.activemq.util.FactoryFinder;
039 import org.activemq.util.JMSExceptionHelper;
040 import org.activemq.service.DeadLetterPolicy;
041 import org.activemq.service.MessageIdentity;
042 import org.activemq.store.jdbc.JDBCAdapter.ExpiredMessageResultHandler;
043 import org.activemq.message.ActiveMQMessage;
044
045 import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
046 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
047
048 /**
049 * A {@link PersistenceAdapter} implementation using JDBC for
050 * persistence storage.
051 *
052 * This persistence adapter will correctly remember prepared XA transactions,
053 * but it will not keep track of local transaction commits so that operations
054 * performed against the Message store are done as a single uow.
055 *
056 * @version $Revision: 1.1 $
057 */
058 public class JDBCPersistenceAdapter implements PersistenceAdapter {
059
060 private static final Log log = LogFactory.getLog(JDBCPersistenceAdapter.class);
061 private static FactoryFinder factoryFinder = new FactoryFinder("META-INF/services/org/activemq/store/jdbc/");
062
063 private WireFormat wireFormat = new StatelessDefaultWireFormat();
064 private DataSource dataSource;
065 private JDBCAdapter adapter;
066 private String adapterClass;
067 private VMTransactionStore transactionStore;
068 private boolean dropTablesOnStartup=false;
069 private ClockDaemon clockDaemon;
070 private Object clockTicket;
071 private DeadLetterPolicy deadLetterPolicy;
072 private BrokerContainer brokerContainer;
073 private boolean autoCleanupExpiredMessages=true;
074 private boolean deleteExpiredMessages=true;
075 private long cleanupRepeatInterval=1000*60*5; // by default, run the cleanup process every 5 minutes
076 private int cleanupPeriod = 1000 * 60 * 5;
077 private String tablePrefix = "";
078
079 public JDBCPersistenceAdapter() {
080 }
081
082 public JDBCPersistenceAdapter(DataSource ds, WireFormat wireFormat) {
083 this.dataSource = ds;
084 this.wireFormat = wireFormat;
085 }
086
087 public Map getInitialDestinations() {
088 return null;
089 }
090
091 public MessageStore createQueueMessageStore(String destinationName) throws JMSException {
092 if (adapter == null) {
093 throw new IllegalStateException("Not started");
094 }
095 MessageStore store = new JDBCMessageStore(this, adapter, wireFormat.copy(), destinationName);
096 if( transactionStore!=null ) {
097 store = transactionStore.proxy(store);
098 }
099 return store;
100 }
101
102 public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException {
103 if (adapter == null) {
104 throw new IllegalStateException("Not started");
105 }
106 TopicMessageStore store = new JDBCTopicMessageStore(this, adapter, wireFormat.copy(), destinationName);
107 if( transactionStore!=null ) {
108 store = transactionStore.proxy(store);
109 }
110 return store;
111 }
112
113 public TransactionStore createTransactionStore() throws JMSException {
114 if (adapter == null) {
115 throw new IllegalStateException("Not started");
116 }
117 if( this.transactionStore == null ) {
118 this.transactionStore = new VMTransactionStore();
119 }
120 return this.transactionStore;
121 }
122
123 public void beginTransaction() throws JMSException {
124 try {
125 Connection c = dataSource.getConnection();
126 c.setAutoCommit(false);
127 TransactionContext.pushConnection(c);
128 }
129 catch (SQLException e) {
130 throw JMSExceptionHelper.newJMSException("Failed to create transaction: " + e, e);
131 }
132 }
133
134 public void commitTransaction() throws JMSException {
135 Connection c = TransactionContext.popConnection();
136 if (c == null) {
137 log.warn("Commit while no transaction in progress");
138 }
139 else {
140 try {
141 c.commit();
142 }
143 catch (SQLException e) {
144 throw JMSExceptionHelper.newJMSException("Failed to commit transaction: " + c + ": " + e, e);
145 }
146 finally {
147 try {
148 c.close();
149 }
150 catch (Throwable e) {
151 }
152 }
153 }
154 }
155
156 public void rollbackTransaction() {
157 Connection c = TransactionContext.popConnection();
158 try {
159 c.rollback();
160 }
161 catch (SQLException e) {
162 log.warn("Cannot rollback transaction due to: " + e, e);
163 }
164 finally {
165 try {
166 c.close();
167 }
168 catch (Throwable e) {
169 }
170 }
171 }
172
173
174 public void start() throws JMSException {
175 beginTransaction();
176 Connection c = null;
177 try {
178 // Load the right adapter for the database
179 adapter = null;
180
181 try {
182 c = getConnection();
183 }
184 catch (SQLException e) {
185 throw JMSExceptionHelper.newJMSException("Could not get a database connection: "+e,e);
186 }
187
188 // If the adapter class is not specified.. try to dectect they right type by getting
189 // info from the database.
190 if( adapterClass == null ) {
191
192 try {
193
194 // Make the filename file system safe.
195 String driverName = c.getMetaData().getDriverName();
196 driverName = driverName.replaceAll("[^a-zA-Z0-9\\-]", "_").toLowerCase();
197
198 try {
199 adapter = (DefaultJDBCAdapter) factoryFinder.newInstance(driverName);
200 log.info("Database driver recognized: [" + driverName + "]");
201 }
202 catch (Throwable e) {
203 log.warn("Database driver NOT recognized: [" + driverName + "]. Will use default JDBC implementation.");
204 }
205
206 }
207 catch (SQLException e) {
208 log.warn("JDBC error occured while trying to detect database type. Will use default JDBC implementation: "+e.getMessage());
209 log.debug("Reason: " + e, e);
210 }
211
212 } else {
213 try {
214 Class clazz = JDBCPersistenceAdapter.class.getClassLoader().loadClass(adapterClass);
215 adapter = (DefaultJDBCAdapter)clazz.newInstance();
216 }
217 catch (Throwable e) {
218 log.warn("Invalid JDBC adapter class class (" + adapterClass + "). Will use default JDBC implementation.");
219 log.debug("Reason: " + e, e);
220 }
221 }
222
223 // Use the default JDBC adapter if the
224 // Database type is not recognized.
225 if (adapter == null) {
226 adapter = new DefaultJDBCAdapter();
227 }
228
229 adapter.getStatementProvider().setTablePrefix(tablePrefix);
230
231 if( dropTablesOnStartup ) {
232 try {
233 adapter.doDropTables(c);
234 }
235 catch (SQLException e) {
236 log.warn("Cannot drop tables due to: " + e, e);
237 }
238 }
239 try {
240 adapter.doCreateTables(c);
241 }
242 catch (SQLException e) {
243 log.warn("Cannot create tables due to: " + e, e);
244 }
245 adapter.initSequenceGenerator(c);
246
247 }
248 finally {
249 commitTransaction();
250 }
251
252 if (isAutoCleanupExpiredMessages()) {
253 // Cleanup the db periodically.
254 clockTicket = getClockDaemon().executePeriodically(getCleanupRepeatInterval(), new Runnable() {
255 public void run() {
256 try {
257 cleanup();
258 } catch (SQLException sqle) {
259 log.error("Error in cleanup due to: " + sqle, sqle);
260 }
261 }
262 }, false);
263 }
264 }
265
266 public void cleanup() throws SQLException {
267 final Connection c = getConnection();
268 try {
269 log.debug("Cleaning up old messages in the database");
270 adapter.doDeleteOldMessages(c);
271 adapter.doGetExpiredMessages(c, new ExpiredMessageResultHandler() {
272 public void onMessage(long seq, String container, String messageID, boolean isSentToDeadLetter) {
273 try {
274 // restore the message from the db
275 MessageStore messageStore = createQueueMessageStore(container);
276 MessageIdentity messageIdentity = new MessageIdentity(messageID, new Long(seq));
277 ActiveMQMessage message = messageStore.getMessage(messageIdentity);
278 if (message != null){
279 log.debug("Cleaning up old message in the database: " + message.toString());
280 if (message.isExpired() && !isSentToDeadLetter) {
281 // send a dead letter
282 sendToDeadLetter(message);
283 }else {
284 log.warn("could not find message from store with identity: " + messageIdentity + " in cleanup");
285 }
286 }
287 // clean up old message, use original identity
288 cleanupOldMessage(c, new MessageIdentity(messageID, new Long(seq)));
289 } catch (JMSException jmse) {
290 log.warn("Cleanup expired message failed due to: " + jmse, jmse);
291 } catch (SQLException sqle) {
292 log.warn("Cleanup expired message failed due to: " + sqle, sqle);
293 }
294 }
295 });
296 } catch (JMSException e) {
297 log.warn("Old message cleanup failed due to: " + e, e);
298 } catch (SQLException e) {
299 log.warn("Old message cleanup failed due to: " + e, e);
300 } finally {
301 if (c!= null) returnConnection(c);
302 log.debug("Cleanup done.");
303 }
304 }
305
306 protected void sendToDeadLetter(ActiveMQMessage message) throws JMSException {
307 // send a dead letter if the dead letter policy is enabled
308 if (getBrokerContainer()!=null) {
309 DeadLetterPolicy deadLetterPolicy = getBrokerContainer().getBroker().getDeadLetterPolicy();
310 if (deadLetterPolicy != null && deadLetterPolicy.isDeadLetterEnabled()) {
311 deadLetterPolicy.sendToDeadLetter(message);
312 }
313 }
314 }
315
316 public void cleanupOldMessage(Connection c, MessageIdentity messageIdentity) throws JMSException, SQLException {
317 if (getDeleteExpiredMessages()==true) {
318 adapter.doDeleteExpiredMessage(c, messageIdentity);
319 }
320 }
321
322 /**
323 * Ensures that no previous dead letter was already sent for this message
324 */
325 public boolean deadLetterAlreadySent(long seq, boolean useDatabaseLocking) {
326 final BooleanWrapper alreadySentToDeadLetter = new BooleanWrapper(true);
327 try {
328 beginTransaction();
329 Connection c = getConnection();
330 // fetch the message from the persistent store
331 getJDBCAdapter().doGetMessageForUpdate(c, seq, useDatabaseLocking, new ExpiredMessageResultHandler() {
332 public void onMessage(long seq, String container, String messageID, boolean isSentToDeadLetter) {
333 if (!isSentToDeadLetter) {
334 alreadySentToDeadLetter.setValue(false);
335 }
336 }
337 });
338 if (!alreadySentToDeadLetter.getValue()) {
339 // if not already sent, set the deadletter flag in the db
340 getJDBCAdapter().doSetDeadLetterFlag(c, seq);
341 }
342 commitTransaction();
343 return alreadySentToDeadLetter.getValue();
344 } catch (Exception e) {
345 log.error("Could not get a database connection due to: " + e, e);
346 rollbackTransaction();
347 return true; // avoid sending a dead letter in case there is a problem
348 }
349 }
350
351 private class BooleanWrapper {
352 boolean value;
353 BooleanWrapper(boolean value) {
354 setValue(value);
355 }
356 boolean getValue() {
357 return value;
358 }
359 void setValue(boolean value) {
360 this.value = value;
361 }
362 }
363
364 public void setClockDaemon(ClockDaemon clockDaemon) {
365 this.clockDaemon = clockDaemon;
366 }
367
368 public ClockDaemon getClockDaemon() {
369 if (clockDaemon == null) {
370 clockDaemon = new ClockDaemon();
371 clockDaemon.setThreadFactory(new ThreadFactory() {
372 public Thread newThread(Runnable runnable) {
373 Thread thread = new Thread(runnable, "Cleanup Timmer");
374 thread.setDaemon(true);
375 return thread;
376 }
377 });
378 }
379 return clockDaemon;
380 }
381
382 public synchronized void stop() throws JMSException {
383 if (clockTicket != null) {
384 // Stop the periodical cleanup.
385 ClockDaemon.cancel(clockTicket);
386 clockTicket=null;
387 clockDaemon.shutDown();
388 }
389 }
390
391 public BrokerContainer getBrokerContainer() {
392 return brokerContainer;
393 }
394
395 public void setBrokerContainer(BrokerContainer brokerContainer) {
396 this.brokerContainer = brokerContainer;
397 }
398
399 public DataSource getDataSource() {
400 return dataSource;
401 }
402
403 public void setDataSource(DataSource dataSource) {
404 this.dataSource = dataSource;
405 }
406
407 public WireFormat getWireFormat() {
408 return wireFormat;
409 }
410
411 public void setWireFormat(WireFormat wireFormat) {
412 this.wireFormat = wireFormat;
413 }
414
415 public Connection getConnection() throws SQLException {
416 Connection answer = TransactionContext.peekConnection();
417 if (answer == null) {
418 answer = dataSource.getConnection();
419 answer.setAutoCommit(true);
420 }
421 return answer;
422 }
423
424 public void returnConnection(Connection connection) {
425 if (connection == null) {
426 return;
427 }
428 Connection peek = TransactionContext.peekConnection();
429 if (peek != connection) {
430 try {
431 connection.close();
432 }
433 catch (SQLException e) {
434 }
435 }
436 }
437
438 /**
439 * @return Returns the adapterClass.
440 */
441 public String getAdapterClass() {
442 return adapterClass;
443 }
444
445 /**
446 * @param adapterClass The adapterClass to set.
447 */
448 public void setAdapterClass(String adapterClass) {
449 this.adapterClass = adapterClass;
450 }
451
452 public JDBCAdapter getJDBCAdapter() {
453 return adapter;
454 }
455
456 /**
457 * @return Returns the dropTablesOnStartup.
458 */
459 public boolean getDropTablesOnStartup() {
460 return dropTablesOnStartup;
461 }
462 /**
463 * @param dropTablesOnStartup The dropTablesOnStartup to set.
464 */
465 public void setDropTablesOnStartup(boolean dropTablesOnStartup) {
466 this.dropTablesOnStartup = dropTablesOnStartup;
467 }
468
469 public DeadLetterPolicy getDeadLetterPolicy() {
470 return this.deadLetterPolicy;
471 }
472
473 public void setDeadLetterPolicy(DeadLetterPolicy deadLetterPolicy) {
474 this.deadLetterPolicy = deadLetterPolicy;
475 }
476
477 public boolean getDeleteExpiredMessages() {
478 return deleteExpiredMessages;
479 }
480
481 public void setDeleteExpiredMessages(boolean deleteExpiredMessages) {
482 this.deleteExpiredMessages = deleteExpiredMessages;
483 }
484 /**
485 * @return Returns the autoCleanupExpiredMessages.
486 */
487 public boolean isAutoCleanupExpiredMessages() {
488 return autoCleanupExpiredMessages;
489 }
490 /**
491 * @param autoCleanupExpiredMessages The autoCleanupExpiredMessages to set.
492 */
493 public void setAutoCleanupExpiredMessages(boolean autoCleanupExpiredMessages) {
494 this.autoCleanupExpiredMessages = autoCleanupExpiredMessages;
495 }
496 /**
497 * @return Returns the cleanupRepeatInterval.
498 */
499 public long getCleanupRepeatInterval() {
500 return cleanupRepeatInterval;
501 }
502 /**
503 * @param cleanupRepeatInterval The cleanupRepeatInterval to set.
504 */
505 public void setCleanupRepeatInterval(long cleanupRepeatInterval) {
506 this.cleanupRepeatInterval = cleanupRepeatInterval;
507 }
508
509 public int getCleanupPeriod() {
510 return cleanupPeriod;
511 }
512
513 public void setCleanupPeriod(int cleanupPeriod) {
514 this.cleanupPeriod = cleanupPeriod;
515 }
516
517 public String getTablePrefix() {
518 return tablePrefix;
519 }
520
521 public void setTablePrefix(String tablePrefix) {
522 this.tablePrefix = tablePrefix;
523 }
524 }