001 /**
002 *
003 * Copyright 2004 Hiram Chirino
004 * Copyright 2004 Protique Ltd
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 *
018 **/
019 package org.activemq.store.journal;
020
021 import java.io.DataInputStream;
022 import java.io.DataOutputStream;
023 import java.io.File;
024 import java.io.IOException;
025 import java.util.ArrayList;
026 import java.util.Iterator;
027 import java.util.Map;
028 import java.sql.SQLException;
029
030 import javax.jms.JMSException;
031 import javax.transaction.xa.XAException;
032
033 import org.activeio.adapter.PacketByteArrayOutputStream;
034 import org.activeio.adapter.PacketInputStream;
035 import org.activeio.journal.InvalidRecordLocationException;
036 import org.activeio.journal.Journal;
037 import org.activeio.journal.JournalEventListener;
038 import org.activeio.journal.RecordLocation;
039 import org.activeio.journal.active.JournalImpl;
040 import org.activeio.journal.howl.HowlJournal;
041 import org.activemq.io.WireFormat;
042 import org.activemq.io.impl.StatelessDefaultWireFormat;
043 import org.activemq.message.ActiveMQMessage;
044 import org.activemq.message.ActiveMQXid;
045 import org.activemq.message.MessageAck;
046 import org.activemq.message.Packet;
047 import org.activemq.service.MessageIdentity;
048 import org.activemq.store.MessageStore;
049 import org.activemq.store.PersistenceAdapter;
050 import org.activemq.store.TopicMessageStore;
051 import org.activemq.store.TransactionStore;
052 import org.activemq.store.jdbc.JDBCPersistenceAdapter;
053 import org.activemq.store.journal.JournalTransactionStore.Tx;
054 import org.activemq.store.journal.JournalTransactionStore.TxOperation;
055 import org.activemq.util.JMSExceptionHelper;
056 import org.apache.commons.logging.Log;
057 import org.apache.commons.logging.LogFactory;
058 import org.objectweb.howl.log.Configuration;
059
060 import EDU.oswego.cs.dl.util.concurrent.Channel;
061 import EDU.oswego.cs.dl.util.concurrent.ClockDaemon;
062 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
063 import EDU.oswego.cs.dl.util.concurrent.Latch;
064 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
065 import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
066 import EDU.oswego.cs.dl.util.concurrent.ThreadFactory;
067
068 /**
069 * An implementation of {@link PersistenceAdapter} designed for
070 * use with a {@link Journal} and then checkpointing asynchronously
071 * on a timeout with some other long term persistent storage.
072 *
073 * @version $Revision: 1.1 $
074 */
075 public class JournalPersistenceAdapter implements PersistenceAdapter, JournalEventListener {
076
077 private static final Log log = LogFactory.getLog(JournalPersistenceAdapter.class);
078 public static final String DEFAULT_JOURNAL_TYPE = "default";
079 public static final String HOWL_JOURNAL_TYPE = "howl";
080
081 private Journal journal;
082 private String journalType = DEFAULT_JOURNAL_TYPE;
083 private PersistenceAdapter longTermPersistence;
084 private File directory = new File("logs");
085 private final StatelessDefaultWireFormat wireFormat = new StatelessDefaultWireFormat();
086 private final ConcurrentHashMap messageStores = new ConcurrentHashMap();
087 private final ConcurrentHashMap topicMessageStores = new ConcurrentHashMap();
088
089 private static final int PACKET_RECORD_TYPE = 0;
090 private static final int COMMAND_RECORD_TYPE = 1;
091 private static final int TX_COMMAND_RECORD_TYPE = 2;
092 private static final int ACK_RECORD_TYPE = 3;
093
094 private Channel checkpointRequests = new LinkedQueue();
095 private QueuedExecutor checkpointExecutor;
096 ClockDaemon clockDaemon;
097 private Object clockTicket;
098 private JournalTransactionStore transactionStore = new JournalTransactionStore(this);
099 private int logFileSize=1024*1024*20;
100 private int logFileCount=2;
101 private long checkpointInterval = 1000 * 60 * 5;
102
103 public JournalPersistenceAdapter() {
104 checkpointExecutor = new QueuedExecutor(new LinkedQueue());
105 checkpointExecutor.setThreadFactory(new ThreadFactory() {
106 public Thread newThread(Runnable runnable) {
107 Thread answer = new Thread(runnable, "Checkpoint Worker");
108 answer.setDaemon(true);
109 answer.setPriority(Thread.MAX_PRIORITY);
110 return answer;
111 }
112 });
113 }
114
115 public JournalPersistenceAdapter(File directory, PersistenceAdapter longTermPersistence) throws IOException {
116 this();
117 this.directory = directory;
118 this.longTermPersistence = longTermPersistence;
119 }
120
121 public Map getInitialDestinations() {
122 return longTermPersistence.getInitialDestinations();
123 }
124
125 private MessageStore createMessageStore(String destination, boolean isQueue) throws JMSException {
126 if(isQueue) {
127 return createQueueMessageStore(destination);
128 } else {
129 return createTopicMessageStore(destination);
130 }
131 }
132
133 public MessageStore createQueueMessageStore(String destinationName) throws JMSException {
134 JournalMessageStore store = (JournalMessageStore) messageStores.get(destinationName);
135 if( store == null ) {
136 MessageStore checkpointStore = longTermPersistence.createQueueMessageStore(destinationName);
137 store = new JournalMessageStore(this, checkpointStore, destinationName);
138 messageStores.put(destinationName, store);
139 }
140 return store;
141 }
142
143 public TopicMessageStore createTopicMessageStore(String destinationName) throws JMSException {
144 JournalTopicMessageStore store = (JournalTopicMessageStore) topicMessageStores.get(destinationName);
145 if( store == null ) {
146 TopicMessageStore checkpointStore = longTermPersistence.createTopicMessageStore(destinationName);
147 store = new JournalTopicMessageStore(this, checkpointStore, destinationName);
148 topicMessageStores.put(destinationName, store);
149 }
150 return store;
151 }
152
153 public TransactionStore createTransactionStore() throws JMSException {
154 return transactionStore;
155 }
156
157 public void beginTransaction() throws JMSException {
158 longTermPersistence.beginTransaction();
159 }
160
161 public void commitTransaction() throws JMSException {
162 longTermPersistence.commitTransaction();
163 }
164
165 public void rollbackTransaction() {
166 longTermPersistence.rollbackTransaction();
167 }
168
169 public synchronized void start() throws JMSException {
170
171 if( longTermPersistence instanceof JDBCPersistenceAdapter ) {
172 // Disabled periodic clean up as it deadlocks with the checkpoint operations.
173 ((JDBCPersistenceAdapter)longTermPersistence).setCleanupPeriod(0);
174 }
175
176 longTermPersistence.start();
177 createTransactionStore();
178 if (journal == null) {
179 try {
180 log.info("Opening journal.");
181 journal = createJournal();
182 log.info("Opened journal: " + journal);
183 journal.setJournalEventListener(this);
184 }
185 catch (Exception e) {
186 throw JMSExceptionHelper.newJMSException("Failed to open transaction journal: " + e, e);
187 }
188 try {
189 recover();
190 }
191 catch (Exception e) {
192 throw JMSExceptionHelper.newJMSException("Failed to recover transactions from journal: " + e, e);
193 }
194 }
195
196 // Do a checkpoint periodically.
197 clockTicket = getClockDaemon().executePeriodically(checkpointInterval, new Runnable() {
198 public void run() {
199 checkpoint(false);
200 }
201 }, false);
202
203 }
204
205 public synchronized void stop() throws JMSException {
206
207 if (clockTicket != null) {
208 // Stop the periodical checkpoint.
209 ClockDaemon.cancel(clockTicket);
210 clockTicket=null;
211 clockDaemon.shutDown();
212 }
213
214 // Take one final checkpoint and stop checkpoint processing.
215 checkpoint(true);
216 checkpointExecutor.shutdownAfterProcessingCurrentlyQueuedTasks();
217
218 JMSException firstException = null;
219 if (journal != null) {
220 try {
221 journal.close();
222 journal = null;
223 }
224 catch (Exception e) {
225 firstException = JMSExceptionHelper.newJMSException("Failed to close journals: " + e, e);
226 }
227 }
228 longTermPersistence.stop();
229
230 if (firstException != null) {
231 throw firstException;
232 }
233 }
234
235 // Properties
236 //-------------------------------------------------------------------------
237 public PersistenceAdapter getLongTermPersistence() {
238 return longTermPersistence;
239 }
240
241 public void setLongTermPersistence(PersistenceAdapter longTermPersistence) {
242 this.longTermPersistence = longTermPersistence;
243 }
244
245 /**
246 * @return Returns the directory.
247 */
248 public File getDirectory() {
249 return directory;
250 }
251
252 /**
253 * @param directory The directory to set.
254 */
255 public void setDirectory(File directory) {
256 this.directory = directory;
257 }
258
259 /**
260 * @return Returns the wireFormat.
261 */
262 public WireFormat getWireFormat() {
263 return wireFormat;
264 }
265
266 public String getJournalType() {
267 return journalType;
268 }
269
270 public void setJournalType(String journalType) {
271 this.journalType = journalType;
272 }
273
274 protected Journal createJournal() throws IOException {
275 if( DEFAULT_JOURNAL_TYPE.equals(journalType) ) {
276 return new JournalImpl(directory,logFileCount,logFileSize);
277 }
278
279 if( HOWL_JOURNAL_TYPE.equals(journalType) ) {
280 try {
281 Configuration config = new Configuration();
282 config.setLogFileDir(directory.getCanonicalPath());
283 return new HowlJournal(config);
284 } catch (IOException e) {
285 throw e;
286 } catch (Exception e) {
287 throw (IOException)new IOException("Could not open HOWL journal: "+e.getMessage()).initCause(e);
288 }
289 }
290
291 throw new IllegalStateException("Unsupported valued for journalType attribute: "+journalType);
292 }
293
294 // Implementation methods
295 //-------------------------------------------------------------------------
296
297 /**
298 * The Journal give us a call back so that we can move old data out of the journal.
299 * Taking a checkpoint does this for us.
300 *
301 * @see org.activemq.journal.JournalEventListener#overflowNotification(org.activemq.journal.RecordLocation)
302 */
303 public void overflowNotification(RecordLocation safeLocation) {
304 checkpoint(false);
305 }
306
307 /**
308 * When we checkpoint we move all the journaled data to long term storage.
309 * @param b
310 */
311 public void checkpoint(boolean sync) {
312 try {
313
314 if( journal == null )
315 throw new IllegalStateException("Journal is closed.");
316
317 // Do the checkpoint asynchronously?
318 Latch latch=null;
319 if( sync ) {
320 latch = new Latch();
321 checkpointRequests.put(latch);
322 } else {
323 checkpointRequests.put(Boolean.TRUE);
324 }
325
326 checkpointExecutor.execute(new Runnable() {
327 public void run() {
328
329 ArrayList listners = new ArrayList();
330
331 try {
332 // Avoid running a checkpoint too many times in a row.
333 // Consume any queued up checkpoint requests.
334 try {
335 boolean requested = false;
336 Object t;
337 while ((t=checkpointRequests.poll(0)) != null) {
338 if( t.getClass()==Latch.class )
339 listners.add(t);
340 requested = true;
341 }
342 if (!requested) {
343 return;
344 }
345 }
346 catch (InterruptedException e1) {
347 return;
348 }
349
350 log.debug("Checkpoint started.");
351 RecordLocation newMark = null;
352
353 Iterator iterator = messageStores.values().iterator();
354 while (iterator.hasNext()) {
355 try {
356 JournalMessageStore ms = (JournalMessageStore) iterator.next();
357 RecordLocation mark = ms.checkpoint();
358 if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
359 newMark = mark;
360 }
361 }
362 catch (Exception e) {
363 log.error("Failed to checkpoint a message store: " + e, e);
364 }
365 }
366
367 iterator = topicMessageStores.values().iterator();
368 while (iterator.hasNext()) {
369 try {
370 JournalTopicMessageStore ms = (JournalTopicMessageStore) iterator.next();
371 RecordLocation mark = ms.checkpoint();
372 if (mark != null && (newMark == null || newMark.compareTo(mark) < 0)) {
373 newMark = mark;
374 }
375 }
376 catch (Exception e) {
377 log.error("Failed to checkpoint a message store: " + e, e);
378 }
379 }
380
381 try {
382 if (newMark != null) {
383 if( log.isDebugEnabled() )
384 log.debug("Marking journal: "+newMark);
385 journal.setMark(newMark, true);
386 }
387 }
388 catch (Exception e) {
389 log.error("Failed to mark the Journal: " + e, e);
390 }
391
392 // Clean up the DB if it's a JDBC store.
393 if( longTermPersistence instanceof JDBCPersistenceAdapter ) {
394 // Disabled periodic clean up as it deadlocks with the checkpoint operations.
395 try {
396 ((JDBCPersistenceAdapter)longTermPersistence).cleanup();
397 } catch (SQLException sqle) {
398 log.error("Cleanup failed due to: " + sqle, sqle);
399 }
400 }
401
402 log.debug("Checkpoint done.");
403 } finally {
404 for (Iterator iter = listners.iterator(); iter.hasNext();) {
405 Latch latch = (Latch) iter.next();
406 latch.release();
407 }
408 }
409 }
410 });
411
412 if( sync ) {
413 latch.acquire();
414 }
415 }
416 catch (InterruptedException e) {
417 log.warn("Request to start checkpoint failed: " + e, e);
418 }
419 }
420
421 /**
422 * @param destinationName
423 * @param message
424 * @param sync
425 * @throws JMSException
426 */
427 public RecordLocation writePacket(String destination, Packet packet, boolean sync) throws JMSException {
428 try {
429
430 PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream();
431 DataOutputStream os = new DataOutputStream(pos);
432 os.writeByte(PACKET_RECORD_TYPE);
433 os.writeUTF(destination);
434 os.close();
435 org.activeio.Packet p = wireFormat.writePacket(packet, pos);
436 return journal.write(p, sync);
437 }
438 catch (IOException e) {
439 throw createWriteException(packet, e);
440 }
441 }
442
443 /**
444 * @param destinationName
445 * @param message
446 * @param sync
447 * @throws JMSException
448 */
449 public RecordLocation writeCommand(String command, boolean sync) throws JMSException {
450 try {
451
452 PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream();
453 DataOutputStream os = new DataOutputStream(pos);
454 os.writeByte(COMMAND_RECORD_TYPE);
455 os.writeUTF(command);
456 os.close();
457 return journal.write(pos.getPacket(), sync);
458
459 }
460 catch (IOException e) {
461 throw createWriteException(command, e);
462 }
463 }
464
465 /**
466 * @param location
467 * @return
468 * @throws JMSException
469 */
470 public Packet readPacket(RecordLocation location) throws JMSException {
471 try {
472 org.activeio.Packet data = journal.read(location);
473 DataInputStream is = new DataInputStream(new PacketInputStream(data));
474 byte type = is.readByte();
475 if (type != PACKET_RECORD_TYPE) {
476 throw new IOException("Record is not a packet type.");
477 }
478 String destination = is.readUTF();
479 Packet packet = wireFormat.readPacket(data);
480 is.close();
481 return packet;
482
483 }
484 catch (InvalidRecordLocationException e) {
485 throw createReadException(location, e);
486 }
487 catch (IOException e) {
488 throw createReadException(location, e);
489 }
490 }
491
492
493 /**
494 * Move all the messages that were in the journal into long term storeage. We just replay and do a checkpoint.
495 *
496 * @throws JMSException
497 * @throws IOException
498 * @throws InvalidRecordLocationException
499 * @throws IllegalStateException
500 */
501 private void recover() throws IllegalStateException, InvalidRecordLocationException, IOException, JMSException {
502
503 RecordLocation pos = null;
504 int transactionCounter = 0;
505
506 log.info("Journal Recovery Started.");
507
508 // While we have records in the journal.
509 while ((pos = journal.getNextRecordLocation(pos)) != null) {
510 org.activeio.Packet data = journal.read(pos);
511 DataInputStream is = new DataInputStream(new PacketInputStream(data));
512
513 // Read the destination and packate from the record.
514 String destination = null;
515 Packet packet = null;
516 try {
517 byte type = is.readByte();
518 switch (type) {
519 case PACKET_RECORD_TYPE:
520
521 // Is the current packet part of the destination?
522 destination = is.readUTF();
523 packet = wireFormat.readPacket(data);
524
525 // Try to replay the packet.
526 if (packet instanceof ActiveMQMessage) {
527 ActiveMQMessage msg = (ActiveMQMessage) packet;
528
529 JournalMessageStore store = (JournalMessageStore) createMessageStore(destination, msg.getJMSActiveMQDestination().isQueue());
530 if( msg.getTransactionId()!=null ) {
531 transactionStore.addMessage(store, msg, pos);
532 } else {
533 store.replayAddMessage(msg);
534 transactionCounter++;
535 }
536 }
537 else if (packet instanceof MessageAck) {
538 MessageAck ack = (MessageAck) packet;
539 JournalMessageStore store = (JournalMessageStore) createMessageStore(destination, ack.getDestination().isQueue());
540 if( ack.getTransactionId()!=null ) {
541 transactionStore.removeMessage(store, ack, pos);
542 } else {
543 store.replayRemoveMessage(ack);
544 transactionCounter++;
545 }
546 }
547 else {
548 log.error("Unknown type of packet in transaction log which will be discarded: " + packet);
549 }
550
551 break;
552 case TX_COMMAND_RECORD_TYPE:
553
554 TxCommand command = new TxCommand();
555 command.setType(is.readByte());
556 command.setWasPrepared(is.readBoolean());
557 switch(command.getType()) {
558 case TxCommand.LOCAL_COMMIT:
559 case TxCommand.LOCAL_ROLLBACK:
560 command.setTransactionId(is.readUTF());
561 break;
562 default:
563 command.setTransactionId(ActiveMQXid.read(is));
564 break;
565 }
566
567 // Try to replay the packet.
568 switch(command.getType()) {
569 case TxCommand.XA_PREPARE:
570 transactionStore.replayPrepare(command.getTransactionId());
571 break;
572 case TxCommand.XA_COMMIT:
573 case TxCommand.LOCAL_COMMIT:
574 Tx tx = transactionStore.replayCommit(command.getTransactionId(), command.getWasPrepared());
575 // Replay the committed operations.
576 if( tx!=null) {
577 tx.getOperations();
578 for (Iterator iter = tx.getOperations().iterator(); iter.hasNext();) {
579 TxOperation op = (TxOperation) iter.next();
580 if( op.operationType == TxOperation.ADD_OPERATION_TYPE ) {
581 op.store.replayAddMessage((ActiveMQMessage) op.data);
582 }
583 if( op.operationType == TxOperation.REMOVE_OPERATION_TYPE) {
584 op.store.replayRemoveMessage((MessageAck) op.data);
585 }
586 if( op.operationType == TxOperation.ACK_OPERATION_TYPE) {
587 JournalAck ack = (JournalAck) op.data;
588 ((JournalTopicMessageStore)op.store).replayAcknowledge(ack.getSubscription(), new MessageIdentity(ack.getMessageId()));
589 }
590 }
591 transactionCounter++;
592 }
593 break;
594 case TxCommand.LOCAL_ROLLBACK:
595 case TxCommand.XA_ROLLBACK:
596 transactionStore.replayRollback(command.getTransactionId());
597 break;
598 }
599
600 break;
601
602 case ACK_RECORD_TYPE:
603
604 destination = is.readUTF();
605 String subscription = is.readUTF();
606 String messageId = is.readUTF();
607 Object transactionId=null;
608
609 JournalTopicMessageStore store = (JournalTopicMessageStore) createMessageStore(destination, false);
610 if( transactionId!=null ) {
611 JournalAck ack = new JournalAck(destination, subscription, messageId, transactionId);
612 transactionStore.acknowledge(store, ack, pos);
613 } else {
614 store.replayAcknowledge(subscription, new MessageIdentity(messageId));
615 transactionCounter++;
616 }
617
618 case COMMAND_RECORD_TYPE:
619
620 break;
621 default:
622 log.error("Unknown type of record in transaction log which will be discarded: " + type);
623 break;
624 }
625 }
626 finally {
627 is.close();
628 }
629 }
630
631 RecordLocation location = writeCommand("RECOVERED", true);
632 journal.setMark(location, true);
633
634 log.info("Journal Recovered: " + transactionCounter + " message(s) in transactions recovered.");
635 }
636
637 private JMSException createReadException(RecordLocation location, Exception e) {
638 return JMSExceptionHelper.newJMSException("Failed to read to journal for: " + location + ". Reason: " + e, e);
639 }
640
641 protected JMSException createWriteException(Packet packet, Exception e) {
642 return JMSExceptionHelper.newJMSException("Failed to write to journal for: " + packet + ". Reason: " + e, e);
643 }
644
645 private XAException createWriteException(TxCommand command, Exception e) {
646 return (XAException)new XAException("Failed to write to journal for: " + command + ". Reason: " + e).initCause(e);
647 }
648
649
650 protected JMSException createWriteException(String command, Exception e) {
651 return JMSExceptionHelper.newJMSException("Failed to write to journal for command: " + command + ". Reason: " + e, e);
652 }
653
654 protected JMSException createRecoveryFailedException(Exception e) {
655 return JMSExceptionHelper.newJMSException("Failed to recover from journal. Reason: " + e, e);
656 }
657
658 public ClockDaemon getClockDaemon() {
659 if (clockDaemon == null) {
660 clockDaemon = new ClockDaemon();
661 clockDaemon.setThreadFactory(new ThreadFactory() {
662 public Thread newThread(Runnable runnable) {
663 Thread thread = new Thread(runnable, "Checkpoint Timer");
664 thread.setDaemon(true);
665 return thread;
666 }
667 });
668 }
669 return clockDaemon;
670 }
671
672 public void setClockDaemon(ClockDaemon clockDaemon) {
673 this.clockDaemon = clockDaemon;
674 }
675
676 /**
677 * @param xid
678 * @return
679 */
680 public RecordLocation writeTxCommand(TxCommand command, boolean sync) throws XAException {
681 try {
682
683 PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream();
684 DataOutputStream os = new DataOutputStream(pos);
685 os.writeByte(TX_COMMAND_RECORD_TYPE);
686 os.writeByte(command.getType());
687 os.writeBoolean(command.getWasPrepared());
688 switch(command.getType()) {
689 case TxCommand.LOCAL_COMMIT:
690 case TxCommand.LOCAL_ROLLBACK:
691 os.writeUTF( (String) command.getTransactionId() );
692 break;
693 default:
694 ActiveMQXid xid = (ActiveMQXid) command.getTransactionId();
695 xid.write(os);
696 break;
697 }
698 os.close();
699 return journal.write(pos.getPacket(), sync);
700 }
701 catch (IOException e) {
702 throw createWriteException(command, e);
703 }
704 }
705
706 /**
707 * @param destinationName
708 * @param persistentKey
709 * @param messageIdentity
710 * @param b
711 * @return
712 */
713 public RecordLocation writePacket(String destinationName, String subscription, MessageIdentity messageIdentity, boolean sync) throws JMSException{
714 try {
715
716 PacketByteArrayOutputStream pos = new PacketByteArrayOutputStream();
717 DataOutputStream os = new DataOutputStream(pos);
718 os.writeByte(ACK_RECORD_TYPE);
719 os.writeUTF(destinationName);
720 os.writeUTF(subscription);
721 os.writeUTF(messageIdentity.getMessageID());
722 os.close();
723 return journal.write(pos.getPacket(), sync);
724
725 }
726 catch (IOException e) {
727 throw createWriteException("Ack for message: "+messageIdentity, e);
728 }
729 }
730
731 public JournalTransactionStore getTransactionStore() {
732 return transactionStore;
733 }
734
735 public int getLogFileCount() {
736 return logFileCount;
737 }
738
739 public void setLogFileCount(int logFileCount) {
740 this.logFileCount = logFileCount;
741 }
742
743 public int getLogFileSize() {
744 return logFileSize;
745 }
746
747 public void setLogFileSize(int logFileSize) {
748 this.logFileSize = logFileSize;
749 }
750
751 /**
752 * Verifies if a dead letter has already been sent for a message
753 * @param seq
754 * @param useLocking to prevent concurrency/dups
755 * @return
756 */
757 public boolean deadLetterAlreadySent(long seq, boolean useLocking) {
758 return longTermPersistence.deadLetterAlreadySent(seq, useLocking);
759 }
760
761 public long getCheckpointInterval() {
762 return checkpointInterval;
763 }
764 public void setCheckpointInterval(long checkpointInterval) {
765 this.checkpointInterval = checkpointInterval;
766 }
767 }