001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq;
020
021 import java.io.IOException;
022 import java.util.Iterator;
023 import java.util.Map;
024
025 import javax.jms.Connection;
026 import javax.jms.ConnectionConsumer;
027 import javax.jms.ConnectionMetaData;
028 import javax.jms.DeliveryMode;
029 import javax.jms.Destination;
030 import javax.jms.ExceptionListener;
031 import javax.jms.IllegalStateException;
032 import javax.jms.JMSException;
033 import javax.jms.Queue;
034 import javax.jms.QueueConnection;
035 import javax.jms.QueueSession;
036 import javax.jms.ServerSessionPool;
037 import javax.jms.Session;
038 import javax.jms.Topic;
039 import javax.jms.TopicConnection;
040 import javax.jms.TopicSession;
041 import javax.jms.XAConnection;
042
043 import org.activemq.advisories.TempDestinationAdvisor;
044 import org.activemq.advisories.TempDestinationAdvisoryEvent;
045 import org.activemq.capacity.CapacityMonitorEvent;
046 import org.activemq.capacity.CapacityMonitorEventListener;
047 import org.activemq.filter.AndFilter;
048 import org.activemq.filter.Filter;
049 import org.activemq.filter.FilterFactory;
050 import org.activemq.filter.FilterFactoryImpl;
051 import org.activemq.filter.NoLocalFilter;
052 import org.activemq.io.util.ByteArray;
053 import org.activemq.io.util.ByteArrayCompression;
054 import org.activemq.io.util.ByteArrayFragmentation;
055 import org.activemq.io.util.MemoryBoundedObjectManager;
056 import org.activemq.io.util.MemoryBoundedQueue;
057 import org.activemq.io.util.MemoryBoundedQueueManager;
058 import org.activemq.management.JMSConnectionStatsImpl;
059 import org.activemq.management.JMSStatsImpl;
060 import org.activemq.management.StatsCapable;
061 import org.activemq.management.StatsImpl;
062 import org.activemq.message.ActiveMQDestination;
063 import org.activemq.message.ActiveMQMessage;
064 import org.activemq.message.ActiveMQObjectMessage;
065 import org.activemq.message.BrokerAdminCommand;
066 import org.activemq.message.CapacityInfo;
067 import org.activemq.message.CleanupConnectionInfo;
068 import org.activemq.message.ConnectionInfo;
069 import org.activemq.message.ConsumerInfo;
070 import org.activemq.message.Packet;
071 import org.activemq.message.PacketListener;
072 import org.activemq.message.ProducerInfo;
073 import org.activemq.message.Receipt;
074 import org.activemq.message.ResponseReceipt;
075 import org.activemq.message.SessionInfo;
076 import org.activemq.message.TransactionInfo;
077 import org.activemq.message.WireFormatInfo;
078 import org.activemq.message.XATransactionInfo;
079 import org.activemq.transport.TransportChannel;
080 import org.activemq.transport.TransportStatusEvent;
081 import org.activemq.transport.TransportStatusEventListener;
082 import org.activemq.util.IdGenerator;
083 import org.activemq.util.JMSExceptionHelper;
084 import org.apache.commons.logging.Log;
085 import org.apache.commons.logging.LogFactory;
086
087 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
088 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
089 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
090 import EDU.oswego.cs.dl.util.concurrent.SynchronizedInt;
091
092 /**
093 * A <CODE>Connection</CODE> object is a client's active connection to its JMS
094 * provider. It typically allocates provider resources outside the Java virtual
095 * machine (JVM).
096 * <P>
097 * Connections support concurrent use.
098 * <P>
099 * A connection serves several purposes:
100 * <UL>
101 * <LI>It encapsulates an open connection with a JMS provider. It typically
102 * represents an open TCP/IP socket between a client and the service provider
103 * software.
104 * <LI>Its creation is where client authentication takes place.
105 * <LI>It can specify a unique client identifier.
106 * <LI>It provides a <CODE>ConnectionMetaData</CODE> object.
107 * <LI>It supports an optional <CODE>ExceptionListener</CODE> object.
108 * </UL>
109 * <P>
110 * Because the creation of a connection involves setting up authentication and
111 * communication, a connection is a relatively heavyweight object. Most clients
112 * will do all their messaging with a single connection. Other more advanced
113 * applications may use several connections. The JMS API does not architect a
114 * reason for using multiple connections; however, there may be operational
115 * reasons for doing so.
116 * <P>
117 * A JMS client typically creates a connection, one or more sessions, and a
118 * number of message producers and consumers. When a connection is created, it
119 * is in stopped mode. That means that no messages are being delivered.
120 * <P>
121 * It is typical to leave the connection in stopped mode until setup is complete
122 * (that is, until all message consumers have been created). At that point, the
123 * client calls the connection's <CODE>start</CODE> method, and messages begin
124 * arriving at the connection's consumers. This setup convention minimizes any
125 * client confusion that may result from asynchronous message delivery while the
126 * client is still in the process of setting itself up.
127 * <P>
128 * A connection can be started immediately, and the setup can be done
129 * afterwards. Clients that do this must be prepared to handle asynchronous
130 * message delivery while they are still in the process of setting up.
131 * <P>
132 * A message producer can send messages while a connection is stopped. <p/>This
133 * class is also a <CODE>TopicConnection </CODE>. A <CODE>TopicConnection</CODE>
134 * object is an active connection to a publish/subscribe JMS provider. A client
135 * uses a <CODE>TopicConnection</CODE> object to create one or more <CODE>TopicSession</CODE>
136 * objects for producing and consuming messages.
137 * <P>
138 * A <CODE>TopicConnection</CODE> can be used to create a <CODE>TopicSession</CODE>,
139 * from which specialized topic-related objects can be created. A more general,
140 * and recommended approach is to use the <CODE>Connection </CODE> object.
141 * <P>
142 * <p/><p/>This class is also a <CODE>QueueConnection</CODE>. A A <CODE>QueueConnection</CODE>
143 * object is an active connection to a point-to-point JMS provider. A client
144 * uses a <CODE>QueueConnection</CODE> object to create one or more <CODE>QueueSession</CODE>
145 * objects for producing and consuming messages.
146 * <P>
147 * A <CODE>QueueConnection</CODE> can be used to create a <CODE>QueueSession</CODE>,
148 * from which specialized queue-related objects can be created. A more general,
149 * and recommended, approach is to use the <CODE>Connection </CODE> object.
150 * <P>
151 * A <CODE>QueueConnection</CODE> cannot be used to create objects specific to
152 * the publish/subscribe domain. The <CODE>createDurableConnectionConsumer</CODE>
153 * method inherits from <CODE>Connection</CODE>, but must throw an <CODE>IllegalStateException</CODE>
154 * if used from <CODE>QueueConnection</CODE>. // *
155 *
156 * @version $Revision: 1.1.1.1 $
157 * @see javax.jms.Connection
158 * @see javax.jms.ConnectionFactory
159 * @see javax.jms.QueueConnection
160 * @see javax.jms.TopicConnection
161 * @see javax.jms.TopicConnectionFactory
162 * @see javax.jms.QueueConnection
163 * @see javax.jms.QueueConnectionFactory
164 */
165 public class ActiveMQConnection implements Connection, PacketListener,
166 ExceptionListener, TopicConnection, QueueConnection, StatsCapable,
167 CapacityMonitorEventListener, TransportStatusEventListener, Closeable {
168
169 /**
170 * Default UserName for the Connection
171 */
172 public static final String DEFAULT_USER = "defaultUser";
173
174 /**
175 * Default URL for the ActiveMQ Broker
176 */
177 public static final String DEFAULT_BROKER_URL = "tcp://localhost:61616";
178
179 /**
180 * Default client URL. If using a message broker in a hub(s)/spoke
181 * architecture - use the DEFAULT_BROKER_URL
182 *
183 * @see ActiveMQConnection#DEFAULT_BROKER_URL
184 */
185 public static final String DEFAULT_URL = "peer://development";
186
187 /**
188 * Default Password for the Connection
189 */
190 public static final String DEFAULT_PASSWORD = "defaultPassword";
191
192 private static final Log log = LogFactory.getLog(ActiveMQConnection.class);
193
194 private static final int DEFAULT_CONNECTION_MEMORY_LIMIT = 10 * 1024 * 1024;
195
196 // properties
197 private ActiveMQConnectionFactory factory;
198
199 private String userName;
200
201 private String password;
202
203 protected String clientID;
204
205 private int sendCloseTimeout = 2000;
206
207 private TransportChannel transportChannel;
208
209 private ExceptionListener exceptionListener;
210
211 private ActiveMQPrefetchPolicy prefetchPolicy;
212
213 private JMSStatsImpl factoryStats;
214
215 private MemoryBoundedObjectManager memoryManager;
216
217 private MemoryBoundedQueueManager boundedQueueManager;
218
219 protected IdGenerator handleIdGenerator;
220
221 private IdGenerator clientIdGenerator;
222
223 protected IdGenerator packetIdGenerator;
224
225 private IdGenerator sessionIdGenerator;
226
227 private JMSConnectionStatsImpl stats;
228
229 // internal state
230 private CopyOnWriteArrayList sessions;
231
232 private CopyOnWriteArrayList messageDispatchers;
233
234 private CopyOnWriteArrayList connectionConsumers;
235
236 private SynchronizedInt consumerNumberGenerator;
237
238 private ActiveMQConnectionMetaData connectionMetaData;
239
240 private boolean closed;
241
242 private SynchronizedBoolean started;
243
244 private boolean clientIDSet;
245
246 private boolean isConnectionInfoSentToBroker;
247
248 private boolean isTransportOK;
249
250 private boolean startedTransport;
251
252 private long startTime;
253
254 private long flowControlSleepTime = 0;
255 private Object flowControlMutex = new Object();
256
257 private boolean quickClose;
258
259 private boolean internalConnection;// used for notifying that the
260 // connection is used for networks etc.
261
262 private boolean userSpecifiedClientID;
263
264 /**
265 * Should we use an async send for persistent non transacted messages ?
266 */
267 protected boolean useAsyncSend = true;
268
269 private int sendConnectionInfoTimeout = 30000;
270
271 private boolean disableTimeStampsByDefault = false;
272
273 private boolean J2EEcompliant = true;
274
275 private boolean prepareMessageBodyOnSend = true;
276
277 private boolean copyMessageOnSend = true;
278
279 // compression and fragmentation variables
280
281 private boolean doMessageCompression = true;
282
283 private int messageCompressionLimit = ByteArrayCompression.DEFAULT_COMPRESSION_LIMIT;// data
284 // size
285 // above
286 // which
287 // compression
288 // will
289 // be
290 // used
291
292 private int messageCompressionLevel = ByteArrayCompression.DEFAULT_COMPRESSION_LEVEL;
293
294 private int messageCompressionStrategy = ByteArrayCompression.DEFAULT_COMPRESSION_STRATEGY;// default
295 // compression
296 // strategy
297
298 private boolean doMessageFragmentation = false;
299
300 private int messageFragmentationLimit = ByteArrayFragmentation.DEFAULT_FRAGMENTATION_LIMIT;
301
302 private boolean cachingEnabled = true;
303
304 private boolean optimizedMessageDispatch = false;
305
306 private CopyOnWriteArrayList transientConsumedRedeliverCache;
307
308 private FilterFactory filterFactory;
309
310 private Map tempDestinationMap;
311
312 private Map validDestinationsMap;
313
314 private String resourceManagerId;
315 //used for assembling message fragments
316 private final ConcurrentHashMap assemblies= new ConcurrentHashMap();
317 private final ByteArrayFragmentation fragmentation = new ByteArrayFragmentation();
318
319 /**
320 * A static helper method to create a new connection
321 *
322 * @return an ActiveMQConnection
323 * @throws JMSException
324 */
325 public static ActiveMQConnection makeConnection() throws JMSException {
326 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
327 return (ActiveMQConnection) factory.createConnection();
328 }
329
330 /**
331 * A static helper method to create a new connection
332 *
333 * @param uri
334 * @return and ActiveMQConnection
335 * @throws JMSException
336 */
337 public static ActiveMQConnection makeConnection(String uri)
338 throws JMSException {
339 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(uri);
340 return (ActiveMQConnection) factory.createConnection();
341 }
342
343 /**
344 * A static helper method to create a new connection
345 *
346 * @param user
347 * @param password
348 * @param uri
349 * @return an ActiveMQConnection
350 * @throws JMSException
351 */
352 public static ActiveMQConnection makeConnection(String user,
353 String password, String uri) throws JMSException {
354 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(user,
355 password, uri);
356 return (ActiveMQConnection) factory.createConnection();
357 }
358
359 /**
360 * Constructs a connection from an existing TransportChannel and
361 * user/password.
362 *
363 * @param factory
364 * @param theUserName
365 * the users name
366 * @param thePassword
367 * the password
368 * @param transportChannel
369 * the transport channel to communicate with the server
370 * @throws JMSException
371 */
372 public ActiveMQConnection(ActiveMQConnectionFactory factory,
373 String theUserName, String thePassword,
374 TransportChannel transportChannel) throws JMSException {
375 this(factory, theUserName, thePassword);
376 this.transportChannel = transportChannel;
377 this.transportChannel.setPacketListener(this);
378 this.transportChannel.setExceptionListener(this);
379 this.transportChannel.addTransportStatusEventListener(this);
380 this.isTransportOK = true;
381 }
382
383 protected ActiveMQConnection(ActiveMQConnectionFactory factory,
384 String theUserName, String thePassword) {
385 this.factory = factory;
386 this.userName = theUserName;
387 this.password = thePassword;
388 this.clientIdGenerator = new IdGenerator();
389 this.packetIdGenerator = new IdGenerator();
390 this.handleIdGenerator = new IdGenerator();
391 this.sessionIdGenerator = new IdGenerator();
392 this.consumerNumberGenerator = new SynchronizedInt(0);
393 this.sessions = new CopyOnWriteArrayList();
394 this.messageDispatchers = new CopyOnWriteArrayList();
395 this.connectionConsumers = new CopyOnWriteArrayList();
396 this.connectionMetaData = new ActiveMQConnectionMetaData();
397 this.started = new SynchronizedBoolean(false);
398 this.startTime = System.currentTimeMillis();
399 this.prefetchPolicy = new ActiveMQPrefetchPolicy();
400 this.memoryManager = new MemoryBoundedObjectManager(clientID,
401 DEFAULT_CONNECTION_MEMORY_LIMIT);
402 this.boundedQueueManager = new MemoryBoundedQueueManager(memoryManager);
403 this.memoryManager.addCapacityEventListener(this);
404 boolean transactional = this instanceof XAConnection;
405 factoryStats = factory.getFactoryStats();
406 factoryStats.addConnection(this);
407 stats = new JMSConnectionStatsImpl(sessions, transactional);
408 this.transientConsumedRedeliverCache = new CopyOnWriteArrayList();
409 this.tempDestinationMap = new ConcurrentHashMap();
410 this.validDestinationsMap = new ConcurrentHashMap();
411 factory.onConnectionCreate(this);
412 }
413
414 /**
415 * @return statistics for this Connection
416 */
417 public StatsImpl getStats() {
418 return stats;
419 }
420
421 /**
422 * @return a number unique for this connection
423 */
424 public JMSConnectionStatsImpl getConnectionStats() {
425 return stats;
426 }
427
428 /**
429 * Creates a <CODE>Session</CODE> object.
430 *
431 * @param transacted
432 * indicates whether the session is transacted
433 * @param acknowledgeMode
434 * indicates whether the consumer or the client will acknowledge
435 * any messages it receives; ignored if the session is
436 * transacted. Legal values are
437 * <code>Session.AUTO_ACKNOWLEDGE</code>,
438 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
439 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
440 * @return a newly created session
441 * @throws JMSException
442 * if the <CODE>Connection</CODE> object fails to create a
443 * session due to some internal error or lack of support for the
444 * specific transaction and acknowledgement mode.
445 * @see Session#AUTO_ACKNOWLEDGE
446 * @see Session#CLIENT_ACKNOWLEDGE
447 * @see Session#DUPS_OK_ACKNOWLEDGE
448 * @since 1.1
449 */
450 public Session createSession(boolean transacted, int acknowledgeMode)
451 throws JMSException {
452 checkClosed();
453 sendConnectionInfoToBroker();
454 return new ActiveMQSession(
455 this,
456 (transacted ? Session.SESSION_TRANSACTED
457 : (acknowledgeMode == Session.SESSION_TRANSACTED ? Session.AUTO_ACKNOWLEDGE
458 : acknowledgeMode)));
459 }
460
461 /**
462 * Creates a <CODE>Session</CODE> object.
463 *
464 * @param transacted
465 * indicates whether the session is transacted
466 * @param acknowledgeMode
467 * indicates whether the consumer or the client will acknowledge
468 * any messages it receives; ignored if the session is
469 * transacted. Legal values are
470 * <code>Session.AUTO_ACKNOWLEDGE</code>,
471 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
472 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
473 * @param optimizedDispatch
474 * @return a newly created session
475 * @throws JMSException
476 * if the <CODE>Connection</CODE> object fails to create a
477 * session due to some internal error or lack of support for the
478 * specific transaction and acknowledgement mode.
479 * @see Session#AUTO_ACKNOWLEDGE
480 * @see Session#CLIENT_ACKNOWLEDGE
481 * @see Session#DUPS_OK_ACKNOWLEDGE
482 * @since 1.1
483 */
484 public Session createSession(boolean transacted, int acknowledgeMode,
485 boolean optimizedDispatch) throws JMSException {
486 checkClosed();
487 sendConnectionInfoToBroker();
488 return new ActiveMQSession(this,
489 (transacted ? Session.SESSION_TRANSACTED : acknowledgeMode),
490 optimizedDispatch);
491 }
492
493 /**
494 * Gets the client identifier for this connection.
495 * <P>
496 * This value is specific to the JMS provider. It is either preconfigured by
497 * an administrator in a <CODE> ConnectionFactory</CODE> object or assigned
498 * dynamically by the application by calling the <code>setClientID</code>
499 * method.
500 *
501 * @return the unique client identifier
502 * @throws JMSException
503 * if the JMS provider fails to return the client ID for this
504 * connection due to some internal error.
505 */
506 public String getClientID() throws JMSException {
507 checkClosed();
508 return this.clientID;
509 }
510
511 /**
512 * Sets the client identifier for this connection.
513 * <P>
514 * The preferred way to assign a JMS client's client identifier is for it to
515 * be configured in a client-specific <CODE>ConnectionFactory</CODE>
516 * object and transparently assigned to the <CODE>Connection</CODE> object
517 * it creates.
518 * <P>
519 * Alternatively, a client can set a connection's client identifier using a
520 * provider-specific value. The facility to set a connection's client
521 * identifier explicitly is not a mechanism for overriding the identifier
522 * that has been administratively configured. It is provided for the case
523 * where no administratively specified identifier exists. If one does exist,
524 * an attempt to change it by setting it must throw an <CODE>IllegalStateException</CODE>.
525 * If a client sets the client identifier explicitly, it must do so
526 * immediately after it creates the connection and before any other action
527 * on the connection is taken. After this point, setting the client
528 * identifier is a programming error that should throw an <CODE>IllegalStateException</CODE>.
529 * <P>
530 * The purpose of the client identifier is to associate a connection and its
531 * objects with a state maintained on behalf of the client by a provider.
532 * The only such state identified by the JMS API is that required to support
533 * durable subscriptions.
534 * <P>
535 * If another connection with the same <code>clientID</code> is already
536 * running when this method is called, the JMS provider should detect the
537 * duplicate ID and throw an <CODE>InvalidClientIDException</CODE>.
538 *
539 * @param newClientID
540 * the unique client identifier
541 * @throws JMSException
542 * if the JMS provider fails to set the client ID for this
543 * connection due to some internal error.
544 * @throws javax.jms.InvalidClientIDException
545 * if the JMS client specifies an invalid or duplicate client
546 * ID.
547 * @throws javax.jms.IllegalStateException
548 * if the JMS client attempts to set a connection's client ID at
549 * the wrong time or when it has been administratively
550 * configured.
551 */
552 public void setClientID(String newClientID) throws JMSException {
553 if (this.clientIDSet) {
554 throw new IllegalStateException("The clientID has already been set");
555 }
556 if (this.isConnectionInfoSentToBroker) {
557 throw new IllegalStateException(
558 "Setting clientID on a used Connection is not allowed");
559 }
560 checkClosed();
561 this.clientID = newClientID;
562 this.userSpecifiedClientID = true;
563 ensureClientIDInitialised();
564 }
565
566 /**
567 * Gets the metadata for this connection.
568 *
569 * @return the connection metadata
570 * @throws JMSException
571 * if the JMS provider fails to get the connection metadata for
572 * this connection.
573 * @see javax.jms.ConnectionMetaData
574 */
575 public ConnectionMetaData getMetaData() throws JMSException {
576 checkClosed();
577 return this.connectionMetaData;
578 }
579
580 /**
581 * Gets the <CODE>ExceptionListener</CODE> object for this connection. Not
582 * every <CODE>Connection</CODE> has an <CODE>ExceptionListener</CODE>
583 * associated with it.
584 *
585 * @return the <CODE>ExceptionListener</CODE> for this connection, or
586 * null. if no <CODE>ExceptionListener</CODE> is associated with
587 * this connection.
588 * @throws JMSException
589 * if the JMS provider fails to get the <CODE>ExceptionListener</CODE>
590 * for this connection.
591 * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
592 */
593 public ExceptionListener getExceptionListener() throws JMSException {
594 checkClosed();
595 return this.exceptionListener;
596 }
597
598 /**
599 * Sets an exception listener for this connection.
600 * <P>
601 * If a JMS provider detects a serious problem with a connection, it informs
602 * the connection's <CODE> ExceptionListener</CODE>, if one has been
603 * registered. It does this by calling the listener's <CODE>onException
604 * </CODE> method, passing it a <CODE>JMSException</CODE> object
605 * describing the problem.
606 * <P>
607 * An exception listener allows a client to be notified of a problem
608 * asynchronously. Some connections only consume messages, so they would
609 * have no other way to learn their connection has failed.
610 * <P>
611 * A connection serializes execution of its <CODE>ExceptionListener</CODE>.
612 * <P>
613 * A JMS provider should attempt to resolve connection problems itself
614 * before it notifies the client of them.
615 *
616 * @param listener
617 * the exception listener
618 * @throws JMSException
619 * if the JMS provider fails to set the exception listener for
620 * this connection.
621 */
622 public void setExceptionListener(ExceptionListener listener)
623 throws JMSException {
624 checkClosed();
625 this.exceptionListener = listener;
626 this.transportChannel.setExceptionListener(listener);
627 }
628
629 /**
630 * Starts (or restarts) a connection's delivery of incoming messages. A call
631 * to <CODE>start</CODE> on a connection that has already been started is
632 * ignored.
633 *
634 * @throws JMSException
635 * if the JMS provider fails to start message delivery due to
636 * some internal error.
637 * @see javax.jms.Connection#stop()
638 */
639 public void start() throws JMSException {
640 checkClosed();
641 if (started.commit(false, true)) {
642 // We have a change in connection info to send.
643 // send the Connection info again
644 sendConnectionInfoToBroker(sendConnectionInfoTimeout, true, false);
645 for (Iterator i = sessions.iterator(); i.hasNext();) {
646 ActiveMQSession s = (ActiveMQSession) i.next();
647 s.start();
648 }
649 }
650 }
651
652 /**
653 * @return true if this Connection is started
654 */
655 protected boolean isStarted() {
656 return started.get();
657 }
658
659 /**
660 * Temporarily stops a connection's delivery of incoming messages. Delivery
661 * can be restarted using the connection's <CODE>start</CODE> method. When
662 * the connection is stopped, delivery to all the connection's message
663 * consumers is inhibited: synchronous receives block, and messages are not
664 * delivered to message listeners.
665 * <P>
666 * This call blocks until receives and/or message listeners in progress have
667 * completed.
668 * <P>
669 * Stopping a connection has no effect on its ability to send messages. A
670 * call to <CODE>stop</CODE> on a connection that has already been stopped
671 * is ignored.
672 * <P>
673 * A call to <CODE>stop</CODE> must not return until delivery of messages
674 * has paused. This means that a client can rely on the fact that none of
675 * its message listeners will be called and that all threads of control
676 * waiting for <CODE>receive</CODE> calls to return will not return with a
677 * message until the connection is restarted. The receive timers for a
678 * stopped connection continue to advance, so receives may time out while
679 * the connection is stopped.
680 * <P>
681 * If message listeners are running when <CODE>stop</CODE> is invoked, the
682 * <CODE>stop</CODE> call must wait until all of them have returned before
683 * it may return. While these message listeners are completing, they must
684 * have the full services of the connection available to them.
685 *
686 * @throws JMSException
687 * if the JMS provider fails to stop message delivery due to
688 * some internal error.
689 * @see javax.jms.Connection#start()
690 */
691 public void stop() throws JMSException {
692 checkClosed();
693 if (started.commit(true, false)) {
694 for (Iterator i = sessions.iterator(); i.hasNext();) {
695 ActiveMQSession s = (ActiveMQSession) i.next();
696 s.stop();
697 }
698 sendConnectionInfoToBroker(2000, true, false);
699 }
700 }
701
702 /**
703 * Closes the connection.
704 * <P>
705 * Since a provider typically allocates significant resources outside the
706 * JVM on behalf of a connection, clients should close these resources when
707 * they are not needed. Relying on garbage collection to eventually reclaim
708 * these resources may not be timely enough.
709 * <P>
710 * There is no need to close the sessions, producers, and consumers of a
711 * closed connection.
712 * <P>
713 * Closing a connection causes all temporary destinations to be deleted.
714 * <P>
715 * When this method is invoked, it should not return until message
716 * processing has been shut down in an orderly fashion. This means that all
717 * message listeners that may have been running have returned, and that all
718 * pending receives have returned. A close terminates all pending message
719 * receives on the connection's sessions' consumers. The receives may return
720 * with a message or with null, depending on whether there was a message
721 * available at the time of the close. If one or more of the connection's
722 * sessions' message listeners is processing a message at the time when
723 * connection <CODE>close</CODE> is invoked, all the facilities of the
724 * connection and its sessions must remain available to those listeners
725 * until they return control to the JMS provider.
726 * <P>
727 * Closing a connection causes any of its sessions' transactions in progress
728 * to be rolled back. In the case where a session's work is coordinated by
729 * an external transaction manager, a session's <CODE>commit</CODE> and
730 * <CODE> rollback</CODE> methods are not used and the result of a closed
731 * session's work is determined later by the transaction manager. Closing a
732 * connection does NOT force an acknowledgment of client-acknowledged
733 * sessions.
734 * <P>
735 * Invoking the <CODE>acknowledge</CODE> method of a received message from
736 * a closed connection's session must throw an <CODE>IllegalStateException</CODE>.
737 * Closing a closed connection must NOT throw an exception.
738 *
739 * @throws JMSException
740 * if the JMS provider fails to close the connection due to some
741 * internal error. For example, a failure to release resources
742 * or to close a socket connection can cause this exception to
743 * be thrown.
744 */
745 public void close() throws JMSException {
746 this.transportChannel.setPendingStop(true);
747 synchronized (this) {
748 if (!closed) {
749 memoryManager.removeCapacityEventListener(this);
750 try {
751 closeTemporaryDestinations();
752 for (Iterator i = this.sessions.iterator(); i.hasNext();) {
753 ActiveMQSession s = (ActiveMQSession) i.next();
754 s.close();
755 }
756 for (Iterator i = this.connectionConsumers.iterator(); i
757 .hasNext();) {
758 ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer) i
759 .next();
760 c.close();
761 }
762 try {
763 sendConnectionInfoToBroker(sendCloseTimeout, true, true);
764 } catch (TimeoutExpiredException e) {
765 log
766 .warn("Failed to send close to broker, timeout expired of: "
767 + sendCloseTimeout + " millis");
768 }
769 this.connectionConsumers.clear();
770 this.messageDispatchers.clear();
771 this.transportChannel.stop();
772 } finally {
773 this.sessions.clear();
774 started.set(false);
775 factory.onConnectionClose(this);
776 }
777 closed = true;
778 transientConsumedRedeliverCache.clear();
779 validDestinationsMap.clear();
780 factoryStats.removeConnection(this);
781 }
782 }
783
784 }
785
786 /**
787 * Tells the broker to terminate its VM. This can be used to cleanly terminate a broker running in
788 * a standalone java process. Server must have property enable.vm.shutdown=true defined
789 * to allow this to work.
790 */
791 public void terminateBrokerVM() throws JMSException {
792 BrokerAdminCommand command = new BrokerAdminCommand();
793 command.setCommand(BrokerAdminCommand.SHUTDOWN_SERVER_VM);
794 asyncSendPacket(command);
795 }
796
797 /**
798 * simply throws an exception if the Connection is already closed
799 *
800 * @throws JMSException
801 */
802 protected synchronized void checkClosed() throws JMSException {
803 if (!startedTransport) {
804 startedTransport = true;
805 this.transportChannel.setCachingEnabled(isCachingEnabled());
806 if (useAsyncSend == false) {
807 this.transportChannel.setNoDelay(true);
808 }
809
810 this.transportChannel.setUsedInternally(internalConnection);
811 this.transportChannel.start();
812 if (transportChannel.doesSupportWireFormatVersioning()) {
813 WireFormatInfo info = new WireFormatInfo();
814 info.setVersion(transportChannel.getCurrentWireFormatVersion());
815 this.asyncSendPacket(info);
816 }
817 }
818 if (this.closed) {
819 throw new ConnectionClosedException();
820 }
821 }
822
823 /**
824 * Creates a connection consumer for this connection (optional operation).
825 * This is an expert facility not used by regular JMS clients.
826 *
827 * @param destination
828 * the destination to access
829 * @param messageSelector
830 * only messages with properties matching the message selector
831 * expression are delivered. A value of null or an empty string
832 * indicates that there is no message selector for the message
833 * consumer.
834 * @param sessionPool
835 * the server session pool to associate with this connection
836 * consumer
837 * @param maxMessages
838 * the maximum number of messages that can be assigned to a
839 * server session at one time
840 * @return the connection consumer
841 * @throws JMSException
842 * if the <CODE>Connection</CODE> object fails to create a
843 * connection consumer due to some internal error or invalid
844 * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
845 * @throws javax.jms.InvalidDestinationException
846 * if an invalid destination is specified.
847 * @throws javax.jms.InvalidSelectorException
848 * if the message selector is invalid.
849 * @see javax.jms.ConnectionConsumer
850 * @since 1.1
851 */
852 public ConnectionConsumer createConnectionConsumer(Destination destination,
853 String messageSelector, ServerSessionPool sessionPool,
854 int maxMessages) throws JMSException {
855 checkClosed();
856 ensureClientIDInitialised();
857 ConsumerInfo info = new ConsumerInfo();
858 info.setConsumerId(handleIdGenerator.generateId());
859 info.setDestination(ActiveMQMessageTransformation
860 .transformDestination(destination));
861 info.setSelector(messageSelector);
862 info.setConsumerNo(handleIdGenerator.getNextShortSequence());
863 info.setClientId(clientID);
864 return new ActiveMQConnectionConsumer(this, sessionPool, info,
865 maxMessages);
866 }
867
868 /**
869 * Creates a connection consumer for this connection (optional operation).
870 * This is an expert facility not used by regular JMS clients.
871 *
872 * @param destination
873 * the destination to access
874 * @param messageSelector
875 * only messages with properties matching the message selector
876 * expression are delivered. A value of null or an empty string
877 * indicates that there is no message selector for the message
878 * consumer.
879 * @param sessionPool
880 * the server session pool to associate with this connection
881 * consumer
882 * @param maxMessages
883 * the maximum number of messages that can be assigned to a
884 * server session at one time
885 * @param noLocal
886 * set true if you want to filter out messages published locally
887 *
888 * @return the connection consumer
889 * @throws JMSException
890 * if the <CODE>Connection</CODE> object fails to create a
891 * connection consumer due to some internal error or invalid
892 * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
893 * @throws javax.jms.InvalidDestinationException
894 * if an invalid destination is specified.
895 * @throws javax.jms.InvalidSelectorException
896 * if the message selector is invalid.
897 * @see javax.jms.ConnectionConsumer
898 * @since 1.1
899 */
900 public ConnectionConsumer createConnectionConsumer(Destination destination,
901 String messageSelector, ServerSessionPool sessionPool,
902 int maxMessages, boolean noLocal) throws JMSException {
903
904 checkClosed();
905 ensureClientIDInitialised();
906 ConsumerInfo info = new ConsumerInfo();
907 info.setConsumerId(handleIdGenerator.generateId());
908 info.setDestination(ActiveMQMessageTransformation
909 .transformDestination(destination));
910 info.setSelector(messageSelector);
911 info.setConsumerNo(handleIdGenerator.getNextShortSequence());
912 info.setNoLocal(noLocal);
913 info.setClientId(clientID);
914 return new ActiveMQConnectionConsumer(this, sessionPool, info,
915 maxMessages);
916 }
917
918
919
920 /**
921 * Create a durable connection consumer for this connection (optional
922 * operation). This is an expert facility not used by regular JMS clients.
923 *
924 * @param topic
925 * topic to access
926 * @param subscriptionName
927 * durable subscription name
928 * @param messageSelector
929 * only messages with properties matching the message selector
930 * expression are delivered. A value of null or an empty string
931 * indicates that there is no message selector for the message
932 * consumer.
933 * @param sessionPool
934 * the server session pool to associate with this durable
935 * connection consumer
936 * @param maxMessages
937 * the maximum number of messages that can be assigned to a
938 * server session at one time
939 * @return the durable connection consumer
940 * @throws JMSException
941 * if the <CODE>Connection</CODE> object fails to create a
942 * connection consumer due to some internal error or invalid
943 * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
944 * @throws javax.jms.InvalidDestinationException
945 * if an invalid destination is specified.
946 * @throws javax.jms.InvalidSelectorException
947 * if the message selector is invalid.
948 * @see javax.jms.ConnectionConsumer
949 * @since 1.1
950 */
951 public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
952 String subscriptionName, String messageSelector,
953 ServerSessionPool sessionPool, int maxMessages) throws JMSException {
954 checkClosed();
955 ensureClientIDInitialised();
956 ConsumerInfo info = new ConsumerInfo();
957 info.setConsumerId(this.handleIdGenerator.generateId());
958 info.setDestination(ActiveMQMessageTransformation
959 .transformDestination(topic));
960 info.setSelector(messageSelector);
961 info.setConsumerName(subscriptionName);
962 info.setConsumerNo(handleIdGenerator.getNextShortSequence());
963 info.setClientId(clientID);
964 return new ActiveMQConnectionConsumer(this, sessionPool, info,
965 maxMessages);
966 }
967
968 /**
969 * Create a durable connection consumer for this connection (optional
970 * operation). This is an expert facility not used by regular JMS clients.
971 *
972 * @param topic
973 * topic to access
974 * @param subscriptionName
975 * durable subscription name
976 * @param messageSelector
977 * only messages with properties matching the message selector
978 * expression are delivered. A value of null or an empty string
979 * indicates that there is no message selector for the message
980 * consumer.
981 * @param sessionPool
982 * the server session pool to associate with this durable
983 * connection consumer
984 * @param maxMessages
985 * the maximum number of messages that can be assigned to a
986 * server session at one time
987 * @param noLocal
988 * set true if you want to filter out messages published locally
989 *
990 * @return the durable connection consumer
991 * @throws JMSException
992 * if the <CODE>Connection</CODE> object fails to create a
993 * connection consumer due to some internal error or invalid
994 * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
995 * @throws javax.jms.InvalidDestinationException
996 * if an invalid destination is specified.
997 * @throws javax.jms.InvalidSelectorException
998 * if the message selector is invalid.
999 * @see javax.jms.ConnectionConsumer
1000 * @since 1.1
1001 */
1002 public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
1003 String subscriptionName, String messageSelector,
1004 ServerSessionPool sessionPool, int maxMessages, boolean noLocal) throws JMSException {
1005 checkClosed();
1006 ensureClientIDInitialised();
1007 ConsumerInfo info = new ConsumerInfo();
1008 info.setConsumerId(this.handleIdGenerator.generateId());
1009 info.setDestination(ActiveMQMessageTransformation
1010 .transformDestination(topic));
1011 info.setSelector(messageSelector);
1012 info.setConsumerName(subscriptionName);
1013 info.setNoLocal(noLocal);
1014 info.setConsumerNo(handleIdGenerator.getNextShortSequence());
1015 info.setClientId(clientID);
1016 return new ActiveMQConnectionConsumer(this, sessionPool, info,
1017 maxMessages);
1018 }
1019
1020 /**
1021 * Implementation of the PacketListener interface - consume a packet
1022 *
1023 * @param packet -
1024 * the Packet to consume
1025 * @see org.activemq.message.PacketListener#consume(org.activemq.message.Packet)
1026 */
1027 public void consume(Packet packet) {
1028 if (!closed && packet != null) {
1029 if (packet.isJMSMessage()) {
1030 ActiveMQMessage message = (ActiveMQMessage) packet;
1031 message.setReadOnly(true);
1032 message.setConsumerIdentifer(clientID);
1033
1034 // lets check for expired messages which is only relevant for
1035 // multicast based stuff
1036 // as a pointcast based network should filter out this stuff
1037 if (transportChannel.isMulticast()) {
1038 long expiration = message.getJMSExpiration();
1039 if (expiration > 0) {
1040 long timeStamp = System.currentTimeMillis();
1041 if (timeStamp > expiration) {
1042 if (log.isDebugEnabled()) {
1043 log.debug("Discarding expired message: " + message);
1044 }
1045 return;
1046 }
1047 }
1048 }
1049
1050 try {
1051 message = assembleMessage(message);
1052 if( message !=null ) {
1053 int count = 0;
1054 for (Iterator i = this.messageDispatchers.iterator(); i.hasNext();) {
1055 ActiveMQMessageDispatcher dispatcher = (ActiveMQMessageDispatcher) i.next();
1056 if (dispatcher.isTarget(message)) {
1057 if (count > 0) {
1058 // separate message for each Session etc.
1059 message = message.deepCopy();
1060 }
1061 dispatcher.dispatch(message);
1062 count++;
1063 }
1064 }
1065 }
1066 } catch (JMSException jmsEx) {
1067 handleAsyncException(jmsEx);
1068 }
1069 } else if (packet.getPacketType() == Packet.CAPACITY_INFO) {
1070 CapacityInfo info = (CapacityInfo) packet;
1071 synchronized(flowControlMutex) {
1072 flowControlSleepTime = info.getFlowControlTimeout();
1073 }
1074 // System.out.println("SET FLOW TIMEOUT = " +
1075 // flowControlSleepTime + " FOR " + info);
1076 } else if (packet.getPacketType() == Packet.KEEP_ALIVE && packet.isReceiptRequired()) {
1077 Receipt receipt = new Receipt();
1078 receipt.setCorrelationId(packet.getId());
1079 receipt.setReceiptRequired(false);
1080 try {
1081 asyncSendPacket(receipt);
1082 } catch (JMSException jmsEx) {
1083 handleAsyncException(jmsEx);
1084 }
1085 }
1086 }
1087 }
1088
1089 private final ActiveMQMessage assembleMessage(ActiveMQMessage message) {
1090 ActiveMQMessage result = message;
1091 if (message != null && !isInternalConnection() && message.isMessagePart()) {
1092 if (message.getNumberOfParts() == 1) {
1093 //passed though from another session - i.e.
1094 //a network or remote connection and now assembled
1095 message.resetMessagePart();
1096 result = message;
1097 }
1098 else {
1099 result = null;
1100 String parentId = message.getParentMessageID();
1101 ActiveMQMessage[] array = (ActiveMQMessage[]) assemblies.get(parentId);
1102 if (array == null) {
1103 array = new ActiveMQMessage[message.getNumberOfParts()];
1104 assemblies.put(parentId, array);
1105 }
1106 array[message.getPartNumber()] = message;
1107 boolean complete = true;
1108 for (int i = 0;i < array.length;i++) {
1109 complete &= array[i] != null;
1110 }
1111 if (complete) {
1112 result = array[0];
1113 ByteArray[] bas = new ByteArray[array.length];
1114 try {
1115 for (int i = 0;i < bas.length;i++) {
1116 bas[i] = array[i].getBodyAsBytes();
1117 if (i >= 1){
1118 array[i].clearBody();
1119 }
1120 }
1121 ByteArray ba = fragmentation.assemble(bas);
1122 result.setBodyAsBytes(ba);
1123 }
1124 catch (IOException ioe) {
1125 JMSException jmsEx = new JMSException("Failed to assemble fragment message: " + parentId);
1126 jmsEx.setLinkedException(ioe);
1127 onException(jmsEx);
1128 }catch(JMSException jmsEx){
1129 onException(jmsEx);
1130 }
1131 }
1132 }
1133 }
1134 return result;
1135 }
1136
1137 /**
1138 * @see javax.jms.ExceptionListener#onException(javax.jms.JMSException)
1139 */
1140 public void onException(JMSException jmsEx) {
1141 // Got an exception propagated up from the transport channel
1142 handleAsyncException(jmsEx);
1143 isTransportOK = false;
1144 try {
1145 close();
1146 } catch (JMSException ex) {
1147 log.debug("Exception closing the connection", ex);
1148 }
1149 }
1150
1151 /**
1152 * Creates a <CODE>TopicSession</CODE> object.
1153 *
1154 * @param transacted
1155 * indicates whether the session is transacted
1156 * @param acknowledgeMode
1157 * indicates whether the consumer or the client will acknowledge
1158 * any messages it receives; ignored if the session is
1159 * transacted. Legal values are
1160 * <code>Session.AUTO_ACKNOWLEDGE</code>,
1161 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1162 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1163 * @return a newly created topic session
1164 * @throws JMSException
1165 * if the <CODE>TopicConnection</CODE> object fails to create
1166 * a session due to some internal error or lack of support for
1167 * the specific transaction and acknowledgement mode.
1168 * @see Session#AUTO_ACKNOWLEDGE
1169 * @see Session#CLIENT_ACKNOWLEDGE
1170 * @see Session#DUPS_OK_ACKNOWLEDGE
1171 */
1172 public TopicSession createTopicSession(boolean transacted,
1173 int acknowledgeMode) throws JMSException {
1174 checkClosed();
1175 sendConnectionInfoToBroker();
1176 return new ActiveMQTopicSession((ActiveMQSession) createSession(
1177 transacted, acknowledgeMode));
1178 }
1179
1180 /**
1181 * Creates a connection consumer for this connection (optional operation).
1182 * This is an expert facility not used by regular JMS clients.
1183 *
1184 * @param topic
1185 * the topic to access
1186 * @param messageSelector
1187 * only messages with properties matching the message selector
1188 * expression are delivered. A value of null or an empty string
1189 * indicates that there is no message selector for the message
1190 * consumer.
1191 * @param sessionPool
1192 * the server session pool to associate with this connection
1193 * consumer
1194 * @param maxMessages
1195 * the maximum number of messages that can be assigned to a
1196 * server session at one time
1197 * @return the connection consumer
1198 * @throws JMSException
1199 * if the <CODE>TopicConnection</CODE> object fails to create
1200 * a connection consumer due to some internal error or invalid
1201 * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
1202 * @throws InvalidDestinationException
1203 * if an invalid topic is specified.
1204 * @throws InvalidSelectorException
1205 * if the message selector is invalid.
1206 * @see javax.jms.ConnectionConsumer
1207 */
1208 public ConnectionConsumer createConnectionConsumer(Topic topic,
1209 String messageSelector, ServerSessionPool sessionPool,
1210 int maxMessages) throws JMSException {
1211 checkClosed();
1212 ensureClientIDInitialised();
1213 ConsumerInfo info = new ConsumerInfo();
1214 info.setConsumerId(this.handleIdGenerator.generateId());
1215 info.setDestination(ActiveMQMessageTransformation
1216 .transformDestination(topic));
1217 info.setSelector(messageSelector);
1218 info.setConsumerNo(handleIdGenerator.getNextShortSequence());
1219 info.setClientId(clientID);
1220 return new ActiveMQConnectionConsumer(this, sessionPool, info,
1221 maxMessages);
1222 }
1223
1224 /**
1225 * Creates a <CODE>QueueSession</CODE> object.
1226 *
1227 * @param transacted
1228 * indicates whether the session is transacted
1229 * @param acknowledgeMode
1230 * indicates whether the consumer or the client will acknowledge
1231 * any messages it receives; ignored if the session is
1232 * transacted. Legal values are
1233 * <code>Session.AUTO_ACKNOWLEDGE</code>,
1234 * <code>Session.CLIENT_ACKNOWLEDGE</code>, and
1235 * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
1236 * @return a newly created queue session
1237 * @throws JMSException
1238 * if the <CODE>QueueConnection</CODE> object fails to create
1239 * a session due to some internal error or lack of support for
1240 * the specific transaction and acknowledgement mode.
1241 * @see Session#AUTO_ACKNOWLEDGE
1242 * @see Session#CLIENT_ACKNOWLEDGE
1243 * @see Session#DUPS_OK_ACKNOWLEDGE
1244 */
1245 public QueueSession createQueueSession(boolean transacted,
1246 int acknowledgeMode) throws JMSException {
1247 checkClosed();
1248 sendConnectionInfoToBroker();
1249 return new ActiveMQQueueSession((ActiveMQSession) createSession(
1250 transacted, acknowledgeMode));
1251 }
1252
1253 /**
1254 * Creates a connection consumer for this connection (optional operation).
1255 * This is an expert facility not used by regular JMS clients.
1256 *
1257 * @param queue
1258 * the queue to access
1259 * @param messageSelector
1260 * only messages with properties matching the message selector
1261 * expression are delivered. A value of null or an empty string
1262 * indicates that there is no message selector for the message
1263 * consumer.
1264 * @param sessionPool
1265 * the server session pool to associate with this connection
1266 * consumer
1267 * @param maxMessages
1268 * the maximum number of messages that can be assigned to a
1269 * server session at one time
1270 * @return the connection consumer
1271 * @throws JMSException
1272 * if the <CODE>QueueConnection</CODE> object fails to create
1273 * a connection consumer due to some internal error or invalid
1274 * arguments for <CODE>sessionPool</CODE> and <CODE>messageSelector</CODE>.
1275 * @throws InvalidDestinationException
1276 * if an invalid queue is specified.
1277 * @throws InvalidSelectorException
1278 * if the message selector is invalid.
1279 * @see javax.jms.ConnectionConsumer
1280 */
1281 public ConnectionConsumer createConnectionConsumer(Queue queue,
1282 String messageSelector, ServerSessionPool sessionPool,
1283 int maxMessages) throws JMSException {
1284 checkClosed();
1285 ensureClientIDInitialised();
1286 ConsumerInfo info = new ConsumerInfo();
1287 info.setConsumerId(this.handleIdGenerator.generateId());
1288 info.setDestination(ActiveMQMessageTransformation
1289 .transformDestination(queue));
1290 info.setSelector(messageSelector);
1291 info.setConsumerNo(handleIdGenerator.getNextShortSequence());
1292 info.setClientId(clientID);
1293 return new ActiveMQConnectionConsumer(this, sessionPool, info,
1294 maxMessages);
1295 }
1296
1297 /**
1298 * Ensures that the clientID was manually specified and not auto-generated.
1299 * If the clientID was not specified this method will throw an exception.
1300 * This method is used to ensure that the clientID + durableSubscriber name
1301 * are used correctly.
1302 *
1303 * @throws JMSException
1304 */
1305 public void checkClientIDWasManuallySpecified() throws JMSException {
1306 if (!userSpecifiedClientID) {
1307 throw new JMSException(
1308 "You cannot create a durable subscriber without specifying a unique clientID on a Connection");
1309 }
1310 }
1311
1312 /**
1313 * handle disconnect/reconnect events
1314 *
1315 * @param event
1316 */
1317 public void statusChanged(TransportStatusEvent event) {
1318 log.info("channel status changed: " + event);
1319 if (event.getChannelStatus() == TransportStatusEvent.RECONNECTED) {
1320 isTransportOK = true;
1321 doReconnect();
1322
1323 } else if (event.getChannelStatus() == TransportStatusEvent.DISCONNECTED) {
1324 isTransportOK = false;
1325 clearMessagesInProgress();
1326 }
1327 }
1328
1329 /**
1330 * send a Packet through the Connection - for internal use only
1331 *
1332 * @param packet
1333 * @throws JMSException
1334 */
1335 public void asyncSendPacket(Packet packet) throws JMSException {
1336 asyncSendPacket(packet, true);
1337 }
1338
1339 /**
1340 * send a Packet through the Connection - for internal use only
1341 *
1342 * @param packet
1343 * @param doSendWhileReconnecting
1344 * @throws JMSException
1345 */
1346 public void asyncSendPacket(Packet packet, boolean doSendWhileReconnecting)
1347 throws JMSException {
1348 if (isTransportOK
1349 && !closed
1350 && (doSendWhileReconnecting || transportChannel
1351 .isTransportConnected())) {
1352 packet.setId(packetIdGenerator.getNextShortSequence());
1353 packet.setReceiptRequired(false);
1354 synchronized(flowControlMutex) {
1355 if (packet.isJMSMessage() && flowControlSleepTime > 0) {
1356 try {
1357 Thread.sleep(flowControlSleepTime);
1358 } catch (InterruptedException e) {
1359 }
1360 }
1361 }
1362 this.transportChannel.asyncSend(packet);
1363 }
1364 }
1365
1366 /**
1367 * send a Packet through a Connection - for internal use only
1368 *
1369 * @param packet
1370 * @throws JMSException
1371 */
1372 public void syncSendPacket(Packet packet) throws JMSException {
1373 syncSendPacket(packet, 0);
1374 }
1375
1376 /**
1377 * Send a packet through a Connection - for internal use only
1378 *
1379 * @param packet
1380 * @param timeout
1381 * @throws JMSException
1382 */
1383 public void syncSendPacket(Packet packet, int timeout) throws JMSException {
1384 if (isTransportOK && !closed) {
1385 Receipt receipt;
1386 packet.setId(packetIdGenerator.getNextShortSequence());
1387 packet.setReceiptRequired(true);
1388 receipt = this.transportChannel.send(packet, timeout);
1389 if (receipt != null) {
1390 if (receipt.isFailed()) {
1391 Throwable e = receipt.getException();
1392 if (e != null) {
1393 throw JMSExceptionHelper.newJMSException(e);
1394 }
1395 throw new JMSException(
1396 "syncSendPacket failed with unknown exception");
1397 }
1398 }
1399 } else {
1400 if (closed) {
1401 throw new ConnectionClosedException();
1402 } else {
1403 throw new JMSException(
1404 "syncSendTimedOut: connection no longer OK");
1405 }
1406 }
1407 }
1408
1409 public Receipt syncSendRequest(Packet packet) throws JMSException {
1410 checkClosed();
1411 if (isTransportOK && !closed) {
1412 Receipt receipt;
1413 packet.setReceiptRequired(true);
1414 packet.setId(this.packetIdGenerator.getNextShortSequence());
1415
1416 receipt = this.transportChannel.send(packet);
1417 if (receipt != null && receipt.isFailed()) {
1418 Throwable e = receipt.getException();
1419 if (e != null) {
1420 throw (JMSException) new JMSException(e.getMessage())
1421 .initCause(e);
1422 }
1423 throw new JMSException(
1424 "syncSendPacket failed with unknown exception");
1425 }
1426 return receipt;
1427 } else {
1428 if (closed) {
1429 throw new ConnectionClosedException();
1430 } else {
1431 throw new JMSException(
1432 "syncSendTimedOut: connection no longer OK");
1433 }
1434 }
1435 }
1436
1437 // Properties
1438 // -------------------------------------------------------------------------
1439
1440 /**
1441 * @return Returns the prefetchPolicy.
1442 */
1443 public ActiveMQPrefetchPolicy getPrefetchPolicy() {
1444 return prefetchPolicy;
1445 }
1446
1447 /**
1448 * @param prefetchPolicy
1449 * The prefetchPolicy to set.
1450 */
1451 public void setPrefetchPolicy(ActiveMQPrefetchPolicy prefetchPolicy) {
1452 this.prefetchPolicy = prefetchPolicy;
1453 }
1454
1455 public int getSendCloseTimeout() {
1456 return sendCloseTimeout;
1457 }
1458
1459 public void setSendCloseTimeout(int sendCloseTimeout) {
1460 this.sendCloseTimeout = sendCloseTimeout;
1461 }
1462
1463 public int getSendConnectionInfoTimeout() {
1464 return sendConnectionInfoTimeout;
1465 }
1466
1467 public void setSendConnectionInfoTimeout(int sendConnectionInfoTimeout) {
1468 this.sendConnectionInfoTimeout = sendConnectionInfoTimeout;
1469 }
1470
1471 public TransportChannel getTransportChannel() {
1472 return transportChannel;
1473 }
1474
1475 /**
1476 * Returns the clientID of the connection, forcing one to be generated if
1477 * one has not yet been configured
1478 */
1479 public String getInitializedClientID() throws JMSException {
1480 ensureClientIDInitialised();
1481 return this.clientID;
1482 }
1483
1484 // Implementation methods
1485 // -------------------------------------------------------------------------
1486
1487 /**
1488 * Used internally for adding Sessions to the Connection
1489 *
1490 * @param session
1491 * @throws JMSException
1492 */
1493 protected void addSession(ActiveMQSession session) throws JMSException {
1494 this.sessions.add(session);
1495 addMessageDispatcher(session);
1496 if (started.get()) {
1497 session.start();
1498 }
1499 SessionInfo info = createSessionInfo(session);
1500 info.setStarted(true);
1501 asyncSendPacket(info);
1502 }
1503
1504 /**
1505 * Used interanlly for removing Sessions from a Connection
1506 *
1507 * @param session
1508 * @throws JMSException
1509 */
1510 protected void removeSession(ActiveMQSession session) throws JMSException {
1511 this.sessions.remove(session);
1512 removeMessageDispatcher(session);
1513 SessionInfo info = createSessionInfo(session);
1514 info.setStarted(false);
1515 asyncSendPacket(info, false);
1516 }
1517
1518 private SessionInfo createSessionInfo(ActiveMQSession session) {
1519 SessionInfo info = new SessionInfo();
1520 info.setClientId(clientID);
1521 info.setSessionId(session.getSessionId());
1522 info.setStartTime(session.getStartTime());
1523 return info;
1524 }
1525
1526 /**
1527 * Add a ConnectionConsumer
1528 *
1529 * @param connectionConsumer
1530 * @throws JMSException
1531 */
1532 protected void addConnectionConsumer(
1533 ActiveMQConnectionConsumer connectionConsumer) throws JMSException {
1534 this.connectionConsumers.add(connectionConsumer);
1535 addMessageDispatcher(connectionConsumer);
1536 }
1537
1538 /**
1539 * Remove a ConnectionConsumer
1540 *
1541 * @param connectionConsumer
1542 */
1543 protected void removeConnectionConsumer(
1544 ActiveMQConnectionConsumer connectionConsumer) {
1545 this.connectionConsumers.add(connectionConsumer);
1546 removeMessageDispatcher(connectionConsumer);
1547 }
1548
1549 /**
1550 * Add a Message dispatcher to receive messages from the Broker
1551 *
1552 * @param messageDispatch
1553 * @throws JMSException
1554 * if an internal error
1555 */
1556 protected void addMessageDispatcher(
1557 ActiveMQMessageDispatcher messageDispatch) throws JMSException {
1558 this.messageDispatchers.add(messageDispatch);
1559 }
1560
1561 /**
1562 * Remove a Message dispatcher
1563 *
1564 * @param messageDispatcher
1565 */
1566 protected void removeMessageDispatcher(
1567 ActiveMQMessageDispatcher messageDispatcher) {
1568 this.messageDispatchers.remove(messageDispatcher);
1569 }
1570
1571 /**
1572 * Used for handling async exceptions
1573 *
1574 * @param jmsEx
1575 */
1576 protected void handleAsyncException(JMSException jmsEx) {
1577 if (!closed) {
1578 if (this.exceptionListener != null) {
1579 this.exceptionListener.onException(jmsEx);
1580 } else {
1581 log.warn(
1582 "Async exception with no exception listener: " + jmsEx,
1583 jmsEx);
1584 }
1585 }
1586 }
1587
1588 protected void sendConnectionInfoToBroker() throws JMSException {
1589 sendConnectionInfoToBroker(sendConnectionInfoTimeout, closed, false);
1590 }
1591
1592 /**
1593 * Send the ConnectionInfo to the Broker
1594 *
1595 * @param timeout
1596 * @param isClosed
1597 * @throws JMSException
1598 */
1599 protected void sendConnectionInfoToBroker(int timeout, boolean forceResend,
1600 boolean closing) throws JMSException {
1601 // Can we skip sending the ConnectionInfo packet??
1602 if (isConnectionInfoSentToBroker && !forceResend) {
1603 return;
1604 }
1605
1606 fragmentation.setFragmentationLimit(getMessageFragmentationLimit());
1607
1608 this.isConnectionInfoSentToBroker = true;
1609 ensureClientIDInitialised();
1610 ConnectionInfo info = new ConnectionInfo();
1611 info.setClientId(this.clientID);
1612 info.setHostName(IdGenerator.getHostName());
1613 info.setUserName(userName);
1614 info.setPassword(password);
1615 info.setStartTime(startTime);
1616 info.setStarted(started.get());
1617 info.setClosed(closed || closing);
1618 info.setClientVersion(connectionMetaData.getProviderVersion());
1619 info.setWireFormatVersion(transportChannel
1620 .getCurrentWireFormatVersion());
1621 if (info.getProperties() != null) {
1622 info.getProperties().setProperty(ConnectionInfo.NO_DELAY_PROPERTY,
1623 new Boolean(!useAsyncSend).toString());
1624 }
1625 if (quickClose && info.isClosed()) {
1626 asyncSendPacket(info);
1627 } else {
1628 syncSendPacket(info, timeout);
1629 }
1630 }
1631
1632 /**
1633 * Set the maximum amount of memory this Connection should use for buffered
1634 * inbound messages
1635 *
1636 * @param newMemoryLimit
1637 * the new memory limit in bytes
1638 */
1639 public void setConnectionMemoryLimit(int newMemoryLimit) {
1640 memoryManager.setValueLimit(newMemoryLimit);
1641 }
1642
1643 /**
1644 * Get the current value for the maximum amount of memory this Connection
1645 * should use for buffered inbound messages
1646 *
1647 * @return the current limit in bytes
1648 */
1649 public int getConnectionMemoryLimit() {
1650 return (int) memoryManager.getValueLimit();
1651 }
1652
1653 /**
1654 * CapacityMonitorEventListener implementation called when the capacity of a
1655 * CapacityService changes
1656 *
1657 * @param event
1658 */
1659 public void capacityChanged(CapacityMonitorEvent event) {
1660 // send the event to broker ...
1661 CapacityInfo info = new CapacityInfo();
1662 info.setResourceName(event.getMonitorName());
1663 info.setCapacity(event.getCapacity());
1664 // System.out.println("Cap changed: " + event);
1665 try {
1666 asyncSendPacket(info, false);
1667 } catch (JMSException e) {
1668 JMSException jmsEx = new JMSException(
1669 "failed to send change in capacity");
1670 jmsEx.setLinkedException(e);
1671 handleAsyncException(jmsEx);
1672 }
1673 }
1674
1675 /**
1676 * @return a number unique for this connection
1677 */
1678 protected int getNextConsumerNumber() {
1679 return this.consumerNumberGenerator.increment();
1680 }
1681
1682 protected short generateSessionId() {
1683 return this.sessionIdGenerator.getNextShortSequence();
1684 }
1685
1686 private synchronized void ensureClientIDInitialised() {
1687 if (this.clientID == null || this.clientID.trim().equals("")) {
1688 this.clientID = this.clientIdGenerator.generateId();
1689 }
1690 transportChannel.setClientID(clientID);
1691 this.clientIDSet = true;
1692 }
1693
1694 protected MemoryBoundedQueue getMemoryBoundedQueue(String name) {
1695 return boundedQueueManager.getMemoryBoundedQueue(name);
1696 }
1697
1698 protected void doReconnect() {
1699 try {
1700 // send the Connection info again
1701 sendConnectionInfoToBroker(sendConnectionInfoTimeout, true, false);
1702 for (Iterator iter = sessions.iterator(); iter.hasNext();) {
1703 ActiveMQSession session = (ActiveMQSession) iter.next();
1704 SessionInfo sessionInfo = createSessionInfo(session);
1705 sessionInfo.setStarted(true);
1706 asyncSendPacket(sessionInfo, false);
1707 // send consumers
1708 for (Iterator consumersIterator = session.consumers.iterator(); consumersIterator
1709 .hasNext();) {
1710 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) consumersIterator
1711 .next();
1712 ConsumerInfo consumerInfo = session
1713 .createConsumerInfo(consumer);
1714 consumerInfo.setStarted(true);
1715 asyncSendPacket(consumerInfo, false);
1716 }
1717 // send producers
1718 for (Iterator producersIterator = session.producers.iterator(); producersIterator
1719 .hasNext();) {
1720 ActiveMQMessageProducer producer = (ActiveMQMessageProducer) producersIterator
1721 .next();
1722 ProducerInfo producerInfo = session
1723 .createProducerInfo(producer);
1724 producerInfo.setStarted(true);
1725 asyncSendPacket(producerInfo, false);
1726 }
1727 // send the current capacity
1728 CapacityMonitorEvent event = memoryManager
1729 .generateCapacityMonitorEvent();
1730 if (event != null) {
1731 capacityChanged(event);
1732 }
1733 }
1734 } catch (JMSException jmsEx) {
1735 log.error("Failed to do reconnection");
1736 handleAsyncException(jmsEx);
1737 isTransportOK = false;
1738 }
1739 }
1740
1741 /**
1742 * @return Returns the useAsyncSend.
1743 */
1744 public boolean isUseAsyncSend() {
1745 return useAsyncSend;
1746 }
1747
1748 /**
1749 * @param useAsyncSend
1750 * The useAsyncSend to set.
1751 */
1752 public void setUseAsyncSend(boolean useAsyncSend) {
1753 this.useAsyncSend = useAsyncSend;
1754 }
1755
1756 /**
1757 * @return Returns the cachingEnabled.
1758 */
1759 public boolean isCachingEnabled() {
1760 return cachingEnabled;
1761 }
1762
1763 /**
1764 * @param cachingEnabled
1765 * The cachingEnabled to set.
1766 */
1767 public void setCachingEnabled(boolean cachingEnabled) {
1768 this.cachingEnabled = cachingEnabled;
1769 }
1770
1771 /**
1772 * @return Returns the j2EEcompliant.
1773 */
1774 public boolean isJ2EEcompliant() {
1775 return J2EEcompliant;
1776 }
1777
1778 /**
1779 * @param ecompliant
1780 * The j2EEcompliant to set.
1781 */
1782 public void setJ2EEcompliant(boolean ecompliant) {
1783 J2EEcompliant = ecompliant;
1784 }
1785
1786 /**
1787 * @return Returns the internalConnection.
1788 */
1789 public boolean isInternalConnection() {
1790 return internalConnection;
1791 }
1792
1793 /**
1794 * @param internalConnection
1795 * The internalConnection to set.
1796 */
1797 public void setInternalConnection(boolean internalConnection) {
1798 this.internalConnection = internalConnection;
1799 }
1800
1801 /**
1802 * @return Returns the doMessageCompression.
1803 */
1804 public boolean isDoMessageCompression() {
1805 return doMessageCompression
1806 && transportChannel.doesSupportMessageCompression();
1807 }
1808
1809 /**
1810 * @param doMessageCompression
1811 * The doMessageCompression to set.
1812 */
1813 public void setDoMessageCompression(boolean doMessageCompression) {
1814 this.doMessageCompression = doMessageCompression
1815 && transportChannel.doesSupportMessageCompression();
1816 }
1817
1818 /**
1819 * @return Returns the doMessageFragmentation.
1820 */
1821 public boolean isDoMessageFragmentation() {
1822 return doMessageFragmentation
1823 && transportChannel.doesSupportMessageFragmentation();
1824 }
1825
1826 /**
1827 * @param doMessageFragmentation
1828 * The doMessageFragmentation to set.
1829 */
1830 public void setDoMessageFragmentation(boolean doMessageFragmentation) {
1831 this.doMessageFragmentation = doMessageFragmentation
1832 && transportChannel.doesSupportMessageFragmentation();
1833 }
1834
1835 /**
1836 * @return Returns the messageCompressionLevel.
1837 */
1838 public int getMessageCompressionLevel() {
1839 return messageCompressionLevel;
1840 }
1841
1842 /**
1843 * @param messageCompressionLevel
1844 * The messageCompressionLevel to set.
1845 */
1846 public void setMessageCompressionLevel(int messageCompressionLevel) {
1847 this.messageCompressionLevel = messageCompressionLevel;
1848 }
1849
1850 /**
1851 * @return Returns the messageCompressionLimit.
1852 */
1853 public int getMessageCompressionLimit() {
1854 return messageCompressionLimit;
1855 }
1856
1857 /**
1858 * @param messageCompressionLimit
1859 * The messageCompressionLimit to set.
1860 */
1861 public void setMessageCompressionLimit(int messageCompressionLimit) {
1862 this.messageCompressionLimit = messageCompressionLimit;
1863 }
1864
1865 /**
1866 * @return Returns the messageCompressionStrategy.
1867 */
1868 public int getMessageCompressionStrategy() {
1869 return messageCompressionStrategy;
1870 }
1871
1872 /**
1873 * @param messageCompressionStrategy
1874 * The messageCompressionStrategy to set.
1875 */
1876 public void setMessageCompressionStrategy(int messageCompressionStrategy) {
1877 this.messageCompressionStrategy = messageCompressionStrategy;
1878 }
1879
1880 /**
1881 * @return Returns the messageFragmentationLimit.
1882 */
1883 public int getMessageFragmentationLimit() {
1884 return messageFragmentationLimit;
1885 }
1886
1887 /**
1888 * @param messageFragmentationLimit
1889 * The messageFragmentationLimit to set.
1890 */
1891 public void setMessageFragmentationLimit(int messageFragmentationLimit) {
1892 this.messageFragmentationLimit = messageFragmentationLimit;
1893 }
1894
1895 /**
1896 * @return Returns the disableTimeStampsByDefault.
1897 */
1898 public boolean isDisableTimeStampsByDefault() {
1899 return disableTimeStampsByDefault;
1900 }
1901
1902 /**
1903 * @param disableTimeStampsByDefault
1904 * The disableTimeStampsByDefault to set.
1905 */
1906 public void setDisableTimeStampsByDefault(boolean disableTimeStampsByDefault) {
1907 this.disableTimeStampsByDefault = disableTimeStampsByDefault;
1908 }
1909
1910 /**
1911 * Causes pre-serialization of messages before send By default this is on
1912 *
1913 * @return Returns the prePrepareMessageOnSend.
1914 */
1915 public boolean isPrepareMessageBodyOnSend() {
1916 return prepareMessageBodyOnSend;
1917 }
1918
1919 /**
1920 * Causes pre-serialization of messages before send By default this is on
1921 *
1922 * @param prePrepareMessageOnSend
1923 * The prePrepareMessageOnSend to set.
1924 */
1925 public void setPrepareMessageBodyOnSend(boolean prePrepareMessageOnSend) {
1926 this.prepareMessageBodyOnSend = prePrepareMessageOnSend;
1927 }
1928
1929 /**
1930 * @return Returns the copyMessageOnSend.
1931 */
1932 public boolean isCopyMessageOnSend() {
1933 return copyMessageOnSend;
1934 }
1935
1936 /**
1937 * @param copyMessageOnSend
1938 * The copyMessageOnSend to set.
1939 */
1940 public void setCopyMessageOnSend(boolean copyMessageOnSend) {
1941 this.copyMessageOnSend = copyMessageOnSend;
1942 }
1943
1944 /**
1945 * @return Returns the quickClose.
1946 */
1947 public boolean isQuickClose() {
1948 return quickClose;
1949 }
1950
1951 /**
1952 * @param quickClose
1953 * The quickClose to set.
1954 */
1955 public void setQuickClose(boolean quickClose) {
1956 this.quickClose = quickClose;
1957 }
1958
1959 /**
1960 * @return Returns the optimizedMessageDispatch.
1961 */
1962 public boolean isOptimizedMessageDispatch() {
1963 return optimizedMessageDispatch;
1964 }
1965
1966 /**
1967 * @param optimizedMessageDispatch
1968 * The optimizedMessageDispatch to set.
1969 */
1970 public void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
1971 this.optimizedMessageDispatch = optimizedMessageDispatch;
1972 }
1973
1974 protected void clearMessagesInProgress() {
1975 for (Iterator i = sessions.iterator(); i.hasNext();) {
1976 ActiveMQSession session = (ActiveMQSession) i.next();
1977 session.clearMessagesInProgress();
1978 }
1979 }
1980
1981 /**
1982 * Tells the broker to destroy a destination.
1983 *
1984 * @param destination
1985 */
1986 public void destroyDestination(ActiveMQDestination destination)
1987 throws JMSException {
1988 BrokerAdminCommand command = new BrokerAdminCommand();
1989 command.setCommand(BrokerAdminCommand.DESTROY_DESTINATION);
1990 command.setDestination(destination);
1991 syncSendPacket(command);
1992 }
1993
1994 /**
1995 * Cleans up this connection so that it's state is as if the connection was
1996 * just created. This allows the Resource Adapter to clean up a connection
1997 * so that it can be reused without having to close and recreate the
1998 * connection.
1999 *
2000 * @param sessionId
2001 */
2002 public void cleanup() throws JMSException {
2003
2004 try {
2005 for (Iterator i = this.sessions.iterator(); i.hasNext();) {
2006 ActiveMQSession s = (ActiveMQSession) i.next();
2007 s.close();
2008 }
2009 for (Iterator i = this.connectionConsumers.iterator(); i.hasNext();) {
2010 ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer) i
2011 .next();
2012 c.close();
2013 }
2014 this.connectionConsumers.clear();
2015 this.messageDispatchers.clear();
2016 } finally {
2017 this.sessions.clear();
2018 started.set(false);
2019 }
2020
2021 setExceptionListener(null);
2022 clientIDSet = false;
2023 isConnectionInfoSentToBroker = false;
2024
2025 CleanupConnectionInfo cleanupInfo = new CleanupConnectionInfo();
2026 cleanupInfo.setClientId(getClientID());
2027 asyncSendPacket(cleanupInfo);
2028 }
2029
2030 /**
2031 * Changes the associated username/password that is associated with this
2032 * connection. If the connection has been used, you must called cleanup()
2033 * before calling this method.
2034 *
2035 * @throws IllegalStateException
2036 * if the connection is in used.
2037 * @param sessionId
2038 */
2039 public void changeUserInfo(String theUserName, String thePassword)
2040 throws JMSException {
2041 if (isConnectionInfoSentToBroker)
2042 throw new IllegalStateException(
2043 "changeUserInfo used Connection is not allowed");
2044
2045 this.userName = theUserName;
2046 this.password = thePassword;
2047 }
2048
2049 protected void addToTransientConsumedRedeliverCache(ActiveMQMessage message) {
2050 transientConsumedRedeliverCache.add(message);
2051 }
2052
2053 protected void replayTransientConsumedRedeliveredMessages(
2054 ActiveMQSession session, ActiveMQMessageConsumer consumer)
2055 throws JMSException {
2056 if (consumer.getDestination().isTopic()
2057 && !transientConsumedRedeliverCache.isEmpty()) {
2058 Filter filter = getFilterFactory().createFilter(
2059 consumer.getDestination(), consumer.getMessageSelector());
2060 if (consumer.isNoLocal()) {
2061 filter = new AndFilter(filter, new NoLocalFilter(clientID));
2062 }
2063 for (Iterator i = transientConsumedRedeliverCache.iterator(); i
2064 .hasNext();) {
2065 ActiveMQMessage message = (ActiveMQMessage) i.next();
2066 if (filter.matches(message)) {
2067 transientConsumedRedeliverCache.remove(message);
2068 message.setMessageAcknowledge(session);
2069 message.setJMSRedelivered(true);
2070 message.setConsumerNos(new int[] { consumer
2071 .getConsumerNumber() });
2072 consumer.processMessage(message);
2073 }
2074 }
2075 }
2076 }
2077
2078 private FilterFactory getFilterFactory() {
2079 if (filterFactory == null) {
2080 filterFactory = new FilterFactoryImpl();
2081 }
2082 return filterFactory;
2083 }
2084
2085 protected void startTemporaryDestination(ActiveMQDestination dest)
2086 throws JMSException {
2087 if (dest != null && dest.isTemporary()) {
2088 TempDestinationAdvisoryEvent event = (TempDestinationAdvisoryEvent) tempDestinationMap
2089 .get(dest);
2090 if (event == null) {
2091 event = new TempDestinationAdvisoryEvent(dest, true);
2092 tempDestinationMap.put(dest, event);
2093 ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
2094 msg.setObject(event);
2095 msg.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
2096 msg.setJMSDestination(dest.getTopicForTempAdvisory());
2097 msg.setJMSMessageID("ID:" + dest.getPhysicalName()
2098 + " .started");
2099 this.syncSendPacket(msg);
2100 }
2101 }
2102 }
2103
2104 protected void stopTemporaryDestination(ActiveMQDestination dest)
2105 throws JMSException {
2106 if (dest != null && dest.isTemporary()) {
2107 TempDestinationAdvisoryEvent event = (TempDestinationAdvisoryEvent) tempDestinationMap
2108 .remove(dest);
2109 if (event != null) {
2110 event.setStarted(false);
2111 ActiveMQObjectMessage msg = new ActiveMQObjectMessage();
2112 msg.setObject(event);
2113 msg.setJMSDeliveryMode(DeliveryMode.NON_PERSISTENT);
2114 msg.setJMSDestination(dest.getTopicForTempAdvisory());
2115 msg.setJMSMessageID("ID:" + dest.getPhysicalName()
2116 + " .stopped");
2117 this.syncSendPacket(msg);
2118 }
2119 }
2120 }
2121
2122 protected void closeTemporaryDestinations() throws JMSException {
2123 for (Iterator i = tempDestinationMap.keySet().iterator(); i.hasNext();) {
2124 ActiveMQDestination dest = (ActiveMQDestination) i.next();
2125 stopTemporaryDestination(dest);
2126 }
2127 }
2128
2129 protected void startAdvisoryForTempDestination(Destination d)
2130 throws JMSException {
2131 if (d != null) {
2132 ActiveMQDestination dest = (ActiveMQDestination) ActiveMQMessageTransformation
2133 .transformDestination(d);
2134 if (dest.isTemporary()) {
2135 TempDestinationAdvisor test = (TempDestinationAdvisor) validDestinationsMap
2136 .get(dest);
2137 if (test == null) {
2138 test = new TempDestinationAdvisor(this, dest);
2139 test.start();
2140 validDestinationsMap.put(dest, test);
2141 }
2142 }
2143 }
2144 }
2145
2146 protected void stopAdvisoryForTempDestination(ActiveMQDestination d)
2147 throws JMSException {
2148 if (d != null) {
2149 ActiveMQDestination dest = (ActiveMQDestination) ActiveMQMessageTransformation
2150 .transformDestination(d);
2151 if (dest.isTemporary()) {
2152 TempDestinationAdvisor test = (TempDestinationAdvisor) validDestinationsMap
2153 .remove(dest);
2154 if (test != null) {
2155 test.stop();
2156 }
2157 }
2158 }
2159 }
2160
2161 protected final void validateDestination(ActiveMQDestination dest)
2162 throws JMSException {
2163 if (dest != null) {
2164 if (dest.isTemporary()) {
2165 TempDestinationAdvisor test = (TempDestinationAdvisor) validDestinationsMap
2166 .get(dest);
2167 if (dest.isDeleted() || test == null || !test.isActive(dest)) {
2168 throw new JMSException(
2169 "Cannot publish to a deleted Destination: " + dest);
2170 }
2171 }
2172 }
2173 }
2174
2175 /**
2176 * @return Returns the resourceManagerId.
2177 * @throws JMSException
2178 */
2179 synchronized public String getResourceManagerId() throws JMSException {
2180 if (resourceManagerId == null) {
2181 resourceManagerId = determineResourceManagerId();
2182 }
2183 return resourceManagerId;
2184 }
2185
2186 /**
2187 * Get's the resource manager id.
2188 */
2189 private String determineResourceManagerId() throws JMSException {
2190
2191 XATransactionInfo info = new XATransactionInfo();
2192 info.setType(TransactionInfo.GET_RM_ID);
2193
2194 ResponseReceipt receipt = (ResponseReceipt) syncSendRequest(info);
2195 String rmId = (String) receipt.getResult();
2196 assert rmId != null;
2197 return rmId;
2198 }
2199
2200 public ByteArrayFragmentation getFragmentation() {
2201 return fragmentation;
2202 }
2203
2204 public ConcurrentHashMap getAssemblies() {
2205 return assemblies;
2206 }
2207
2208
2209 }