001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.ra;
019
020 import java.lang.reflect.Method;
021
022 import javax.jms.JMSException;
023 import javax.jms.Message;
024 import javax.jms.MessageListener;
025 import javax.jms.MessageProducer;
026 import javax.jms.ServerSession;
027 import javax.jms.Session;
028 import javax.resource.spi.endpoint.MessageEndpoint;
029 import javax.resource.spi.work.Work;
030 import javax.resource.spi.work.WorkEvent;
031 import javax.resource.spi.work.WorkException;
032 import javax.resource.spi.work.WorkListener;
033 import javax.resource.spi.work.WorkManager;
034
035 import org.activemq.ActiveMQSession;
036 import org.activemq.ActiveMQSession.DeliveryListener;
037 import org.activemq.util.JMSExceptionHelper;
038 import org.apache.commons.logging.Log;
039 import org.apache.commons.logging.LogFactory;
040
041 /**
042 * @version $Revision: 1.1.1.1 $
043 */
044 public class ServerSessionImpl implements ServerSession, SessionAndProducer, Work, DeliveryListener {
045
046 public static final Method ON_MESSAGE_METHOD;
047
048 static {
049 try {
050 ON_MESSAGE_METHOD = MessageListener.class.getMethod("onMessage", new Class[]{Message.class});
051 }
052 catch (Exception e) {
053 throw new ExceptionInInitializerError(e);
054 }
055 }
056
057 private static int nextLogId=0;
058 synchronized static private int getNextLogId() {
059 return nextLogId++;
060 }
061
062 private int serverSessionId = getNextLogId();
063 private final Log log = LogFactory.getLog( ServerSessionImpl.class.getName()+":"+serverSessionId );
064
065 private ActiveMQSession session;
066 private WorkManager workManager;
067 private MessageEndpoint endpoint;
068 private MessageProducer messageProducer;
069 private final ServerSessionPoolImpl pool;
070
071 private Object runControlMutex = new Object();
072 private boolean runningFlag = false;
073 /**
074 * True if an error was detected that cause this session to be stale. When a session
075 * is stale, it should not be used again for proccessing.
076 */
077 private boolean stale;
078 /**
079 * Does the TX commit need to be managed by the RA?
080 */
081 private final boolean useRAManagedTx;
082 /**
083 * The maximum number of messages to batch
084 */
085 private final int batchSize;
086 /**
087 * The current number of messages in the batch
088 */
089 private int currentBatchSize;
090
091 public ServerSessionImpl(ServerSessionPoolImpl pool, ActiveMQSession session, WorkManager workManager, MessageEndpoint endpoint, boolean useRAManagedTx, int batchSize) throws JMSException {
092 this.pool = pool;
093 this.session = session;
094 this.workManager = workManager;
095 this.endpoint = endpoint;
096 this.useRAManagedTx = useRAManagedTx;
097 this.session.setMessageListener((MessageListener) endpoint);
098 this.session.setDeliveryListener(this);
099 this.batchSize = batchSize;
100 }
101
102 public Session getSession() throws JMSException {
103 return session;
104 }
105
106 public MessageProducer getMessageProducer() throws JMSException {
107 if (messageProducer == null) {
108 messageProducer = getSession().createProducer(null);
109 }
110 return messageProducer;
111 }
112
113 /**
114 * @see javax.jms.ServerSession#start()
115 */
116 public void start() throws JMSException {
117
118 synchronized (runControlMutex) {
119 if (runningFlag) {
120 log.debug("Start request ignored, allready running.");
121 return;
122 }
123 runningFlag = true;
124 }
125
126 // We get here because we need to start a async worker.
127 log.debug("Starting run.");
128 try {
129 workManager.scheduleWork(this, WorkManager.INDEFINITE, null,
130 new WorkListener() {
131 //The work listener is useful only for debugging...
132 public void workAccepted(WorkEvent event) {
133 log.debug("Work accepted: " + event);
134 }
135
136 public void workRejected(WorkEvent event) {
137 log.debug("Work rejected: " + event);
138 }
139
140 public void workStarted(WorkEvent event) {
141 log.debug("Work started: " + event);
142 }
143
144 public void workCompleted(WorkEvent event) {
145 log.debug("Work completed: " + event);
146 }
147
148 });
149 }
150 catch (WorkException e) {
151 throw (JMSException) new JMSException("Start failed: " + e).initCause(e);
152 }
153 }
154
155 /**
156 * @see java.lang.Runnable#run()
157 */
158 synchronized public void run() {
159 log.debug("Running");
160 while (true) {
161 log.debug("run loop start");
162 try {
163 SessionAndProducerHelper.register(this);
164 currentBatchSize = 0;
165 session.run();
166 }
167 catch (Throwable e) {
168 stale=true;
169 log.debug("Endpoint failed to process message.", e);
170 log.info("Endpoint failed to process message. Reason: " + e);
171 }
172 finally {
173 SessionAndProducerHelper.unregister(this);
174 log.debug("run loop end");
175 synchronized (runControlMutex) {
176 // This endpoint may have gone stale due to error
177 if( stale) {
178 runningFlag = false;
179 pool.removeFromPool(this);
180 break;
181 }
182 if( !session.hasUncomsumedMessages() ) {
183 runningFlag = false;
184 pool.returnToPool(this);
185 break;
186 }
187 }
188 }
189 }
190 log.debug("Run finished");
191 }
192
193
194 /**
195 * The ActiveMQSession's run method will call back to this method before
196 * dispactching a message to the MessageListener.
197 */
198 public void beforeDelivery(ActiveMQSession session, Message msg) {
199 if (currentBatchSize == 0) {
200 try {
201 endpoint.beforeDelivery(ON_MESSAGE_METHOD);
202 } catch (Throwable e) {
203 throw new RuntimeException("Endpoint before delivery notification failure", e);
204 }
205 }
206 }
207
208 /**
209 * The ActiveMQSession's run method will call back to this method after
210 * dispactching a message to the MessageListener.
211 */
212 public void afterDelivery(ActiveMQSession session, Message msg) {
213 if (++currentBatchSize >= batchSize || !session.hasUncomsumedMessages()) {
214 currentBatchSize = 0;
215 try {
216 endpoint.afterDelivery();
217 } catch (Throwable e) {
218 throw new RuntimeException("Endpoint after delivery notification failure", e);
219 } finally {
220 if( session.getTransactionContext().isInLocalTransaction() ) {
221 if( !useRAManagedTx ) {
222 // Sanitiy Check: If the local transaction has not been commited..
223 // Commit it now.
224 log.warn("Local transaction had not been commited. Commiting now.");
225 }
226 try {
227 session.commit();
228 } catch (JMSException e) {
229 log.info("Commit failed:", e);
230 }
231 }
232 }
233 }
234 }
235
236 /**
237 * @see javax.resource.spi.work.Work#release()
238 */
239 public void release() {
240 log.debug("release called");
241 }
242
243 /**
244 * @see java.lang.Object#toString()
245 */
246 public String toString() {
247 return "ServerSessionImpl:"+serverSessionId;
248 }
249
250 public void close() {
251 try {
252 endpoint.release();
253 } catch (Throwable e) {
254 log.debug("Endpoint did not release properly: "+e,e);
255 }
256 try {
257 session.close();
258 } catch (Throwable e) {
259 log.debug("Session did not close properly: "+e,e);
260 }
261 }
262
263 }