001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.transport.http;
019
020 import java.io.BufferedReader;
021 import java.io.DataOutputStream;
022 import java.io.IOException;
023 import java.util.HashMap;
024 import java.util.Map;
025
026 import javax.jms.JMSException;
027 import javax.servlet.ServletException;
028 import javax.servlet.http.HttpServlet;
029 import javax.servlet.http.HttpServletRequest;
030 import javax.servlet.http.HttpServletResponse;
031
032 import org.apache.commons.logging.Log;
033 import org.apache.commons.logging.LogFactory;
034 import org.activemq.io.TextWireFormat;
035 import org.activemq.message.Packet;
036 import org.activemq.message.WireFormatInfo;
037 import org.activemq.message.PacketListener;
038 import org.activemq.transport.TransportChannelListener;
039 import org.activemq.transport.xstream.XStreamWireFormat;
040 import org.activemq.util.JMSExceptionHelper;
041
042 import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
043
044 /**
045 * A servlet which handles server side HTTP transport, delegaging to the ActiveMQ broker.
046 * This servlet is designed for being embedded inside an ActiveMQ Broker using an embedded
047 * Jetty or Tomcat instance.
048 *
049 * @version $Revision$
050 */
051 public class HttpTunnelServlet extends HttpServlet {
052
053 private static final Log log = LogFactory.getLog(HttpTunnelServlet.class);
054
055 private TransportChannelListener listener;
056 private TextWireFormat wireFormat;
057 private Map clients = new HashMap();
058 private long requestTimeout = 30000L;
059
060 public void init() throws ServletException {
061 super.init();
062 listener = (TransportChannelListener) getServletContext().getAttribute("transportChannelListener");
063 if (listener == null) {
064 throw new ServletException("No such attribute 'transportChannelListener' available in the ServletContext");
065 }
066 wireFormat = (TextWireFormat) getServletContext().getAttribute("wireFormat");
067 if (wireFormat == null) {
068 wireFormat = createWireFormat();
069 }
070 }
071
072 protected void doGet(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
073 // lets return the next response
074 Packet packet = null;
075 try {
076 HttpServerTransportChannel transportChannel = getTransportChannel(request);
077 if (transportChannel == null) {
078 return;
079 }
080 packet = (Packet) transportChannel.getChannel().poll(requestTimeout);
081 }
082 catch (InterruptedException e) {
083 // ignore
084 }
085 if (packet == null) {
086 response.setStatus(HttpServletResponse.SC_REQUEST_TIMEOUT);
087 }
088 else {
089 try {
090 wireFormat.writePacket(packet, new DataOutputStream(response.getOutputStream()));
091 }
092 catch (JMSException e) {
093 throw JMSExceptionHelper.newIOException(e);
094 }
095 }
096 }
097
098 protected void doPost(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException {
099 try {
100 Packet packet = wireFormat.fromString(readRequestBody(request));
101
102 if( packet.getPacketType() == Packet.WIRE_FORMAT_INFO ) {
103
104 // Can we handle the requested wire format?
105 WireFormatInfo info = (WireFormatInfo) packet;
106 if (!canProcessWireFormatVersion(info.getVersion())) {
107 response.sendError(HttpServletResponse.SC_NOT_FOUND, "Cannot process wire format of version: " + info.getVersion());
108 }
109
110 } else {
111 HttpServerTransportChannel transportChannel = getTransportChannel(request);
112 if (transportChannel == null) {
113 response.setStatus(HttpServletResponse.SC_NOT_FOUND);
114 }
115 else {
116 PacketListener packetListener = transportChannel.getPacketListener();
117 if (packetListener == null) {
118 log.error("No packetListener available to process inbound packet: " + packet);
119 }
120 else {
121 packetListener.consume(packet);
122 }
123 }
124 }
125 }
126 catch (IOException e) {
127 log.error("Caught: " + e, e);
128 }
129 catch (JMSException e) {
130 throw JMSExceptionHelper.newIOException(e);
131 }
132 }
133
134 /**
135 * @param version
136 * @return
137 */
138 private boolean canProcessWireFormatVersion(int version) {
139 // TODO:
140 return true;
141 }
142
143 protected String readRequestBody(HttpServletRequest request) throws IOException {
144 StringBuffer buffer = new StringBuffer();
145 BufferedReader reader = request.getReader();
146 while (true) {
147 String line = reader.readLine();
148 if (line == null) {
149 break;
150 }
151 else {
152 buffer.append(line);
153 buffer.append("\n");
154 }
155 }
156 return buffer.toString();
157 }
158
159 protected HttpServerTransportChannel getTransportChannel(HttpServletRequest request) {
160 String clientID = request.getHeader("clientID");
161 if (clientID == null) {
162 clientID = request.getParameter("clientID");
163 }
164 if (clientID == null) {
165 log.warn("No clientID header so ignoring request");
166 return null;
167 }
168 synchronized (this) {
169 HttpServerTransportChannel answer = (HttpServerTransportChannel) clients.get(clientID);
170 if (answer == null) {
171 answer = createTransportChannel();
172 clients.put(clientID, answer);
173 listener.addClient(answer);
174 }
175 else {
176 // this lookup should keep the client alive, otherwise we need to discard it
177 keepAlivePing(answer);
178 }
179 return answer;
180 }
181 }
182
183 /**
184 * Disable this channel from being auto-disconnected after a timeout period
185 */
186 protected void keepAlivePing(HttpServerTransportChannel channel) {
187 /** TODO */
188 }
189
190 protected HttpServerTransportChannel createTransportChannel() {
191 return new HttpServerTransportChannel(new BoundedLinkedQueue(10));
192 }
193
194 protected TextWireFormat createWireFormat() {
195 return new XStreamWireFormat();
196 }
197 }