001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.transport.http;
019
020 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
021 import org.apache.commons.logging.Log;
022 import org.apache.commons.logging.LogFactory;
023 import org.activemq.io.TextWireFormat;
024 import org.activemq.io.WireFormat;
025 import org.activemq.transport.TransportChannelSupport;
026
027 import javax.jms.JMSException;
028
029 /**
030 * @version $Revision$
031 */
032 public abstract class HttpTransportChannelSupport extends TransportChannelSupport implements Runnable {
033 private static final Log log = LogFactory.getLog(HttpTransportChannelSupport.class);
034
035 private TextWireFormat wireFormat;
036 private String remoteUrl;
037 private Thread thread; // should use pool
038 private SynchronizedBoolean closed = new SynchronizedBoolean(false);
039 private SynchronizedBoolean started = new SynchronizedBoolean(false);
040
041 public HttpTransportChannelSupport(TextWireFormat wireFormat, String remoteUrl) {
042 this.wireFormat = wireFormat;
043 this.remoteUrl = remoteUrl;
044 }
045
046 public boolean isMulticast() {
047 return false;
048 }
049
050 public void start() throws JMSException {
051 if (started.commit(false, true)) {
052 if (getClientID() != null) {
053 startThread();
054 }
055 }
056 }
057
058 protected void startThread() {
059 thread = new Thread(this, toString());
060 thread.start();
061 }
062
063 public void stop() {
064 if (closed.commit(false, true)) {
065 super.stop();
066 }
067 }
068
069 public synchronized void setClientID(String clientID) {
070 super.setClientID(clientID);
071 if (clientID != null && thread == null && started.get()) {
072 startThread();
073 }
074 }
075
076 public String toString() {
077 return "HTTP Reader " + getRemoteUrl();
078 }
079
080 /**
081 * Can this wireformat process packets of this version
082 * @param version the version number to test
083 * @return true if can accept the version
084 */
085 public boolean canProcessWireFormatVersion(int version){
086 return wireFormat.canProcessWireFormatVersion(version);
087 }
088
089 /**
090 * @return the current version of this wire format
091 */
092 public int getCurrentWireFormatVersion(){
093 return wireFormat.getCurrentWireFormatVersion();
094 }
095
096 // Properties
097 //-------------------------------------------------------------------------
098 public String getRemoteUrl() {
099 return remoteUrl;
100 }
101
102 public WireFormat getWireFormat() {
103 return wireFormat;
104 }
105
106 public TextWireFormat getTextWireFormat(){
107 return wireFormat;
108 }
109
110 public void setWireFormat(TextWireFormat wireFormat) {
111 this.wireFormat = wireFormat;
112 }
113
114 public SynchronizedBoolean getClosed() {
115 return closed;
116 }
117
118 public SynchronizedBoolean getStarted() {
119 return started;
120 }
121 }