001 /**
002 *
003 * Copyright 2004 Hiram Chirino
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.store.jdbc.adapter;
019
020 import java.sql.Connection;
021 import java.sql.PreparedStatement;
022 import java.sql.ResultSet;
023 import java.sql.SQLException;
024 import java.sql.Statement;
025
026 import javax.jms.JMSException;
027 import javax.transaction.xa.XAException;
028
029 import org.activemq.message.ActiveMQXid;
030 import org.activemq.service.SubscriberEntry;
031 import org.activemq.store.TransactionStore.RecoveryListener;
032 import org.activemq.store.jdbc.JDBCAdapter;
033 import org.activemq.store.jdbc.StatementProvider;
034 import org.activemq.util.LongSequenceGenerator;
035 import org.activemq.service.MessageIdentity;
036 import org.apache.commons.logging.Log;
037 import org.apache.commons.logging.LogFactory;
038
039 /**
040 * Implements all the default JDBC operations that are used
041 * by the JDBCPersistenceAdapter.
042 * <p/>
043 * Subclassing is encouraged to override the default
044 * implementation of methods to account for differences
045 * in JDBC Driver implementations.
046 * <p/>
047 * The JDBCAdapter inserts and extracts BLOB data using the
048 * getBytes()/setBytes() operations.
049 * <p/>
050 * The databases/JDBC drivers that use this adapter are:
051 * <ul>
052 * <li></li>
053 * </ul>
054 *
055 * @version $Revision: 1.1 $
056 */
057 public class DefaultJDBCAdapter implements JDBCAdapter {
058
059 private static final Log log = LogFactory.getLog(DefaultJDBCAdapter.class);
060
061 final protected StatementProvider statementProvider;
062 protected LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
063
064 protected void setBinaryData(PreparedStatement s, int index, byte data[]) throws SQLException {
065 s.setBytes(index, data);
066 }
067
068 protected byte[] getBinaryData(ResultSet rs, int index) throws SQLException {
069 return rs.getBytes(index);
070 }
071
072 /**
073 * @param provider
074 */
075 public DefaultJDBCAdapter(StatementProvider provider) {
076 this.statementProvider = new CachingStatementProvider(provider);
077 }
078
079 public DefaultJDBCAdapter() {
080 this(new DefaultStatementProvider());
081 }
082
083 public LongSequenceGenerator getSequenceGenerator() {
084 return sequenceGenerator;
085 }
086
087 public void doCreateTables(Connection c) throws SQLException {
088 Statement s = null;
089 try {
090 s = c.createStatement();
091 String[] createStatments = statementProvider.getCreateSchemaStatments();
092 for (int i = 0; i < createStatments.length; i++) {
093 // This will fail usually since the tables will be
094 // created allready.
095 try {
096 boolean rc = s.execute(createStatments[i]);
097 }
098 catch (SQLException e) {
099 log.info("Could not create JDBC tables; they could already exist." +
100 " Failure was: " + createStatments[i] + " Message: " + e.getMessage() +
101 " SQLState: " + e.getSQLState() + " Vendor code: " + e.getErrorCode() );
102 }
103 }
104 c.commit();
105 }
106 finally {
107 try {
108 s.close();
109 }
110 catch (Throwable e) {
111 }
112 }
113 }
114
115 public void doDropTables(Connection c) throws SQLException {
116 Statement s = null;
117 try {
118 s = c.createStatement();
119 String[] dropStatments = statementProvider.getDropSchemaStatments();
120 for (int i = 0; i < dropStatments.length; i++) {
121 // This will fail usually since the tables will be
122 // created allready.
123 try {
124 boolean rc = s.execute(dropStatments[i]);
125 }
126 catch (SQLException e) {
127 log.warn("Could not drop JDBC tables; they may not exist." +
128 " Failure was: " + dropStatments[i] + " Message: " + e.getMessage() +
129 " SQLState: " + e.getSQLState() + " Vendor code: " + e.getErrorCode() );
130 }
131 }
132 c.commit();
133 }
134 finally {
135 try {
136 s.close();
137 }
138 catch (Throwable e) {
139 }
140 }
141 }
142
143 public void initSequenceGenerator(Connection c) {
144 PreparedStatement s = null;
145 ResultSet rs = null;
146 try {
147 s = c.prepareStatement(statementProvider.getFindLastSequenceIdInMsgs());
148 rs = s.executeQuery();
149 long seq1 = 0;
150 if (rs.next()) {
151 seq1 = rs.getLong(1);
152 }
153 rs.close();
154 s.close();
155 s = c.prepareStatement(statementProvider.getFindLastSequenceIdInAcks());
156 rs = s.executeQuery();
157 long seq2 = 0;
158 if (rs.next()) {
159 seq2 = rs.getLong(1);
160 }
161
162 sequenceGenerator.setLastSequenceId(Math.max(seq1, seq2));
163 log.debug("Last sequence id: "+sequenceGenerator.getLastSequenceId());
164 }
165 catch (SQLException e) {
166 log.warn("Failed to find last sequence number: " + e, e);
167 }
168 finally {
169 try {
170 rs.close();
171 }
172 catch (Throwable e) {
173 }
174 try {
175 s.close();
176 }
177 catch (Throwable e) {
178 }
179 }
180 }
181
182 public void doAddMessage(Connection c, long seq, String messageID, String destinationName, byte[] data, long expiration) throws SQLException, JMSException {
183 PreparedStatement s = null;
184 try {
185 s = c.prepareStatement(statementProvider.getAddMessageStatment());
186 s.setLong(1, seq);
187 s.setString(2, destinationName);
188 s.setString(3, messageID);
189 setBinaryData(s, 4, data);
190 s.setLong(5, expiration);
191 if (s.executeUpdate() != 1) {
192 throw new JMSException("Failed to broker message: " + messageID + " in container. ");
193 }
194 }
195 finally {
196 try {
197 s.close();
198 }
199 catch (Throwable e) {
200 }
201 }
202 }
203
204 public Long getMessageSequenceId(Connection c, String messageID) throws SQLException, JMSException {
205 PreparedStatement s = null;
206 ResultSet rs = null;
207 try {
208
209 s = c.prepareStatement(statementProvider.getFindMessageSequenceIdStatment());
210 s.setString(1, messageID);
211 rs = s.executeQuery();
212
213 if (!rs.next()) {
214 return null;
215 }
216 return new Long( rs.getLong(1) );
217
218 }
219 finally {
220 try {
221 rs.close();
222 }
223 catch (Throwable e) {
224 }
225 try {
226 s.close();
227 }
228 catch (Throwable e) {
229 }
230 }
231 }
232
233 public byte[] doGetMessage(Connection c, long seq) throws SQLException {
234 PreparedStatement s = null;
235 ResultSet rs = null;
236 try {
237
238 s = c.prepareStatement(statementProvider.getFindMessageStatment());
239 s.setLong(1, seq);
240 rs = s.executeQuery();
241
242 if (!rs.next()) {
243 return null;
244 }
245 return getBinaryData(rs, 1);
246
247 }
248 finally {
249 try {
250 rs.close();
251 }
252 catch (Throwable e) {
253 }
254 try {
255 s.close();
256 }
257 catch (Throwable e) {
258 }
259 }
260 }
261
262 public void doGetMessageForUpdate(Connection c, long seq, boolean useLocking, ExpiredMessageResultHandler handler) throws SQLException, JMSException {
263 PreparedStatement s = null;
264 ResultSet rs = null;
265 try {
266
267 if (useLocking) {
268 s = c.prepareStatement(statementProvider.getFindMessageAttributesForUpdateStatment());
269 } else {
270 s = c.prepareStatement(statementProvider.getFindMessageAttributesStatment());
271 }
272 s.setLong(1, seq);
273 rs = s.executeQuery();
274
275 if (rs.next()) {
276 String container = rs.getString(1);
277 String msgid = rs.getString(2);
278 boolean isSentToDeadLetter = rs.getString(3)!=null&&rs.getString(3).equals("Y");
279 handler.onMessage(seq, container, msgid, isSentToDeadLetter);
280 }
281 }
282 finally {
283 try {
284 rs.close();
285 }
286 catch (Throwable e) {
287 }
288 try {
289 s.close();
290 }
291 catch (Throwable e) {
292 }
293 }
294 }
295
296 public void doSetDeadLetterFlag(Connection c, long seq) throws SQLException, JMSException {
297 PreparedStatement s = null;
298 ResultSet rs = null;
299 try {
300 // Update the db with the updated blob
301 s = c.prepareStatement(statementProvider.getSetDeadLetterFlagStatement());
302 s.setLong(1, seq);
303 int i = s.executeUpdate();
304 if (i <= 0)
305 throw new JMSException("Failed to broker message: " + seq
306 + " in container.");
307
308 } finally {
309 try {
310 rs.close();
311 } catch (Throwable e) {
312 }
313 try {
314 s.close();
315 } catch (Throwable e) {
316 }
317 }
318 }
319
320 public void doRemoveMessage(Connection c, long seq) throws SQLException {
321 PreparedStatement s = null;
322 try {
323 s = c.prepareStatement(statementProvider.getRemoveMessageStatment());
324 s.setLong(1, seq);
325 if (s.executeUpdate() != 1) {
326 log.error("Could not delete sequenece number for: " + seq);
327 }
328 }
329 finally {
330 try {
331 s.close();
332 }
333 catch (Throwable e) {
334 }
335 }
336 }
337
338 public void doRecover(Connection c, String destinationName, MessageListResultHandler listener) throws SQLException, JMSException {
339 PreparedStatement s = null;
340 ResultSet rs = null;
341 try {
342
343 s = c.prepareStatement(statementProvider.getFindAllMessagesStatment());
344 s.setString(1, destinationName);
345 rs = s.executeQuery();
346
347 while (rs.next()) {
348 long seq = rs.getLong(1);
349 String msgid = rs.getString(2);
350 listener.onMessage(seq, msgid);
351 }
352
353 }
354 finally {
355 try {
356 rs.close();
357 }
358 catch (Throwable e) {
359 }
360 try {
361 s.close();
362 }
363 catch (Throwable e) {
364 }
365 }
366 }
367
368 public void doRemoveXid(Connection c, ActiveMQXid xid) throws SQLException, XAException {
369 PreparedStatement s = null;
370 try {
371 s = c.prepareStatement(statementProvider.getRemoveXidStatment());
372 s.setString(1, xid.toLocalTransactionId());
373 if (s.executeUpdate() != 1) {
374 throw new XAException("Failed to remove prepared transaction: " + xid + ".");
375 }
376 }
377 finally {
378 try {
379 s.close();
380 }
381 catch (Throwable e) {
382 }
383 }
384 }
385
386
387 public void doAddXid(Connection c, ActiveMQXid xid) throws SQLException, XAException {
388 PreparedStatement s = null;
389 try {
390
391 s = c.prepareStatement(statementProvider.getAddXidStatment());
392 s.setString(1, xid.toLocalTransactionId());
393 if (s.executeUpdate() != 1) {
394 throw new XAException("Failed to store prepared transaction: " + xid);
395 }
396
397 }
398 finally {
399 try {
400 s.close();
401 }
402 catch (Throwable e) {
403 }
404 }
405 }
406
407 public void doLoadPreparedTransactions(Connection c, RecoveryListener listener) throws SQLException {
408 PreparedStatement s = null;
409 ResultSet rs = null;
410 try {
411
412 s = c.prepareStatement(statementProvider.getFindAllXidStatment());
413 rs = s.executeQuery();
414
415 while (rs.next()) {
416 String id = rs.getString(1);
417
418
419 /*
420 byte data[] = this.getBinaryData(rs, 2);
421 try {
422 ActiveMQXid xid = new ActiveMQXid(id);
423 Transaction transaction = XATransactionCommand.fromBytes(data);
424 transactionManager.loadTransaction(xid, transaction);
425 }
426 catch (Exception e) {
427 log.error("Failed to recover prepared transaction due to invalid xid: " + id, e);
428 }
429 */
430 }
431 }
432 finally {
433 try {
434 rs.close();
435 }
436 catch (Throwable e) {
437 }
438 try {
439 s.close();
440 }
441 catch (Throwable e) {
442 }
443 }
444 }
445
446 /**
447 * @throws JMSException
448 * @see org.activemq.store.jdbc.JDBCAdapter#doSetLastAck(java.sql.Connection, java.lang.String, java.lang.String, long)
449 */
450 public void doSetLastAck(Connection c, String destinationName, String subscriptionID, long seq) throws SQLException, JMSException {
451 PreparedStatement s = null;
452 try {
453 s = c.prepareStatement(statementProvider.getUpdateLastAckOfDurableSub());
454 s.setLong(1, seq);
455 s.setString(2, subscriptionID);
456 s.setString(3, destinationName);
457
458 if (s.executeUpdate() != 1) {
459 throw new JMSException("Failed to acknowlege message with sequence id: " + seq + " for client: " + subscriptionID);
460 }
461 }
462 finally {
463 try {
464 s.close();
465 }
466 catch (Throwable e) {
467 }
468 }
469 }
470
471 /**
472 * @throws JMSException
473 * @see org.activemq.store.jdbc.JDBCAdapter#doRecoverSubscription(java.sql.Connection, java.lang.String, java.lang.String, org.activemq.store.jdbc.JDBCAdapter.MessageListResultHandler)
474 */
475 public void doRecoverSubscription(Connection c, String destinationName, String subscriptionID, MessageListResultHandler listener) throws SQLException, JMSException {
476 // dumpTables(c, destinationName, subscriptionID);
477
478 PreparedStatement s = null;
479 ResultSet rs = null;
480 try {
481
482 // System.out.println(statementProvider.getFindAllDurableSubMessagesStatment());
483 s = c.prepareStatement(statementProvider.getFindAllDurableSubMessagesStatment());
484 s.setString(1, destinationName);
485 s.setString(2, subscriptionID);
486 rs = s.executeQuery();
487
488 while (rs.next()) {
489 long seq = rs.getLong(1);
490 String msgid = rs.getString(2);
491 listener.onMessage(seq, msgid);
492 }
493
494 }
495 finally {
496 try {
497 rs.close();
498 }
499 catch (Throwable e) {
500 }
501 try {
502 s.close();
503 }
504 catch (Throwable e) {
505 }
506 }
507 }
508
509 /**
510 * @see org.activemq.store.jdbc.JDBCAdapter#doSetSubscriberEntry(java.sql.Connection, java.lang.Object, org.activemq.service.SubscriberEntry)
511 */
512 public void doSetSubscriberEntry(Connection c, String destinationName, String sub, SubscriberEntry subscriberEntry) throws SQLException {
513
514 PreparedStatement s = null;
515 try {
516 s = c.prepareStatement(statementProvider.getUpdateDurableSubStatment());
517 s.setInt(1, subscriberEntry.getSubscriberID());
518 s.setString(2, subscriberEntry.getClientID());
519 s.setString(3, subscriberEntry.getConsumerName());
520 s.setString(4, subscriberEntry.getSelector());
521 s.setString(5, sub);
522 s.setString(6, destinationName);
523
524 // If the sub was not there then we need to create it.
525 if (s.executeUpdate() != 1) {
526 s.close();
527
528 long id=0;
529 ResultSet rs=null;
530 s = c.prepareStatement(statementProvider.getFindLastSequenceIdInMsgs());
531 try {
532 rs = s.executeQuery();
533 if (rs.next()) {
534 id = rs.getLong(1);
535 }
536 } finally {
537 try {
538 rs.close();
539 } catch (Throwable e) {
540 }
541 }
542 s.close();
543
544 s = c.prepareStatement(statementProvider.getCreateDurableSubStatment());
545 s.setInt(1, subscriberEntry.getSubscriberID());
546 s.setString(2, subscriberEntry.getClientID());
547 s.setString(3, subscriberEntry.getConsumerName());
548 s.setString(4, subscriberEntry.getSelector());
549 s.setString(5, sub);
550 s.setString(6, destinationName);
551
552 s.setLong(7, id);
553
554 if (s.executeUpdate() != 1) {
555 log.error("Failed to store durable subscription for: " + sub);
556 }
557 }
558 }
559 finally {
560 try {
561 s.close();
562 }
563 catch (Throwable e) {
564 }
565 }
566 }
567
568 /**
569 * @see org.activemq.store.jdbc.JDBCAdapter#doGetSubscriberEntry(java.sql.Connection, java.lang.Object)
570 */
571 public SubscriberEntry doGetSubscriberEntry(Connection c, String destinationName, String sub) throws SQLException {
572 PreparedStatement s = null;
573 ResultSet rs = null;
574 try {
575
576 s = c.prepareStatement(statementProvider.getFindDurableSubStatment());
577 s.setString(1, sub);
578 s.setString(2, destinationName);
579 rs = s.executeQuery();
580
581 if (!rs.next()) {
582 return null;
583 }
584
585 SubscriberEntry answer = new SubscriberEntry();
586 answer.setSubscriberID(rs.getInt(1));
587 answer.setClientID(rs.getString(2));
588 answer.setConsumerName(rs.getString(3));
589 answer.setDestination(rs.getString(4));
590
591 return answer;
592
593 }
594 finally {
595 try {
596 rs.close();
597 }
598 catch (Throwable e) {
599 }
600 try {
601 s.close();
602 }
603 catch (Throwable e) {
604 }
605 }
606 }
607
608 public void doRemoveAllMessages(Connection c, String destinationName) throws SQLException, JMSException {
609 PreparedStatement s = null;
610 try {
611 s = c.prepareStatement(statementProvider.getRemoveAllMessagesStatment());
612 s.setString(1, destinationName);
613 s.executeUpdate();
614 s.close();
615
616 s = c.prepareStatement(statementProvider.getRemoveAllSubscriptionsStatment());
617 s.setString(1, destinationName);
618 s.executeUpdate();
619
620 }
621 finally {
622 try {
623 s.close();
624 }
625 catch (Throwable e) {
626 }
627 }
628 }
629
630 public void doDeleteSubscription(Connection c, String destinationName, String subscription) throws SQLException, JMSException {
631 PreparedStatement s = null;
632 try {
633 s = c.prepareStatement(statementProvider.getDeleteSubscriptionStatment());
634 s.setString(1, subscription);
635 s.setString(2, destinationName);
636
637 s.executeUpdate();
638 }
639 finally {
640 try {
641 s.close();
642 }
643 catch (Throwable e) {
644 }
645 }
646 }
647
648 public void doDeleteOldMessages(Connection c) throws SQLException, JMSException {
649 PreparedStatement s = null;
650 try {
651 s = c.prepareStatement(statementProvider.getDeleteOldMessagesStatment());
652 //s.setLong(1, System.currentTimeMillis());
653 int i = s.executeUpdate();
654 log.debug("Deleted "+i+" old message(s).");
655 }
656 finally {
657 try {
658 s.close();
659 }
660 catch (Throwable e) {
661 }
662 }
663 }
664
665 public void doGetExpiredMessages(Connection c, ExpiredMessageResultHandler handler) throws SQLException, JMSException {
666 PreparedStatement s = null;
667 ResultSet rs = null;
668 try {
669 s = c.prepareStatement(statementProvider.getFindExpiredMessagesStatment());
670 s.setLong(1, System.currentTimeMillis());
671 rs = s.executeQuery();
672 while(rs.next()) {
673 long seq = rs.getLong(1);
674 String container = rs.getString(2);
675 String msgid = rs.getString(3);
676 boolean isSentToDeadLetter = rs.getString(4)!=null&&rs.getString(4).equals("Y");
677 handler.onMessage(seq, container, msgid, isSentToDeadLetter);
678 }
679 }
680 finally {
681 try {
682 s.close();
683 }
684 catch (Throwable e) {
685 }
686 }
687 }
688
689 public void doDeleteExpiredMessage(Connection c, MessageIdentity messageIdentity) throws SQLException, JMSException {
690 PreparedStatement s = null;
691 ResultSet rs = null;
692 try {
693 s = c.prepareStatement(statementProvider.getDeleteMessageStatement());
694 Long seq = (Long)messageIdentity.getSequenceNumber();
695 s.setLong(1, seq.longValue());
696 s.setString(2, messageIdentity.getMessageID());
697 int i = s.executeUpdate();
698 log.debug("Deleted "+i+" old message.");
699 }
700 finally {
701 try {
702 s.close();
703 }
704 catch (Throwable e) {
705 }
706 }
707 }
708
709 public StatementProvider getStatementProvider() {
710 return statementProvider;
711 }
712
713 /*
714 * Usefull for debuging.
715 *
716 public void dumpTables(Connection c, String destinationName, String subscriptionID) throws SQLException {
717 printQuery(c, "Select * from ACTIVEMQ_MSGS", System.out);
718 printQuery(c, "Select * from ACTIVEMQ_ACKS", System.out);
719 PreparedStatement s = c.prepareStatement("SELECT M.ID, M.MSGID " +
720 "FROM ACTIVEMQ_MSGS M, ACTIVEMQ_ACKS D " +
721 "WHERE D.CONTAINER=? AND D.SUB=? " +
722 "AND M.CONTAINER=D.CONTAINER " +
723 "AND M.ID > D.LAST_ACKED_ID " +
724 "ORDER BY M.ID");
725 s.setString(1,destinationName);
726 s.setString(2,subscriptionID);
727 printQuery(s,System.out);
728 }
729
730 private void printQuery(Connection c, String query, PrintStream out) throws SQLException {
731 printQuery(c.prepareStatement(query), out);
732 }
733
734 private void printQuery(PreparedStatement s, PrintStream out) throws SQLException {
735
736 ResultSet set=null;
737 try {
738 set = s.executeQuery();
739 ResultSetMetaData metaData = set.getMetaData();
740 for( int i=1; i<= metaData.getColumnCount(); i++ ) {
741 if(i==1)
742 out.print("||");
743 out.print(metaData.getColumnName(i)+"||");
744 }
745 out.println();
746 while(set.next()) {
747 for( int i=1; i<= metaData.getColumnCount(); i++ ) {
748 if(i==1)
749 out.print("|");
750 out.print(set.getString(i)+"|");
751 }
752 out.println();
753 }
754 } finally {
755 try { set.close(); } catch (Throwable ignore) {}
756 try { s.close(); } catch (Throwable ignore) {}
757 }
758 }
759 */
760 }