001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq;
020 import java.io.IOException;
021 import java.io.Serializable;
022 import java.util.Iterator;
023 import java.util.LinkedList;
024 import java.util.List;
025 import java.util.ListIterator;
026
027 import javax.jms.BytesMessage;
028 import javax.jms.DeliveryMode;
029 import javax.jms.Destination;
030 import javax.jms.IllegalStateException;
031 import javax.jms.InvalidDestinationException;
032 import javax.jms.InvalidSelectorException;
033 import javax.jms.JMSException;
034 import javax.jms.MapMessage;
035 import javax.jms.Message;
036 import javax.jms.MessageConsumer;
037 import javax.jms.MessageListener;
038 import javax.jms.MessageProducer;
039 import javax.jms.ObjectMessage;
040 import javax.jms.Queue;
041 import javax.jms.QueueBrowser;
042 import javax.jms.QueueReceiver;
043 import javax.jms.QueueSender;
044 import javax.jms.QueueSession;
045 import javax.jms.Session;
046 import javax.jms.StreamMessage;
047 import javax.jms.TemporaryQueue;
048 import javax.jms.TemporaryTopic;
049 import javax.jms.TextMessage;
050 import javax.jms.Topic;
051 import javax.jms.TopicPublisher;
052 import javax.jms.TopicSession;
053 import javax.jms.TopicSubscriber;
054 import javax.jms.TransactionRolledBackException;
055
056 import org.activemq.io.util.ByteArray;
057 import org.activemq.io.util.ByteArrayCompression;
058 import org.activemq.io.util.ByteArrayFragmentation;
059 import org.activemq.management.JMSSessionStatsImpl;
060 import org.activemq.management.StatsCapable;
061 import org.activemq.management.StatsImpl;
062 import org.activemq.message.ActiveMQBytesMessage;
063 import org.activemq.message.ActiveMQDestination;
064 import org.activemq.message.ActiveMQMapMessage;
065 import org.activemq.message.ActiveMQMessage;
066 import org.activemq.message.ActiveMQObjectMessage;
067 import org.activemq.message.ActiveMQQueue;
068 import org.activemq.message.ActiveMQStreamMessage;
069 import org.activemq.message.ActiveMQTemporaryQueue;
070 import org.activemq.message.ActiveMQTemporaryTopic;
071 import org.activemq.message.ActiveMQTextMessage;
072 import org.activemq.message.ActiveMQTopic;
073 import org.activemq.message.ConsumerInfo;
074 import org.activemq.message.DurableUnsubscribe;
075 import org.activemq.message.MessageAck;
076 import org.activemq.message.MessageAcknowledge;
077 import org.activemq.message.ProducerInfo;
078 import org.activemq.service.impl.DefaultQueueList;
079 import org.activemq.util.IdGenerator;
080 import org.apache.commons.logging.Log;
081 import org.apache.commons.logging.LogFactory;
082
083 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
084 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
085
086 /**
087 * <P>
088 * A <CODE>Session</CODE> object is a single-threaded context for producing and consuming messages. Although it may
089 * allocate provider resources outside the Java virtual machine (JVM), it is considered a lightweight JMS object.
090 * <P>
091 * A session serves several purposes:
092 * <UL>
093 * <LI>It is a factory for its message producers and consumers.
094 * <LI>It supplies provider-optimized message factories.
095 * <LI>It is a factory for <CODE>TemporaryTopics</CODE> and <CODE>TemporaryQueues</CODE>.
096 * <LI>It provides a way to create <CODE>Queue</CODE> or <CODE>Topic</CODE> objects for those clients that need to
097 * dynamically manipulate provider-specific destination names.
098 * <LI>It supports a single series of transactions that combine work spanning its producers and consumers into atomic
099 * units.
100 * <LI>It defines a serial order for the messages it consumes and the messages it produces.
101 * <LI>It retains messages it consumes until they have been acknowledged.
102 * <LI>It serializes execution of message listeners registered with its message consumers.
103 * <LI>It is a factory for <CODE>QueueBrowsers</CODE>.
104 * </UL>
105 * <P>
106 * A session can create and service multiple message producers and consumers.
107 * <P>
108 * One typical use is to have a thread block on a synchronous <CODE>MessageConsumer</CODE> until a message arrives.
109 * The thread may then use one or more of the <CODE>Session</CODE>'s<CODE>MessageProducer</CODE>s.
110 * <P>
111 * If a client desires to have one thread produce messages while others consume them, the client should use a separate
112 * session for its producing thread.
113 * <P>
114 * Once a connection has been started, any session with one or more registered message listeners is dedicated to the
115 * thread of control that delivers messages to it. It is erroneous for client code to use this session or any of its
116 * constituent objects from another thread of control. The only exception to this rule is the use of the session or
117 * connection <CODE>close</CODE> method.
118 * <P>
119 * It should be easy for most clients to partition their work naturally into sessions. This model allows clients to
120 * start simply and incrementally add message processing complexity as their need for concurrency grows.
121 * <P>
122 * The <CODE>close</CODE> method is the only session method that can be called while some other session method is
123 * being executed in another thread.
124 * <P>
125 * A session may be specified as transacted. Each transacted session supports a single series of transactions. Each
126 * transaction groups a set of message sends and a set of message receives into an atomic unit of work. In effect,
127 * transactions organize a session's input message stream and output message stream into series of atomic units. When a
128 * transaction commits, its atomic unit of input is acknowledged and its associated atomic unit of output is sent. If a
129 * transaction rollback is done, the transaction's sent messages are destroyed and the session's input is automatically
130 * recovered.
131 * <P>
132 * The content of a transaction's input and output units is simply those messages that have been produced and consumed
133 * within the session's current transaction.
134 * <P>
135 * A transaction is completed using either its session's <CODE>commit</CODE> method or its session's <CODE>rollback
136 * </CODE> method. The completion of a session's current transaction automatically begins the next. The result is that a
137 * transacted session always has a current transaction within which its work is done.
138 * <P>
139 * The Java Transaction Service (JTS) or some other transaction monitor may be used to combine a session's transaction
140 * with transactions on other resources (databases, other JMS sessions, etc.). Since Java distributed transactions are
141 * controlled via the Java Transaction API (JTA), use of the session's <CODE>commit</CODE> and <CODE>rollback</CODE>
142 * methods in this context is prohibited.
143 * <P>
144 * The JMS API does not require support for JTA; however, it does define how a provider supplies this support.
145 * <P>
146 * Although it is also possible for a JMS client to handle distributed transactions directly, it is unlikely that many
147 * JMS clients will do this. Support for JTA in the JMS API is targeted at systems vendors who will be integrating the
148 * JMS API into their application server products.
149 *
150 * @version $Revision: 1.1.1.1 $
151 * @see javax.jms.Session
152 * @see javax.jms.QueueSession
153 * @see javax.jms.TopicSession
154 * @see javax.jms.XASession
155 */
156 public class ActiveMQSession
157 implements
158 Session,
159 QueueSession,
160 TopicSession,
161 ActiveMQMessageDispatcher,
162 MessageAcknowledge,
163 StatsCapable {
164
165 public static interface DeliveryListener {
166 public void beforeDelivery(ActiveMQSession session, Message msg);
167 public void afterDelivery(ActiveMQSession session, Message msg);
168 }
169
170 protected static final int CONSUMER_DISPATCH_UNSET = 1;
171 protected static final int CONSUMER_DISPATCH_ASYNC = 2;
172 protected static final int CONSUMER_DISPATCH_SYNC = 3;
173 private static final Log log = LogFactory.getLog(ActiveMQSession.class);
174 protected ActiveMQConnection connection;
175 protected int acknowledgeMode;
176 protected CopyOnWriteArrayList consumers;
177 protected CopyOnWriteArrayList producers;
178 private IdGenerator temporaryDestinationGenerator;
179 private MessageListener messageListener;
180 protected boolean closed;
181 private SynchronizedBoolean started;
182 private short sessionId;
183 private long startTime;
184 private DefaultQueueList deliveredMessages;
185 private ActiveMQSessionExecutor messageExecutor;
186 private JMSSessionStatsImpl stats;
187 private int consumerDispatchState;
188 private ByteArrayCompression compression;
189 private TransactionContext transactionContext;
190 private boolean internalSession;
191 private DeliveryListener deliveryListener;
192
193 /**
194 * Construct the Session
195 *
196 * @param theConnection
197 * @param theAcknowledgeMode n.b if transacted - the acknowledgeMode == Session.SESSION_TRANSACTED
198 * @throws JMSException on internal error
199 */
200 protected ActiveMQSession(ActiveMQConnection theConnection, int theAcknowledgeMode) throws JMSException {
201 this(theConnection, theAcknowledgeMode,theConnection.isOptimizedMessageDispatch());
202 }
203
204 /**
205 * Construct the Session
206 *
207 * @param theConnection
208 * @param theAcknowledgeMode n.b if transacted - the acknowledgeMode == Session.SESSION_TRANSACTED
209 * @param optimizedDispatch
210 * @throws JMSException on internal error
211 */
212 protected ActiveMQSession(ActiveMQConnection theConnection, int theAcknowledgeMode,boolean optimizedDispatch) throws JMSException {
213 this.connection = theConnection;
214 this.acknowledgeMode = theAcknowledgeMode;
215 setTransactionContext(new TransactionContext(theConnection));
216 this.consumers = new CopyOnWriteArrayList();
217 this.producers = new CopyOnWriteArrayList();
218 this.temporaryDestinationGenerator = new IdGenerator();
219 this.started = new SynchronizedBoolean(false);
220 this.sessionId = connection.generateSessionId();
221 this.startTime = System.currentTimeMillis();
222 this.deliveredMessages = new DefaultQueueList();
223 this.messageExecutor = new ActiveMQSessionExecutor(this, connection.getMemoryBoundedQueue("Session("
224 + sessionId + ")"));
225 this.messageExecutor.setOptimizedMessageDispatch(optimizedDispatch);
226 connection.addSession(this);
227 stats = new JMSSessionStatsImpl(producers, consumers);
228 this.consumerDispatchState = CONSUMER_DISPATCH_UNSET;
229 this.compression = new ByteArrayCompression();
230 this.compression.setCompressionLevel(theConnection.getMessageCompressionLevel());
231 this.compression.setCompressionStrategy(theConnection.getMessageCompressionStrategy());
232 this.compression.setCompressionLimit(theConnection.getMessageCompressionLimit());
233
234 this.internalSession = theConnection.isInternalConnection();
235 }
236
237 public void setTransactionContext(TransactionContext transactionContext) {
238 if( this.transactionContext!=null ) {
239 this.transactionContext.removeSession(this);
240 }
241 this.transactionContext = transactionContext;
242 this.transactionContext.addSession(this);
243 }
244
245 public TransactionContext getTransactionContext() {
246 return transactionContext;
247 }
248
249 public StatsImpl getStats() {
250 return stats;
251 }
252
253 public JMSSessionStatsImpl getSessionStats() {
254 return stats;
255 }
256
257 /**
258 * Creates a <CODE>BytesMessage</CODE> object. A <CODE>BytesMessage</CODE> object is used to send a message
259 * containing a stream of uninterpreted bytes.
260 *
261 * @return the an ActiveMQBytesMessage
262 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
263 */
264 public BytesMessage createBytesMessage() throws JMSException {
265 checkClosed();
266 return new ActiveMQBytesMessage();
267 }
268
269 /**
270 * Creates a <CODE>MapMessage</CODE> object. A <CODE>MapMessage</CODE> object is used to send a self-defining
271 * set of name-value pairs, where names are <CODE>String</CODE> objects and values are primitive values in the
272 * Java programming language.
273 *
274 * @return an ActiveMQMapMessage
275 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
276 */
277 public MapMessage createMapMessage() throws JMSException {
278 checkClosed();
279 return new ActiveMQMapMessage();
280 }
281
282 /**
283 * Creates a <CODE>Message</CODE> object. The <CODE>Message</CODE> interface is the root interface of all JMS
284 * messages. A <CODE>Message</CODE> object holds all the standard message header information. It can be sent when
285 * a message containing only header information is sufficient.
286 *
287 * @return an ActiveMQMessage
288 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
289 */
290 public Message createMessage() throws JMSException {
291 checkClosed();
292 return new ActiveMQMessage();
293 }
294
295 /**
296 * Creates an <CODE>ObjectMessage</CODE> object. An <CODE>ObjectMessage</CODE> object is used to send a message
297 * that contains a serializable Java object.
298 *
299 * @return an ActiveMQObjectMessage
300 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
301 */
302 public ObjectMessage createObjectMessage() throws JMSException {
303 checkClosed();
304 return new ActiveMQObjectMessage();
305 }
306
307 /**
308 * Creates an initialized <CODE>ObjectMessage</CODE> object. An <CODE>ObjectMessage</CODE> object is used to
309 * send a message that contains a serializable Java object.
310 *
311 * @param object the object to use to initialize this message
312 * @return an ActiveMQObjectMessage
313 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
314 */
315 public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
316 checkClosed();
317 ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
318 msg.setObject(object);
319 return msg;
320 }
321
322 /**
323 * Creates a <CODE>StreamMessage</CODE> object. A <CODE>StreamMessage</CODE> object is used to send a
324 * self-defining stream of primitive values in the Java programming language.
325 *
326 * @return an ActiveMQStreamMessage
327 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
328 */
329 public StreamMessage createStreamMessage() throws JMSException {
330 checkClosed();
331 return new ActiveMQStreamMessage();
332 }
333
334 /**
335 * Creates a <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> object is used to send a message
336 * containing a <CODE>String</CODE> object.
337 *
338 * @return an ActiveMQTextMessage
339 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
340 */
341 public TextMessage createTextMessage() throws JMSException {
342 checkClosed();
343 return new ActiveMQTextMessage();
344 }
345
346 /**
347 * Creates an initialized <CODE>TextMessage</CODE> object. A <CODE>TextMessage</CODE> object is used to send a
348 * message containing a <CODE>String</CODE>.
349 *
350 * @param text the string used to initialize this message
351 * @return an ActiveMQTextMessage
352 * @throws JMSException if the JMS provider fails to create this message due to some internal error.
353 */
354 public TextMessage createTextMessage(String text) throws JMSException {
355 checkClosed();
356 ActiveMQTextMessage msg = new ActiveMQTextMessage();
357 msg.setText(text);
358 return msg;
359 }
360
361 /**
362 * Indicates whether the session is in transacted mode.
363 *
364 * @return true if the session is in transacted mode
365 * @throws JMSException if there is some internal error.
366 */
367 public boolean getTransacted() throws JMSException {
368 checkClosed();
369 return this.acknowledgeMode == Session.SESSION_TRANSACTED || transactionContext.isInXATransaction();
370 }
371
372 /**
373 * Returns the acknowledgement mode of the session. The acknowledgement mode is set at the time that the session is
374 * created. If the session is transacted, the acknowledgement mode is ignored.
375 *
376 * @return If the session is not transacted, returns the current acknowledgement mode for the session. If the
377 * session is transacted, returns SESSION_TRANSACTED.
378 * @throws JMSException
379 * @see javax.jms.Connection#createSession(boolean,int)
380 * @since 1.1 exception JMSException if there is some internal error.
381 */
382 public int getAcknowledgeMode() throws JMSException {
383 checkClosed();
384 return this.acknowledgeMode;
385 }
386
387 /**
388 * Commits all messages done in this transaction and releases any locks currently held.
389 *
390 * @throws JMSException if the JMS provider fails to commit the transaction due to some internal error.
391 * @throws TransactionRolledBackException if the transaction is rolled back due to some internal error during
392 * commit.
393 * @throws javax.jms.IllegalStateException if the method is not called by a transacted session.
394 */
395 public void commit() throws JMSException {
396 checkClosed();
397 if (!getTransacted()) {
398 throw new javax.jms.IllegalStateException("Not a transacted session");
399 }
400 transactionContext.commit();
401 }
402
403 /**
404 * Rolls back any messages done in this transaction and releases any locks currently held.
405 *
406 * @throws JMSException if the JMS provider fails to roll back the transaction due to some internal error.
407 * @throws javax.jms.IllegalStateException if the method is not called by a transacted session.
408 */
409 public void rollback() throws JMSException {
410 checkClosed();
411 if (!getTransacted()) {
412 throw new javax.jms.IllegalStateException("Not a transacted session");
413 }
414 transactionContext.rollback();
415 }
416
417 public void clearDeliveredMessages() {
418 deliveredMessages.clear();
419 }
420
421 /**
422 * Closes the session.
423 * <P>
424 * Since a provider may allocate some resources on behalf of a session outside the JVM, clients should close the
425 * resources when they are not needed. Relying on garbage collection to eventually reclaim these resources may not
426 * be timely enough.
427 * <P>
428 * There is no need to close the producers and consumers of a closed session.
429 * <P>
430 * This call will block until a <CODE>receive</CODE> call or message listener in progress has completed. A blocked
431 * message consumer <CODE>receive</CODE> call returns <CODE>null</CODE> when this session is closed.
432 * <P>
433 * Closing a transacted session must roll back the transaction in progress.
434 * <P>
435 * This method is the only <CODE>Session</CODE> method that can be called concurrently.
436 * <P>
437 * Invoking any other <CODE>Session</CODE> method on a closed session must throw a <CODE>
438 * JMSException.IllegalStateException</CODE>. Closing a closed session must <I>not </I> throw an exception.
439 *
440 * @throws JMSException if the JMS provider fails to close the session due to some internal error.
441 */
442 public void close() throws JMSException {
443 if (!this.closed) {
444 if (getTransactionContext().isInLocalTransaction()) {
445 rollback();
446 }
447 doClose();
448 closed = true;
449 }
450 }
451
452 protected void doClose() throws JMSException {
453 doAcknowledge(true);
454 deliveredMessages.clear();
455 for (Iterator i = consumers.iterator();i.hasNext();) {
456 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
457 consumer.close();
458 }
459 for (Iterator i = producers.iterator();i.hasNext();) {
460 ActiveMQMessageProducer producer = (ActiveMQMessageProducer) i.next();
461 producer.close();
462 }
463 consumers.clear();
464 producers.clear();
465 this.connection.removeSession(this);
466 this.transactionContext.removeSession(this);
467 messageExecutor.close();
468 }
469
470 /**
471 * @throws IllegalStateException if the Session is closed
472 */
473 protected void checkClosed() throws IllegalStateException {
474 if (this.closed) {
475 throw new IllegalStateException("The Session is closed");
476 }
477 }
478
479 /**
480 * Stops message delivery in this session, and restarts message delivery with the oldest unacknowledged message.
481 * <P>
482 * All consumers deliver messages in a serial order. Acknowledging a received message automatically acknowledges all
483 * messages that have been delivered to the client.
484 * <P>
485 * Restarting a session causes it to take the following actions:
486 * <UL>
487 * <LI>Stop message delivery
488 * <LI>Mark all messages that might have been delivered but not acknowledged as "redelivered"
489 * <LI>Restart the delivery sequence including all unacknowledged messages that had been previously delivered.
490 * Redelivered messages do not have to be delivered in exactly their original delivery order.
491 * </UL>
492 *
493 * @throws JMSException if the JMS provider fails to stop and restart message delivery due to some internal error.
494 * @throws IllegalStateException if the method is called by a transacted session.
495 */
496 public void recover() throws JMSException {
497 checkClosed();
498 if (getTransacted()) {
499 throw new IllegalStateException("This session is transacted");
500 }
501 redeliverUnacknowledgedMessages();
502 }
503
504 /**
505 * Returns the session's distinguished message listener (optional).
506 *
507 * @return the message listener associated with this session
508 * @throws JMSException if the JMS provider fails to get the message listener due to an internal error.
509 * @see javax.jms.Session#setMessageListener(javax.jms.MessageListener)
510 * @see javax.jms.ServerSessionPool
511 * @see javax.jms.ServerSession
512 */
513 public MessageListener getMessageListener() throws JMSException {
514 checkClosed();
515 return this.messageListener;
516 }
517
518 /**
519 * Sets the session's distinguished message listener (optional).
520 * <P>
521 * When the distinguished message listener is set, no other form of message receipt in the session can be used;
522 * however, all forms of sending messages are still supported.
523 * <P>
524 * This is an expert facility not used by regular JMS clients.
525 *
526 * @param listener the message listener to associate with this session
527 * @throws JMSException if the JMS provider fails to set the message listener due to an internal error.
528 * @see javax.jms.Session#getMessageListener()
529 * @see javax.jms.ServerSessionPool
530 * @see javax.jms.ServerSession
531 */
532 public void setMessageListener(MessageListener listener) throws JMSException {
533 checkClosed();
534 this.messageListener = listener;
535 if (listener != null) {
536 messageExecutor.setDispatchedBySessionPool(true);
537 }
538 }
539
540 /**
541 * Optional operation, intended to be used only by Application Servers, not by ordinary JMS clients.
542 *
543 * @see javax.jms.ServerSession
544 */
545 public void run() {
546 ActiveMQMessage message;
547 while ((message = messageExecutor.dequeueNoWait()) != null) {
548 if( deliveryListener!=null )
549 deliveryListener.beforeDelivery(this, message);
550 beforeMessageDelivered(message);
551 deliver(message);
552 if( deliveryListener!=null )
553 deliveryListener.afterDelivery(this, message);
554 }
555 }
556
557 /**
558 * Delivers a message to the messageListern
559 * @param message The message to deliver
560 */
561 private void deliver(ActiveMQMessage message) {
562 if (!message.isExpired() && this.messageListener != null) {
563 try {
564
565 if( log.isDebugEnabled() ) {
566 log.debug("Message delivered to session message listener: "+message);
567 }
568
569 this.messageListener.onMessage(message);
570 this.afterMessageDelivered(true, message, true, false, true);
571 }
572 catch (Throwable t) {
573 log.info("Caught :" + t, t);
574 this.afterMessageDelivered(true, message, false, false, true);
575 }
576 }
577 else {
578 this.afterMessageDelivered(true, message, false, message.isExpired(), true);
579 }
580 }
581
582 /**
583 * Creates a <CODE>MessageProducer</CODE> to send messages to the specified destination.
584 * <P>
585 * A client uses a <CODE>MessageProducer</CODE> object to send messages to a destination. Since <CODE>Queue
586 * </CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the
587 * destination parameter to create a <CODE>MessageProducer</CODE> object.
588 *
589 * @param destination the <CODE>Destination</CODE> to send to, or null if this is a producer which does not have a
590 * specified destination.
591 * @return the MessageProducer
592 * @throws JMSException if the session fails to create a MessageProducer due to some internal error.
593 * @throws InvalidDestinationException if an invalid destination is specified.
594 * @since 1.1
595 */
596 public MessageProducer createProducer(Destination destination) throws JMSException {
597 checkClosed();
598 return new ActiveMQMessageProducer(this, ActiveMQMessageTransformation.transformDestination(destination));
599 }
600
601 /**
602 * Creates a <CODE>MessageConsumer</CODE> for the specified destination. Since <CODE>Queue</CODE> and <CODE>
603 * Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the destination parameter to
604 * create a <CODE>MessageConsumer</CODE>.
605 *
606 * @param destination the <CODE>Destination</CODE> to access.
607 * @return the MessageConsumer
608 * @throws JMSException if the session fails to create a consumer due to some internal error.
609 * @throws InvalidDestinationException if an invalid destination is specified.
610 * @since 1.1
611 */
612 public MessageConsumer createConsumer(Destination destination) throws JMSException {
613 checkClosed();
614 int prefetch = destination instanceof Topic ? connection.getPrefetchPolicy().getTopicPrefetch() : connection
615 .getPrefetchPolicy().getQueuePrefetch();
616 return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "",
617 "", this.connection.getNextConsumerNumber(), prefetch, false, false);
618 }
619
620 /**
621 * Creates a <CODE>MessageConsumer</CODE> for the specified destination, using a message selector. Since <CODE>
622 * Queue</CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be used in the
623 * destination parameter to create a <CODE>MessageConsumer</CODE>.
624 * <P>
625 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages that have been sent to a destination.
626 *
627 * @param destination the <CODE>Destination</CODE> to access
628 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
629 * value of null or an empty string indicates that there is no message selector for the message consumer.
630 * @return the MessageConsumer
631 * @throws JMSException if the session fails to create a MessageConsumer due to some internal error.
632 * @throws InvalidDestinationException if an invalid destination is specified.
633 * @throws InvalidSelectorException if the message selector is invalid.
634 * @since 1.1
635 */
636 public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
637 checkClosed();
638 int prefetch = destination instanceof Topic ? connection.getPrefetchPolicy().getTopicPrefetch() : connection
639 .getPrefetchPolicy().getQueuePrefetch();
640 return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "",
641 messageSelector, this.connection.getNextConsumerNumber(), prefetch, false, false);
642 }
643
644 /**
645 * Creates <CODE>MessageConsumer</CODE> for the specified destination, using a message selector. This method can
646 * specify whether messages published by its own connection should be delivered to it, if the destination is a
647 * topic.
648 * <P>
649 * Since <CODE>Queue</CODE> and <CODE>Topic</CODE> both inherit from <CODE>Destination</CODE>, they can be
650 * used in the destination parameter to create a <CODE>MessageConsumer</CODE>.
651 * <P>
652 * A client uses a <CODE>MessageConsumer</CODE> object to receive messages that have been published to a
653 * destination.
654 * <P>
655 * In some cases, a connection may both publish and subscribe to a topic. The consumer <CODE>NoLocal</CODE>
656 * attribute allows a consumer to inhibit the delivery of messages published by its own connection. The default
657 * value for this attribute is False. The <CODE>noLocal</CODE> value must be supported by destinations that are
658 * topics.
659 *
660 * @param destination the <CODE>Destination</CODE> to access
661 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
662 * value of null or an empty string indicates that there is no message selector for the message consumer.
663 * @param NoLocal - if true, and the destination is a topic, inhibits the delivery of messages published by its own
664 * connection. The behavior for <CODE>NoLocal</CODE> is not specified if the destination is a queue.
665 * @return the MessageConsumer
666 * @throws JMSException if the session fails to create a MessageConsumer due to some internal error.
667 * @throws InvalidDestinationException if an invalid destination is specified.
668 * @throws InvalidSelectorException if the message selector is invalid.
669 * @since 1.1
670 */
671 public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal)
672 throws JMSException {
673 checkClosed();
674 int prefetch = connection.getPrefetchPolicy().getTopicPrefetch();
675 return new ActiveMQMessageConsumer(this, ActiveMQMessageTransformation.transformDestination(destination), "",
676 messageSelector, this.connection.getNextConsumerNumber(), prefetch, NoLocal, false);
677 }
678
679 /**
680 * Creates a queue identity given a <CODE>Queue</CODE> name.
681 * <P>
682 * This facility is provided for the rare cases where clients need to dynamically manipulate queue identity. It
683 * allows the creation of a queue identity with a provider-specific name. Clients that depend on this ability are
684 * not portable.
685 * <P>
686 * Note that this method is not for creating the physical queue. The physical creation of queues is an
687 * administrative task and is not to be initiated by the JMS API. The one exception is the creation of temporary
688 * queues, which is accomplished with the <CODE>createTemporaryQueue</CODE> method.
689 *
690 * @param queueName the name of this <CODE>Queue</CODE>
691 * @return a <CODE>Queue</CODE> with the given name
692 * @throws JMSException if the session fails to create a queue due to some internal error.
693 * @since 1.1
694 */
695 public Queue createQueue(String queueName) throws JMSException {
696 checkClosed();
697 return new ActiveMQQueue(queueName);
698 }
699
700 /**
701 * Creates a topic identity given a <CODE>Topic</CODE> name.
702 * <P>
703 * This facility is provided for the rare cases where clients need to dynamically manipulate topic identity. This
704 * allows the creation of a topic identity with a provider-specific name. Clients that depend on this ability are
705 * not portable.
706 * <P>
707 * Note that this method is not for creating the physical topic. The physical creation of topics is an
708 * administrative task and is not to be initiated by the JMS API. The one exception is the creation of temporary
709 * topics, which is accomplished with the <CODE>createTemporaryTopic</CODE> method.
710 *
711 * @param topicName the name of this <CODE>Topic</CODE>
712 * @return a <CODE>Topic</CODE> with the given name
713 * @throws JMSException if the session fails to create a topic due to some internal error.
714 * @since 1.1
715 */
716 public Topic createTopic(String topicName) throws JMSException {
717 checkClosed();
718 return new ActiveMQTopic(topicName);
719 }
720
721 /**
722 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue.
723 *
724 * @param queue the <CODE>queue</CODE> to access
725 * @exception InvalidDestinationException if an invalid destination is specified
726 * @since 1.1
727 */
728 /**
729 * Creates a durable subscriber to the specified topic.
730 * <P>
731 * If a client needs to receive all the messages published on a topic, including the ones published while the
732 * subscriber is inactive, it uses a durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a record of
733 * this durable subscription and insures that all messages from the topic's publishers are retained until they are
734 * acknowledged by this durable subscriber or they have expired.
735 * <P>
736 * Sessions with durable subscribers must always provide the same client identifier. In addition, each client must
737 * specify a name that uniquely identifies (within client identifier) each durable subscription it creates. Only one
738 * session at a time can have a <CODE>TopicSubscriber</CODE> for a particular durable subscription.
739 * <P>
740 * A client can change an existing durable subscription by creating a durable <CODE>TopicSubscriber</CODE> with
741 * the same name and a new topic and/or message selector. Changing a durable subscriber is equivalent to
742 * unsubscribing (deleting) the old one and creating a new one.
743 * <P>
744 * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
745 * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
746 * value for this attribute is false.
747 *
748 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
749 * @param name the name used to identify this subscription
750 * @return the TopicSubscriber
751 * @throws JMSException if the session fails to create a subscriber due to some internal error.
752 * @throws InvalidDestinationException if an invalid topic is specified.
753 * @since 1.1
754 */
755 public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
756 checkClosed();
757 return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), name, "",
758 this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getDurableTopicPrefetch(),
759 false, false);
760 }
761
762 /**
763 * Creates a durable subscriber to the specified topic, using a message selector and specifying whether messages
764 * published by its own connection should be delivered to it.
765 * <P>
766 * If a client needs to receive all the messages published on a topic, including the ones published while the
767 * subscriber is inactive, it uses a durable <CODE>TopicSubscriber</CODE>. The JMS provider retains a record of
768 * this durable subscription and insures that all messages from the topic's publishers are retained until they are
769 * acknowledged by this durable subscriber or they have expired.
770 * <P>
771 * Sessions with durable subscribers must always provide the same client identifier. In addition, each client must
772 * specify a name which uniquely identifies (within client identifier) each durable subscription it creates. Only
773 * one session at a time can have a <CODE>TopicSubscriber</CODE> for a particular durable subscription. An
774 * inactive durable subscriber is one that exists but does not currently have a message consumer associated with it.
775 * <P>
776 * A client can change an existing durable subscription by creating a durable <CODE>TopicSubscriber</CODE> with
777 * the same name and a new topic and/or message selector. Changing a durable subscriber is equivalent to
778 * unsubscribing (deleting) the old one and creating a new one.
779 *
780 * @param topic the non-temporary <CODE>Topic</CODE> to subscribe to
781 * @param name the name used to identify this subscription
782 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
783 * value of null or an empty string indicates that there is no message selector for the message consumer.
784 * @param noLocal if set, inhibits the delivery of messages published by its own connection
785 * @return the Queue Browser
786 * @throws JMSException if the session fails to create a subscriber due to some internal error.
787 * @throws InvalidDestinationException if an invalid topic is specified.
788 * @throws InvalidSelectorException if the message selector is invalid.
789 * @since 1.1
790 */
791 public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal)
792 throws JMSException {
793 checkClosed();
794 return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), name,
795 messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy()
796 .getDurableTopicPrefetch(), noLocal, false);
797 }
798
799 /**
800 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue.
801 *
802 * @param queue the <CODE>queue</CODE> to access
803 * @return the Queue Browser
804 * @throws JMSException if the session fails to create a browser due to some internal error.
805 * @throws InvalidDestinationException if an invalid destination is specified
806 * @since 1.1
807 */
808 public QueueBrowser createBrowser(Queue queue) throws JMSException {
809 checkClosed();
810 return new ActiveMQQueueBrowser(this, (ActiveMQQueue) ActiveMQMessageTransformation.transformDestination(queue), "",
811 this.connection.getNextConsumerNumber());
812 }
813
814 /**
815 * Creates a <CODE>QueueBrowser</CODE> object to peek at the messages on the specified queue using a message
816 * selector.
817 *
818 * @param queue the <CODE>queue</CODE> to access
819 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
820 * value of null or an empty string indicates that there is no message selector for the message consumer.
821 * @return the Queue Browser
822 * @throws JMSException if the session fails to create a browser due to some internal error.
823 * @throws InvalidDestinationException if an invalid destination is specified
824 * @throws InvalidSelectorException if the message selector is invalid.
825 * @since 1.1
826 */
827 public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
828 checkClosed();
829 return new ActiveMQQueueBrowser(this, (ActiveMQQueue) ActiveMQMessageTransformation.transformDestination(queue),
830 messageSelector, this.connection.getNextConsumerNumber());
831 }
832
833 /**
834 * Creates a <CODE>TemporaryQueue</CODE> object. Its lifetime will be that of the <CODE>Connection</CODE> unless
835 * it is deleted earlier.
836 *
837 * @return a temporary queue identity
838 * @throws JMSException if the session fails to create a temporary queue due to some internal error.
839 * @since 1.1
840 */
841 public TemporaryQueue createTemporaryQueue() throws JMSException {
842 checkClosed();
843 String tempQueueName = "TemporaryQueue-"
844 + ActiveMQDestination.createTemporaryName(this.connection.getInitializedClientID());
845 tempQueueName += this.temporaryDestinationGenerator.generateId();
846 ActiveMQTemporaryQueue tempQueue = new ActiveMQTemporaryQueue(tempQueueName);
847 tempQueue.setSessionCreatedBy(this);
848 this.connection.startTemporaryDestination(tempQueue);
849 return tempQueue;
850 }
851
852 /**
853 * Creates a <CODE>TemporaryTopic</CODE> object. Its lifetime will be that of the <CODE>Connection</CODE> unless
854 * it is deleted earlier.
855 *
856 * @return a temporary topic identity
857 * @throws JMSException if the session fails to create a temporary topic due to some internal error.
858 * @since 1.1
859 */
860 public TemporaryTopic createTemporaryTopic() throws JMSException {
861 checkClosed();
862 String tempTopicName = "TemporaryTopic-"
863 + ActiveMQDestination.createTemporaryName(this.connection.getInitializedClientID());
864 tempTopicName += this.temporaryDestinationGenerator.generateId();
865 ActiveMQTemporaryTopic tempTopic = new ActiveMQTemporaryTopic(tempTopicName);
866 tempTopic.setSessionCreatedBy(this);
867 this.connection.startTemporaryDestination(tempTopic);
868 return tempTopic;
869 }
870
871 /**
872 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from the specified queue.
873 *
874 * @param queue the <CODE>Queue</CODE> to access
875 * @return @throws JMSException if the session fails to create a receiver due to some internal error.
876 * @throws JMSException
877 * @throws InvalidDestinationException if an invalid queue is specified.
878 */
879 public QueueReceiver createReceiver(Queue queue) throws JMSException {
880 checkClosed();
881 return new ActiveMQQueueReceiver(this, ActiveMQDestination.transformDestination(queue), "", this.connection
882 .getNextConsumerNumber(), this.connection.getPrefetchPolicy().getQueuePrefetch());
883 }
884
885 /**
886 * Creates a <CODE>QueueReceiver</CODE> object to receive messages from the specified queue using a message
887 * selector.
888 *
889 * @param queue the <CODE>Queue</CODE> to access
890 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
891 * value of null or an empty string indicates that there is no message selector for the message consumer.
892 * @return QueueReceiver
893 * @throws JMSException if the session fails to create a receiver due to some internal error.
894 * @throws InvalidDestinationException if an invalid queue is specified.
895 * @throws InvalidSelectorException if the message selector is invalid.
896 */
897 public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
898 checkClosed();
899 return new ActiveMQQueueReceiver(this, ActiveMQMessageTransformation.transformDestination(queue),
900 messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy()
901 .getQueuePrefetch());
902 }
903
904 /**
905 * Creates a <CODE>QueueSender</CODE> object to send messages to the specified queue.
906 *
907 * @param queue the <CODE>Queue</CODE> to access, or null if this is an unidentified producer
908 * @return QueueSender
909 * @throws JMSException if the session fails to create a sender due to some internal error.
910 * @throws InvalidDestinationException if an invalid queue is specified.
911 */
912 public QueueSender createSender(Queue queue) throws JMSException {
913 checkClosed();
914 return new ActiveMQQueueSender(this, ActiveMQMessageTransformation.transformDestination(queue));
915 }
916
917 /**
918 * Creates a nondurable subscriber to the specified topic. <p/>
919 * <P>
920 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages that have been published to a topic.
921 * <p/>
922 * <P>
923 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They receive only messages that are published
924 * while they are active. <p/>
925 * <P>
926 * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
927 * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
928 * value for this attribute is false.
929 *
930 * @param topic the <CODE>Topic</CODE> to subscribe to
931 * @return TopicSubscriber
932 * @throws JMSException if the session fails to create a subscriber due to some internal error.
933 * @throws InvalidDestinationException if an invalid topic is specified.
934 */
935 public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
936 checkClosed();
937 return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), null, null,
938 this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy().getTopicPrefetch(), false,
939 false);
940 }
941
942 /**
943 * Creates a nondurable subscriber to the specified topic, using a message selector or specifying whether messages
944 * published by its own connection should be delivered to it. <p/>
945 * <P>
946 * A client uses a <CODE>TopicSubscriber</CODE> object to receive messages that have been published to a topic.
947 * <p/>
948 * <P>
949 * Regular <CODE>TopicSubscriber</CODE> objects are not durable. They receive only messages that are published
950 * while they are active. <p/>
951 * <P>
952 * Messages filtered out by a subscriber's message selector will never be delivered to the subscriber. From the
953 * subscriber's perspective, they do not exist. <p/>
954 * <P>
955 * In some cases, a connection may both publish and subscribe to a topic. The subscriber <CODE>NoLocal</CODE>
956 * attribute allows a subscriber to inhibit the delivery of messages published by its own connection. The default
957 * value for this attribute is false.
958 *
959 * @param topic the <CODE>Topic</CODE> to subscribe to
960 * @param messageSelector only messages with properties matching the message selector expression are delivered. A
961 * value of null or an empty string indicates that there is no message selector for the message consumer.
962 * @param noLocal if set, inhibits the delivery of messages published by its own connection
963 * @return TopicSubscriber
964 * @throws JMSException if the session fails to create a subscriber due to some internal error.
965 * @throws InvalidDestinationException if an invalid topic is specified.
966 * @throws InvalidSelectorException if the message selector is invalid.
967 */
968 public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
969 checkClosed();
970 return new ActiveMQTopicSubscriber(this, ActiveMQMessageTransformation.transformDestination(topic), null,
971 messageSelector, this.connection.getNextConsumerNumber(), this.connection.getPrefetchPolicy()
972 .getTopicPrefetch(), noLocal, false);
973 }
974
975 /**
976 * Creates a publisher for the specified topic. <p/>
977 * <P>
978 * A client uses a <CODE>TopicPublisher</CODE> object to publish messages on a topic. Each time a client creates a
979 * <CODE>TopicPublisher</CODE> on a topic, it defines a new sequence of messages that have no ordering
980 * relationship with the messages it has previously sent.
981 *
982 * @param topic the <CODE>Topic</CODE> to publish to, or null if this is an unidentified producer
983 * @return TopicPublisher
984 * @throws JMSException if the session fails to create a publisher due to some internal error.
985 * @throws InvalidDestinationException if an invalid topic is specified.
986 */
987 public TopicPublisher createPublisher(Topic topic) throws JMSException {
988 checkClosed();
989 return new ActiveMQTopicPublisher(this, ActiveMQMessageTransformation.transformDestination(topic));
990 }
991
992 /**
993 * Unsubscribes a durable subscription that has been created by a client.
994 * <P>
995 * This method deletes the state being maintained on behalf of the subscriber by its provider.
996 * <P>
997 * It is erroneous for a client to delete a durable subscription while there is an active <CODE>MessageConsumer
998 * </CODE> or <CODE>TopicSubscriber</CODE> for the subscription, or while a consumed message is part of a pending
999 * transaction or has not been acknowledged in the session.
1000 *
1001 * @param name the name used to identify this subscription
1002 * @throws JMSException if the session fails to unsubscribe to the durable subscription due to some internal error.
1003 * @throws InvalidDestinationException if an invalid subscription name is specified.
1004 * @since 1.1
1005 */
1006 public void unsubscribe(String name) throws JMSException {
1007 checkClosed();
1008 DurableUnsubscribe ds = new DurableUnsubscribe();
1009 ds.setClientId(this.connection.getClientID());
1010 ds.setSubscriberName(name);
1011 this.connection.syncSendPacket(ds);
1012 }
1013
1014 /**
1015 * Tests to see if the Message Dispatcher is a target for this message
1016 *
1017 * @param message the message to test
1018 * @return true if the Message Dispatcher can dispatch the message
1019 */
1020 public boolean isTarget(ActiveMQMessage message) {
1021 for (Iterator i = this.consumers.iterator();i.hasNext();) {
1022 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
1023 if (message.isConsumerTarget(consumer.getConsumerNumber())) {
1024 return true;
1025 }
1026 }
1027 return false;
1028 }
1029
1030 /**
1031 * Dispatch an ActiveMQMessage
1032 *
1033 * @param message
1034 */
1035 public void dispatch(ActiveMQMessage message) {
1036 message.setMessageAcknowledge(this);
1037 messageExecutor.execute(message);
1038 }
1039
1040 /**
1041 * Acknowledges all consumed messages of the session of this consumed message.
1042 * <P>
1043 * All consumed JMS messages support the <CODE>acknowledge</CODE> method for use when a client has specified that
1044 * its JMS session's consumed messages are to be explicitly acknowledged. By invoking <CODE>acknowledge</CODE> on
1045 * a consumed message, a client acknowledges all messages consumed by the session that the message was delivered to.
1046 * <P>
1047 * Calls to <CODE>acknowledge</CODE> are ignored for both transacted sessions and sessions specified to use
1048 * implicit acknowledgement modes.
1049 * <P>
1050 * A client may individually acknowledge each message as it is consumed, or it may choose to acknowledge messages as
1051 * an application-defined group (which is done by calling acknowledge on the last received message of the group,
1052 * thereby acknowledging all messages consumed by the session.)
1053 * <P>
1054 * Messages that have been received but not acknowledged may be redelivered.
1055 * @param caller - the message calling acknowledge on the session
1056 *
1057 * @throws JMSException if the JMS provider fails to acknowledge the messages due to some internal error.
1058 * @throws javax.jms.IllegalStateException if this method is called on a closed session.
1059 * @see javax.jms.Session#CLIENT_ACKNOWLEDGE
1060 */
1061 public void acknowledge(ActiveMQMessage caller) throws JMSException {
1062 checkClosed();
1063 /**
1064 * Find the caller and ensure it is marked as consumed
1065 * This is to ensure acknowledge called by a
1066 * MessageListener works correctly
1067 */
1068 ActiveMQMessage msg = (ActiveMQMessage)deliveredMessages.get(caller);
1069 if (msg != null){
1070 msg.setMessageConsumed(true);
1071 }
1072
1073 doAcknowledge(false);
1074 }
1075
1076 protected void doAcknowledge(boolean isClosing) throws JMSException {
1077 if (!closed) {
1078 if (this.acknowledgeMode == Session.CLIENT_ACKNOWLEDGE) {
1079 ActiveMQMessage msg = null;
1080 while((msg = (ActiveMQMessage)deliveredMessages.removeFirst())!=null){
1081 boolean messageConsumed = isClosing ? false : msg.isMessageConsumed();
1082 if (!msg.isTransientConsumed()){
1083 sendMessageAck(msg, messageConsumed, false);
1084 }else {
1085 if (!messageConsumed){
1086 connection.addToTransientConsumedRedeliverCache(msg);
1087 }
1088 }
1089 }
1090 deliveredMessages.clear();
1091 }
1092 }
1093 }
1094
1095 protected void beforeMessageDelivered(ActiveMQMessage message) {
1096 if (message != null && !closed) {
1097 deliveredMessages.add(message);
1098 }
1099 }
1100
1101 protected void afterMessageDelivered(boolean sendAcknowledge, ActiveMQMessage message, boolean messageConsumed,
1102 boolean messageExpired, boolean beforeCalled) {
1103 if (message != null && !closed) {
1104 if ((isClientAcknowledge() && !messageExpired) || (isTransacted() && message.isTransientConsumed())) {
1105 message.setMessageConsumed(messageConsumed);
1106 if (!beforeCalled) {
1107 deliveredMessages.add(message);
1108 }
1109 }
1110 else {
1111 if (beforeCalled) {
1112 deliveredMessages.remove(message);
1113 }
1114 }
1115 //don't send acks for expired messages unless sendAcknowledge is set
1116 //the sendAcknowledge flag is set for all messages expect those destined
1117 //for transient Topic subscribers
1118 if (sendAcknowledge && !isClientAcknowledge()) {
1119 try {
1120 doStartTransaction();
1121 sendMessageAck(message,messageConsumed,messageExpired);
1122 }
1123 catch (JMSException e) {
1124 log.warn("failed to notify Broker that message is delivered", e);
1125 }
1126 }
1127 }
1128 }
1129
1130 /**
1131 * remove a temporary destination
1132 * @param destination
1133 * @throws JMSException if active subscribers already exist
1134 */
1135 public void removeTemporaryDestination(ActiveMQDestination destination) throws JMSException{
1136 this.connection.stopTemporaryDestination(destination);
1137 }
1138
1139 private void sendMessageAck(ActiveMQMessage message, boolean messageConsumed, boolean messageExpired)
1140 throws JMSException {
1141 if (message.isMessagePart()) {
1142 ActiveMQMessage[] parts = (ActiveMQMessage[]) connection.getAssemblies().remove(message.getParentMessageID());
1143 if (parts != null) {
1144 for (int i = 0;i < parts.length;i++) {
1145 parts[i].setConsumerIdentifer(message.getConsumerIdentifer());
1146 doSendMessageAck(parts[i], messageConsumed, messageExpired);
1147 }
1148 }
1149 else {
1150 JMSException jmsEx = new JMSException("Could not find parts for fragemented message: " + message);
1151 connection.onException(jmsEx);
1152 }
1153 }
1154 else {
1155 doSendMessageAck(message, messageConsumed, messageExpired);
1156 }
1157 }
1158
1159 private void doSendMessageAck(ActiveMQMessage message, boolean messageConsumed, boolean messageExpired)
1160 throws JMSException {
1161 if (message != null && !message.isAdvisory()) {
1162 MessageAck ack = new MessageAck();
1163 ack.setConsumerId(message.getConsumerIdentifer());
1164 ack.setTransactionId(transactionContext.getTransactionId());
1165 ack.setExternalMessageId(message.isExternalMessageId());
1166 ack.setMessageID(message.getJMSMessageID());
1167 ack.setSequenceNumber(message.getSequenceNumber());
1168 ack.setProducerKey(message.getProducerKey());
1169 ack.setMessageRead(messageConsumed);
1170 ack.setDestination(message.getJMSActiveMQDestination());
1171 ack.setPersistent(message.getJMSDeliveryMode() == DeliveryMode.PERSISTENT);
1172 ack.setExpired(messageExpired);
1173 ack.setSessionId(getSessionId());
1174 this.connection.asyncSendPacket(ack);
1175 }
1176 }
1177
1178 /**
1179 * @param consumer
1180 * @throws JMSException
1181 */
1182 protected void addConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1183 // ensure that the connection info is sent to the broker
1184 connection.sendConnectionInfoToBroker();
1185 // lets add the stat
1186 if (consumer.isDurableSubscriber()) {
1187 stats.onCreateDurableSubscriber();
1188 }
1189 ConsumerInfo info = createConsumerInfo(consumer);
1190 info.setStarted(true);
1191 //we add before notifying the server - as messages could
1192 //start to be dispatched before receipt from syncSend()
1193 //is returned
1194 this.consumers.add(consumer);
1195 if (started.get()){
1196 connection.replayTransientConsumedRedeliveredMessages(this,consumer);
1197 }
1198 try {
1199 this.connection.syncSendPacket(info);
1200 }
1201 catch (JMSException jmsEx) {
1202 this.consumers.remove(consumer);
1203 throw jmsEx;
1204 }
1205 }
1206
1207 /**
1208 * @param consumer
1209 * @throws JMSException
1210 */
1211 protected void removeConsumer(ActiveMQMessageConsumer consumer) throws JMSException {
1212 this.consumers.remove(consumer);
1213 // lets remove the stat
1214 if (consumer.isDurableSubscriber()) {
1215 stats.onRemoveDurableSubscriber();
1216 }
1217 if (!closed) {
1218 ConsumerInfo info = createConsumerInfo(consumer);
1219 info.setStarted(false);
1220 this.connection.asyncSendPacket(info, false);
1221 }
1222 }
1223
1224 protected ConsumerInfo createConsumerInfo(ActiveMQMessageConsumer consumer) throws JMSException {
1225 ConsumerInfo info = new ConsumerInfo();
1226 info.setConsumerId(consumer.consumerIdentifier);
1227 info.setClientId(connection.clientID);
1228 info.setSessionId(this.sessionId);
1229 info.setConsumerNo(consumer.consumerNumber);
1230 info.setPrefetchNumber(consumer.prefetchNumber);
1231 info.setDestination(consumer.destination);
1232 info.setNoLocal(consumer.noLocal);
1233 info.setBrowser(consumer.browser);
1234 info.setSelector(consumer.messageSelector);
1235 info.setStartTime(consumer.startTime);
1236 info.setConsumerName(consumer.consumerName);
1237 return info;
1238 }
1239
1240 /**
1241 * @param producer
1242 * @throws JMSException
1243 */
1244 protected void addProducer(ActiveMQMessageProducer producer) throws JMSException {
1245 // ensure that the connection info is sent to the broker
1246 connection.sendConnectionInfoToBroker();
1247 //start listening for advisories if the destination is temporary
1248 this.connection.startAdvisoryForTempDestination(producer.defaultDestination);
1249 producer.setProducerId(connection.handleIdGenerator.getNextShortSequence());
1250 ProducerInfo info = createProducerInfo(producer);
1251 info.setStarted(true);
1252 this.connection.asyncSendPacket(info);
1253 this.producers.add(producer);
1254 }
1255
1256 /**
1257 * @param producer
1258 * @throws JMSException
1259 */
1260 protected void removeProducer(ActiveMQMessageProducer producer) throws JMSException {
1261 this.producers.remove(producer);
1262 if (!closed) {
1263 this.connection.stopAdvisoryForTempDestination(producer.defaultDestination);
1264 ProducerInfo info = createProducerInfo(producer);
1265 info.setStarted(false);
1266 this.connection.asyncSendPacket(info, false);
1267 }
1268 }
1269
1270 protected ProducerInfo createProducerInfo(ActiveMQMessageProducer producer) throws JMSException {
1271 ProducerInfo info = new ProducerInfo();
1272 info.setProducerId(producer.getProducerId());
1273 info.setClientId(connection.clientID);
1274 info.setSessionId(this.sessionId);
1275 info.setDestination(producer.defaultDestination);
1276 info.setStartTime(producer.getStartTime());
1277 return info;
1278 }
1279
1280 /**
1281 * Start this Session
1282 * @throws JMSException
1283 */
1284 protected void start() throws JMSException {
1285 started.set(true);
1286 for (Iterator i = consumers.iterator(); i.hasNext();){
1287 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer)i.next();
1288 connection.replayTransientConsumedRedeliveredMessages(this,consumer);
1289 }
1290 messageExecutor.start();
1291 }
1292
1293 /**
1294 * Stop this Session
1295 */
1296 protected void stop() {
1297 started.set(false);
1298 messageExecutor.stop();
1299 }
1300
1301 /**
1302 * @return Returns the sessionId.
1303 */
1304 protected short getSessionId() {
1305 return sessionId;
1306 }
1307
1308 /**
1309 * @param sessionId The sessionId to set.
1310 */
1311 protected void setSessionId(short sessionId) {
1312 this.sessionId = sessionId;
1313 }
1314
1315 /**
1316 * @return Returns the startTime.
1317 */
1318 protected long getStartTime() {
1319 return startTime;
1320 }
1321
1322 /**
1323 * @param startTime The startTime to set.
1324 */
1325 protected void setStartTime(long startTime) {
1326 this.startTime = startTime;
1327 }
1328
1329 /**
1330 * send the message for dispatch by the broker
1331 *
1332 * @param producer
1333 * @param destination
1334 * @param message
1335 * @param deliveryMode
1336 * @param priority
1337 * @param timeToLive
1338 * @throws JMSException
1339 */
1340 protected void send(ActiveMQMessageProducer producer, Destination destination, Message message, int deliveryMode,
1341 int priority, long timeToLive, boolean reuseMessageId) throws JMSException {
1342 checkClosed();
1343 // ensure that the connection info is sent to the broker
1344 connection.sendConnectionInfoToBroker();
1345 // tell the Broker we are about to start a new transaction
1346 doStartTransaction();
1347 message.setJMSDestination(destination);
1348 message.setJMSDeliveryMode(deliveryMode);
1349 message.setJMSPriority(priority);
1350 long expiration = 0L;
1351 if (!producer.getDisableMessageTimestamp()) {
1352 long timeStamp = System.currentTimeMillis();
1353 message.setJMSTimestamp(timeStamp);
1354 if (timeToLive > 0) {
1355 expiration = timeToLive + timeStamp;
1356 }
1357 }
1358 message.setJMSExpiration(expiration);
1359 String id = message.getJMSMessageID();
1360 String producerKey = producer.getProducerMessageKey();
1361 long sequenceNumber = producer.getIdGenerator().getNextSequence();
1362
1363 if ((id == null || id.length() == 0) || !producer.getDisableMessageID() && !reuseMessageId) {
1364 message.setJMSMessageID(producerKey + sequenceNumber);
1365 }
1366 //transform to our own message format here
1367 ActiveMQMessage msg = ActiveMQMessageTransformation.transformMessage(message);
1368 if (connection.isCopyMessageOnSend()){
1369 msg = msg.shallowCopy();
1370 }
1371 //clear identity - incase forwared on
1372 msg.setJMSMessageIdentity(null);
1373 msg.setExternalMessageId(id != null);
1374 msg.setSequenceNumber(sequenceNumber);
1375 msg.setProducerKey(producerKey);
1376 msg.setTransactionId(transactionContext.getTransactionId());
1377 msg.setJMSClientID(this.connection.clientID);
1378 msg.setMesssageHandle(producer.getProducerId());
1379 //reset state as could be forwarded on
1380 msg.setJMSRedelivered(false);
1381 if (!connection.isInternalConnection()){
1382 msg.clearBrokersVisited();
1383 connection.validateDestination(msg.getJMSActiveMQDestination());
1384 }
1385
1386 if (this.connection.isPrepareMessageBodyOnSend()){
1387 msg.prepareMessageBody();
1388 }
1389 //do message payload compression
1390 if (connection.isDoMessageCompression()){
1391 try {
1392 msg.getBodyAsBytes(compression);
1393 }
1394 catch (IOException e) {
1395 JMSException jmsEx = new JMSException("Failed to compress message payload");
1396 jmsEx.setLinkedException(e);
1397 throw jmsEx;
1398 }
1399 }
1400 boolean fragmentedMessage = connection.isDoMessageFragmentation();
1401 if (fragmentedMessage && !msg.isMessagePart()){
1402 try {
1403 ByteArrayFragmentation fragmentation = connection.getFragmentation();
1404 fragmentedMessage = fragmentation.doFragmentation(msg.getBodyAsBytes());
1405 if (fragmentedMessage){
1406 ByteArray[] array = fragmentation.fragment(msg.getBodyAsBytes());
1407 String parentMessageId = msg.getJMSMessageID();
1408 for (int i = 0; i < array.length; i++){
1409 ActiveMQMessage fragment = msg.shallowCopy();
1410 fragment.setJMSMessageID(null);
1411 fragment.setMessagePart(true);
1412 fragment.setParentMessageID(parentMessageId);
1413 fragment.setNumberOfParts((short)array.length);
1414 fragment.setPartNumber((short)i);
1415 if (i != 0){
1416 fragment.setSequenceNumber(producer.getIdGenerator().getNextSequence());
1417 }
1418 fragment.setBodyAsBytes(array[i]);
1419 if (this.connection.isUseAsyncSend()) {
1420 this.connection.asyncSendPacket(fragment);
1421 }
1422 else {
1423 this.connection.syncSendPacket(fragment);
1424 }
1425
1426 }
1427 }
1428 }catch (IOException e) {
1429 JMSException jmsEx = new JMSException("Failed to fragment message payload");
1430 jmsEx.setLinkedException(e);
1431 throw jmsEx;
1432 }
1433 }
1434 if (log.isDebugEnabled()) {
1435 log.debug("Sending message: " + msg);
1436 }
1437
1438 if (!fragmentedMessage){
1439 if (this.connection.isUseAsyncSend()) {
1440 this.connection.asyncSendPacket(msg);
1441 }
1442 else {
1443 this.connection.syncSendPacket(msg);
1444 }
1445 }
1446 }
1447
1448 /**
1449 * Send TransactionInfo to indicate transaction has started
1450 *
1451 * @throws JMSException if some internal error occurs
1452 */
1453 protected void doStartTransaction() throws JMSException {
1454 if (getTransacted() && !transactionContext.isInXATransaction()) {
1455 transactionContext.begin();
1456 }
1457 }
1458
1459 protected void setSessionConsumerDispatchState(int value) throws JMSException {
1460 if (consumerDispatchState != ActiveMQSession.CONSUMER_DISPATCH_UNSET && value != consumerDispatchState) {
1461 String errorStr = "Cannot mix consumer dispatching on a session - already: ";
1462 if (value == ActiveMQSession.CONSUMER_DISPATCH_SYNC) {
1463 errorStr += "synchronous";
1464 }
1465 else {
1466 errorStr += "asynchronous";
1467 }
1468 throw new IllegalStateException(errorStr);
1469 }
1470 consumerDispatchState = value;
1471 }
1472
1473 protected void redeliverUnacknowledgedMessages() {
1474 redeliverUnacknowledgedMessages(false);
1475 }
1476
1477 protected void redeliverUnacknowledgedMessages(boolean onlyDeliverTransientConsumed) {
1478 messageExecutor.stop();
1479 LinkedList replay = new LinkedList();
1480 Object obj = null;
1481 while ((obj = deliveredMessages.removeFirst()) != null) {
1482 replay.add(obj);
1483 }
1484
1485 deliveredMessages.clear();
1486 if (!replay.isEmpty()) {
1487 for (ListIterator i = replay.listIterator(replay.size());i.hasPrevious();) {
1488 ActiveMQMessage msg = (ActiveMQMessage) i.previous();
1489 if (!onlyDeliverTransientConsumed || msg.isTransientConsumed()) {
1490 msg.setJMSRedelivered(true);
1491 msg.incrementDeliveryCount();
1492 messageExecutor.executeFirst(msg);
1493 }
1494 }
1495 }
1496 replay.clear();
1497 messageExecutor.start();
1498 }
1499
1500 protected void clearMessagesInProgress() {
1501 messageExecutor.clearMessagesInProgress();
1502 for (Iterator i = consumers.iterator();i.hasNext();) {
1503 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
1504 consumer.clearMessagesInProgress();
1505 }
1506 }
1507
1508 public boolean hasUncomsumedMessages() {
1509 return messageExecutor.hasUncomsumedMessages();
1510 }
1511
1512 public List getUnconsumedMessages() {
1513 return messageExecutor.getUnconsumedMessages();
1514 }
1515
1516 public boolean isTransacted() {
1517 return this.acknowledgeMode == Session.SESSION_TRANSACTED;
1518 }
1519
1520 protected boolean isClientAcknowledge() {
1521 return this.acknowledgeMode == Session.CLIENT_ACKNOWLEDGE;
1522 }
1523
1524 /**
1525 * @return Returns the internalSession.
1526 */
1527 public boolean isInternalSession() {
1528 return internalSession;
1529 }
1530 /**
1531 * @param internalSession The internalSession to set.
1532 */
1533 public void setInternalSession(boolean internalSession) {
1534 this.internalSession = internalSession;
1535 }
1536
1537 public DeliveryListener getDeliveryListener() {
1538 return deliveryListener;
1539 }
1540
1541
1542 public void setDeliveryListener(DeliveryListener deliveryListener) {
1543 this.deliveryListener = deliveryListener;
1544 }
1545
1546 }