001 /**
002 *
003 * Copyright 2004 Hiram Chirino
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.ra;
019
020 import java.util.HashMap;
021
022 import javax.jms.Connection;
023 import javax.jms.JMSException;
024 import javax.jms.XAConnection;
025 import javax.jms.XASession;
026 import javax.resource.NotSupportedException;
027 import javax.resource.ResourceException;
028 import javax.resource.spi.ActivationSpec;
029 import javax.resource.spi.BootstrapContext;
030 import javax.resource.spi.ResourceAdapter;
031 import javax.resource.spi.ResourceAdapterInternalException;
032 import javax.resource.spi.endpoint.MessageEndpointFactory;
033 import javax.transaction.xa.XAResource;
034
035 import org.apache.commons.logging.Log;
036 import org.apache.commons.logging.LogFactory;
037 import org.activemq.ActiveMQConnection;
038 import org.activemq.ActiveMQConnectionFactory;
039 import org.activemq.XmlConfigHelper;
040 import org.activemq.broker.BrokerContainer;
041 import org.activemq.broker.BrokerContainerFactory;
042 import org.activemq.broker.BrokerContext;
043 import org.activemq.util.IdGenerator;
044
045 /**
046 * Knows how to connect to one ActiveMQ server. It can then activate endpoints
047 * and deliver messages to those enpoints using the connection configure in the
048 * resource adapter. <p/>Must override equals and hashCode (JCA spec 16.4)
049 *
050 * @version $Revision: 1.2 $
051 */
052 public class ActiveMQResourceAdapter implements ResourceAdapter {
053 private static final Log log = LogFactory.getLog(ActiveMQResourceAdapter.class);
054
055 private static final String ASF_ENDPOINT_WORKER_TYPE = "asf";
056
057 private static final String POLLING_ENDPOINT_WORKER_TYPE = "polling";
058
059 private BootstrapContext bootstrapContext;
060
061 private HashMap endpointWorkers = new HashMap();
062
063 final private ActiveMQConnectionRequestInfo info = new ActiveMQConnectionRequestInfo();
064
065 private String endpointWorkerType = ASF_ENDPOINT_WORKER_TYPE;
066
067 private ActiveMQConnectionFactory connectionFactory;
068
069 private BrokerContainer container;
070
071 private Boolean useEmbeddedBroker;
072 private String brokerXmlConfig;
073
074 private HashMap connectionFactoryMap = new HashMap(1);
075
076 public ActiveMQResourceAdapter() {
077 }
078
079 /**
080 * @see javax.resource.spi.ResourceAdapter#start(javax.resource.spi.BootstrapContext)
081 */
082 public void start(BootstrapContext bootstrapContext) throws ResourceAdapterInternalException {
083 this.bootstrapContext = bootstrapContext;
084 if (isUseEmbeddedBroker() != null && isUseEmbeddedBroker().booleanValue()) {
085 createBroker();
086 }
087 }
088
089 private void createBroker() throws ResourceAdapterInternalException {
090 try {
091 BrokerContainerFactory brokerContainerFactory = XmlConfigHelper.createBrokerContainerFactory(getBrokerXmlConfig());
092
093 IdGenerator idgen = new IdGenerator();
094 container = brokerContainerFactory.createBrokerContainer(idgen.generateId(), BrokerContext.getInstance());
095 container.start();
096 connectionFactory = new ActiveMQConnectionFactory(container, getServerUrl());
097 } catch (JMSException e) {
098 log.error(e.toString(), e);
099 throw new ResourceAdapterInternalException("Failed to startup an embedded broker", e);
100 }
101 }
102
103 /**
104 * Return a connection using the default connection request info from the RAR
105 * deployment.
106 */
107 public ActiveMQConnection makeConnection() throws JMSException {
108 return makeConnection(info);
109 }
110
111 /**
112 * Return a connection using a specific connection request info.
113 */
114 public ActiveMQConnection makeConnection(ActiveMQConnectionRequestInfo crInfo) throws JMSException {
115
116 ActiveMQConnectionFactory connectionFactory = getConnectionFactory(crInfo);
117
118 String userName = info.getUserName();
119 String password = info.getPassword();
120 ActiveMQConnection physicalConnection = (ActiveMQConnection) connectionFactory.createConnection(userName, password);
121
122 String clientId = info.getClientid();
123 if (clientId != null) {
124 physicalConnection.setClientID(clientId);
125 }
126 return physicalConnection;
127 }
128
129 /**
130 * @param activationSpec
131 */
132 public ActiveMQConnection makeConnection(ActiveMQActivationSpec activationSpec) throws JMSException {
133 //use the default RA connection request for info
134 ActiveMQConnectionFactory connectionFactory = getConnectionFactory(info);
135 String userName = defaultValue(activationSpec.getUserName(), info.getUserName());
136 String password = defaultValue(activationSpec.getPassword(), info.getPassword());
137 ActiveMQConnection physicalConnection = (ActiveMQConnection) connectionFactory.createConnection(userName, password);
138 if (activationSpec.isDurableSubscription()) {
139 physicalConnection.setClientID(activationSpec.getClientId());
140 }
141 return physicalConnection;
142 }
143
144 /**
145 * Returns a connection factory given a connection configuration.
146 * The implementation of this method treats the factories as singletons
147 * only creating one factory for a given set of configuration data.
148 */
149 private ActiveMQConnectionFactory getConnectionFactory(ActiveMQConnectionRequestInfo crInfo) {
150 //use adapter default if none provided
151 if(crInfo == null) {
152 crInfo = info;
153 }
154
155 if(!(connectionFactoryMap.containsKey(crInfo))) {
156 //slightly possible the factory can be set twice here
157 //but highly unlikely and no real functional impact
158 //other than an extra reference
159 synchronized(connectionFactoryMap) {
160 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(crInfo.getServerUrl());
161 connectionFactoryMap.put(crInfo, factory);
162 return factory;
163 }
164 }
165 return (ActiveMQConnectionFactory)connectionFactoryMap.get(crInfo);
166 }
167
168 private String defaultValue(String value, String defaultValue) {
169 if (value != null)
170 return value;
171 return defaultValue;
172 }
173
174 /**
175 * @see javax.resource.spi.ResourceAdapter#stop()
176 */
177 public void stop() {
178 while (endpointWorkers.size() > 0) {
179 ActiveMQEndpointActivationKey key = (ActiveMQEndpointActivationKey) endpointWorkers.keySet().iterator().next();
180 endpointDeactivation(key.getMessageEndpointFactory(), key.getActivationSpec());
181 }
182 stopBroker();
183 this.bootstrapContext = null;
184 }
185
186 private void stopBroker() {
187 if (container != null) {
188 try {
189 container.stop();
190 } catch (JMSException e) {
191 log.warn("Exception while stopping the broker container", e);
192 }
193 }
194 }
195
196 /**
197 * @return
198 */
199 public BootstrapContext getBootstrapContext() {
200 return bootstrapContext;
201 }
202
203 /**
204 * @see javax.resource.spi.ResourceAdapter#endpointActivation(javax.resource.spi.endpoint.MessageEndpointFactory,
205 * javax.resource.spi.ActivationSpec)
206 */
207 public void endpointActivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec)
208 throws ResourceException {
209
210 // spec section 5.3.3
211 if (activationSpec.getResourceAdapter() != this) {
212 throw new ResourceException("Activation spec not initialized with this ResourceAdapter instance");
213 }
214
215 if (activationSpec.getClass().equals(ActiveMQActivationSpec.class)) {
216
217 ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory,
218 (ActiveMQActivationSpec) activationSpec);
219 // This is weird.. the same endpoint activated twice.. must be a
220 // container error.
221 if (endpointWorkers.containsKey(key)) {
222 throw new IllegalStateException("Endpoint previously activated");
223 }
224
225 ActiveMQBaseEndpointWorker worker;
226 if (POLLING_ENDPOINT_WORKER_TYPE.equals(getEndpointWorkerType())) {
227 worker = new ActiveMQPollingEndpointWorker(this, key);
228 } else if (ASF_ENDPOINT_WORKER_TYPE.equals(getEndpointWorkerType())) {
229 worker = new ActiveMQAsfEndpointWorker(this, key);
230 } else {
231 throw new NotSupportedException("That type of EndpointWorkerType is not supported: "
232 + getEndpointWorkerType());
233 }
234
235 endpointWorkers.put(key, worker);
236 worker.start();
237
238 } else {
239 throw new NotSupportedException("That type of ActicationSpec not supported: " + activationSpec.getClass());
240 }
241
242 }
243
244 /**
245 * @see javax.resource.spi.ResourceAdapter#endpointDeactivation(javax.resource.spi.endpoint.MessageEndpointFactory,
246 * javax.resource.spi.ActivationSpec)
247 */
248 public void endpointDeactivation(MessageEndpointFactory endpointFactory, ActivationSpec activationSpec) {
249
250 if (activationSpec.getClass().equals(ActiveMQActivationSpec.class)) {
251 ActiveMQEndpointActivationKey key = new ActiveMQEndpointActivationKey(endpointFactory,
252 (ActiveMQActivationSpec) activationSpec);
253 ActiveMQBaseEndpointWorker worker = (ActiveMQBaseEndpointWorker) endpointWorkers.remove(key);
254 if (worker == null) {
255 // This is weird.. that endpoint was not activated.. oh well..
256 // this method
257 // does not throw exceptions so just return.
258 return;
259 }
260 try {
261 worker.stop();
262 } catch (InterruptedException e) {
263 // We interrupted.. we won't throw an exception but will stop
264 // waiting for the worker
265 // to stop.. we tried our best. Keep trying to interrupt the
266 // thread.
267 Thread.currentThread().interrupt();
268 }
269
270 }
271
272 }
273
274 /**
275 * We only connect to one resource manager per ResourceAdapter instance, so
276 * any ActivationSpec will return the same XAResource.
277 *
278 * @see javax.resource.spi.ResourceAdapter#getXAResources(javax.resource.spi.ActivationSpec[])
279 */
280 public XAResource[] getXAResources(ActivationSpec[] activationSpecs) throws ResourceException {
281 Connection connection = null;
282 try {
283 connection = makeConnection();
284 if (connection instanceof XAConnection) {
285 XASession session = ((XAConnection) connection).createXASession();
286 XAResource xaResource = session.getXAResource();
287 return new XAResource[] { xaResource };
288 } else {
289 return new XAResource[] {};
290 }
291 } catch (JMSException e) {
292 throw new ResourceException(e);
293 } finally {
294 try {
295 connection.close();
296 } catch (Throwable ignore) {
297 }
298 }
299 }
300
301 // ///////////////////////////////////////////////////////////////////////
302 //
303 // Java Bean getters and setters for this ResourceAdapter class.
304 //
305 // ///////////////////////////////////////////////////////////////////////
306
307 /**
308 * @return
309 */
310 public String getClientid() {
311 return emptyToNull(info.getClientid());
312 }
313
314 /**
315 * @return
316 */
317 public String getPassword() {
318 return emptyToNull(info.getPassword());
319 }
320
321 /**
322 * @return
323 */
324 public String getServerUrl() {
325 return info.getServerUrl();
326 }
327
328 /**
329 * @return
330 */
331 public String getUserName() {
332 return emptyToNull(info.getUserName());
333 }
334
335 /**
336 * @param clientid
337 */
338 public void setClientid(String clientid) {
339 info.setClientid(clientid);
340 }
341
342 /**
343 * @param password
344 */
345 public void setPassword(String password) {
346 info.setPassword(password);
347 }
348
349 /**
350 * @param url
351 */
352 public void setServerUrl(String url) {
353 info.setServerUrl(url);
354 }
355
356 /**
357 * @param userid
358 */
359 public void setUserName(String userid) {
360 info.setUserName(userid);
361 }
362
363 /**
364 * @return Returns the endpointWorkerType.
365 */
366 public String getEndpointWorkerType() {
367 return endpointWorkerType;
368 }
369
370 /**
371 * @param endpointWorkerType
372 * The endpointWorkerType to set.
373 */
374 public void setEndpointWorkerType(String endpointWorkerType) {
375 this.endpointWorkerType = endpointWorkerType.toLowerCase();
376 }
377
378 public String getBrokerXmlConfig() {
379 return brokerXmlConfig;
380 }
381
382 /**
383 * Sets the <a href="http://activemq.org/Xml+Configuration">XML
384 * configuration file </a> used to configure the ActiveMQ broker via Spring
385 * if using embedded mode.
386 *
387 * @param brokerXmlConfig
388 * is the filename which is assumed to be on the classpath unless
389 * a URL is specified. So a value of <code>foo/bar.xml</code>
390 * would be assumed to be on the classpath whereas
391 * <code>file:dir/file.xml</code> would use the file system.
392 * Any valid URL string is supported.
393 * @see #setUseEmbeddedBroker(Boolean)
394 */
395 public void setBrokerXmlConfig(String brokerXmlConfig) {
396 this.brokerXmlConfig=brokerXmlConfig;
397 }
398
399 public Boolean isUseEmbeddedBroker() {
400 return useEmbeddedBroker;
401 }
402
403 public void setUseEmbeddedBroker(Boolean useEmbeddedBroker) {
404 this.useEmbeddedBroker = useEmbeddedBroker;
405 }
406
407 /**
408 * @return Returns the info.
409 */
410 public ActiveMQConnectionRequestInfo getInfo() {
411 return info;
412 }
413
414 public boolean equals(Object o) {
415 if (this == o) {
416 return true;
417 }
418 if (!(o instanceof ActiveMQResourceAdapter)) {
419 return false;
420 }
421
422 final ActiveMQResourceAdapter activeMQResourceAdapter = (ActiveMQResourceAdapter) o;
423
424 if (!endpointWorkerType.equals(activeMQResourceAdapter.endpointWorkerType)) {
425 return false;
426 }
427 if (!info.equals(activeMQResourceAdapter.info)) {
428 return false;
429 }
430 if ( notEqual(useEmbeddedBroker, activeMQResourceAdapter.useEmbeddedBroker) ) {
431 return false;
432 }
433 if ( notEqual(brokerXmlConfig, activeMQResourceAdapter.brokerXmlConfig) ) {
434 return false;
435 }
436
437 return true;
438 }
439
440 private boolean notEqual(Object o1, Object o2) {
441 return (o1 == null ^ o2 == null) || (o1 != null && !o1.equals(o2));
442 }
443
444
445 public int hashCode() {
446 int result;
447 result = info.hashCode();
448 result = 29 * result + endpointWorkerType.hashCode();
449 if (useEmbeddedBroker != null && useEmbeddedBroker.booleanValue()) {
450 result = result * 29 + 1;
451 }
452 if( brokerXmlConfig !=null ) {
453 result ^= brokerXmlConfig.hashCode();
454 }
455 return result;
456 }
457
458 private String emptyToNull(String value) {
459 if (value == null || value.length() == 0) {
460 return null;
461 }
462 return value;
463 }
464
465 public Boolean getUseEmbeddedBroker() {
466 return useEmbeddedBroker;
467 }
468
469 public Boolean getUseInboundSession() {
470 return info.getUseInboundSession();
471 }
472
473 public void setUseInboundSession(Boolean useInboundSession) {
474 info.setUseInboundSession(useInboundSession);
475 }
476
477 }