001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.transport.http;
019
020 import org.apache.commons.httpclient.HttpClient;
021 import org.apache.commons.httpclient.HttpMethod;
022 import org.apache.commons.httpclient.HttpStatus;
023 import org.apache.commons.httpclient.methods.GetMethod;
024 import org.apache.commons.httpclient.methods.PostMethod;
025 import org.apache.commons.logging.Log;
026 import org.apache.commons.logging.LogFactory;
027 import org.activemq.io.TextWireFormat;
028 import org.activemq.message.Packet;
029 import org.activemq.util.JMSExceptionHelper;
030
031 import javax.jms.JMSException;
032 import java.io.DataInputStream;
033 import java.io.IOException;
034
035 /**
036 * A HTTP {@link org.activemq.transport.TransportChannel} which uses the
037 * <a href="http://jakarta.apache.org/commons/httpclient/">commons-httpclient</a> library
038 *
039 * @version $Revision$
040 */
041 public class HttpClientTransportChannel extends HttpTransportChannelSupport {
042 private static final Log log = LogFactory.getLog(HttpClientTransportChannel.class);
043
044 private HttpClient sendHttpClient;
045 private HttpClient receiveHttpClient;
046
047 public HttpClientTransportChannel(TextWireFormat wireFormat, String remoteUrl) {
048 super(wireFormat, remoteUrl);
049 }
050
051 public void asyncSend(Packet packet) throws JMSException {
052 PostMethod httpMethod = new PostMethod(getRemoteUrl());
053 configureMethod(httpMethod);
054 httpMethod.setRequestBody(getTextWireFormat().toString(packet));
055 try {
056 int answer = getSendHttpClient().executeMethod(httpMethod);
057 if (answer != HttpStatus.SC_OK) {
058 throw new JMSException("Failed to post packet: " + packet + " as response was: " + answer);
059 }
060 }
061 catch (IOException e) {
062 throw JMSExceptionHelper.newJMSException("Could not post packet: " + packet + " due to: " + e, e);
063 }
064 }
065
066 public boolean isMulticast() {
067 return false;
068 }
069
070 public void run() {
071 log.trace("HTTP GET consumer thread starting for clientID: " + getClientID());
072 HttpClient httpClient = getReceiveHttpClient();
073 String remoteUrl = getRemoteUrl();
074 while (!getClosed().get()) {
075 GetMethod httpMethod = new GetMethod(remoteUrl);
076 configureMethod(httpMethod);
077 try {
078 int answer = httpClient.executeMethod(httpMethod);
079 if (answer != HttpStatus.SC_OK) {
080 if (answer == HttpStatus.SC_REQUEST_TIMEOUT) {
081 log.info("GET timed out");
082 }
083 else {
084 log.warn("Failed to perform GET on: " + remoteUrl + " as response was: " + answer);
085 }
086 }
087 else {
088 Packet packet = getWireFormat().readPacket(new DataInputStream(httpMethod.getResponseBodyAsStream()));
089 if (packet == null) {
090 log.warn("Received null packet from url: " + remoteUrl);
091 }
092 else {
093 doConsumePacket(packet);
094 }
095 }
096 }
097 catch (IOException e) {
098 log.warn("Failed to perform GET on: " + remoteUrl + " due to: " + e, e);
099 }
100 }
101 }
102
103 // Properties
104 //-------------------------------------------------------------------------
105 public HttpClient getSendHttpClient() {
106 if (sendHttpClient == null) {
107 sendHttpClient = createHttpClient();
108 }
109 return sendHttpClient;
110 }
111
112 public void setSendHttpClient(HttpClient sendHttpClient) {
113 this.sendHttpClient = sendHttpClient;
114 }
115
116 public HttpClient getReceiveHttpClient() {
117 if (receiveHttpClient == null) {
118 receiveHttpClient = createHttpClient();
119 }
120 return receiveHttpClient;
121 }
122
123 public void setReceiveHttpClient(HttpClient receiveHttpClient) {
124 this.receiveHttpClient = receiveHttpClient;
125 }
126
127 // Implementation methods
128 //-------------------------------------------------------------------------
129 protected HttpClient createHttpClient() {
130 return new HttpClient();
131 }
132
133 protected void configureMethod(HttpMethod method) {
134 String clientID = getClientID();
135 if (clientID != null) {
136 method.setRequestHeader("clientID", clientID);
137 }
138 }
139
140 public void forceDisconnect() {
141 // TODO: implement me.
142 throw new RuntimeException("Not yet Implemented.");
143 }
144
145 }