001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.transport.remote;
020
021 import javax.jms.JMSException;
022
023 import org.activemq.broker.BrokerConnector;
024 import org.activemq.broker.BrokerContainer;
025 import org.activemq.broker.impl.BrokerConnectorImpl;
026 import org.activemq.broker.impl.BrokerContainerImpl;
027 import org.activemq.io.WireFormat;
028 import org.activemq.transport.NetworkChannel;
029 import org.activemq.transport.NetworkConnector;
030 import org.activemq.transport.RemoteNetworkConnector;
031 import org.activemq.transport.vm.VmTransportChannel;
032 import org.apache.commons.logging.Log;
033 import org.apache.commons.logging.LogFactory;
034
035 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
036
037 /**
038 * A <CODE>RemoteTransportChannel</CODE> creates an embedded broker that creates a remote connection to another
039 * broker. This connection type is designed for reliable connections, that can use the storage mechansims of an embedded
040 * broker to be decoupled from the remote broker - i.e. for connections that need to be reliable, don't block but maybe
041 * using a transport across an unreliable network connection
042 * <P>
043 *
044 * <P>
045 * An example of the expected format is: <CODE>remote://tcp://remotebroker:5060</CODE>
046 * <P>
047 *
048 * @version $Revision: 1.1.1.1 $
049 */
050 public class RemoteTransportChannel extends VmTransportChannel{
051
052 private static final Log log = LogFactory.getLog(RemoteTransportChannel.class);
053 private WireFormat wireFormat;
054 private String remoteUserName;
055 private String remotePassword;
056 private String brokerName;
057 private String remoteLocation;
058 private BrokerContainer container;
059 private NetworkConnector networkConnector;
060 private SynchronizedBoolean brokerConnectorStarted = new SynchronizedBoolean(false);
061
062 /**
063 * Construct a RemoteTransportChannel
064 *
065 * @param wireFormat
066 * @param peerURIs
067 * @throws JMSException
068 */
069 protected RemoteTransportChannel(WireFormat wireFormat,String remoteLocation) throws JMSException{
070 this.wireFormat = wireFormat;
071 this.remoteLocation = remoteLocation;
072 }
073
074 /**
075 * @return true if the transport channel is active, this value will be false through reconnecting
076 */
077 public boolean isTransportConnected(){
078 return true;
079 }
080
081 /**
082 * Some transports rely on an embedded broker (beer based protocols)
083 *
084 * @return true if an embedded broker required
085 */
086 public boolean requiresEmbeddedBroker(){
087 return true;
088 }
089
090 /**
091 * Provides a way to specify the client ID that this channel is using
092 *
093 * @param clientID
094 */
095 public void setClientID(String clientID){
096 super.setClientID(clientID);
097 if(brokerConnectorStarted.commit(false,true)){
098 if(brokerName==null){
099 brokerName = clientID;
100 }
101 if(container!=null){
102 container.getBroker().getBrokerInfo().setBrokerName(brokerName);
103 container.getBroker().getBrokerInfo().setClusterName(brokerName);
104 try{
105 container.start();
106 Thread.sleep(1000);
107
108
109 }catch(Exception e){
110 log.error("Failed to start brokerConnector",e);
111 }
112 }
113 }
114 }
115
116 public void stop(){
117 super.stop();
118 if(container!=null){
119 try{
120 container.stop();
121 }catch(JMSException e){
122 log.warn("Caught exception stopping Broker",e);
123 }
124 }
125 }
126
127 /**
128 * Some transports that rely on an embedded broker need to create the connector used by the broker
129 *
130 * @return the BrokerConnector or null if not applicable
131 * @throws JMSException
132 */
133 public BrokerConnector getEmbeddedBrokerConnector() throws JMSException{
134 try{
135 BrokerConnector brokerConnector = null;
136 if(container==null){
137
138 container = new BrokerContainerImpl("NotSet","NotSet");
139
140 networkConnector = networkConnector = new RemoteNetworkConnector(container);
141 NetworkChannel channel = networkConnector.addNetworkChannel(remoteLocation);
142 channel.setRemoteUserName(remoteUserName);
143 channel.setRemotePassword(remotePassword);
144
145 container.addNetworkConnector(networkConnector);
146 brokerConnector = new BrokerConnectorImpl(container);
147
148 }
149 return brokerConnector;
150 }catch(Exception e){
151 e.printStackTrace();
152 String errorStr = "Failed to get embedded connector";
153 log.error(errorStr,e);
154 JMSException jmsEx = new JMSException(errorStr);
155 jmsEx.setLinkedException(e);
156 throw jmsEx;
157 }
158 }
159
160 /**
161 * @return Returns the remoteLocation.
162 */
163 public String getRemoteLocation(){
164 return remoteLocation;
165 }
166
167 /**
168 * @param remoteLocation
169 * The remoteLocation to set.
170 */
171 public void setRemoteLocation(String remoteLocation){
172 this.remoteLocation = remoteLocation;
173 }
174
175 /**
176 * @return Returns the remotePassword.
177 */
178 public String getRemotePassword(){
179 return remotePassword;
180 }
181
182 /**
183 * @param remotePassword
184 * The remotePassword to set.
185 */
186 public void setRemotePassword(String remotePassword){
187 this.remotePassword = remotePassword;
188 }
189
190 /**
191 * @return Returns the remoteUserName.
192 */
193 public String getRemoteUserName(){
194 return remoteUserName;
195 }
196
197 /**
198 * @param remoteUserName
199 * The remoteUserName to set.
200 */
201 public void setRemoteUserName(String remoteUserName){
202 this.remoteUserName = remoteUserName;
203 }
204
205 /**
206 * @return Returns the wireFormat.
207 */
208 public WireFormat getWireFormat(){
209 return wireFormat;
210 }
211
212 /**
213 * @param wireFormat
214 * The wireFormat to set.
215 */
216 public void setWireFormat(WireFormat wireFormat){
217 this.wireFormat = wireFormat;
218 }
219
220 /**
221 * @return Returns the brokerName.
222 */
223 public String getBrokerName(){
224 return brokerName;
225 }
226
227 /**
228 * @param brokerName
229 * The brokerName to set.
230 */
231 public void setBrokerName(String brokerName){
232 this.brokerName = brokerName;
233 }
234
235 }