001 /**
002 *
003 * Copyright 2004 Hiram Chirino
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.ra;
019
020 import javax.jms.ConnectionConsumer;
021 import javax.jms.ExceptionListener;
022 import javax.jms.JMSException;
023 import javax.jms.Session;
024 import javax.jms.Topic;
025 import javax.resource.ResourceException;
026 import javax.resource.spi.work.Work;
027 import javax.resource.spi.work.WorkException;
028 import javax.resource.spi.work.WorkManager;
029
030 import org.activemq.ActiveMQConnection;
031 import org.activemq.message.ActiveMQDestination;
032 import org.activemq.message.ActiveMQQueue;
033 import org.activemq.message.ActiveMQTopic;
034 import org.apache.commons.logging.Log;
035 import org.apache.commons.logging.LogFactory;
036
037 /**
038 * @version $Revision: 1.1.1.1 $ $Date: 2005/03/11 21:15:09 $
039 */
040 public class ActiveMQAsfEndpointWorker extends ActiveMQBaseEndpointWorker {
041
042 private static final Log log = LogFactory.getLog(ActiveMQAsfEndpointWorker.class);
043
044 private static final long INITIAL_RECONNECT_DELAY = 1000; // 1 second.
045 private static final long MAX_RECONNECT_DELAY = 1000*30; // 30 seconds.
046 private static final ThreadLocal threadLocal = new ThreadLocal();
047
048 private ConnectionConsumer consumer;
049 private ServerSessionPoolImpl serverSessionPool;
050 private ActiveMQDestination dest;
051 private boolean running;
052 private Work connectWork;
053 protected ActiveMQConnection connection;
054
055 private long reconnectDelay=INITIAL_RECONNECT_DELAY;
056
057 /**
058 * @param adapter
059 * @param key
060 * @throws ResourceException
061 */
062 public ActiveMQAsfEndpointWorker(final ActiveMQResourceAdapter adapter, ActiveMQEndpointActivationKey key)
063 throws ResourceException {
064 super(adapter, key);
065
066 connectWork = new Work() {
067
068 public void release() {
069 }
070
071 synchronized public void run() {
072 if( !isRunning() )
073 return;
074 if( connection!=null )
075 return;
076
077 ActiveMQActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
078 try {
079 connection = adapter.makeConnection(activationSpec);
080 connection.start();
081 connection.setExceptionListener(new ExceptionListener() {
082 public void onException(JMSException error) {
083 reconnect(error);
084 }
085 });
086
087 if (activationSpec.isDurableSubscription()) {
088 consumer = connection.createDurableConnectionConsumer(
089 (Topic) dest,
090 activationSpec.getSubscriptionName(),
091 emptyToNull(activationSpec.getMessageSelector()),
092 serverSessionPool,
093 activationSpec.getMaxMessagesPerSessionsIntValue(),
094 activationSpec.getNoLocalBooleanValue());
095 } else {
096 consumer = connection.createConnectionConsumer(
097 dest,
098 emptyToNull(activationSpec.getMessageSelector()),
099 serverSessionPool,
100 activationSpec.getMaxMessagesPerSessionsIntValue(),
101 activationSpec.getNoLocalBooleanValue());
102 }
103
104 } catch (JMSException error) {
105 reconnect(error);
106 }
107 }
108 };
109
110 ActiveMQActivationSpec activationSpec = endpointActivationKey.getActivationSpec();
111 if ("javax.jms.Queue".equals(activationSpec.getDestinationType())) {
112 dest = new ActiveMQQueue(activationSpec.getDestination());
113 } else if ("javax.jms.Topic".equals(activationSpec.getDestinationType())) {
114 dest = new ActiveMQTopic(activationSpec.getDestination());
115 } else {
116 throw new ResourceException("Unknown destination type: " + activationSpec.getDestinationType());
117 }
118 }
119
120 synchronized public void start() throws WorkException, ResourceException {
121 if (running)
122 return;
123 running = true;
124
125 log.debug("Starting");
126 serverSessionPool = new ServerSessionPoolImpl(this, endpointActivationKey.getActivationSpec().getMaxSessionsIntValue());
127 connect();
128 log.debug("Started");
129 }
130
131 /**
132 *
133 */
134 synchronized public void stop() throws InterruptedException {
135 if (!running)
136 return;
137 running = false;
138 serverSessionPool.close();
139 disconnect();
140 }
141
142 private boolean isRunning() {
143 return running;
144 }
145
146 synchronized private void connect() {
147 if (!running)
148 return;
149
150 try {
151 workManager.scheduleWork(connectWork, WorkManager.INDEFINITE, null, null);
152 } catch (WorkException e) {
153 running = false;
154 log.error("Work Manager did not accept work: ",e);
155 }
156 }
157
158 /**
159 *
160 */
161 synchronized private void disconnect() {
162 safeClose(consumer);
163 consumer=null;
164 safeClose(connection);
165 connection=null;
166 }
167
168 private void reconnect(JMSException error){
169 log.debug("Reconnect cause: ",error);
170 long reconnectDelay;
171 synchronized(this) {
172 reconnectDelay = this.reconnectDelay;
173 // Only log errors if the server is really down.. And not a temp failure.
174 if (reconnectDelay == MAX_RECONNECT_DELAY) {
175 log.info("Endpoint connection to JMS broker failed: " + error.getMessage());
176 log.info("Endpoint will try to reconnect to the JMS broker in "+(MAX_RECONNECT_DELAY/1000)+" seconds");
177 }
178 }
179 try {
180 disconnect();
181 Thread.sleep(reconnectDelay);
182
183 synchronized(this) {
184 // Use exponential rollback.
185 this.reconnectDelay*=2;
186 if (this.reconnectDelay > MAX_RECONNECT_DELAY)
187 this.reconnectDelay=MAX_RECONNECT_DELAY;
188 }
189 connect();
190 } catch(InterruptedException e) {}
191 }
192
193 protected void registerThreadSession(Session session) {
194 threadLocal.set(session);
195 }
196
197 protected void unregisterThreadSession(Session session) {
198 threadLocal.set(null);
199 }
200
201 private String emptyToNull(String value) {
202 if (value == null || value.length() == 0) {
203 return null;
204 }
205 return value;
206 }
207
208 }