001 /**
002 *
003 * Copyright 2004 Hiram Chirino
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.ra;
019
020 import java.io.PrintWriter;
021 import java.util.ArrayList;
022 import java.util.Iterator;
023
024 import javax.jms.Connection;
025 import javax.jms.ExceptionListener;
026 import javax.jms.JMSException;
027 import javax.resource.ResourceException;
028 import javax.resource.spi.ConnectionEvent;
029 import javax.resource.spi.ConnectionEventListener;
030 import javax.resource.spi.ConnectionRequestInfo;
031 import javax.resource.spi.LocalTransaction;
032 import javax.resource.spi.ManagedConnection;
033 import javax.resource.spi.ManagedConnectionMetaData;
034 import javax.security.auth.Subject;
035 import javax.transaction.xa.XAResource;
036
037 import org.apache.commons.logging.Log;
038 import org.apache.commons.logging.LogFactory;
039 import org.activemq.ActiveMQConnection;
040 import org.activemq.LocalTransactionEventListener;
041 import org.activemq.TransactionContext;
042
043 /**
044 * ActiveMQManagedConnection maps to real physical connection to the
045 * server. Since a ManagedConnection has to provide a transaction
046 * managment interface to the physical connection, and sessions
047 * are the objects implement transaction managment interfaces in
048 * the JMS API, this object also maps to a singe physical JMS session.
049 * <p/>
050 * The side-effect is that JMS connection the application gets
051 * will allways create the same session object. This is good if
052 * running in an app server since the sessions are elisted in the
053 * context transaction. This is bad if used outside of an app
054 * server since the user may be trying to create 2 different
055 * sessions to coordinate 2 different uow.
056 *
057 * @version $Revision: 1.1.1.1 $
058 */
059 public class ActiveMQManagedConnection implements ManagedConnection, ExceptionListener { // TODO: , DissociatableManagedConnection {
060
061 private static final Log log = LogFactory.getLog(ActiveMQManagedConnection.class);
062
063 private PrintWriter logWriter;
064
065 private final ActiveMQConnection physicalConnection;
066 private final TransactionContext transactionContext;
067 private final ArrayList proxyConnections = new ArrayList();
068 private final ArrayList listeners = new ArrayList();
069 private final LocalAndXATransaction localAndXATransaction;
070
071 private Subject subject;
072 private ActiveMQConnectionRequestInfo info;
073 private boolean destoryed;
074
075 public ActiveMQManagedConnection(Subject subject, ActiveMQConnection physicalConnection, ActiveMQConnectionRequestInfo info) throws ResourceException {
076 try {
077 this.subject = subject;
078 this.info = info;
079 this.physicalConnection = physicalConnection;
080 this.transactionContext = new TransactionContext(physicalConnection);
081
082 this.localAndXATransaction = new LocalAndXATransaction(transactionContext) {
083 public void setInManagedTx(boolean inManagedTx) throws JMSException {
084 super.setInManagedTx(inManagedTx);
085 Iterator iterator = proxyConnections.iterator();
086 while (iterator.hasNext()) {
087 JMSConnectionProxy proxy = (JMSConnectionProxy) iterator.next();
088 proxy.setUseSharedTxContext(inManagedTx);
089 }
090 }
091 };
092
093 this.transactionContext.setLocalTransactionEventListener( new LocalTransactionEventListener() {
094 public void beginEvent() {
095 fireBeginEvent();
096 }
097 public void commitEvent() {
098 fireCommitEvent();
099 }
100 public void rollbackEvent() {
101 fireRollbackEvent();
102 }
103 });
104
105 physicalConnection.setExceptionListener(this);
106 } catch (JMSException e) {
107 throw new ResourceException("Could not create a new connection: "+e.getMessage(), e);
108 }
109 }
110
111 public boolean isInManagedTx() {
112 return localAndXATransaction.isInManagedTx();
113 }
114
115 static public boolean matches(Object x, Object y) {
116 if (x == null ^ y == null) {
117 return false;
118 }
119 if (x != null && !x.equals(y)) {
120 return false;
121 }
122 return true;
123 }
124
125 public void associate(Subject subject, ActiveMQConnectionRequestInfo info) throws JMSException {
126
127 // Do we need to change the associated userid/password
128 if( !matches(info.getUserName(), this.info.getUserName()) || !matches(info.getPassword(), this.info.getPassword()) ) {
129 ((ActiveMQConnection)physicalConnection).changeUserInfo(info.getUserName(), info.getPassword());
130 }
131
132 // Do we need to set the clientId?
133 if( info.getClientid()!=null && info.getClientid().length()>0 )
134 physicalConnection.setClientID(info.getClientid());
135
136 this.subject = subject;
137 this.info = info;
138 }
139
140 public Connection getPhysicalConnection() {
141 return physicalConnection;
142 }
143
144 private void fireBeginEvent() {
145 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this,
146 ConnectionEvent.LOCAL_TRANSACTION_STARTED);
147 Iterator iterator = listeners.iterator();
148 while (iterator.hasNext()) {
149 ConnectionEventListener l = (ConnectionEventListener) iterator.next();
150 l.localTransactionStarted(event);
151 }
152 }
153
154 private void fireCommitEvent() {
155 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this,
156 ConnectionEvent.LOCAL_TRANSACTION_COMMITTED);
157 Iterator iterator = listeners.iterator();
158 while (iterator.hasNext()) {
159 ConnectionEventListener l = (ConnectionEventListener) iterator.next();
160 l.localTransactionCommitted(event);
161 }
162 }
163
164 private void fireRollbackEvent() {
165 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this,
166 ConnectionEvent.LOCAL_TRANSACTION_ROLLEDBACK);
167 Iterator iterator = listeners.iterator();
168 while (iterator.hasNext()) {
169 ConnectionEventListener l = (ConnectionEventListener) iterator.next();
170 l.localTransactionRolledback(event);
171 }
172 }
173
174 private void fireCloseEvent(JMSConnectionProxy proxy) {
175 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this,
176 ConnectionEvent.CONNECTION_CLOSED);
177 event.setConnectionHandle(proxy);
178
179 Iterator iterator = listeners.iterator();
180 while (iterator.hasNext()) {
181 ConnectionEventListener l = (ConnectionEventListener) iterator.next();
182 l.connectionClosed(event);
183 }
184 }
185
186 private void fireErrorOccurredEvent(Exception error) {
187 ConnectionEvent event = new ConnectionEvent(ActiveMQManagedConnection.this,
188 ConnectionEvent.CONNECTION_ERROR_OCCURRED, error);
189 Iterator iterator = listeners.iterator();
190 while (iterator.hasNext()) {
191 ConnectionEventListener l = (ConnectionEventListener) iterator.next();
192 l.connectionErrorOccurred(event);
193 }
194 }
195
196 /**
197 * @see javax.resource.spi.ManagedConnection#getConnection(javax.security.auth.Subject,
198 * javax.resource.spi.ConnectionRequestInfo)
199 */
200 public Object getConnection(Subject subject, ConnectionRequestInfo info)
201 throws ResourceException {
202 JMSConnectionProxy proxy = new JMSConnectionProxy(this);
203 proxyConnections.add(proxy);
204 return proxy;
205 }
206
207 private boolean isDestroyed() {
208 return destoryed;
209 }
210
211 /**
212 * Close down the physical connection to the server.
213 *
214 * @see javax.resource.spi.ManagedConnection#destroy()
215 */
216 public void destroy() throws ResourceException {
217 // Have we allready been destroyed??
218 if (isDestroyed()) {
219 return;
220 }
221
222 cleanup();
223
224 try {
225 physicalConnection.close();
226 destoryed = true;
227 } catch (JMSException e) {
228 log.info("Error occured during close of a JMS connection.", e);
229 }
230 }
231
232 /**
233 * Cleans up all proxy handles attached to this physical connection so that
234 * they cannot be used anymore.
235 *
236 * @see javax.resource.spi.ManagedConnection#cleanup()
237 */
238 public void cleanup() throws ResourceException {
239
240 // Have we allready been destroyed??
241 if (isDestroyed()) {
242 return;
243 }
244
245 Iterator iterator = proxyConnections.iterator();
246 while (iterator.hasNext()) {
247 JMSConnectionProxy proxy = (JMSConnectionProxy) iterator.next();
248 proxy.cleanup();
249 }
250 proxyConnections.clear();
251
252 try {
253 ((ActiveMQConnection)physicalConnection).cleanup();
254 } catch (JMSException e) {
255 throw new ResourceException("Could cleanup the ActiveMQ connection: "+e, e);
256 }
257
258 }
259
260 /**
261 * @see javax.resource.spi.ManagedConnection#associateConnection(java.lang.Object)
262 */
263 public void associateConnection(Object connection) throws ResourceException {
264 throw new ResourceException("Not supported.");
265 }
266
267 /**
268 * @see javax.resource.spi.ManagedConnection#addConnectionEventListener(javax.resource.spi.ConnectionEventListener)
269 */
270 public void addConnectionEventListener(ConnectionEventListener listener) {
271 listeners.add(listener);
272 }
273
274 /**
275 * @see javax.resource.spi.ManagedConnection#removeConnectionEventListener(javax.resource.spi.ConnectionEventListener)
276 */
277 public void removeConnectionEventListener(ConnectionEventListener listener) {
278 listeners.remove(listener);
279 }
280
281 /**
282 * @see javax.resource.spi.ManagedConnection#getXAResource()
283 */
284 public XAResource getXAResource() throws ResourceException {
285 return localAndXATransaction;
286 }
287
288 /**
289 * @see javax.resource.spi.ManagedConnection#getLocalTransaction()
290 */
291 public LocalTransaction getLocalTransaction() throws ResourceException {
292 return localAndXATransaction;
293 }
294
295 /**
296 * @see javax.resource.spi.ManagedConnection#getMetaData()
297 */
298 public ManagedConnectionMetaData getMetaData() throws ResourceException {
299 return new ManagedConnectionMetaData() {
300
301 public String getEISProductName() throws ResourceException {
302 if (physicalConnection == null) {
303 throw new ResourceException("Not connected.");
304 }
305 try {
306 return physicalConnection.getMetaData().getJMSProviderName();
307 }
308 catch (JMSException e) {
309 throw new ResourceException("Error accessing provider.", e);
310 }
311 }
312
313 public String getEISProductVersion() throws ResourceException {
314 if (physicalConnection == null) {
315 throw new ResourceException("Not connected.");
316 }
317 try {
318 return physicalConnection.getMetaData().getProviderVersion();
319 }
320 catch (JMSException e) {
321 throw new ResourceException("Error accessing provider.", e);
322 }
323 }
324
325 public int getMaxConnections() throws ResourceException {
326 if (physicalConnection == null) {
327 throw new ResourceException("Not connected.");
328 }
329 return Integer.MAX_VALUE;
330 }
331
332 public String getUserName() throws ResourceException {
333 if (physicalConnection == null) {
334 throw new ResourceException("Not connected.");
335 }
336 try {
337 return physicalConnection.getClientID();
338 }
339 catch (JMSException e) {
340 throw new ResourceException("Error accessing provider.", e);
341 }
342 }
343 };
344 }
345
346 /**
347 * @see javax.resource.spi.ManagedConnection#setLogWriter(java.io.PrintWriter)
348 */
349 public void setLogWriter(PrintWriter logWriter) throws ResourceException {
350 this.logWriter = logWriter;
351 }
352
353 /**
354 * @see javax.resource.spi.ManagedConnection#getLogWriter()
355 */
356 public PrintWriter getLogWriter() throws ResourceException {
357 return logWriter;
358 }
359
360 /**
361 * @param subject
362 * @param info
363 * @return
364 */
365 public boolean matches(Subject subject, ConnectionRequestInfo info) {
366
367 // Check to see if it is our info class
368 if (info == null) {
369 return false;
370 }
371 if (info.getClass() != ActiveMQConnectionRequestInfo.class) {
372 return false;
373 }
374
375 // Do the subjects match?
376 if (subject == null ^ this.subject == null) {
377 return false;
378 }
379 if (subject != null && !subject.equals(this.subject)) {
380 return false;
381 }
382
383 // Does the info match?
384 return info.equals(this.info);
385 }
386
387 /**
388 * When a proxy is closed this cleans up the proxy and notifys the
389 * ConnectionEventListeners that a connection closed.
390 *
391 * @param proxy
392 */
393 public void proxyClosedEvent(JMSConnectionProxy proxy) {
394 proxyConnections.remove(proxy);
395 proxy.cleanup();
396 fireCloseEvent(proxy);
397 }
398
399 public void onException(JMSException e) {
400 log.warn("Connection failed: "+e);
401 log.debug("Cause: ", e);
402
403 // Let any active proxy connections know that exception occured.
404 for (Iterator iter = proxyConnections.iterator(); iter.hasNext();) {
405 JMSConnectionProxy proxy = (JMSConnectionProxy) iter.next();
406 proxy.onException(e);
407 }
408 // Let the container know that the error occured.
409 fireErrorOccurredEvent(e);
410 }
411
412 /**
413 * @return Returns the transactionContext.
414 */
415 public TransactionContext getTransactionContext() {
416 return transactionContext;
417 }
418
419 }