001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq;
019
020 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
021
022 import java.util.LinkedList;
023
024 import javax.jms.IllegalStateException;
025 import javax.jms.InvalidDestinationException;
026 import javax.jms.JMSException;
027 import javax.jms.Message;
028 import javax.jms.MessageConsumer;
029 import javax.jms.MessageListener;
030
031 import org.activemq.io.util.MemoryBoundedQueue;
032 import org.activemq.management.JMSConsumerStatsImpl;
033 import org.activemq.management.StatsCapable;
034 import org.activemq.management.StatsImpl;
035 import org.activemq.message.ActiveMQDestination;
036 import org.activemq.message.ActiveMQMessage;
037 import org.activemq.selector.SelectorParser;
038 import org.apache.commons.logging.Log;
039 import org.apache.commons.logging.LogFactory;
040
041 /**
042 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages from a destination. A <CODE>
043 * MessageConsumer</CODE> object is created by passing a <CODE>Destination</CODE> object to a message-consumer
044 * creation method supplied by a session.
045 * <P>
046 * <CODE>MessageConsumer</CODE> is the parent interface for all message consumers.
047 * <P>
048 * A message consumer can be created with a message selector. A message selector allows the client to restrict the
049 * messages delivered to the message consumer to those that match the selector.
050 * <P>
051 * A client may either synchronously receive a message consumer's messages or have the consumer asynchronously deliver
052 * them as they arrive.
053 * <P>
054 * For synchronous receipt, a client can request the next message from a message consumer using one of its <CODE>
055 * receive</CODE> methods. There are several variations of <CODE>receive</CODE> that allow a client to poll or wait
056 * for the next message.
057 * <P>
058 * For asynchronous delivery, a client can register a <CODE>MessageListener</CODE> object with a message consumer. As
059 * messages arrive at the message consumer, it delivers them by calling the <CODE>MessageListener</CODE>'s<CODE>
060 * onMessage</CODE> method.
061 * <P>
062 * It is a client programming error for a <CODE>MessageListener</CODE> to throw an exception.
063 *
064 * @version $Revision: 1.1.1.1 $
065 * @see javax.jms.MessageConsumer
066 * @see javax.jms.QueueReceiver
067 * @see javax.jms.TopicSubscriber
068 * @see javax.jms.Session
069 */
070 public class ActiveMQMessageConsumer implements MessageConsumer, StatsCapable, Closeable {
071 private static final Log log = LogFactory.getLog(ActiveMQMessageConsumer.class);
072 protected ActiveMQSession session;
073 protected String consumerIdentifier;
074 protected MemoryBoundedQueue messageQueue;
075 protected String messageSelector;
076 private MessageListener messageListener;
077 protected String consumerName;
078 protected ActiveMQDestination destination;
079 private boolean closed;
080 protected int consumerNumber;
081 protected int prefetchNumber;
082 protected long startTime;
083 protected boolean noLocal;
084 protected boolean browser;
085 private Thread accessThread;
086 private Object messageListenerGuard;
087 private JMSConsumerStatsImpl stats;
088
089 private SynchronizedBoolean running = new SynchronizedBoolean(true);
090 private LinkedList stoppedQueue=new LinkedList();
091 /**
092 * Create a MessageConsumer
093 *
094 * @param theSession
095 * @param dest
096 * @param name
097 * @param selector
098 * @param cnum
099 * @param prefetch
100 * @param noLocalValue
101 * @param browserValue
102 * @throws JMSException
103 */
104 protected ActiveMQMessageConsumer(ActiveMQSession theSession, ActiveMQDestination dest, String name,
105 String selector, int cnum, int prefetch, boolean noLocalValue, boolean browserValue) throws JMSException {
106 if (dest == null) {
107 throw new InvalidDestinationException("Do not understand a null destination");
108 }
109 if (dest.isTemporary() && theSession.connection.isJ2EEcompliant() && !theSession.isInternalSession()) {
110 //validate that the destination comes from this Connection
111 String physicalName = dest.getPhysicalName();
112 if (physicalName == null) {
113 throw new IllegalArgumentException("Physical name of Destination should be valid: " + dest);
114 }
115 String clientID = theSession.connection.getInitializedClientID();
116 if (physicalName.indexOf(clientID) < 0) {
117 throw new InvalidDestinationException("Cannot use a Temporary destination from another Connection");
118 }
119 if (dest.isDeleted()) {
120 throw new InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
121 }
122 }
123 dest.incrementConsumerCounter();
124 if (selector != null) {
125 selector = selector.trim();
126 if (selector.length() > 0) {
127 // Validate that the selector
128 new SelectorParser().parse(selector);
129 }
130 }
131 this.session = theSession;
132 this.destination = dest;
133 this.consumerName = name;
134 this.messageSelector = selector;
135
136 this.consumerNumber = cnum;
137 this.prefetchNumber = prefetch;
138 this.noLocal = noLocalValue;
139 this.browser = browserValue;
140 this.consumerIdentifier = theSession.connection.getClientID() + "." + theSession.getSessionId() + "." + this.consumerNumber;
141 this.startTime = System.currentTimeMillis();
142 this.messageListenerGuard = new Object();
143 this.messageQueue = theSession.connection.getMemoryBoundedQueue(this.consumerIdentifier);
144 this.stats = new JMSConsumerStatsImpl(theSession.getSessionStats(), dest);
145 this.session.addConsumer(this);
146 }
147
148 /**
149 * @return the memory used by the internal queue for this MessageConsumer
150 */
151 public long getLocalMemoryUsage() {
152 return this.messageQueue.getLocalMemoryUsedByThisQueue();
153 }
154
155 /**
156 * @return the number of messages enqueued by this consumer awaiting dispatch
157 */
158 public int size() {
159 return this.messageQueue.size();
160 }
161
162
163 /**
164 * @return Stats for this MessageConsumer
165 */
166 public StatsImpl getStats() {
167 return stats;
168 }
169
170 /**
171 * @return Stats for this MessageConsumer
172 */
173 public JMSConsumerStatsImpl getConsumerStats() {
174 return stats;
175 }
176
177 /**
178 * @return pretty print of this consumer
179 */
180 public String toString() {
181 return "MessageConsumer: " + consumerIdentifier + "[" + consumerNumber + "]";
182 }
183
184 /**
185 * @return Returns the prefetchNumber.
186 */
187 public int getPrefetchNumber() {
188 return prefetchNumber;
189 }
190
191 /**
192 * @param prefetchNumber The prefetchNumber to set.
193 */
194 public void setPrefetchNumber(int prefetchNumber) {
195 this.prefetchNumber = prefetchNumber;
196 }
197
198 /**
199 * Gets this message consumer's message selector expression.
200 *
201 * @return this message consumer's message selector, or null if no message selector exists for the message consumer
202 * (that is, if the message selector was not set or was set to null or the empty string)
203 * @throws JMSException if the JMS provider fails to receive the next message due to some internal error.
204 */
205 public String getMessageSelector() throws JMSException {
206 checkClosed();
207 return this.messageSelector;
208 }
209
210 /**
211 * Gets the message consumer's <CODE>MessageListener</CODE>.
212 *
213 * @return the listener for the message consumer, or null if no listener is set
214 * @throws JMSException if the JMS provider fails to get the message listener due to some internal error.
215 * @see javax.jms.MessageConsumer#setMessageListener(javax.jms.MessageListener)
216 */
217 public MessageListener getMessageListener() throws JMSException {
218 checkClosed();
219 return this.messageListener;
220 }
221
222 /**
223 * Sets the message consumer's <CODE>MessageListener</CODE>.
224 * <P>
225 * Setting the message listener to null is the equivalent of unsetting the message listener for the message
226 * consumer.
227 * <P>
228 * The effect of calling <CODE>MessageConsumer.setMessageListener</CODE> while messages are being consumed by an
229 * existing listener or the consumer is being used to consume messages synchronously is undefined.
230 *
231 * @param listener the listener to which the messages are to be delivered
232 * @throws JMSException if the JMS provider fails to receive the next message due to some internal error.
233 * @see javax.jms.MessageConsumer#getMessageListener()
234 */
235 public void setMessageListener(MessageListener listener) throws JMSException {
236 checkClosed();
237 synchronized (messageListenerGuard) {
238 this.messageListener = listener;
239 }
240 if (listener != null) {
241 session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_ASYNC);
242 //messages may already be enqueued
243 ActiveMQMessage msg = null;
244 try {
245 while ((msg = (ActiveMQMessage)messageQueue.dequeueNoWait()) != null) {
246 processMessage(msg);
247 }
248 }
249 catch (InterruptedException ex) {
250 JMSException jmsEx = new JMSException("Interrupted setting message listener");
251 jmsEx.setLinkedException(ex);
252 throw jmsEx;
253 }
254 }
255 }
256
257 /**
258 * Receives the next message produced for this message consumer.
259 * <P>
260 * This call blocks indefinitely until a message is produced or until this message consumer is closed.
261 * <P>
262 * If this <CODE>receive</CODE> is done within a transaction, the consumer retains the message until the
263 * transaction commits.
264 *
265 * @return the next message produced for this message consumer, or null if this message consumer is concurrently
266 * closed
267 * @throws JMSException
268 */
269 public Message receive() throws JMSException {
270 checkClosed();
271 session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_SYNC);
272 try {
273 this.accessThread = Thread.currentThread();
274 ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue();
275 this.accessThread = null;
276 if (message != null) {
277 boolean expired = message.isExpired();
278 messageDelivered(message, true, expired);
279 if (!expired) {
280 message = message.shallowCopy();
281 }
282 else {
283 message = (ActiveMQMessage) receiveNoWait(); //this will remove any other expired messages held in the queue
284 }
285 }
286 if( message!=null && log.isDebugEnabled() ) {
287 log.debug("Message received: "+message);
288 }
289 return message;
290 }
291 catch (InterruptedException ioe) {
292 return null;
293 }
294 }
295
296 /**
297 * Receives the next message that arrives within the specified timeout interval.
298 * <P>
299 * This call blocks until a message arrives, the timeout expires, or this message consumer is closed. A <CODE>
300 * timeout</CODE> of zero never expires, and the call blocks indefinitely.
301 *
302 * @param timeout the timeout value (in milliseconds)
303 * @return the next message produced for this message consumer, or null if the timeout expires or this message
304 * consumer is concurrently closed
305 * @throws JMSException
306 */
307 public Message receive(long timeout) throws JMSException {
308 checkClosed();
309 session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_SYNC);
310 try {
311 if (timeout == 0) {
312 return this.receive();
313 }
314 this.accessThread = Thread.currentThread();
315 ActiveMQMessage message = (ActiveMQMessage) messageQueue.dequeue(timeout);
316 this.accessThread = null;
317 if (message != null) {
318 boolean expired = message.isExpired();
319 messageDelivered(message, true, expired);
320 if (!expired) {
321 message = message.shallowCopy();
322 }
323 else {
324 message = (ActiveMQMessage) receiveNoWait(); //this will remove any other expired messages held in the queue
325 }
326 }
327 if( message!=null && log.isDebugEnabled() ) {
328 log.debug("Message received: "+message);
329 }
330 return message;
331 }
332 catch (InterruptedException ioe) {
333 return null;
334 }
335 }
336
337 /**
338 * Receives the next message if one is immediately available.
339 *
340 * @return the next message produced for this message consumer, or null if one is not available
341 * @throws JMSException if the JMS provider fails to receive the next message due to some internal error.
342 */
343 public Message receiveNoWait() throws JMSException {
344 checkClosed();
345 session.setSessionConsumerDispatchState(ActiveMQSession.CONSUMER_DISPATCH_SYNC);
346 try {
347 ActiveMQMessage message = null;
348 //iterate through an scrub delivered but expired messages
349 while ((message = (ActiveMQMessage) messageQueue.dequeueNoWait()) != null) {
350 boolean expired = message.isExpired();
351 messageDelivered(message, true, expired);
352 if (!expired) {
353 if( message!=null && log.isDebugEnabled() ) {
354 log.debug("Message received: "+message);
355 }
356 return message.shallowCopy();
357 }
358 }
359 }
360 catch (InterruptedException ioe) {
361 throw new JMSException("Queue is interrupted: " + ioe.getMessage());
362 }
363 return null;
364 }
365
366 /**
367 * Closes the message consumer.
368 * <P>
369 * Since a provider may allocate some resources on behalf of a <CODE>MessageConsumer</CODE> outside the Java
370 * virtual machine, clients should close them when they are not needed. Relying on garbage collection to eventually
371 * reclaim these resources may not be timely enough.
372 * <P>
373 * This call blocks until a <CODE>receive</CODE> or message listener in progress has completed. A blocked message
374 * consumer <CODE>receive</CODE> call returns null when this message consumer is closed.
375 *
376 * @throws JMSException if the JMS provider fails to close the consumer due to some internal error.
377 */
378 public void close() throws JMSException {
379 try {
380 this.accessThread.interrupt();
381 }
382 catch (NullPointerException npe) {
383 }
384 catch (SecurityException se) {
385 }
386 if (destination != null) {
387 destination.decrementConsumerCounter();
388 }
389
390 this.session.removeConsumer(this);
391 messageQueue.close();
392 closed = true;
393 }
394
395 /**
396 * @return true if this is a durable topic subscriber
397 */
398 public boolean isDurableSubscriber() {
399 return this instanceof ActiveMQTopicSubscriber && consumerName != null && consumerName.length() > 0;
400 }
401
402 /**
403 * @return true if this is a Transient Topic subscriber
404 */
405 public boolean isTransientSubscriber(){
406 return this.destination != null && destination.isTopic() && (consumerName == null || consumerName.length() ==0);
407 }
408
409 /**
410 * @throws IllegalStateException
411 */
412 protected void checkClosed() throws IllegalStateException {
413 if (closed) {
414 throw new IllegalStateException("The Consumer is closed");
415 }
416 }
417
418 /**
419 * Process a Message - passing either to the queue or message listener
420 *
421 * @param message
422 */
423 protected void processMessage(ActiveMQMessage message) {
424 if( !running.get() ) {
425 stoppedQueue.addLast(message);
426 return;
427 }
428 message.setConsumerIdentifer(this.consumerIdentifier);
429 MessageListener listener = null;
430 synchronized (messageListenerGuard) {
431 listener = this.messageListener;
432 }
433 boolean transacted = session.isTransacted();
434 try {
435 if (!closed) {
436 if (message.getJMSActiveMQDestination() == null) {
437 message.setJMSDestination(getDestination());
438 }
439 if (listener != null) {
440 beforeMessageDelivered(message);
441 boolean expired = message.isExpired();
442 if (transacted) {
443 afterMessageDelivered(message, true, expired, true);
444 }
445 if (!expired) {
446 if( log.isDebugEnabled() ) {
447 log.debug("Message delivered to message listener: "+message);
448 }
449 listener.onMessage(message.shallowCopy());
450 }
451 if (!transacted) {
452 afterMessageDelivered(message, true, expired, true);
453 }
454 }
455 else {
456 this.messageQueue.enqueue(message);
457 }
458 }
459 else {
460 messageDelivered(message, false, false);
461 }
462 }
463 catch (Throwable e) {
464 log.warn("could not process message: " + message + ". Reason: " + e, e);
465 messageDelivered(message, false, false);
466 }
467 }
468
469 /**
470 * @return Returns the consumerId.
471 */
472 protected String getConsumerIdentifier() {
473 return consumerIdentifier;
474 }
475
476 /**
477 * @return the consumer name - used for durable consumers
478 */
479 protected String getConsumerName() {
480 return this.consumerName;
481 }
482
483 /**
484 * Set the name of the Consumer - used for durable subscribers
485 *
486 * @param value
487 */
488 protected void setConsumerName(String value) {
489 this.consumerName = value;
490 }
491
492 /**
493 * @return the locally unique Consumer Number
494 */
495 protected int getConsumerNumber() {
496 return this.consumerNumber;
497 }
498
499 /**
500 * Set the locally unique consumer number
501 *
502 * @param value
503 */
504 protected void setConsumerNumber(int value) {
505 this.consumerNumber = value;
506 }
507
508 /**
509 * @return true if this consumer does not accept locally produced messages
510 */
511 protected boolean isNoLocal() {
512 return this.noLocal;
513 }
514
515 /**
516 * Retrive is a browser
517 *
518 * @return true if a browser
519 */
520 protected boolean isBrowser() {
521 return this.browser;
522 }
523
524 /**
525 * Set true if only a Browser
526 *
527 * @param value
528 * @see ActiveMQQueueBrowser
529 */
530 protected void setBrowser(boolean value) {
531 this.browser = value;
532 }
533
534 /**
535 * @return ActiveMQDestination
536 */
537 protected ActiveMQDestination getDestination() {
538 return this.destination;
539 }
540
541 /**
542 * @return the startTime
543 */
544 protected long getStartTime() {
545 return startTime;
546 }
547
548 protected void clearMessagesInProgress() {
549 messageQueue.clear();
550 stoppedQueue.clear();
551 }
552
553 private void messageDelivered(ActiveMQMessage message, boolean messageRead, boolean messageExpired) {
554 afterMessageDelivered(message, messageRead, messageExpired, false);
555 }
556
557 private void beforeMessageDelivered(ActiveMQMessage message) {
558 if (message == null) {
559 return;
560 }
561 boolean topic = destination != null && destination.isTopic();
562 message.setTransientConsumed((!isDurableSubscriber() || !message.isPersistent()) && topic);
563 this.session.beforeMessageDelivered(message);
564 }
565
566 private void afterMessageDelivered(ActiveMQMessage message, boolean messageRead, boolean messageExpired, boolean beforeCalled) {
567 if (message == null) {
568 return;
569 }
570
571 boolean consumed = browser ? false : messageRead;
572 ActiveMQDestination destination = message.getJMSActiveMQDestination();
573 boolean topic = destination != null && destination.isTopic();
574 message.setTransientConsumed((!isDurableSubscriber() || !message.isPersistent()) && topic);
575 this.session.afterMessageDelivered((isDurableSubscriber() || this.destination.isQueue()), message, consumed, messageExpired, beforeCalled);
576 if (messageRead) {
577 stats.onMessage(message);
578 }
579
580 }
581
582 public void start() {
583 running.set(true);
584 while( !stoppedQueue.isEmpty() ) {
585 ActiveMQMessage m = (ActiveMQMessage)stoppedQueue.removeFirst();
586 processMessage(m);
587 }
588 }
589
590 synchronized public void stop() {
591 running.set(false);
592 }
593 }