001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.broker.impl;
020
021 import java.io.File;
022 import java.io.IOException;
023 import java.util.ArrayList;
024 import java.util.Hashtable;
025 import java.util.Iterator;
026 import java.util.Map;
027 import javax.jms.JMSException;
028 import javax.naming.Context;
029 import javax.transaction.xa.XAException;
030 import org.activemq.broker.Broker;
031 import org.activemq.broker.BrokerAdmin;
032 import org.activemq.broker.BrokerClient;
033 import org.activemq.broker.ConsumerInfoListener;
034 import org.activemq.capacity.DelegateCapacityMonitor;
035 import org.activemq.io.util.MemoryBoundedObjectManager;
036 import org.activemq.io.util.MemoryBoundedQueueManager;
037 import org.activemq.jndi.ReadOnlyContext;
038 import org.activemq.message.ActiveMQDestination;
039 import org.activemq.message.ActiveMQMessage;
040 import org.activemq.message.ActiveMQXid;
041 import org.activemq.message.BrokerInfo;
042 import org.activemq.message.ConnectionInfo;
043 import org.activemq.message.ConsumerInfo;
044 import org.activemq.message.MessageAck;
045 import org.activemq.message.ProducerInfo;
046 import org.activemq.security.SecurityAdapter;
047 import org.activemq.service.DeadLetterPolicy;
048 import org.activemq.service.MessageContainerAdmin;
049 import org.activemq.service.MessageContainerManager;
050 import org.activemq.service.RedeliveryPolicy;
051 import org.activemq.service.Transaction;
052 import org.activemq.service.TransactionManager;
053 import org.activemq.service.boundedvm.DurableQueueBoundedMessageManager;
054 import org.activemq.service.boundedvm.TransientQueueBoundedMessageManager;
055 import org.activemq.service.boundedvm.TransientTopicBoundedMessageManager;
056 import org.activemq.service.impl.DurableTopicMessageContainerManager;
057 import org.activemq.store.PersistenceAdapter;
058 import org.activemq.store.PersistenceAdapterFactory;
059 import org.activemq.store.TransactionStore;
060 import org.activemq.store.vm.VMPersistenceAdapter;
061 import org.activemq.store.vm.VMTransactionManager;
062 import org.activemq.util.Callback;
063 import org.activemq.util.ExceptionTemplate;
064 import org.activemq.util.JMSExceptionHelper;
065 import org.apache.commons.logging.Log;
066 import org.apache.commons.logging.LogFactory;
067 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
068 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
069 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
070
071 /**
072 * The default {@link Broker} implementation
073 *
074 * @version $Revision: 1.1.1.1 $
075 */
076 public class DefaultBroker extends DelegateCapacityMonitor implements Broker, BrokerAdmin {
077
078 private static final Log log = LogFactory.getLog(DefaultBroker.class);
079
080 protected static final String PROPERTY_STORE_DIRECTORY = "activemq.store.dir";
081 protected static final String PERSISTENCE_ADAPTER_FACTORY = "activemq.persistenceAdapterFactory";
082
083 protected static final Class[] NEWINSTANCE_PARAMETER_TYPES = {File.class};
084
085 private static final long DEFAULT_MAX_MEMORY_USAGE = 20 * 1024 * 1024; //20mb
086
087 private PersistenceAdapter persistenceAdapter;
088 private TransactionManager transactionManager;
089 private MessageContainerManager[] containerManagers;
090 private File tempDir;
091 private MemoryBoundedObjectManager memoryManager;
092 private MemoryBoundedQueueManager queueManager;
093 private TransactionStore preparedTransactionStore;
094 private Map containerManagerMap;
095 private CopyOnWriteArrayList consumerInfoListeners;
096 private MessageContainerManager persistentTopicMCM;
097 private MessageContainerManager transientTopicMCM;
098 private TransientQueueBoundedMessageManager transientQueueMCM;
099 private DurableQueueBoundedMessageManager persistentQueueMCM;
100 private SecurityAdapter securityAdapter;
101 private RedeliveryPolicy redeliveryPolicy;
102 private DeadLetterPolicy deadLetterPolicy;
103 private AdvisorySupport advisory;
104 private Map messageConsumers = new ConcurrentHashMap();
105 private BrokerInfo brokerInfo;
106 private SynchronizedBoolean started = new SynchronizedBoolean(false);
107 private BrokerContainerImpl brokerContainer;
108
109
110
111 public DefaultBroker(String brokerName, String brokerClusterName, MemoryBoundedObjectManager memoryManager) {
112 this.brokerInfo = new LocalBrokerInfo(this);
113 this.brokerInfo.setBrokerName(brokerName);
114 this.brokerInfo.setClusterName(brokerClusterName);
115 this.memoryManager = memoryManager;
116 queueManager = new MemoryBoundedQueueManager(memoryManager);
117 setDelegate(memoryManager);
118 containerManagerMap = new ConcurrentHashMap();
119 consumerInfoListeners = new CopyOnWriteArrayList();
120 this.advisory = new AdvisorySupport(this);
121 }
122
123 public DefaultBroker(String brokerName, MemoryBoundedObjectManager memoryManager) {
124 this(brokerName, "default", memoryManager);
125 }
126
127 public DefaultBroker(String brokerName, String cluserName) {
128 this(brokerName, cluserName, new MemoryBoundedObjectManager("Broker Memory Manager", DEFAULT_MAX_MEMORY_USAGE));
129 }
130
131 public DefaultBroker(String brokerName) {
132 this(brokerName, new MemoryBoundedObjectManager("Broker Memory Manager", DEFAULT_MAX_MEMORY_USAGE));
133 }
134
135 public DefaultBroker(String brokerName, String brokerClusterName, PersistenceAdapter persistenceAdapter) {
136 this(brokerName, brokerClusterName, new MemoryBoundedObjectManager("Broker Memory Manager", DEFAULT_MAX_MEMORY_USAGE));
137 this.persistenceAdapter = persistenceAdapter;
138 }
139
140 public DefaultBroker(String brokerName, PersistenceAdapter persistenceAdapter) {
141 this(brokerName);
142 this.persistenceAdapter = persistenceAdapter;
143 }
144
145 public boolean isStarted(){
146 return started.get();
147 }
148
149 /**
150 * Start this Service
151 *
152 * @throws JMSException
153 */
154 public void start() throws JMSException{
155 if(started.commit(false,true)){
156 if(redeliveryPolicy==null){
157 redeliveryPolicy = new RedeliveryPolicy();
158 }
159 if(deadLetterPolicy==null){
160 deadLetterPolicy = new DeadLetterPolicy(this);
161 }
162 if(persistenceAdapter==null){
163 persistenceAdapter = createPersistenceAdapter();
164 }
165 persistenceAdapter.start();
166
167 if(transactionManager==null){
168 preparedTransactionStore = persistenceAdapter.createTransactionStore();
169 transactionManager = new VMTransactionManager(this,preparedTransactionStore);
170 }
171
172 // force containers to be created
173 if(containerManagerMap.isEmpty()){
174 makeDefaultContainerManagers();
175 }
176 getContainerManagers();
177
178 for(int i = 0;i<containerManagers.length;i++){
179 containerManagers[i].setDeadLetterPolicy(this.deadLetterPolicy);
180 containerManagers[i].start();
181 }
182 transactionManager.start();
183 }
184 }
185
186
187 /**
188 * stop this Service
189 *
190 * @throws JMSException
191 */
192
193 public void stop() throws JMSException{
194 if(started.commit(true,false)){
195 ExceptionTemplate template = new ExceptionTemplate();
196
197 if(containerManagers!=null){
198 for(int i = 0;i<containerManagers.length;i++){
199 final MessageContainerManager containerManager = containerManagers[i];
200 template.run(new Callback(){
201
202 public void execute() throws Throwable{
203 containerManager.stop();
204 }
205 });
206 }
207 }
208 if(transactionManager!=null){
209 template.run(new Callback(){
210
211 public void execute() throws Throwable{
212 transactionManager.stop();
213 }
214 });
215 }
216
217 template.run(new Callback(){
218
219 public void execute() throws Throwable{
220 persistenceAdapter.stop();
221 }
222 });
223
224 template.throwJMSException();
225 }
226 }
227
228 // Broker interface
229 //-------------------------------------------------------------------------
230
231 public void addClient(BrokerClient client, ConnectionInfo info) throws JMSException {
232 if (securityAdapter != null) {
233 securityAdapter.authorizeConnection(client, info);
234 }
235 advisory.addConnection(client,info);
236 }
237
238 public void removeClient(BrokerClient client, ConnectionInfo info) throws JMSException {
239 if (transactionManager != null) {
240 transactionManager.cleanUpClient(client);
241 }
242 advisory.removeConnection(client,info);
243 }
244
245 public void addMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
246 if (securityAdapter != null) {
247 securityAdapter.authorizeProducer(client, info);
248 }
249 advisory.addProducer(client,info);
250 }
251
252 public void removeMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
253 advisory.removeProducer(client,info);
254 }
255
256 /**
257 * Add an active message consumer
258 */
259 public void addMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
260 validateConsumer(info);
261 if (securityAdapter != null) {
262 securityAdapter.authorizeConsumer(client, info);
263 }
264 advisory.addAdvisory(client, info);
265 MessageContainerManager[] array = getContainerManagers();
266 for (int i = 0;i < array.length;i++) {
267 array[i].addMessageConsumer(client, info);
268 }
269 fireConsumerInfo(client, info);
270 messageConsumers.put(info,client);
271 }
272
273 /**
274 * remove an active message consumer
275 */
276 public void removeMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
277 validateConsumer(info);
278 advisory.removeAdvisory(client, info);
279 for (int i = 0;i < containerManagers.length;i++) {
280 containerManagers[i].removeMessageConsumer(client, info);
281 }
282 fireConsumerInfo(client, info);
283 messageConsumers.remove(info);
284 }
285
286
287 /**
288 * send a message to the broker
289 */
290 public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
291 checkValid();
292 ActiveMQDestination destination = message.getJMSActiveMQDestination();
293 if (destination == null) {
294 throw new JMSException("No destination specified for the Message");
295 }
296 if (message.getJMSMessageID() == null && !destination.isAdvisory()) {
297 throw new JMSException("No messageID specified for the Message");
298 }
299 associateTransaction(message);
300 try {
301 if (destination.isComposite()) {
302 boolean first = true;
303 for (Iterator iter = destination.getChildDestinations().iterator();iter.hasNext();) {
304 ActiveMQDestination childDestination = (ActiveMQDestination) iter.next();
305 // lets shallow copy just in case
306 if (first) {
307 first = false;
308 }
309 else {
310 message = message.shallowCopy();
311 }
312 message.setJMSDestination(childDestination);
313 doMessageSend(client, message);
314 }
315 }
316 else {
317 if (destination.isTempDestinationAdvisory() && !client.isBrokerConnection()) {
318 advisory.processTempDestinationAdvisory(client,message);
319 }
320 doMessageSend(client, message);
321 }
322 }
323 finally {
324 disAssociateTransaction();
325 }
326 }
327
328 /**
329 * Acknowledge consumption of a message by the Message Consumer
330 */
331 public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
332
333 associateTransaction(ack);
334 try {
335 for (int i = 0; i < containerManagers.length; i++) {
336 containerManagers[i].acknowledgeMessage(client, ack);
337 }
338 } finally {
339 disAssociateTransaction();
340 }
341
342 }
343
344 public void deleteSubscription(String clientId, String subscriberName) throws JMSException {
345 for (int i = 0; i < containerManagers.length; i++) {
346 containerManagers[i].deleteSubscription(clientId, subscriberName);
347 }
348 }
349
350
351 /**
352 * Start a transaction.
353 *
354 * @see org.activemq.broker.Broker#startTransaction(org.activemq.broker.BrokerClient, java.lang.String)
355 */
356 public void startTransaction(BrokerClient client, String transactionId) throws JMSException {
357 transactionManager.createLocalTransaction(client, transactionId);
358 }
359
360 public void commitTransaction(BrokerClient client, String transactionId) throws JMSException {
361 try {
362 Transaction transaction = transactionManager.getLocalTransaction(transactionId);
363 transaction.commit(true);
364 }
365 catch (XAException e) {
366 // TODO: I think the XAException should propagate all the way to the client.
367 throw (JMSException) new JMSException(e.getMessage()).initCause(e);
368 }
369 }
370
371 /**
372 * rollback a transaction
373 */
374 public void rollbackTransaction(BrokerClient client, String transactionId) throws JMSException {
375 try {
376 Transaction transaction = transactionManager.getLocalTransaction(transactionId);
377 transaction.rollback();
378 }
379 catch (XAException e) {
380 // TODO: I think the XAException should propagate all the way to the client.
381 throw (JMSException) new JMSException(e.getMessage()).initCause(e);
382 }
383 }
384
385 /**
386 * Starts an XA Transaction.
387 *
388 * @see org.activemq.broker.Broker#startTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid)
389 */
390 public void startTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
391 transactionManager.createXATransaction(client, xid);
392 }
393
394 /**
395 * Prepares an XA Transaciton.
396 *
397 * @see org.activemq.broker.Broker#prepareTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid)
398 */
399 public int prepareTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
400 Transaction transaction = transactionManager.getXATransaction(xid);
401 return transaction.prepare();
402 }
403
404 /**
405 * Rollback an XA Transaction.
406 *
407 * @see org.activemq.broker.Broker#rollbackTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid)
408 */
409 public void rollbackTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
410 Transaction transaction = transactionManager.getXATransaction(xid);
411 transaction.rollback();
412 }
413
414 /**
415 * Commit an XA Transaction.
416 *
417 * @see org.activemq.broker.Broker#commitTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid, boolean)
418 */
419 public void commitTransaction(BrokerClient client, ActiveMQXid xid, boolean onePhase) throws XAException {
420 Transaction transaction = transactionManager.getXATransaction(xid);
421 transaction.commit(onePhase);
422 }
423
424 /**
425 * Gets the prepared XA transactions.
426 *
427 * @see org.activemq.broker.Broker#getPreparedTransactions(org.activemq.broker.BrokerClient)
428 */
429 public ActiveMQXid[] getPreparedTransactions(BrokerClient client) throws XAException {
430 return transactionManager.getPreparedXATransactions();
431 }
432
433
434
435
436 // Properties
437 //-------------------------------------------------------------------------
438
439 /**
440 * Get a temp directory - used for spooling
441 *
442 * @return a File ptr to the directory
443 */
444 public File getTempDir() {
445 if (tempDir == null) {
446 String dirName = System.getProperty("activemq.store.tempdir", "ActiveMQTemp");
447 tempDir = new File(dirName);
448 }
449 return tempDir;
450 }
451
452 public String getBrokerName() {
453 return brokerInfo.getBrokerName();
454 }
455
456 /**
457 * @return Returns the brokerClusterName.
458 */
459 public String getBrokerClusterName() {
460 return brokerInfo.getClusterName();
461 }
462
463
464 public void setTempDir(File tempDir) {
465 this.tempDir = tempDir;
466 }
467
468 public MessageContainerManager[] getContainerManagers() {
469 if (containerManagers == null) {
470 containerManagers = createContainerManagers();
471 }
472 return containerManagers;
473 }
474
475 public Map getContainerManagerMap() {
476 return containerManagerMap;
477 }
478
479 public void setContainerManagerMap(Map containerManagerMap) {
480 this.containerManagerMap = containerManagerMap;
481 this.containerManagers = null;
482 }
483
484 public PersistenceAdapter getPersistenceAdapter() {
485 return persistenceAdapter;
486 }
487
488 public void setPersistenceAdapter(PersistenceAdapter persistenceAdapter) {
489 this.persistenceAdapter = persistenceAdapter;
490 }
491
492 public TransactionManager getTransactionManager() {
493 return transactionManager;
494 }
495
496 public void setTransactionManager(TransactionManager transactionManager) {
497 this.transactionManager = transactionManager;
498 }
499
500 public SecurityAdapter getSecurityAdapter() {
501 return securityAdapter;
502 }
503
504 public void setSecurityAdapter(SecurityAdapter securityAdapter) {
505 this.securityAdapter = securityAdapter;
506 }
507
508 public RedeliveryPolicy getRedeliveryPolicy() {
509 return redeliveryPolicy;
510 }
511
512 public void setRedeliveryPolicy(RedeliveryPolicy redeliveryPolicy) {
513 this.redeliveryPolicy = redeliveryPolicy;
514 }
515
516 public TransactionStore getPreparedTransactionStore() {
517 return preparedTransactionStore;
518 }
519
520 public void setPreparedTransactionStore(TransactionStore preparedTransactionStore) {
521 this.preparedTransactionStore = preparedTransactionStore;
522 }
523
524 /**
525 * @return the DeadLetterPolicy
526 */
527 public DeadLetterPolicy getDeadLetterPolicy(){
528 return deadLetterPolicy;
529 }
530
531 /**
532 * set the dead letter policy
533 * @param deadLetterPolicy
534 */
535 public void setDeadLetterPolicy(DeadLetterPolicy deadLetterPolicy){
536 this.deadLetterPolicy = deadLetterPolicy;
537 }
538
539 /**
540 * @return Returns the maximumMemoryUsage.
541 */
542 public long getMaximumMemoryUsage() {
543 return memoryManager.getValueLimit();
544 }
545
546 /**
547 * @param maximumMemoryUsage The maximumMemoryUsage to set.
548 */
549 public void setMaximumMemoryUsage(long maximumMemoryUsage) {
550 this.memoryManager.setValueLimit(maximumMemoryUsage);
551 }
552
553
554 public Context getDestinationContext(Hashtable environment) {
555 Map data = new ConcurrentHashMap();
556 for (Iterator iter = containerManagerMap.entrySet().iterator(); iter.hasNext();) {
557 Map.Entry entry = (Map.Entry) iter.next();
558 String name = entry.getKey().toString();
559 MessageContainerManager manager = (MessageContainerManager) entry.getValue();
560 Context context = new ReadOnlyContext(environment, manager.getDestinations());
561 data.put(name, context);
562 }
563 return new ReadOnlyContext(environment, data);
564 }
565
566 // Implementation methods
567 //-------------------------------------------------------------------------
568
569
570 protected void doMessageSend(BrokerClient client, ActiveMQMessage message) throws JMSException {
571 if (securityAdapter != null) {
572 securityAdapter.authorizeSendMessage(client, message);
573 }
574 ActiveMQDestination dest = message.getJMSActiveMQDestination();
575 if (dest.isTopic()){
576 if (message.isPersistent() && !dest.isTemporary()){
577 persistentTopicMCM.sendMessage(client,message);
578 }
579 transientTopicMCM.sendMessage(client, message);
580 }else {
581 transientQueueMCM.sendMessage(client, message);
582 persistentQueueMCM.sendMessage(client, message);
583 }
584 }
585
586 /**
587 * Factory method to create a default persistence adapter
588 *
589 * @return
590 */
591 protected PersistenceAdapter createPersistenceAdapter() throws JMSException {
592 File directory = new File(getStoreDirectory());
593
594 // lets use reflection to avoid runtime dependency on persistence libraries
595 PersistenceAdapter answer = null;
596 String property = System.getProperty(PERSISTENCE_ADAPTER_FACTORY);
597 if (property != null) {
598 answer = tryCreatePersistenceAdapter(property, directory, false);
599 }
600 if (answer == null) {
601 answer = tryCreatePersistenceAdapter("org.activemq.broker.impl.DefaultPersistenceAdapterFactory", directory, true);
602 }
603 if (answer != null) {
604 return answer;
605 }
606 else {
607 log.warn("Default message store (journal+derby) could not be found in the classpath or property '" + PERSISTENCE_ADAPTER_FACTORY
608 + "' not specified so defaulting to use RAM based message persistence");
609 return new VMPersistenceAdapter();
610 }
611 }
612
613 protected PersistenceAdapter tryCreatePersistenceAdapter(String className, File directory, boolean ignoreErrors) throws JMSException {
614 Class adapterClass = loadClass(className, ignoreErrors);
615 if (adapterClass != null) {
616 try {
617 PersistenceAdapterFactory factory = (PersistenceAdapterFactory) adapterClass.newInstance();
618 PersistenceAdapter answer = factory.createPersistenceAdapter(directory, memoryManager);
619 log.info("Persistence adapter created using: " + className);
620 return answer;
621 }
622 catch (IOException cause) {
623 throw createInstantiateAdapterException(className, (Exception) cause);
624 }
625 catch (Throwable e) {
626 if (!ignoreErrors) {
627 throw createInstantiateAdapterException(className, e);
628 }
629 }
630 }
631 return null;
632 }
633
634 protected JMSException createInstantiateAdapterException(String className, Throwable e) {
635 return JMSExceptionHelper.newJMSException("Persistence adapter could not be created using: "
636 + className + ". Reason: " + e, e);
637 }
638
639 /**
640 * Tries to load the given class from the current context class loader or
641 * class loader which loaded us or return null if the class could not be found
642 */
643 protected Class loadClass(String name, boolean ignoreErrors) throws JMSException {
644 try {
645 return Thread.currentThread().getContextClassLoader().loadClass(name);
646 }
647 catch (ClassNotFoundException e) {
648 try {
649 return getClass().getClassLoader().loadClass(name);
650 }
651 catch (ClassNotFoundException e2) {
652 if (ignoreErrors) {
653 log.trace("Could not find class: " + name + " on the classpath");
654 return null;
655 }
656 else {
657 throw JMSExceptionHelper.newJMSException("Could not find class: " + name + " on the classpath. Reason: " + e, e);
658 }
659 }
660 }
661 }
662
663 protected String getStoreDirectory() {
664 String defaultDirectory = "ActiveMQ" + File.separator + sanitizeString(getBrokerInfo().getBrokerName());
665 return System.getProperty(PROPERTY_STORE_DIRECTORY, defaultDirectory);
666 }
667
668 /**
669 * Factory method to create the default container managers
670 *
671 * @return
672 */
673 protected MessageContainerManager[] createContainerManagers() {
674 int size = containerManagerMap.size();
675 MessageContainerManager[] answer = new MessageContainerManager[size];
676 containerManagerMap.values().toArray(answer);
677 return answer;
678 }
679
680 protected void makeDefaultContainerManagers() {
681 transientTopicMCM = new TransientTopicBoundedMessageManager(queueManager);
682 containerManagerMap.put("transientTopicContainer", transientTopicMCM);
683 persistentTopicMCM = new DurableTopicMessageContainerManager(persistenceAdapter, redeliveryPolicy, deadLetterPolicy);
684 containerManagerMap.put("persistentTopicContainer", persistentTopicMCM);
685 persistentQueueMCM = new DurableQueueBoundedMessageManager(persistenceAdapter, queueManager, redeliveryPolicy, deadLetterPolicy);
686 containerManagerMap.put("persistentQueueContainer", persistentQueueMCM);
687 transientQueueMCM = new TransientQueueBoundedMessageManager(queueManager,redeliveryPolicy, deadLetterPolicy);
688 containerManagerMap.put("transientQueueContainer", transientQueueMCM);
689 }
690
691 /**
692 * Ensures the consumer is valid, throwing a meaningful exception if not
693 *
694 * @param info
695 * @throws JMSException
696 */
697 protected void validateConsumer(ConsumerInfo info) throws JMSException {
698 if (info.getConsumerId() == null) {
699 throw new JMSException("No consumerId specified for the ConsumerInfo");
700 }
701 }
702
703 protected void checkValid() throws JMSException {
704 if (containerManagers == null) {
705 throw new JMSException("This Broker has not yet been started. Ensure start() is called before invoking action methods");
706 }
707 }
708
709 /**
710 * Add a ConsumerInfoListener to the Broker
711 *
712 * @param l
713 */
714 public void addConsumerInfoListener(ConsumerInfoListener l) {
715 if (l != null){
716 consumerInfoListeners.add(l);
717 //fire any existing infos to the listener
718 for (Iterator i = messageConsumers.entrySet().iterator(); i.hasNext();){
719 Map.Entry entry = (Map.Entry)i.next();
720 ConsumerInfo info = (ConsumerInfo) entry.getKey();
721 BrokerClient client = (BrokerClient) entry.getValue();
722 l.onConsumerInfo(client, info);
723 }
724 }
725 }
726
727 /**
728 * Remove a ConsumerInfoListener from the Broker
729 *
730 * @param l
731 */
732 public void removeConsumerInfoListener(ConsumerInfoListener l) {
733 consumerInfoListeners.remove(l);
734 }
735
736 protected void fireConsumerInfo(BrokerClient client, ConsumerInfo info) {
737 for (Iterator i = consumerInfoListeners.iterator(); i.hasNext();) {
738 ConsumerInfoListener l = (ConsumerInfoListener) i.next();
739 l.onConsumerInfo(client, info);
740 }
741 }
742
743 /**
744 * @return the MessageContainerManager for durable topics
745 */
746 public MessageContainerManager getPersistentTopicContainerManager() {
747 return persistentTopicMCM;
748 }
749
750 /**
751 * @return the MessageContainerManager for transient topics
752 */
753 public MessageContainerManager getTransientTopicContainerManager() {
754 return transientTopicMCM;
755 }
756
757 /**
758 * @return the MessageContainerManager for persistent queues
759 */
760 public MessageContainerManager getPersistentQueueContainerManager() {
761 return persistentQueueMCM;
762 }
763
764 /**
765 * @return the MessageContainerManager for transient queues
766 */
767 public MessageContainerManager getTransientQueueContainerManager() {
768 return transientQueueMCM;
769 }
770
771 /**
772 * @see org.activemq.broker.Broker#getBrokerAdmin()
773 */
774 public BrokerAdmin getBrokerAdmin() {
775 return this;
776 }
777
778 public void createMessageContainer(ActiveMQDestination dest) throws JMSException {
779 for (int i = 0; i < containerManagers.length; i++) {
780 containerManagers[i].createMessageContainer(dest);
781 }
782 }
783
784 public void destoryMessageContainer(ActiveMQDestination dest) throws JMSException {
785 for (int i = 0; i < containerManagers.length; i++) {
786 containerManagers[i].destroyMessageContainer(dest);
787 }
788 }
789
790 public MessageContainerAdmin getMessageContainerAdmin(ActiveMQDestination dest) throws JMSException {
791 for (int i = 0; i < containerManagers.length; i++) {
792 Map messageContainerAdmins = containerManagers[i].getMessageContainerAdmins();
793 MessageContainerAdmin mca = (MessageContainerAdmin) messageContainerAdmins.get(dest);
794 if( mca != null ) {
795 return mca;
796 }
797 }
798 return null;
799 }
800
801 /**
802 * @throws JMSException
803 * @see org.activemq.broker.BrokerAdmin#listDestinations()
804 */
805 public MessageContainerAdmin[] listMessageContainerAdmin() throws JMSException {
806
807 ArrayList l = new ArrayList();
808 for (int i = 0; i < containerManagers.length; i++) {
809 Map messageContainerAdmins = containerManagers[i].getMessageContainerAdmins();
810 for (Iterator iter = messageContainerAdmins.values().iterator(); iter.hasNext();) {
811 MessageContainerAdmin mca = (MessageContainerAdmin) iter.next();
812 l.add(mca);
813 }
814 }
815
816 MessageContainerAdmin answer[] = new MessageContainerAdmin[l.size()];
817 l.toArray(answer);
818 return answer;
819 }
820
821
822 /**
823 * Add a message to a dead letter queue
824 * @param deadLetterName
825 * @param message
826 * @throws JMSException
827 */
828 public void sendToDeadLetterQueue(String deadLetterName,ActiveMQMessage expiredMessage) throws JMSException {
829 if (persistentQueueMCM != null) {
830 Transaction original = TransactionManager.getContexTransaction();
831 try {
832 TransactionManager.setContexTransaction(null);
833 persistentQueueMCM.sendToDeadLetterQueue(deadLetterName, expiredMessage);
834 log.debug(expiredMessage + " sent to DLQ: " + deadLetterName);
835 } finally {
836 TransactionManager.setContexTransaction(original);
837 }
838 }
839 }
840
841 /**
842 * send a message to the broker within a transaction public void
843 * sendTransactedMessage(final BrokerClient client, final String
844 * transactionId, final ActiveMQMessage message) throws JMSException {
845 * getTransactionFor(message).addPostCommitTask(new
846 * SendMessageTransactionTask(client, message)); }
847 */
848
849 /**
850 * Acknowledge consumption of a message within a transaction
851 public void acknowledgeTransactedMessage(final BrokerClient client, final String transactionId, final MessageAck ack) throws JMSException {
852 Transaction transaction;
853 if (ack.isXaTransacted()) {
854 try {
855 transaction = transactionManager.getXATransaction(new ActiveMQXid(transactionId));
856 }
857 catch (XAException e) {
858 throw (JMSException) new JMSException(e.getMessage()).initCause(e);
859 }
860 }
861 else {
862 transaction = transactionManager.getLocalTransaction(transactionId);
863 }
864 transaction.addPostCommitTask(new MessageAckTransactionTask(client, ack));
865 transaction.addPostRollbackTask(new RedeliverMessageTransactionTask(client, ack));
866
867 // we need to tell the dispatcher that we can now accept another message
868 // even though we don't really ack the message until the commit
869 // this is because if we have a prefetch value of 1, we can never consume 2 messages
870 // in a transaction, since the ack for the first message never arrives until the commit
871 for (int i = 0; i < containerManagers.length; i++) {
872 containerManagers[i].acknowledgeTransactedMessage(client, transactionId, ack);
873 }
874 }
875 */
876
877
878 /**
879 * @param message
880 * @return
881 * @throws JMSException
882 private Transaction getTransactionFor(ActiveMQMessage message) throws JMSException {
883 String transactionId = message.getTransactionId();
884 if (message.isXaTransacted()) {
885 try {
886 return transactionManager.getXATransaction(new ActiveMQXid(transactionId));
887 }
888 catch (XAException e) {
889 throw (JMSException) new JMSException(e.getMessage()).initCause(e);
890 }
891 }
892 return transactionManager.getLocalTransaction(transactionId);
893 }
894
895
896 public void acknowledgeMessageRecover(MessageAck ack) {
897 }
898 public void sendMessageRecover(ActiveMQMessage message) throws JMSException {
899 }
900 */
901
902 /**
903 * Associates a Transaction with the current thread. Once this call is finished,
904 * the Transactio ncan be obtained via TransactionManager.getContexTransaction().
905 * @param message
906 * @throws JMSException
907 */
908 private final void associateTransaction(ActiveMQMessage message) throws JMSException {
909 Transaction transaction;
910 if( message.isPartOfTransaction() ) {
911 if (message.isXaTransacted()) {
912 try {
913 transaction = transactionManager.getXATransaction((ActiveMQXid) message.getTransactionId());
914 }
915 catch (XAException e) {
916 throw (JMSException) new JMSException(e.getMessage()).initCause(e);
917 }
918 } else {
919 transaction = transactionManager.getLocalTransaction((String) message.getTransactionId());
920 }
921
922 } else {
923 transaction = null;
924 }
925 TransactionManager.setContexTransaction(transaction);
926 }
927
928 private void disAssociateTransaction() {
929 TransactionManager.setContexTransaction(null);
930 }
931
932 /**
933 * Associates a Transaction with the current thread. Once this call is finished,
934 * the Transactio ncan be obtained via TransactionManager.getContexTransaction().
935 * @param ack
936 * @throws JMSException
937 */
938 private void associateTransaction(MessageAck ack) throws JMSException {
939 Transaction transaction;
940 if( ack.isPartOfTransaction() ) {
941 if (ack.isXaTransacted()) {
942 try {
943 transaction = transactionManager.getXATransaction((ActiveMQXid) ack.getTransactionId());
944 }
945 catch (XAException e) {
946 throw (JMSException) new JMSException(e.getMessage()).initCause(e);
947 }
948 } else {
949 transaction = transactionManager.getLocalTransaction((String) ack.getTransactionId());
950 }
951
952 } else {
953 transaction = null;
954 }
955 TransactionManager.setContexTransaction(transaction);
956 }
957
958 private String sanitizeString(String in) {
959 String result = null;
960 if (in != null) {
961 result = in.replace(':', '_');
962 result = result.replace('/', '_');
963 result = result.replace('\\', '_');
964 }
965 return result;
966 }
967
968 /**
969 * @return Returns the memoryManager.
970 */
971 public MemoryBoundedObjectManager getMemoryManager() {
972 return memoryManager;
973 }
974
975
976 /**
977 * @return Returns the queueManager.
978 */
979 public MemoryBoundedQueueManager getQueueManager() {
980 return queueManager;
981 }
982
983
984 public String getName() {
985 return getBrokerName();
986 }
987
988
989 public String toString (){
990 return "broker: " + getName();
991 }
992
993 /**
994 * @see org.activemq.broker.Broker#getBrokerInfo()
995 */
996 public BrokerInfo getBrokerInfo(){
997 return brokerInfo;
998 }
999
1000 protected void setBrokercontainer(BrokerContainerImpl container){
1001 this.brokerContainer = container;
1002 }
1003
1004 protected BrokerContainerImpl getBrokerContainer(){
1005 return brokerContainer;
1006 }
1007
1008
1009
1010
1011 }