001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 * Copyright 2005 Hiram Chirino
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 *
018 **/
019
020 package org.activemq.service.boundedvm;
021 import org.activemq.broker.BrokerClient;
022 import org.activemq.filter.Filter;
023 import org.activemq.io.util.MemoryBoundedQueue;
024 import org.activemq.io.util.MemoryBoundedQueueManager;
025 import org.activemq.io.util.MemoryManageable;
026 import org.activemq.message.ActiveMQDestination;
027 import org.activemq.message.ActiveMQMessage;
028 import org.activemq.message.ConsumerInfo;
029 import org.activemq.service.DeadLetterPolicy;
030 import org.activemq.service.MessageContainerAdmin;
031 import org.activemq.service.MessageIdentity;
032 import org.activemq.service.QueueListEntry;
033 import org.activemq.service.RedeliveryPolicy;
034 import org.activemq.service.Service;
035 import org.activemq.service.TransactionManager;
036 import org.activemq.service.TransactionTask;
037 import org.activemq.service.impl.DefaultQueueList;
038 import org.activemq.store.MessageStore;
039 import org.activemq.store.RecoveryListener;
040 import org.apache.commons.logging.Log;
041 import org.apache.commons.logging.LogFactory;
042
043 import EDU.oswego.cs.dl.util.concurrent.Executor;
044 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
045
046 import javax.jms.JMSException;
047
048 import java.util.HashMap;
049 import java.util.List;
050 import java.util.Map;
051
052 /**
053 * A MessageContainer for Durable queues
054 *
055 * @version $Revision: 1.1.1.1 $
056 */
057 public class DurableQueueBoundedMessageContainer implements Service, Runnable, MessageContainerAdmin {
058
059 private final MessageStore messageStore;
060 private final MemoryBoundedQueueManager queueManager;
061 private final ActiveMQDestination destination;
062 private final Executor threadPool;
063 private final DeadLetterPolicy deadLetterPolicy;
064 private final Log log;
065 private final MemoryBoundedQueue queue;
066
067 private final DefaultQueueList subscriptions = new DefaultQueueList();
068 private final SynchronizedBoolean started = new SynchronizedBoolean(false);
069 private final SynchronizedBoolean running = new SynchronizedBoolean(false);
070 private final Object dispatchMutex = new Object();
071 private final Object subscriptionsMutex = new Object();
072
073 private long idleTimestamp; //length of time (ms) there have been no active subscribers
074
075 /**
076 * Construct this beast
077 *
078 * @param threadPool
079 * @param queueManager
080 * @param destination
081 * @param redeliveryPolicy
082 * @param deadLetterPolicy
083 */
084 public DurableQueueBoundedMessageContainer(MessageStore messageStore, Executor threadPool, MemoryBoundedQueueManager queueManager,
085 ActiveMQDestination destination,RedeliveryPolicy redeliveryPolicy, DeadLetterPolicy deadLetterPolicy) {
086 this.messageStore = messageStore;
087 this.threadPool = threadPool;
088 this.queueManager = queueManager;
089 this.destination = destination;
090 this.deadLetterPolicy = deadLetterPolicy;
091
092 this.queue = queueManager.getMemoryBoundedQueue("DURABLE_QUEUE:-" + destination.getPhysicalName());
093 this.log = LogFactory.getLog("DurableQueueBoundedMessageContainer:- " + destination);
094 }
095
096
097 /**
098 * @return true if there are subscribers waiting for messages
099 */
100 public boolean isActive(){
101 return !subscriptions.isEmpty();
102 }
103
104 /**
105 * @return true if no messages are enqueued
106 */
107 public boolean isEmpty(){
108 return queue.isEmpty();
109 }
110
111 /**
112 * @return the timestamp (ms) from the when the last active subscriber stopped
113 */
114 public long getIdleTimestamp(){
115 return idleTimestamp;
116 }
117
118
119
120 /**
121 * Add a consumer to dispatch messages to
122 *
123 * @param filter
124 * @param info
125 * @param client
126 * @return DurableQueueSubscription
127 * @throws JMSException
128 */
129 public DurableQueueSubscription addConsumer(Filter filter, ConsumerInfo info, BrokerClient client)
130 throws JMSException {
131 DurableQueueSubscription ts = findMatch(info);
132 if (ts == null) {
133 MemoryBoundedQueue queue = queueManager.getMemoryBoundedQueue("DURABLE_SUB:-"+info.getConsumerId());
134 MemoryBoundedQueue ackQueue = queueManager.getMemoryBoundedQueue("DURABLE_SUB_ACKED:-"+info.getConsumerId());
135 ts = new DurableQueueSubscription(client, queue, ackQueue, filter, info);
136 synchronized (subscriptionsMutex) {
137 idleTimestamp = 0;
138 subscriptions.add(ts);
139 checkRunning();
140 }
141 }
142 return ts;
143 }
144
145 /**
146 * Remove a consumer
147 *
148 * @param info
149 * @throws JMSException
150 */
151 public void removeConsumer(ConsumerInfo info) throws JMSException {
152 DurableQueueSubscription ts = null;
153 synchronized (subscriptionsMutex) {
154 ts = findMatch(info);
155 if (ts != null) {
156
157 subscriptions.remove(ts);
158 if (subscriptions.isEmpty()) {
159 running.commit(true, false);
160 idleTimestamp = System.currentTimeMillis();
161 }
162 }
163 }
164 if (ts != null) {
165
166 // get unacknowledged messages and re-enqueue them
167 List list = ts.getUndeliveredMessages();
168 for (int i = list.size() - 1; i >= 0; i--) {
169 queue.enqueueFirstNoBlock((MemoryManageable) list.get(i));
170 }
171
172 // If it is a queue browser, then re-enqueue the browsed
173 // messages.
174 if (ts.isBrowser()) {
175 list = ts.listAckedMessages();
176 for (int i = list.size() - 1; i >= 0; i--) {
177 queue.enqueueFirstNoBlock((MemoryManageable) list
178 .get(i));
179 }
180 ts.removeAllAckedMessages();
181 }
182
183 ts.close();
184 }
185 }
186
187 /**
188 * start working
189 *
190 * @throws JMSException
191 */
192 public void start() throws JMSException {
193 if (started.commit(false, true)) {
194 messageStore.start();
195
196 // Avoid recovery failing due to memory constraints.
197 this.queueManager.setMemoryLimitEnforced(false);
198 try {
199 messageStore.recover(new RecoveryListener() {
200 public void recoverMessage(MessageIdentity messageIdentity) throws JMSException {
201 recoverMessageToBeDelivered(messageIdentity);
202 }
203 });
204 } finally {
205 this.queueManager.setMemoryLimitEnforced(true);
206 }
207
208 checkRunning();
209 }
210 }
211
212 private void recoverMessageToBeDelivered(MessageIdentity msgId) throws JMSException {
213 DurableMessagePointer pointer = new DurableMessagePointer(messageStore, getDestination(), messageStore.getMessage(msgId));
214 queue.enqueue(pointer);
215 }
216
217 /**
218 * enqueue a message for dispatching
219 *
220 * @param message
221 * @throws JMSException
222 */
223 public void enqueue(final ActiveMQMessage message) throws JMSException {
224 final DurableMessagePointer pointer = new DurableMessagePointer(messageStore, getDestination(), message);
225 if (message.isAdvisory()) {
226 doAdvisoryDispatchMessage(pointer);
227 }
228 else {
229 messageStore.addMessage(message);
230 // If there is no transaction.. then this executes directly.
231 TransactionManager.getContexTransaction().addPostCommitTask(new TransactionTask(){
232 public void execute() throws Throwable {
233 queue.enqueue(pointer);
234 checkRunning();
235 }
236 });
237 }
238 }
239
240 public void redeliver(DurableMessagePointer message) {
241 queue.enqueueFirstNoBlock(message);
242 checkRunning();
243 }
244
245 public void redeliver(List messages) {
246 queue.enqueueAllFirstNoBlock(messages);
247 checkRunning();
248 }
249
250 /**
251 * stop working
252 */
253 public void stop() {
254 started.set(false);
255 running.set(false);
256 queue.clear();
257 }
258
259 /**
260 * close down this container
261 *
262 * @throws JMSException
263 */
264 public void close() throws JMSException {
265 if (started.get()) {
266 stop();
267 }
268 synchronized(subscriptionsMutex){
269 QueueListEntry entry = subscriptions.getFirstEntry();
270 while (entry != null) {
271 DurableQueueSubscription ts = (DurableQueueSubscription) entry.getElement();
272 ts.close();
273 entry = subscriptions.getNextEntry(entry);
274 }
275 subscriptions.clear();
276 }
277 }
278
279 /**
280 * do some dispatching
281 */
282 public void run() {
283 // Only allow one thread at a time to dispatch.
284 synchronized (dispatchMutex) {
285 boolean dispatched = false;
286 boolean targeted = false;
287 DurableMessagePointer messagePointer = null;
288 int notDispatchedCount = 0;
289 int sleepTime = 250;
290 int iterationsWithoutDispatchingBeforeStopping = 10000 / sleepTime;// ~10
291 // seconds
292 Map messageParts = new HashMap();
293 try {
294 while (started.get() && running.get()) {
295 dispatched = false;
296 targeted = false;
297 synchronized (subscriptionsMutex) {
298 if (!subscriptions.isEmpty()) {
299 messagePointer = (DurableMessagePointer) queue
300 .dequeue(sleepTime);
301 if (messagePointer != null) {
302 ActiveMQMessage message = messagePointer
303 .getMessage();
304 if (!message.isExpired()) {
305
306 QueueListEntry entry = subscriptions
307 .getFirstEntry();
308 while (entry != null) {
309 DurableQueueSubscription ts = (DurableQueueSubscription) entry
310 .getElement();
311 if (ts.isTarget(message)) {
312 targeted = true;
313 if (message.isMessagePart()) {
314 DurableQueueSubscription sameTarget = (DurableQueueSubscription) messageParts
315 .get(message
316 .getParentMessageID());
317 if (sameTarget == null) {
318 sameTarget = ts;
319 messageParts
320 .put(
321 message
322 .getParentMessageID(),
323 sameTarget);
324 }
325 sameTarget
326 .doDispatch(messagePointer);
327 if (message.isLastMessagePart()) {
328 messageParts
329 .remove(message
330 .getParentMessageID());
331 }
332 messagePointer = null;
333 dispatched = true;
334 notDispatchedCount = 0;
335 break;
336 } else if (ts.canAcceptMessages()) {
337 ts.doDispatch(messagePointer);
338 messagePointer = null;
339 dispatched = true;
340 notDispatchedCount = 0;
341 subscriptions.rotate();
342 break;
343 }
344 }
345 entry = subscriptions
346 .getNextEntry(entry);
347 }
348
349 } else {
350 // expire message
351 if (log.isDebugEnabled()) {
352 log.debug("expired message: "
353 + messagePointer);
354 }
355 if (deadLetterPolicy != null) {
356 deadLetterPolicy
357 .sendToDeadLetter(messagePointer
358 .getMessage());
359 }
360 messagePointer = null;
361 }
362 }
363 }
364 }
365 if (!dispatched) {
366 if (messagePointer != null) {
367 if (targeted) {
368 queue.enqueueFirstNoBlock(messagePointer);
369 } else {
370 //no matching subscribers - dump to end and hope one shows up ...
371 queue.enqueueNoBlock(messagePointer);
372
373 }
374 }
375 if (running.get()) {
376 if (notDispatchedCount++ > iterationsWithoutDispatchingBeforeStopping
377 && queue.isEmpty()) {
378 synchronized (running) {
379 running.commit(true, false);
380 }
381 } else {
382 Thread.sleep(sleepTime);
383 }
384 }
385 }
386 }
387 } catch (InterruptedException ie) {
388 //someone is stopping us from another thread
389 } catch (Throwable e) {
390 log.warn("stop dispatching", e);
391 stop();
392 }
393 }
394 }
395
396 private DurableQueueSubscription findMatch(ConsumerInfo info) throws JMSException {
397 DurableQueueSubscription result = null;
398 synchronized (subscriptionsMutex) {
399 QueueListEntry entry = subscriptions.getFirstEntry();
400 while (entry != null) {
401 DurableQueueSubscription ts = (DurableQueueSubscription) entry
402 .getElement();
403 if (ts.getConsumerInfo().equals(info)) {
404 result = ts;
405 break;
406 }
407 entry = subscriptions.getNextEntry(entry);
408 }
409 }
410 return result;
411 }
412
413 /**
414 * @return the destination associated with this container
415 */
416 public ActiveMQDestination getDestination() {
417 return destination;
418 }
419
420 /**
421 * @return the destination name
422 */
423 public String getDestinationName() {
424 return destination.getPhysicalName();
425 }
426
427 protected void clear() {
428 queue.clear();
429 }
430
431 protected void removeExpiredMessages() {
432 long currentTime = System.currentTimeMillis();
433 List list = queue.getContents();
434 for (int i = 0;i < list.size();i++) {
435 DurableMessagePointer msgPointer = (DurableMessagePointer) list.get(i);
436 ActiveMQMessage message = msgPointer.getMessage();
437 if (message.isExpired(currentTime)) {
438 // TODO: remove message from message store.
439 queue.remove(msgPointer);
440 if (log.isDebugEnabled()) {
441 log.debug("expired message: " + msgPointer);
442 }
443 }
444 }
445 }
446
447 protected void checkRunning(){
448 if (!running.get() && started.get() && !subscriptions.isEmpty()) {
449 synchronized (running) {
450 if (running.commit(false, true)) {
451 try {
452 threadPool.execute(this);
453 }
454 catch (InterruptedException e) {
455 log.error(this + " Couldn't start executing ",e);
456 }
457 }
458 }
459 }
460 }
461
462
463 /**
464 * @see org.activemq.service.MessageContainer#getMessageContainerAdmin()
465 */
466 public MessageContainerAdmin getMessageContainerAdmin() {
467 return this;
468 }
469
470 /**
471 * @see org.activemq.service.MessageContainerAdmin#empty()
472 */
473 public void empty() throws JMSException {
474 if( subscriptions.isEmpty() ) {
475 messageStore.removeAllMessages();
476 queue.clear();
477 } else {
478 throw new JMSException("Cannot empty a queue while it is use.");
479 }
480 }
481
482 /**
483 * Dispatch an Advisory Message
484 * @param messagePointer
485 */
486 private synchronized void doAdvisoryDispatchMessage(DurableMessagePointer messagePointer) {
487 ActiveMQMessage message = messagePointer.getMessage();
488 try {
489
490 if (message.isAdvisory() && !message.isExpired()) {
491 synchronized (subscriptionsMutex) {
492 QueueListEntry entry = subscriptions.getFirstEntry();
493 while (entry != null) {
494 DurableQueueSubscription ts = (DurableQueueSubscription) entry
495 .getElement();
496 if (ts.isTarget(message)) {
497 ts.doDispatch(messagePointer);
498 break;
499 }
500 entry = subscriptions.getNextEntry(entry);
501 }
502 }
503 }
504 } catch (JMSException jmsEx) {
505 log.warn("Failed to dispatch advisory", jmsEx);
506 }
507 }
508
509 }