001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.transport.http;
019
020 import org.apache.commons.logging.Log;
021 import org.apache.commons.logging.LogFactory;
022 import org.activemq.io.TextWireFormat;
023 import org.activemq.message.Packet;
024 import org.activemq.util.Callback;
025 import org.activemq.util.ExceptionTemplate;
026 import org.activemq.util.JMSExceptionHelper;
027
028 import javax.jms.JMSException;
029 import java.io.DataInputStream;
030 import java.io.IOException;
031 import java.io.OutputStreamWriter;
032 import java.io.Writer;
033 import java.net.HttpURLConnection;
034 import java.net.MalformedURLException;
035 import java.net.URL;
036
037 /**
038 * @version $Revision$
039 */
040 public class HttpTransportChannel extends HttpTransportChannelSupport {
041 private static final Log log = LogFactory.getLog(HttpTransportChannel.class);
042 private URL url;
043 private HttpURLConnection sendConnection;
044 private HttpURLConnection receiveConnection;
045
046
047 public HttpTransportChannel(TextWireFormat wireFormat, String remoteUrl) throws MalformedURLException {
048 super(wireFormat, remoteUrl);
049 url = new URL(remoteUrl);
050 }
051
052 public void asyncSend(Packet packet) throws JMSException {
053 try {
054
055 HttpURLConnection connection = getSendConnection();
056 String text = getTextWireFormat().toString(packet);
057 Writer writer = new OutputStreamWriter(connection.getOutputStream());
058 writer.write(text);
059 writer.flush();
060 int answer = connection.getResponseCode();
061 if (answer != HttpURLConnection.HTTP_OK) {
062 throw new JMSException("Failed to post packet: " + packet + " as response was: " + answer);
063 }
064 }
065 catch (IOException e) {
066 throw JMSExceptionHelper.newJMSException("Could not post packet: " + packet + " due to: " + e, e);
067 }
068 }
069
070 public void stop() {
071 ExceptionTemplate template = new ExceptionTemplate();
072 if (sendConnection != null) {
073 template.run(new Callback() {
074 public void execute() throws Throwable {
075 sendConnection.disconnect();
076 }
077 });
078 }
079 if (receiveConnection != null) {
080 template.run(new Callback() {
081 public void execute() throws Throwable {
082 receiveConnection.disconnect();
083 }
084 });
085 }
086 super.stop();
087 Throwable firstException = template.getFirstException();
088 if (firstException != null) {
089 log.warn("Failed to shut down cleanly: " + firstException, firstException);
090 }
091 }
092
093 public boolean isMulticast() {
094 return false;
095 }
096
097 public void run() {
098 log.trace("HTTP GET consumer thread starting for clientID: " + getClientID());
099 String remoteUrl = getRemoteUrl();
100 while (!getClosed().get()) {
101 try {
102 HttpURLConnection connection = getReceiveConnection();
103 int answer = connection.getResponseCode();
104 if (answer != HttpURLConnection.HTTP_OK) {
105 if (answer == HttpURLConnection.HTTP_CLIENT_TIMEOUT) {
106 log.trace("GET timed out");
107 }
108 else {
109 log.warn("Failed to perform GET on: " + remoteUrl + " as response was: " + answer);
110 }
111 }
112 else {
113 Packet packet = getWireFormat().readPacket(new DataInputStream(connection.getInputStream()));
114 //Packet packet = getWireFormat().fromString(connection.getContent().toString());
115 if (packet == null) {
116 log.warn("Received null packet from url: " + remoteUrl);
117 }
118 else {
119 doConsumePacket(packet);
120 }
121 }
122 }
123 catch (Exception e) {
124 if (!getClosed().get()) {
125 log.warn("Failed to perform GET on: " + remoteUrl + " due to: " + e, e);
126 }
127 else {
128 log.trace("Caught error after closed: " + e, e);
129 }
130 }
131 }
132 }
133
134
135 // Implementation methods
136 //-------------------------------------------------------------------------
137 protected HttpURLConnection createSendConnection() throws IOException {
138 HttpURLConnection conn = (HttpURLConnection) getRemoteURL().openConnection();
139 conn.setDoOutput(true);
140 conn.setRequestMethod("POST");
141 configureConnection(conn);
142 conn.connect();
143 return conn;
144 }
145
146 protected HttpURLConnection createReceiveConnection() throws IOException {
147 HttpURLConnection conn = (HttpURLConnection) getRemoteURL().openConnection();
148 conn.setDoOutput(false);
149 conn.setDoInput(true);
150 conn.setRequestMethod("GET");
151 configureConnection(conn);
152 conn.connect();
153 return conn;
154 }
155
156 protected void configureConnection(HttpURLConnection connection) {
157 String clientID = getClientID();
158 if (clientID != null) {
159 connection.setRequestProperty("clientID", clientID);
160 //connection.addRequestProperty("clientID", clientID);
161 }
162 }
163
164 protected URL getRemoteURL() {
165 return url;
166 }
167
168 protected HttpURLConnection getSendConnection() throws IOException {
169 setSendConnection( createSendConnection() );
170 return sendConnection;
171 }
172
173 protected HttpURLConnection getReceiveConnection() throws IOException {
174 setReceiveConnection( createReceiveConnection() );
175 return receiveConnection;
176 }
177
178 protected void setSendConnection( HttpURLConnection conn ) {
179 if ( sendConnection != null ) {
180 sendConnection.disconnect();
181 }
182 sendConnection = conn;
183 }
184
185 protected void setReceiveConnection( HttpURLConnection conn ) {
186 if ( receiveConnection != null ) {
187 receiveConnection.disconnect();
188 }
189 receiveConnection = conn;
190 }
191
192 public void forceDisconnect() {
193 // TODO: implement me.
194 throw new RuntimeException("Not yet Implemented.");
195 }
196
197 }