001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.transport.jrms;
019
020 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
021 import com.sun.multicast.reliable.RMException;
022 import com.sun.multicast.reliable.transport.RMPacketSocket;
023 import com.sun.multicast.reliable.transport.SessionDoneException;
024 import com.sun.multicast.reliable.transport.TransportProfile;
025 import com.sun.multicast.reliable.transport.lrmp.LRMPTransportProfile;
026 import org.apache.commons.logging.Log;
027 import org.apache.commons.logging.LogFactory;
028 import org.activemq.io.WireFormat;
029 import org.activemq.message.Packet;
030 import org.activemq.transport.TransportChannelSupport;
031 import org.activemq.util.IdGenerator;
032
033 import javax.jms.JMSException;
034 import java.io.IOException;
035 import java.net.DatagramPacket;
036 import java.net.InetAddress;
037 import java.net.URI;
038
039 /**
040 * A JRMS implementation of a TransportChannel
041 *
042 * @version $Revision$
043 */
044 public class JRMSTransportChannel extends TransportChannelSupport implements Runnable {
045
046 private static final int SOCKET_BUFFER_SIZE = 32 * 1024;
047 private static final Log log = LogFactory.getLog(JRMSTransportChannel.class);
048
049 private WireFormat wireFormat;
050 private SynchronizedBoolean closed;
051 private SynchronizedBoolean started;
052 private Thread thread; //need to change this - and use a thread pool
053 // need to see our own messages
054 private RMPacketSocket socket;
055 private IdGenerator idGenerator;
056 private String channelId;
057 private int port;
058 private InetAddress inetAddress;
059 private Object lock;
060
061 /**
062 * Construct basic helpers
063 */
064 protected JRMSTransportChannel(WireFormat wireFormat) {
065 this.wireFormat = wireFormat;
066 idGenerator = new IdGenerator();
067 channelId = idGenerator.generateId();
068 closed = new SynchronizedBoolean(false);
069 started = new SynchronizedBoolean(false);
070 lock = new Object();
071 }
072
073 /**
074 * Connect to a remote Node - e.g. a Broker
075 *
076 * @param remoteLocation
077 * @throws JMSException
078 */
079 public JRMSTransportChannel(WireFormat wireFormat, URI remoteLocation) throws JMSException {
080 this(wireFormat);
081 try {
082 this.port = remoteLocation.getPort();
083 this.inetAddress = InetAddress.getByName(remoteLocation.getHost());
084 LRMPTransportProfile profile = new LRMPTransportProfile(inetAddress, port);
085 profile.setTTL((byte) 1);
086 profile.setOrdered(true);
087 this.socket = profile.createRMPacketSocket(TransportProfile.SEND_RECEIVE);
088 }
089 catch (Exception ioe) {
090 ioe.printStackTrace();
091 JMSException jmsEx = new JMSException("Initialization of JRMSTransportChannel failed: " + ioe);
092 jmsEx.setLinkedException(ioe);
093 throw jmsEx;
094 }
095 }
096
097 /**
098 * close the channel
099 */
100 public void stop() {
101 if (closed.commit(false, true)) {
102 super.stop();
103 try {
104 socket.close();
105 }
106 catch (Exception e) {
107 log.trace(toString() + " now closed");
108 }
109 }
110 }
111
112 /**
113 * start listeneing for events
114 *
115 * @throws JMSException if an error occurs
116 */
117 public void start() throws JMSException {
118 if (started.commit(false, true)) {
119 thread = new Thread(this, toString());
120 if (isServerSide()) {
121 thread.setDaemon(true);
122 }
123 thread.start();
124 }
125 }
126
127 /**
128 * Asynchronously send a Packet
129 *
130 * @param packet
131 * @throws JMSException
132 */
133 public void asyncSend(Packet packet) throws JMSException {
134 try {
135 DatagramPacket dpacket = createDatagramPacket(packet);
136
137 // lets sync to avoid concurrent writes
138 //synchronized (lock) {
139 socket.send(dpacket);
140 //}
141 }
142 catch (RMException rme) {
143 JMSException jmsEx = new JMSException("syncSend failed " + rme.getMessage());
144 jmsEx.setLinkedException(rme);
145 throw jmsEx;
146 }
147 catch (IOException e) {
148 JMSException jmsEx = new JMSException("asyncSend failed " + e.getMessage());
149 jmsEx.setLinkedException(e);
150 throw jmsEx;
151 }
152 }
153
154
155 public boolean isMulticast() {
156 return true;
157 }
158
159 /**
160 * reads packets from a Socket
161 */
162 public void run() {
163 try {
164 while (!closed.get()) {
165 DatagramPacket dpacket = socket.receive();
166 Packet packet = wireFormat.readPacket(channelId, dpacket);
167 if (packet != null) {
168 doConsumePacket(packet);
169 }
170 }
171 log.trace("The socket peer is now closed");
172 //doClose(new IOException("Socket peer is now closed"));
173 stop();
174 }
175 catch (SessionDoneException e) {
176 // this isn't really an exception, it just indicates
177 // that the socket has closed normally
178 log.trace("Session completed", e);
179 stop();
180 }
181 catch (RMException ste) {
182 doClose(ste);
183 }
184 catch (IOException e) {
185 doClose(e);
186 }
187 }
188
189 /**
190 * Can this wireformat process packets of this version
191 * @param version the version number to test
192 * @return true if can accept the version
193 */
194 public boolean canProcessWireFormatVersion(int version){
195 return wireFormat.canProcessWireFormatVersion(version);
196 }
197
198 /**
199 * @return the current version of this wire format
200 */
201 public int getCurrentWireFormatVersion(){
202 return wireFormat.getCurrentWireFormatVersion();
203 }
204
205 protected DatagramPacket createDatagramPacket() {
206 DatagramPacket answer = new DatagramPacket(new byte[SOCKET_BUFFER_SIZE], SOCKET_BUFFER_SIZE);
207 answer.setPort(port);
208 answer.setAddress(inetAddress);
209 return answer;
210 }
211
212 protected DatagramPacket createDatagramPacket(Packet packet) throws IOException, JMSException {
213 DatagramPacket answer = wireFormat.writePacket(channelId, packet);
214 answer.setPort(port);
215 answer.setAddress(inetAddress);
216 return answer;
217 }
218
219 private void doClose(Exception ex) {
220 if (!closed.get()) {
221 JMSException jmsEx = new JMSException("Error reading socket: " + ex);
222 jmsEx.setLinkedException(ex);
223 onAsyncException(jmsEx);
224 stop();
225 }
226 }
227
228 /**
229 * pretty print for object
230 *
231 * @return String representation of this object
232 */
233 public String toString() {
234 return "JRMSTransportChannel: " + socket;
235 }
236
237 public void forceDisconnect() {
238 // TODO: implement me.
239 throw new RuntimeException("Not yet Implemented.");
240 }
241 }