001 /**
002 *
003 * Copyright 2004 Hiram Chirino
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.ra;
019
020 import java.util.ArrayList;
021 import java.util.Iterator;
022
023 import javax.jms.Connection;
024 import javax.jms.ConnectionConsumer;
025 import javax.jms.ConnectionMetaData;
026 import javax.jms.Destination;
027 import javax.jms.ExceptionListener;
028 import javax.jms.IllegalStateException;
029 import javax.jms.JMSException;
030 import javax.jms.Queue;
031 import javax.jms.QueueConnection;
032 import javax.jms.QueueSession;
033 import javax.jms.ServerSessionPool;
034 import javax.jms.Session;
035 import javax.jms.Topic;
036 import javax.jms.TopicConnection;
037 import javax.jms.TopicSession;
038
039 import org.activemq.ActiveMQQueueSession;
040 import org.activemq.ActiveMQSession;
041 import org.activemq.ActiveMQTopicSession;
042
043
044 /**
045 * Acts as a pass through proxy for a JMS Connection object.
046 * It intercepts events that are of interest of the ActiveMQManagedConnection.
047 *
048 * @version $Revision: 1.1.1.1 $
049 */
050 public class JMSConnectionProxy implements Connection, QueueConnection, TopicConnection, ExceptionListener {
051
052 private ActiveMQManagedConnection managedConnection;
053 private ArrayList sessions = new ArrayList();
054 private ExceptionListener exceptionListener;
055
056 public JMSConnectionProxy(ActiveMQManagedConnection managedConnection) {
057 this.managedConnection = managedConnection;
058 }
059
060 /**
061 * Used to let the ActiveMQManagedConnection that this connection
062 * handel is not needed by the app.
063 *
064 * @throws JMSException
065 */
066 public void close() throws JMSException {
067 if( managedConnection!=null ) {
068 managedConnection.proxyClosedEvent(this);
069 }
070 }
071
072 /**
073 * Called by the ActiveMQManagedConnection to invalidate this proxy.
074 */
075 public void cleanup() {
076 exceptionListener=null;
077 managedConnection = null;
078 for (Iterator iter = sessions.iterator(); iter.hasNext();) {
079 JMSSessionProxy p = (JMSSessionProxy) iter.next();
080 try {
081 p.cleanup();
082 } catch (JMSException ignore) {
083 }
084 iter.remove();
085 }
086 }
087
088 /**
089 *
090 */
091 private Connection getConnection() throws JMSException {
092 if (managedConnection == null) {
093 throw new IllegalStateException("The Connection is closed");
094 }
095 return managedConnection.getPhysicalConnection();
096 }
097
098 /**
099 * @param transacted
100 * @param acknowledgeMode
101 * @return
102 * @throws JMSException
103 */
104 public Session createSession(boolean transacted, int acknowledgeMode)
105 throws JMSException {
106 return createSessionProxy(transacted, acknowledgeMode);
107 }
108
109 /**
110 * @param acknowledgeMode
111 * @param transacted
112 * @return
113 * @throws JMSException
114 */
115 private JMSSessionProxy createSessionProxy(boolean transacted, int acknowledgeMode) throws JMSException {
116 ActiveMQSession session = (ActiveMQSession) getConnection().createSession(transacted, acknowledgeMode);
117 RATransactionContext txContext = new RATransactionContext(managedConnection.getTransactionContext());
118 session.setTransactionContext(txContext);
119 JMSSessionProxy p = new JMSSessionProxy(session);
120 p.setUseSharedTxContext(managedConnection.isInManagedTx());
121 sessions.add(p);
122 return p;
123 }
124
125 public void setUseSharedTxContext(boolean enable) throws JMSException {
126 for (Iterator iter = sessions.iterator(); iter.hasNext();) {
127 JMSSessionProxy p = (JMSSessionProxy) iter.next();
128 p.setUseSharedTxContext(enable);
129 }
130 }
131
132 /**
133 * @param transacted
134 * @param acknowledgeMode
135 * @return
136 * @throws JMSException
137 */
138 public QueueSession createQueueSession(boolean transacted,
139 int acknowledgeMode) throws JMSException {
140 return new ActiveMQQueueSession(createSessionProxy(transacted, acknowledgeMode));
141 }
142
143 /**
144 * @param transacted
145 * @param acknowledgeMode
146 * @return
147 * @throws JMSException
148 */
149 public TopicSession createTopicSession(boolean transacted,
150 int acknowledgeMode) throws JMSException {
151 return new ActiveMQTopicSession(createSessionProxy(transacted, acknowledgeMode));
152 }
153
154 /**
155 * @return
156 * @throws JMSException
157 */
158 public String getClientID() throws JMSException {
159 return getConnection().getClientID();
160 }
161
162 /**
163 * @return
164 * @throws JMSException
165 */
166 public ExceptionListener getExceptionListener() throws JMSException {
167 return getConnection().getExceptionListener();
168 }
169
170 /**
171 * @return
172 * @throws JMSException
173 */
174 public ConnectionMetaData getMetaData() throws JMSException {
175 return getConnection().getMetaData();
176 }
177
178 /**
179 * @param clientID
180 * @throws JMSException
181 */
182 public void setClientID(String clientID) throws JMSException {
183 getConnection().setClientID(clientID);
184 }
185
186 /**
187 * @param listener
188 * @throws JMSException
189 */
190 public void setExceptionListener(ExceptionListener listener)
191 throws JMSException {
192 getConnection();
193 exceptionListener = listener;
194 }
195
196 /**
197 * @throws JMSException
198 */
199 public void start() throws JMSException {
200 getConnection().start();
201 }
202
203 /**
204 * @throws JMSException
205 */
206 public void stop() throws JMSException {
207 getConnection().stop();
208 }
209
210
211 /**
212 * @param queue
213 * @param messageSelector
214 * @param sessionPool
215 * @param maxMessages
216 * @return
217 * @throws JMSException
218 */
219 public ConnectionConsumer createConnectionConsumer(Queue queue,
220 String messageSelector, ServerSessionPool sessionPool,
221 int maxMessages) throws JMSException {
222 throw new JMSException("Not Supported.");
223 }
224
225 /**
226 * @param topic
227 * @param messageSelector
228 * @param sessionPool
229 * @param maxMessages
230 * @return
231 * @throws JMSException
232 */
233 public ConnectionConsumer createConnectionConsumer(Topic topic,
234 String messageSelector, ServerSessionPool sessionPool,
235 int maxMessages) throws JMSException {
236 throw new JMSException("Not Supported.");
237 }
238
239 /**
240 * @param destination
241 * @param messageSelector
242 * @param sessionPool
243 * @param maxMessages
244 * @return
245 * @throws JMSException
246 */
247 public ConnectionConsumer createConnectionConsumer(Destination destination,
248 String messageSelector, ServerSessionPool sessionPool,
249 int maxMessages) throws JMSException {
250 throw new JMSException("Not Supported.");
251 }
252
253 /**
254 * @param topic
255 * @param subscriptionName
256 * @param messageSelector
257 * @param sessionPool
258 * @param maxMessages
259 * @return
260 * @throws JMSException
261 */
262 public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
263 String subscriptionName, String messageSelector,
264 ServerSessionPool sessionPool, int maxMessages) throws JMSException {
265 throw new JMSException("Not Supported.");
266 }
267
268 /**
269 * @return Returns the managedConnection.
270 */
271 public ActiveMQManagedConnection getManagedConnection() {
272 return managedConnection;
273 }
274
275 public void onException(JMSException e) {
276 if(exceptionListener!=null && managedConnection!=null) {
277 try {
278 exceptionListener.onException(e);
279 } catch (Throwable ignore) {
280 // We can never trust user code so ignore any exceptions.
281 }
282 }
283 }
284
285 }