001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq;
020
021 import java.util.Iterator;
022 import java.util.List;
023
024 import javax.jms.JMSException;
025
026 import org.activemq.io.util.MemoryBoundedQueue;
027 import org.activemq.message.ActiveMQMessage;
028
029 /**
030 * A utility class used by the Session for dispatching messages asynchronously to consumers
031 *
032 * @version $Revision: 1.1.1.1 $
033 * @see javax.jms.Session
034 */
035 public class ActiveMQSessionExecutor implements Runnable {
036 private ActiveMQSession session;
037 private MemoryBoundedQueue messageQueue;
038 private boolean closed;
039 private Thread runner;
040 private boolean dispatchedBySessionPool;
041 private boolean optimizedMessageDispatch;
042
043 ActiveMQSessionExecutor(ActiveMQSession session, MemoryBoundedQueue queue) {
044 this.session = session;
045 this.messageQueue = queue;
046 }
047
048 void setDispatchedBySessionPool(boolean value) {
049 dispatchedBySessionPool = value;
050 }
051
052 /**
053 * @return Returns the optimizedMessageDispatch.
054 */
055 boolean isOptimizedMessageDispatch() {
056 return optimizedMessageDispatch;
057 }
058 /**
059 * @param optimizedMessageDispatch The optimizedMessageDispatch to set.
060 */
061 void setOptimizedMessageDispatch(boolean optimizedMessageDispatch) {
062 this.optimizedMessageDispatch = optimizedMessageDispatch;
063 }
064
065 void execute(ActiveMQMessage message) {
066 if (optimizedMessageDispatch && !dispatchedBySessionPool){
067 dispatch(message);
068 }else {
069 messageQueue.enqueue(message);
070 }
071
072 }
073
074 void executeFirst(ActiveMQMessage message) {
075 messageQueue.enqueueFirstNoBlock(message);
076 }
077
078 boolean hasUncomsumedMessages() {
079 return !messageQueue.isEmpty();
080 }
081
082 List getUnconsumedMessages() {
083 return messageQueue.getContents();
084 }
085
086 /**
087 * implementation of Runnable
088 */
089 public void run() {
090 while (!closed && !dispatchedBySessionPool) {
091 ActiveMQMessage message = null;
092 try {
093 message = (ActiveMQMessage) messageQueue.dequeue(100);
094 }
095 catch (InterruptedException ie) {
096 }
097 if (!closed) {
098 if (message != null) {
099 if (!dispatchedBySessionPool) {
100 dispatch(message);
101 }
102 else {
103 messageQueue.enqueueFirstNoBlock(message);
104 }
105 }
106 }
107 }
108 }
109
110 void dispatch(ActiveMQMessage message){
111 for (Iterator i = this.session.consumers.iterator(); i.hasNext();) {
112 ActiveMQMessageConsumer consumer = (ActiveMQMessageConsumer) i.next();
113 if (message.isConsumerTarget(consumer.getConsumerNumber())) {
114 try {
115 consumer.processMessage(message.shallowCopy());
116 }
117 catch (JMSException e) {
118 this.session.connection.handleAsyncException(e);
119 }
120 }
121 }
122 }
123
124 synchronized void start() {
125 messageQueue.start();
126 if (runner == null && (!dispatchedBySessionPool || optimizedMessageDispatch)) {
127 runner = new Thread(this, "JmsSessionDispatcher: " + session.getSessionId());
128 runner.setPriority(Thread.MAX_PRIORITY);
129 //runner.setDaemon(true);
130 runner.start();
131 }
132 }
133
134 synchronized void stop() {
135 messageQueue.stop();
136 }
137
138 synchronized void close() {
139 closed = true;
140 messageQueue.close();
141 }
142
143 void clear() {
144 messageQueue.clear();
145 }
146
147 ActiveMQMessage dequeueNoWait() {
148 try {
149 return (ActiveMQMessage) messageQueue.dequeueNoWait();
150 }
151 catch (InterruptedException ie) {
152 return null;
153 }
154 }
155
156 protected void clearMessagesInProgress(){
157 messageQueue.clear();
158 }
159
160
161 }