001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.service.boundedvm;
020 import java.util.ArrayList;
021 import java.util.Iterator;
022 import java.util.List;
023 import java.util.Set;
024
025 import javax.jms.JMSException;
026
027 import org.apache.commons.logging.Log;
028 import org.apache.commons.logging.LogFactory;
029 import org.activemq.broker.BrokerClient;
030 import org.activemq.filter.DestinationMap;
031 import org.activemq.filter.Filter;
032 import org.activemq.io.util.MemoryBoundedQueue;
033 import org.activemq.message.ActiveMQDestination;
034 import org.activemq.message.ActiveMQMessage;
035 import org.activemq.message.ConsumerInfo;
036 import org.activemq.message.MessageAck;
037 import org.activemq.service.MessageContainer;
038 import org.activemq.service.MessageContainerAdmin;
039 import org.activemq.service.MessageIdentity;
040 import org.activemq.service.Service;
041
042 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
043 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
044 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
045
046 /**
047 * A MessageContainer for transient topics One of these exists for every active Connection consuming transient Topic
048 * messages
049 *
050 * @version $Revision: 1.1.1.1 $
051 */
052 public class TransientTopicBoundedMessageContainer
053 implements
054 MessageContainer,
055 Service,
056 Runnable,
057 MessageContainerAdmin {
058 private SynchronizedBoolean started;
059 private TransientTopicBoundedMessageManager manager;
060 private BrokerClient client;
061 private MemoryBoundedQueue queue;
062 private Thread worker;
063 private CopyOnWriteArrayList subscriptions;
064 private DestinationMap accel;
065 private ConcurrentHashMap subMap;
066 private Log log;
067
068 /**
069 * Construct this beast
070 *
071 * @param manager
072 * @param client
073 * @param queue
074 */
075 public TransientTopicBoundedMessageContainer(TransientTopicBoundedMessageManager manager, BrokerClient client,
076 MemoryBoundedQueue queue) {
077 this.manager = manager;
078 this.client = client;
079 this.queue = queue;
080 this.started = new SynchronizedBoolean(false);
081 this.subscriptions = new CopyOnWriteArrayList();
082 this.accel = new DestinationMap();
083 this.subMap = new ConcurrentHashMap(100,0.25f);
084 this.log = LogFactory.getLog("TransientTopicBoundedMessageContainer:- " + client);
085 }
086
087 /**
088 * @return true if this Container has no active subscriptions
089 */
090 public boolean isInactive() {
091 return subscriptions.isEmpty();
092 }
093
094 /**
095 * @return the BrokerClient this Container is dispatching to
096 */
097 public BrokerClient getBrokerClient() {
098 return client;
099 }
100
101 /**
102 * Add a consumer to dispatch messages to
103 *
104 * @param filter
105 * @param info
106 */
107 public TransientTopicSubscription addConsumer(Filter filter, ConsumerInfo info) {
108 TransientTopicSubscription ts = findMatch(info);
109 if (ts == null) {
110 ts = new TransientTopicSubscription(filter, info, client);
111 subscriptions.add(ts);
112 accel.put(info.getDestination(),ts);
113 subMap.put(info,ts);
114 }
115 return ts;
116 }
117
118 /**
119 * Remove a consumer
120 *
121 * @param info
122 */
123 public void removeConsumer(ConsumerInfo info) {
124 TransientTopicSubscription ts = findMatch(info);
125 if (ts != null) {
126 subscriptions.remove(ts);
127 accel.remove(info.getDestination(),ts);
128 subMap.remove(info);
129 }
130 }
131
132 /**
133 * start working
134 */
135 public void start() {
136 if (started.commit(false, true)) {
137 if (manager.isDecoupledDispatch()) {
138 worker = new Thread(this, "TransientTopicDispatcher");
139 worker.setPriority(Thread.NORM_PRIORITY + 2);
140 worker.start();
141 }
142 }
143 }
144
145 /**
146 * See if this container should get this message and dispatch it
147 *
148 * @param sender the BrokerClient the message came from
149 * @param message
150 * @return true if it is a valid container
151 * @throws JMSException
152 */
153 public boolean targetAndDispatch(BrokerClient sender, ActiveMQMessage message) throws JMSException {
154 boolean result = false;
155 if (!this.client.isClusteredConnection() || !sender.isClusteredConnection()) {
156 List tmpList = null;
157
158 Set set = accel.get(message.getJMSActiveMQDestination());
159 if (!set.isEmpty()) {
160 for (Iterator i = set.iterator(); i.hasNext();) {
161 TransientTopicSubscription ts = (TransientTopicSubscription) i.next();
162 if (ts.isTarget(message)) {
163 if (tmpList == null) {
164 tmpList = new ArrayList();
165 }
166 tmpList.add(ts);
167 }
168 }
169 }
170 dispatchToQueue(message, tmpList);
171 result = tmpList != null;
172 }
173 return result;
174 }
175
176 /**
177 * stop working
178 */
179 public void stop() {
180 started.set(false);
181 queue.clear();
182 }
183
184 /**
185 * close down this container
186 */
187 public void close() {
188 if (started.get()) {
189 stop();
190 }
191 queue.close();
192 }
193
194
195 /**
196 * do some dispatching
197 */
198 public void run() {
199 int count = 0;
200 ActiveMQMessage message = null;
201 while (started.get()) {
202 try {
203 message = (ActiveMQMessage) queue.dequeue(2000);
204 if (message != null) {
205 if (!message.isExpired()) {
206 client.dispatch(message);
207 if (++count == 250) {
208 count = 0;
209 Thread.yield();
210 }
211 }else {
212 if (log.isDebugEnabled()){
213 log.debug("Message: " + message + " has expired");
214 }
215 }
216 }
217 }
218 catch (Exception e) {
219 stop();
220 log.warn("stop dispatching", e);
221 }
222 }
223 }
224
225 private void dispatchToQueue(ActiveMQMessage message, List list) throws JMSException {
226 if (list != null && !list.isEmpty()) {
227 int[] ids = new int[list.size()];
228 for (int i = 0;i < list.size();i++) {
229 TransientTopicSubscription ts = (TransientTopicSubscription) list.get(i);
230 ids[i] = ts.getConsumerInfo().getConsumerNo();
231 }
232 message = message.shallowCopy();
233 message.setConsumerNos(ids);
234 if (manager.isDecoupledDispatch()) {
235 queue.enqueue(message);
236 }
237 else {
238 client.dispatch(message);
239 }
240 }
241 }
242
243 private TransientTopicSubscription findMatch(ConsumerInfo info) {
244 return (TransientTopicSubscription) subMap.get(info);
245 }
246
247 /**
248 * @param destination
249 * @return true if a
250 */
251 public boolean hasConsumerFor(ActiveMQDestination destination) {
252 for (Iterator i = subscriptions.iterator();i.hasNext();) {
253 TransientTopicSubscription ts = (TransientTopicSubscription) i.next();
254 ConsumerInfo info = ts.getConsumerInfo();
255 if (info.getDestination().matches(destination)) {
256 return true;
257 }
258 }
259 return false;
260 }
261
262 /**
263 * @return the destination name
264 */
265 public String getDestinationName() {
266 return "";
267 }
268
269 /**
270 * @param msg
271 * @return @throws JMSException
272 */
273 public void addMessage(ActiveMQMessage msg) throws JMSException {
274 }
275
276 /**
277 * @param messageIdentity
278 * @param ack
279 * @throws JMSException
280 */
281 public void delete(MessageIdentity messageIdentity, MessageAck ack) throws JMSException {
282 }
283
284 /**
285 * @param messageIdentity
286 * @return @throws JMSException
287 */
288 public ActiveMQMessage getMessage(MessageIdentity messageIdentity) throws JMSException {
289 return null;
290 }
291
292 /**
293 * @param messageIdentity
294 * @throws JMSException
295 */
296 public void registerMessageInterest(MessageIdentity messageIdentity) throws JMSException {
297 }
298
299 /**
300 * @param messageIdentity
301 * @param ack
302 * @throws JMSException
303 */
304 public void unregisterMessageInterest(MessageIdentity messageIdentity) throws JMSException {
305 }
306
307 /**
308 * @param messageIdentity
309 * @return @throws JMSException
310 */
311 public boolean containsMessage(MessageIdentity messageIdentity) throws JMSException {
312 return false;
313 }
314
315 /**
316 * @see org.activemq.service.MessageContainer#getMessageContainerAdmin()
317 */
318 public MessageContainerAdmin getMessageContainerAdmin() {
319 return this;
320 }
321
322 /**
323 * @see org.activemq.service.MessageContainerAdmin#empty()
324 */
325 public void empty() throws JMSException {
326 // TODO implement me
327 }
328
329 /**
330 * @see org.activemq.service.MessageContainer#isDeadLetterQueue()
331 */
332 public boolean isDeadLetterQueue() {
333 return false;
334 }
335 }