001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.gbean;
019
020 import org.activemq.ActiveMQConnectionFactory;
021 import org.activemq.broker.BrokerConnector;
022 import org.activemq.broker.impl.BrokerConnectorImpl;
023 import org.activemq.io.WireFormat;
024 import org.activemq.io.impl.DefaultWireFormat;
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027 import org.apache.geronimo.gbean.GBeanInfo;
028 import org.apache.geronimo.gbean.GBeanInfoBuilder;
029 import org.apache.geronimo.gbean.GBeanLifecycle;
030 import org.apache.geronimo.gbean.GConstructorInfo;
031
032 import javax.jms.JMSException;
033 import java.net.InetSocketAddress;
034 import java.net.URI;
035 import java.net.URISyntaxException;
036
037 /**
038 * Default implementation of the ActiveMQ connector
039 *
040 * @version $Revision: 1.1.1.1 $
041 */
042 public class ActiveMQConnectorGBean implements GBeanLifecycle, ActiveMQConnector {
043 private Log log = LogFactory.getLog(getClass().getName());
044
045 private BrokerConnector brokerConnector;
046 private ActiveMQContainer container;
047 private WireFormat wireFormat = new DefaultWireFormat();
048 private String protocol;
049 private String host;
050 private int port;
051 private String path;
052 private String query;
053 private String urlAsStarted;
054
055 public ActiveMQConnectorGBean(ActiveMQContainer container, String protocol, String host, int port) {
056 this.container = container;
057 this.protocol = protocol;
058 this.host = host;
059 this.port = port;
060 }
061
062 public String getProtocol() {
063 return protocol;
064 }
065
066 public void setProtocol(String protocol) {
067 this.protocol = protocol;
068 }
069
070 public String getHost() {
071 return host;
072 }
073
074 public void setHost(String host) {
075 this.host = host;
076 }
077
078 public int getPort() {
079 return port;
080 }
081
082 public void setPort(int port) {
083 this.port = port;
084 }
085
086 public String getPath() {
087 return path;
088 }
089
090 public void setPath(String path) {
091 this.path = path;
092 }
093
094 public String getQuery() {
095 return query;
096 }
097
098 public void setQuery(String query) {
099 this.query = query;
100 }
101
102 public String getUrl() {
103 try {
104 return new URI(protocol, null, host, port, path, query, null).toString();
105 } catch (URISyntaxException e) {
106 throw new IllegalStateException("Attributes don't form a valid URI: "+protocol+"://"+host+":"+port+"/"+path+"?"+query);
107 }
108 }
109
110 public WireFormat getWireFormat() {
111 return wireFormat;
112 }
113
114 public void setWireFormat(WireFormat wireFormat) {
115 this.wireFormat = wireFormat;
116 }
117
118 public InetSocketAddress getListenAddress() {
119 return brokerConnector == null ? null : brokerConnector.getServerChannel().getSocketAddress();
120 }
121
122 public synchronized void doStart() throws Exception {
123 ClassLoader old = Thread.currentThread().getContextClassLoader();
124 Thread.currentThread().setContextClassLoader(ActiveMQContainerGBean.class.getClassLoader());
125 try {
126 if (brokerConnector == null) {
127 urlAsStarted = getUrl();
128 brokerConnector = createBrokerConnector(urlAsStarted);
129 brokerConnector.start();
130 ActiveMQConnectionFactory.registerBroker(urlAsStarted, brokerConnector);
131 }
132 } finally {
133 Thread.currentThread().setContextClassLoader(old);
134 }
135 }
136
137 public synchronized void doStop() throws Exception {
138 if (brokerConnector != null) {
139 ActiveMQConnectionFactory.unregisterBroker(urlAsStarted);
140 BrokerConnector temp = brokerConnector;
141 brokerConnector = null;
142 temp.stop();
143 }
144 }
145
146 public synchronized void doFail() {
147 if (brokerConnector != null) {
148 BrokerConnector temp = brokerConnector;
149 brokerConnector = null;
150 try {
151 temp.stop();
152 }
153 catch (JMSException e) {
154 log.info("Caught while closing due to failure: " + e, e);
155 }
156 }
157 }
158
159 protected BrokerConnector createBrokerConnector(String url) throws Exception {
160 return new BrokerConnectorImpl(container.getBrokerContainer(), url, wireFormat);
161 }
162
163 public static final GBeanInfo GBEAN_INFO;
164
165 static {
166 GBeanInfoBuilder infoFactory = GBeanInfoBuilder.createStatic("ActiveMQ Message Broker Connector", ActiveMQConnectorGBean.class, CONNECTOR_J2EE_TYPE);
167 infoFactory.addAttribute("url", String.class.getName(), false);
168 infoFactory.addAttribute("wireFormat", WireFormat.class.getName(), false);
169 infoFactory.addReference("activeMQContainer", ActiveMQContainer.class);
170 infoFactory.addInterface(ActiveMQConnector.class, new String[]{"host","port","protocol","path","query"},
171 new String[]{"host","port"});
172 infoFactory.setConstructor(new GConstructorInfo(new String[]{"activeMQContainer", "protocol", "host", "port"}));
173 GBEAN_INFO = infoFactory.getBeanInfo();
174 }
175
176 public static GBeanInfo getGBeanInfo() {
177 return GBEAN_INFO;
178 }
179 }