001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.web;
020
021 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
022 import org.apache.commons.logging.Log;
023 import org.apache.commons.logging.LogFactory;
024 import org.activemq.ActiveMQConnection;
025 import org.activemq.ActiveMQConnectionFactory;
026 import org.activemq.ActiveMQSession;
027
028 import javax.jms.ConnectionFactory;
029 import javax.jms.DeliveryMode;
030 import javax.jms.Destination;
031 import javax.jms.JMSException;
032 import javax.jms.Message;
033 import javax.jms.MessageConsumer;
034 import javax.jms.MessageProducer;
035 import javax.jms.Session;
036 import javax.jms.Topic;
037 import javax.servlet.ServletContext;
038 import javax.servlet.http.HttpSession;
039 import javax.servlet.http.HttpSessionActivationListener;
040 import javax.servlet.http.HttpSessionEvent;
041 import java.io.Externalizable;
042 import java.io.IOException;
043 import java.io.ObjectInput;
044 import java.io.ObjectOutput;
045 import java.util.HashMap;
046 import java.util.Map;
047
048 /**
049 * Represents a messaging client used from inside a web container
050 * typically stored inside a HttpSession
051 *
052 * @version $Revision: 1.1.1.1 $
053 */
054 public class WebClient implements HttpSessionActivationListener, Externalizable {
055 public static final String webClientAttribute = "org.activemq.webclient";
056 public static final String connectionFactoryAttribute = "org.activemq.connectionFactory";
057 public static final String queueConsumersAttribute = "org.activemq.queueConsumers";
058 public static final String brokerUrlInitParam = "org.activemq.brokerURL";
059 public static final String embeddedBrokerInitParam = "org.activemq.embeddedBroker";
060
061 private static final Log log = LogFactory.getLog(WebClient.class);
062
063 private static transient ConnectionFactory factory;
064 private static transient Map queueConsumers;
065
066 private transient ServletContext context;
067 private transient ActiveMQConnection connection;
068 private transient ActiveMQSession session;
069 private transient MessageProducer producer;
070 private transient Map topicConsumers = new ConcurrentHashMap();
071 private int deliveryMode = DeliveryMode.NON_PERSISTENT;
072
073
074 /**
075 * @return the web client for the current HTTP session or null if there is not a web client created yet
076 */
077 public static WebClient getWebClient(HttpSession session) {
078 return (WebClient) session.getAttribute(webClientAttribute);
079 }
080
081
082 public static void initContext(ServletContext context) {
083 factory = initConnectionFactory(context);
084 if (factory == null) {
085 log.warn("No ConnectionFactory available in the ServletContext for: " + connectionFactoryAttribute);
086 factory = new ActiveMQConnectionFactory("vm://localhost");
087 context.setAttribute(connectionFactoryAttribute, factory);
088 }
089 queueConsumers = initQueueConsumers(context);
090 }
091
092 /**
093 * Only called by serialization
094 */
095 public WebClient() {
096 }
097
098 public WebClient(ServletContext context) {
099 this.context = context;
100 initContext(context);
101 }
102
103
104 public int getDeliveryMode() {
105 return deliveryMode;
106 }
107
108
109 public void setDeliveryMode(int deliveryMode) {
110 this.deliveryMode = deliveryMode;
111 }
112
113
114 public void start() throws JMSException {
115 }
116
117 public void stop() throws JMSException {
118 System.out.println("Closing the WebClient!!! " + this);
119
120 try {
121 connection.close();
122 }
123 finally {
124 producer = null;
125 session = null;
126 connection = null;
127 topicConsumers.clear();
128 }
129 }
130
131 public void writeExternal(ObjectOutput out) throws IOException {
132 }
133
134 public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
135 topicConsumers = new HashMap();
136 }
137
138 public void send(Destination destination, Message message) throws JMSException {
139 if (producer == null) {
140 producer = getSession().createProducer(null);
141 producer.setDeliveryMode(deliveryMode );
142 }
143 log.info("Sending to destination: " + destination);
144 producer.send(destination, message);
145 log.info("Sent! message: " + message);
146 }
147
148 public Session getSession() throws JMSException {
149 if (session == null) {
150 session = createSession();
151 }
152 return session;
153 }
154
155 public ActiveMQConnection getConnection() throws JMSException {
156 if (connection == null) {
157 connection = (ActiveMQConnection) factory.createConnection();
158 connection.start();
159 }
160 return connection;
161 }
162
163 public void sessionWillPassivate(HttpSessionEvent event) {
164 try {
165 stop();
166 }
167 catch (JMSException e) {
168 log.warn("Could not close connection: " + e, e);
169 }
170 }
171
172 public void sessionDidActivate(HttpSessionEvent event) {
173 // lets update the connection factory from the servlet context
174 context = event.getSession().getServletContext();
175 initContext(context);
176 }
177
178 public static Map initQueueConsumers(ServletContext context) {
179 Map answer = (Map) context.getAttribute(queueConsumersAttribute);
180 if (answer == null) {
181 answer = new HashMap();
182 context.setAttribute(queueConsumersAttribute, answer);
183 }
184 return answer;
185 }
186
187
188 public static ConnectionFactory initConnectionFactory(ServletContext servletContext) {
189 ConnectionFactory connectionFactory = (ConnectionFactory) servletContext.getAttribute(connectionFactoryAttribute);
190 if (connectionFactory == null) {
191 String brokerURL = (String) servletContext.getInitParameter(brokerUrlInitParam);
192
193 servletContext.log("Value of: " + brokerUrlInitParam + " is: " + brokerURL);
194
195 if (brokerURL == null) {
196 brokerURL = "vm://localhost";
197 }
198
199 boolean embeddedBroker = MessageServletSupport.asBoolean(servletContext.getInitParameter(embeddedBrokerInitParam));
200 servletContext.log("Use embedded broker: " + embeddedBroker);
201
202 ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(brokerURL);
203 factory.setUseEmbeddedBroker(embeddedBroker);
204
205 connectionFactory = factory;
206 servletContext.setAttribute(connectionFactoryAttribute, connectionFactory);
207 }
208 return connectionFactory;
209 }
210
211 public synchronized MessageConsumer getConsumer(Destination destination) throws JMSException {
212 if (destination instanceof Topic) {
213 MessageConsumer consumer = (MessageConsumer) topicConsumers.get(destination);
214 if (consumer == null) {
215 consumer = getSession().createConsumer(destination);
216 topicConsumers.put(destination, consumer);
217 }
218 return consumer;
219 }
220 else {
221 synchronized (queueConsumers) {
222 SessionConsumerPair pair = (SessionConsumerPair) queueConsumers.get(destination);
223 if (pair == null) {
224 pair = createSessionConsumerPair(destination);
225 queueConsumers.put(destination, pair);
226 }
227 return pair.consumer;
228 }
229 }
230 }
231
232 protected ActiveMQSession createSession() throws JMSException {
233 return (ActiveMQSession) getConnection().createSession(false, Session.AUTO_ACKNOWLEDGE);
234 }
235
236 protected SessionConsumerPair createSessionConsumerPair(Destination destination) throws JMSException {
237 SessionConsumerPair answer = new SessionConsumerPair();
238 answer.session = createSession();
239 answer.consumer = answer.session.createConsumer(destination);
240 return answer;
241 }
242
243 protected static class SessionConsumerPair {
244 public Session session;
245 public MessageConsumer consumer;
246 }
247 }