001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.transport;
020 import java.util.Iterator;
021 import java.util.Map;
022 import javax.jms.JMSException;
023 import javax.jms.Session;
024 import org.apache.commons.logging.Log;
025 import org.apache.commons.logging.LogFactory;
026 import org.activemq.ActiveMQConnection;
027 import org.activemq.ActiveMQConnectionFactory;
028 import org.activemq.ActiveMQPrefetchPolicy;
029 import org.activemq.advisories.ConnectionAdvisor;
030 import org.activemq.advisories.ConnectionAdvisoryEvent;
031 import org.activemq.advisories.ConnectionAdvisoryEventListener;
032 import org.activemq.broker.BrokerClient;
033 import org.activemq.broker.BrokerContainer;
034 import org.activemq.broker.ConsumerInfoListener;
035 import org.activemq.message.ActiveMQDestination;
036 import org.activemq.message.BrokerInfo;
037 import org.activemq.message.ConsumerInfo;
038 import org.activemq.message.Receipt;
039 import org.activemq.service.MessageContainerManager;
040 import org.activemq.service.Service;
041 import org.activemq.transport.composite.CompositeTransportChannel;
042 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
043 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
044 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
045
046 /**
047 * Represents a broker's connection with a single remote broker which bridges the two brokers to form a network. <p/>
048 * The NetworkChannel contains a JMS connection with the remote broker. <p/>New subscriptions on the local broker are
049 * multiplexed into the JMS connection so that messages published on the remote broker can be replayed onto the local
050 * broker.
051 *
052 * @version $Revision: 1.1.1.1 $
053 */
054 public class NetworkChannel
055 implements
056 Service,
057 ConsumerInfoListener,
058 ConnectionAdvisoryEventListener,
059 TransportStatusEventListener {
060 private static final Log log = LogFactory.getLog(NetworkChannel.class);
061 protected String uri;
062 protected BrokerContainer brokerContainer;
063 protected ActiveMQConnection localConnection;
064 protected ActiveMQConnection remoteConnection;
065 protected ConcurrentHashMap topicConsumerMap;
066 protected ConcurrentHashMap queueConsumerMap;
067 protected String remoteUserName;
068 protected String remotePassword;
069 protected String remoteBrokerName;
070 protected String remoteClusterName;
071 protected int maximumRetries = 0;
072 protected long reconnectSleepTime = 2000L;
073 protected PooledExecutor threadPool;
074 private boolean remote = false;
075 private SynchronizedBoolean started = new SynchronizedBoolean(false);
076 private SynchronizedBoolean connected = new SynchronizedBoolean(false);
077 private SynchronizedBoolean stopped = new SynchronizedBoolean(false);
078 private ConnectionAdvisor connectionAdvisor;
079 private ActiveMQPrefetchPolicy localPrefetchPolicy;
080 private ActiveMQPrefetchPolicy remotePrefetchPolicy;
081 private boolean demandBasedForwarding = true;
082
083 /**
084 * Default constructor
085 */
086 public NetworkChannel() {
087 this.topicConsumerMap = new ConcurrentHashMap();
088 this.queueConsumerMap = new ConcurrentHashMap();
089 }
090
091 /**
092 * Default Constructor
093 *
094 * @param tp
095 */
096 public NetworkChannel(PooledExecutor tp) {
097 this();
098 this.threadPool = tp;
099 }
100
101 /**
102 * Constructor
103 *
104 * @param connector
105 * @param brokerContainer
106 * @param uri
107 */
108 public NetworkChannel(NetworkConnector connector, BrokerContainer brokerContainer, String uri) {
109 this(connector.threadPool);
110 this.brokerContainer = brokerContainer;
111 this.uri = uri;
112 }
113
114 /**
115 * Create a NetworkConnector from a TransportChannel
116 *
117 * @param connector
118 * @param brokerContainer
119 * @param channel
120 * @param remoteBrokerName
121 * @param remoteclusterName
122 * @throws JMSException
123 */
124 public NetworkChannel(NetworkConnector connector, BrokerContainer brokerContainer, TransportChannel channel,
125 String remoteBrokerName, String remoteclusterName) throws JMSException {
126 this(connector.threadPool);
127 this.brokerContainer = brokerContainer;
128 this.uri = "";
129 this.remoteBrokerName = remoteBrokerName;
130 this.remoteClusterName = remoteclusterName;
131 ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory();
132 fac.setJ2EEcompliant(false);
133 fac.setTurboBoost(true);
134 remoteConnection = new ActiveMQConnection(fac, remoteUserName, remotePassword, channel);
135 remoteConnection.setClientID("Boondocks:" + remoteClusterName + ":" + remoteBrokerName);
136 remoteConnection.setQuickClose(true);
137 remoteConnection.start();
138 BrokerInfo info = new BrokerInfo();
139 info.setBrokerName(brokerContainer.getBroker().getBrokerName());
140 info.setClusterName(brokerContainer.getBroker().getBrokerClusterName());
141 channel.asyncSend(info);
142 remote = true;
143 }
144
145 /**
146 * @see org.activemq.transport.TransportStatusEventListener#statusChanged(org.activemq.transport.TransportStatusEvent)
147 */
148 public void statusChanged(TransportStatusEvent event) {
149 if (event != null
150 && (event.getChannelStatus() == TransportStatusEvent.CONNECTED
151 || event.getChannelStatus() == TransportStatusEvent.RECONNECTED)) {
152 connected.set(true);
153 }else {
154 connected.set(false);
155 }
156 }
157
158 private void doSetConnected() {
159 synchronized (connected) {
160 connected.set(true);
161 connected.notifyAll();
162 }
163 }
164
165 /**
166 * @return text info on this
167 */
168 public String toString() {
169 return "NetworkChannel{ " + ", uri = '" + uri + "' " + ", remoteBrokerName = '" + remoteBrokerName + "' "
170 + " }";
171 }
172
173 /**
174 * Start the channel
175 */
176 public void start() {
177 if (started.commit(false, true)) {
178 try {
179 stopped.set(false);
180 threadPool.execute(new Runnable() {
181 public void run() {
182 String originalName = Thread.currentThread().getName();
183 try {
184 Thread.currentThread().setName("NetworkChannel Initiator to " + uri);
185 initialize();
186 startSubscriptions();
187 log.info("Started NetworkChannel to " + uri);
188 }
189 catch (JMSException jmsEx) {
190 log.error("Failed to start NetworkChannel: " + uri, jmsEx);
191 }
192 finally {
193 Thread.currentThread().setName(originalName);
194 }
195 }
196 });
197 }
198 catch (InterruptedException e) {
199 log.warn("Failed to start - interuppted", e);
200 }
201 }
202 }
203
204 /**
205 * stop the channel
206 *
207 * @throws JMSException on error
208 */
209 public void stop() throws JMSException {
210 if (started.commit(true, false)) {
211 stopped.set(true);
212 topicConsumerMap.clear();
213 if (remoteConnection != null) {
214 remoteConnection.close();
215 remoteConnection = null;
216 }
217 if (localConnection != null) {
218 localConnection.close();
219 localConnection = null;
220 }
221 for (Iterator i = topicConsumerMap.values().iterator();i.hasNext();) {
222 NetworkMessageBridge consumer = (NetworkMessageBridge) i.next();
223 consumer.stop();
224 }
225 }
226 }
227
228 /**
229 * Listen for new Consumer events at this broker
230 *
231 * @param client
232 * @param info
233 */
234 public void onConsumerInfo(final BrokerClient client, final ConsumerInfo info) {
235 String brokerName = client.getBrokerConnector().getBrokerInfo().getBrokerName();
236 if (!client.isClusteredConnection()) {
237 if (connected.get()) {
238 if (!info.hasVisited(remoteBrokerName)) {
239 if (info.isStarted()) {
240 addConsumerInfo(info);
241 }
242 else {
243 removeConsumerInfo(info);
244 }
245 }
246 }
247 else {
248 try {
249 threadPool.execute(new Runnable() {
250 public void run() {
251 if (!client.isClusteredConnection()) {
252 if (!info.hasVisited(remoteBrokerName)) {
253 synchronized (connected) {
254 while (!connected.get() && !stopped.get()) {
255 try {
256 connected.wait(500);
257 }
258 catch (InterruptedException e) {
259 log.debug("interuppted", e);
260 }
261 }
262 if (info.isStarted()) {
263 addConsumerInfo(info);
264 }
265 else {
266 removeConsumerInfo(info);
267 }
268 }
269 }
270 }
271 }
272 });
273 }
274 catch (InterruptedException e) {
275 log.warn("Failed to process ConsumerInfo: " + info, e);
276 }
277 }
278 }
279 }
280
281 /**
282 * @return the uri of the broker(s) this channel is connected to
283 */
284 public String getUri() {
285 return uri;
286 }
287
288 /**
289 * set the uri of the broker(s) this channel is connected to
290 *
291 * @param uri
292 */
293 public void setUri(String uri) {
294 this.uri = uri;
295 }
296
297 /**
298 * @return Returns the remotePassword.
299 */
300 public String getRemotePassword() {
301 return remotePassword;
302 }
303
304 /**
305 * @param remotePassword The remotePassword to set.
306 */
307 public void setRemotePassword(String remotePassword) {
308 this.remotePassword = remotePassword;
309 }
310
311 /**
312 * @return Returns the remoteUserName.
313 */
314 public String getRemoteUserName() {
315 return remoteUserName;
316 }
317
318 /**
319 * @param remoteUserName The remoteUserName to set.
320 */
321 public void setRemoteUserName(String remoteUserName) {
322 this.remoteUserName = remoteUserName;
323 }
324
325 /**
326 * @return Returns the brokerContainer.
327 */
328 public BrokerContainer getBrokerContainer() {
329 return brokerContainer;
330 }
331
332 /**
333 * @param brokerContainer The brokerContainer to set.
334 */
335 public void setBrokerContainer(BrokerContainer brokerContainer) {
336 this.brokerContainer = brokerContainer;
337 }
338
339 public int getMaximumRetries() {
340 return maximumRetries;
341 }
342
343 public void setMaximumRetries(int maximumRetries) {
344 this.maximumRetries = maximumRetries;
345 }
346
347 public long getReconnectSleepTime() {
348 return reconnectSleepTime;
349 }
350
351 public void setReconnectSleepTime(long reconnectSleepTime) {
352 this.reconnectSleepTime = reconnectSleepTime;
353 }
354
355 public String getRemoteBrokerName() {
356 return remoteBrokerName;
357 }
358
359 public void setRemoteBrokerName(String remoteBrokerName) {
360 this.remoteBrokerName = remoteBrokerName;
361 }
362
363 /**
364 * @return Returns the threadPool.
365 */
366 protected PooledExecutor getThreadPool() {
367 return threadPool;
368 }
369
370 /**
371 * @param threadPool The threadPool to set.
372 */
373 protected void setThreadPool(PooledExecutor threadPool) {
374 this.threadPool = threadPool;
375 }
376
377 private synchronized ActiveMQConnection getLocalConnection() throws JMSException {
378 if (localConnection == null) {
379 initializeLocal();
380 }
381 return localConnection;
382 }
383
384 private synchronized ActiveMQConnection getRemoteConnection() throws JMSException {
385 if (remoteConnection == null) {
386 initializeRemote();
387 }
388 return remoteConnection;
389 }
390
391 /**
392 * @return Returns the localPrefetchPolicy.
393 */
394 public ActiveMQPrefetchPolicy getLocalPrefetchPolicy() {
395 return localPrefetchPolicy;
396 }
397
398 /**
399 * @param localPrefetchPolicy The localPrefetchPolicy to set.
400 */
401 public void setLocalPrefetchPolicy(ActiveMQPrefetchPolicy localPrefetchPolicy) {
402 this.localPrefetchPolicy = localPrefetchPolicy;
403 }
404
405 /**
406 * @return Returns the remotePrefetchPolicy.
407 */
408 public ActiveMQPrefetchPolicy getRemotePrefetchPolicy() {
409 return remotePrefetchPolicy;
410 }
411
412 /**
413 * @param remotePrefetchPolicy The remotePrefetchPolicy to set.
414 */
415 public void setRemotePrefetchPolicy(ActiveMQPrefetchPolicy remotePrefetchPolicy) {
416 this.remotePrefetchPolicy = remotePrefetchPolicy;
417 }
418
419 /**
420 * @return Returns the demandBasedForwarding.
421 */
422 public boolean isDemandBasedForwarding() {
423 return demandBasedForwarding;
424 }
425
426 /**
427 * @param demandBasedForwarding The demandBasedForwarding to set.
428 */
429 public void setDemandBasedForwarding(boolean demandBasedForwarding) {
430 this.demandBasedForwarding = demandBasedForwarding;
431 }
432
433 // Implementation methods
434 //-------------------------------------------------------------------------
435 /**
436 * Implementation of ConnectionAdvisoryEventListener
437 *
438 * @param event
439 */
440 public void onEvent(ConnectionAdvisoryEvent event) {
441 String localBrokerName = brokerContainer.getBroker().getBrokerName();
442 if (!event.getInfo().isClosed()) {
443 brokerContainer.registerRemoteClientID(event.getInfo().getClientId());
444 }
445 else {
446 brokerContainer.deregisterRemoteClientID(event.getInfo().getClientId());
447 }
448 }
449
450 private void addConsumerInfo(ConsumerInfo info) {
451 addConsumerInfo(info.getDestination(), info.getDestination().isTopic(), info.isDurableTopic());
452 }
453
454 private void addConsumerInfo(ActiveMQDestination destination, boolean topic, boolean durableTopic) {
455 Map map = topic ? topicConsumerMap : queueConsumerMap;
456 NetworkMessageBridge bridge = (NetworkMessageBridge) map.get(destination.getPhysicalName());
457 if (bridge == null) {
458 bridge = createBridge(map, destination, durableTopic);
459 }
460 else if (durableTopic && !bridge.isDurableTopic() && !demandBasedForwarding) {
461 //upgrade our subscription
462 bridge.decrementReferenceCount();
463 upgradeBridge(bridge);
464 }
465 bridge.incrementReferenceCount();
466 }
467
468 private void upgradeBridge(NetworkMessageBridge bridge) {
469 try {
470 remoteConnection.stop();
471 bridge.upgrade();
472 }
473 catch (JMSException e) {
474 log.warn("Could not upgrade the NetworkMessageBridge to a durable subscription for destination: "
475 + bridge.getDestination(), e);
476 }
477 try {
478 remoteConnection.start();
479 }
480 catch (JMSException e) {
481 log.error("Failed to restart the NetworkMessageBridge", e);
482 }
483 }
484
485 private NetworkMessageBridge createBridge(Map map, ActiveMQDestination destination, boolean durableTopic) {
486 NetworkMessageBridge bridge = new NetworkMessageBridge();
487 try {
488 bridge.setDestination(destination);
489 bridge.setDurableTopic(durableTopic);
490 bridge.setLocalBrokerName(brokerContainer.getBroker().getBrokerName());
491 bridge.setLocalSession(getLocalConnection().createSession(false, Session.CLIENT_ACKNOWLEDGE));
492 bridge.setRemoteSession(getRemoteConnection().createSession(false, Session.CLIENT_ACKNOWLEDGE));
493 map.put(destination.getPhysicalName(), bridge);
494 bridge.start();
495 log.info("started NetworkMessageBridge for destination: " + destination + " -- NetworkChannel: "
496 + this.toString());
497 }
498 catch (JMSException jmsEx) {
499 log.error("Failed to start NetworkMessageBridge for destination: " + destination, jmsEx);
500 }
501 return bridge;
502 }
503
504 private void removeConsumerInfo(final ConsumerInfo info) {
505 final String physicalName = info.getDestination().getPhysicalName();
506 Map map = (demandBasedForwarding || info.getDestination().isTopic()) ? topicConsumerMap : queueConsumerMap;
507 final NetworkMessageBridge bridge = (NetworkMessageBridge) map.get(physicalName);
508 if (bridge != null) {
509 if (bridge.decrementReferenceCount() <= 0) {
510 try {
511 threadPool.execute(new Runnable() {
512 public void run() {
513 bridge.stop();
514 topicConsumerMap.remove(physicalName);
515 log.info("stopped MetworkMessageBridge for destination: " + info.getDestination());
516 }
517 });
518 }
519 catch (InterruptedException e) {
520 log.warn("got interrupted stoping NetworkBridge", e);
521 }
522 }
523 }
524 }
525
526 private void startSubscriptions() {
527 if (!demandBasedForwarding) {
528 if (!remote) {
529 MessageContainerManager mcm = brokerContainer.getBroker().getPersistentTopicContainerManager();
530 if (mcm != null) {
531 Map map = mcm.getLocalDestinations();
532 startSubscriptions(map, true, true);
533 }
534 mcm = brokerContainer.getBroker().getTransientTopicContainerManager();
535 if (mcm != null) {
536 Map map = mcm.getLocalDestinations();
537 startSubscriptions(map, true, false);
538 }
539 mcm = brokerContainer.getBroker().getTransientQueueContainerManager();
540 if (mcm != null) {
541 Map map = mcm.getLocalDestinations();
542 startSubscriptions(map, false, false);
543 }
544 mcm = brokerContainer.getBroker().getPersistentQueueContainerManager();
545 if (mcm != null) {
546 Map map = mcm.getLocalDestinations();
547 startSubscriptions(map, false, false);
548 }
549 }
550 }
551 }
552
553 private void startSubscriptions(Map destinations, boolean topic, boolean durableTopic) {
554 if (destinations != null) {
555 for (Iterator i = destinations.values().iterator();i.hasNext();) {
556 ActiveMQDestination dest = (ActiveMQDestination) i.next();
557 addConsumerInfo(dest, topic, durableTopic);
558 }
559 }
560 }
561
562 protected void initialize() throws JMSException {
563 // force lazy construction
564 initializeLocal();
565 initializeRemote();
566 brokerContainer.getBroker().addConsumerInfoListener(NetworkChannel.this);
567 }
568
569 private synchronized void initializeRemote() throws JMSException {
570 if (remoteConnection == null) {
571 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(remoteUserName, remotePassword, uri);
572 //factory.setTurboBoost(true);
573 factory.setJ2EEcompliant(false);
574 factory.setQuickClose(true);
575 factory.setInternalConnection(true);
576 remoteConnection = (ActiveMQConnection) factory.createConnection();
577 TransportChannel transportChannel = remoteConnection.getTransportChannel();
578 if (transportChannel instanceof CompositeTransportChannel) {
579 CompositeTransportChannel composite = (CompositeTransportChannel) transportChannel;
580 composite.setMaximumRetries(maximumRetries);
581 composite.setFailureSleepTime(reconnectSleepTime);
582 composite.setIncrementTimeout(false);
583 }
584 transportChannel.addTransportStatusEventListener(this);
585 remoteConnection.setClientID(brokerContainer.getBroker().getBrokerName() + "_NetworkChannel");
586 remoteConnection.start();
587 BrokerInfo info = new BrokerInfo();
588 info.setBrokerName(brokerContainer.getBroker().getBrokerName());
589 info.setClusterName(brokerContainer.getBroker().getBrokerClusterName());
590 Receipt receipt = remoteConnection.syncSendRequest(info);
591 if (receipt != null) {
592 remoteBrokerName = receipt.getBrokerName();
593 remoteClusterName = receipt.getClusterName();
594 }
595 connectionAdvisor = new ConnectionAdvisor(remoteConnection);
596 connectionAdvisor.addListener(this);
597 connectionAdvisor.start();
598 if (remotePrefetchPolicy != null) {
599 remoteConnection.setPrefetchPolicy(remotePrefetchPolicy);
600 }
601 }
602 doSetConnected();
603 }
604
605 private void initializeLocal() throws JMSException {
606 String brokerName = brokerContainer.getBroker().getBrokerName();
607 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://" + brokerName);
608 factory.setTurboBoost(true);
609 factory.setJ2EEcompliant(false);
610 factory.setBrokerName(brokerName);
611 factory.setQuickClose(true);
612 factory.setInternalConnection(true);
613 localConnection = (ActiveMQConnection) factory.createConnection();
614 localConnection.start();
615 BrokerInfo info = new BrokerInfo();
616 info.setBrokerName(remoteBrokerName);
617 info.setClusterName(remoteClusterName);
618 localConnection.asyncSendPacket(info);
619 if (localPrefetchPolicy != null) {
620 localConnection.setPrefetchPolicy(localPrefetchPolicy);
621 }
622 }
623
624 /*private synchronized void releaseRemote() throws JMSException {
625 if (remoteConnection != null) {
626 TransportChannel transportChannel = remoteConnection.getTransportChannel();
627 transportChannel.stop();
628 if (connectionAdvisor != null) {
629 connectionAdvisor.stop();
630 }
631 try {
632 remoteConnection.stop();
633 } catch (JMSException e) {
634 // ignore this exception, since the remote broker is most likely down
635 }
636 remoteConnection = null;
637 }
638 }*/
639
640 }