001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.transport.tcp;
020
021 import EDU.oswego.cs.dl.util.concurrent.BoundedBuffer;
022 import EDU.oswego.cs.dl.util.concurrent.BoundedChannel;
023 import EDU.oswego.cs.dl.util.concurrent.BoundedLinkedQueue;
024 import EDU.oswego.cs.dl.util.concurrent.Executor;
025 import EDU.oswego.cs.dl.util.concurrent.PooledExecutor;
026 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
027 import org.apache.commons.logging.Log;
028 import org.apache.commons.logging.LogFactory;
029 import org.activemq.io.WireFormat;
030 import org.activemq.io.WireFormatLoader;
031 import org.activemq.message.Packet;
032 import org.activemq.transport.TransportChannelSupport;
033 import org.activemq.transport.TransportStatusEvent;
034 import org.activemq.util.JMSExceptionHelper;
035
036 import javax.jms.JMSException;
037 import java.io.BufferedInputStream;
038 import java.io.DataInputStream;
039 import java.io.DataOutputStream;
040 import java.io.EOFException;
041 import java.io.IOException;
042 import java.io.InterruptedIOException;
043 import java.net.InetAddress;
044 import java.net.InetSocketAddress;
045 import java.net.Socket;
046 import java.net.SocketAddress;
047 import java.net.SocketException;
048 import java.net.SocketTimeoutException;
049 import java.net.URI;
050 import java.net.UnknownHostException;
051
052 /**
053 * A tcp implementation of a TransportChannel
054 *
055 * @version $Revision: 1.2 $
056 */
057 public class TcpTransportChannel extends TransportChannelSupport implements Runnable {
058 private static final int DEFAULT_SOCKET_BUFFER_SIZE = 64 * 1024;
059 private static final Log log = LogFactory.getLog(TcpTransportChannel.class);
060 protected Socket socket;
061 protected DataOutputStream dataOut;
062 protected DataInputStream dataIn;
063
064 private WireFormatLoader wireFormatLoader;
065 private SynchronizedBoolean closed;
066 private SynchronizedBoolean started;
067 private Object outboundLock;
068 private Executor executor;
069 private Thread thread;
070 private boolean useAsyncSend = false;
071 private int soTimeout = 10000;
072 private int socketBufferSize = DEFAULT_SOCKET_BUFFER_SIZE;
073 private BoundedChannel exceptionsList;
074 private TcpTransportServerChannel serverChannel;
075
076 /**
077 * Construct basic helpers
078 *
079 * @param wireFormat
080 */
081 protected TcpTransportChannel(WireFormat wireFormat) {
082 super(wireFormat);
083 this.wireFormatLoader = new WireFormatLoader(wireFormat);
084 closed = new SynchronizedBoolean(false);
085 started = new SynchronizedBoolean(false);
086 // there's not much point logging all exceptions, lets just keep a few around
087 exceptionsList = new BoundedLinkedQueue(10);
088 outboundLock = new Object();
089 setUseAsyncSend(useAsyncSend);
090 super.setCachingEnabled(true);
091 }
092
093 /**
094 * Connect to a remote Node - e.g. a Broker
095 *
096 * @param wireFormat
097 * @param remoteLocation
098 * @throws JMSException
099 */
100 public TcpTransportChannel(WireFormat wireFormat, URI remoteLocation) throws JMSException {
101 this(wireFormat);
102 try {
103 this.socket = createSocket(remoteLocation);
104 initializeStreams();
105 }
106 catch (Exception ioe) {
107 throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed. " + "URI was: "
108 + remoteLocation + " Reason: " + ioe, ioe);
109 }
110 }
111
112 /**
113 * Connect to a remote Node - e.g. a Broker
114 *
115 * @param wireFormat
116 * @param remoteLocation
117 * @param localLocation - e.g. local InetAddress and local port
118 * @throws JMSException
119 */
120 public TcpTransportChannel(WireFormat wireFormat, URI remoteLocation, URI localLocation) throws JMSException {
121 this(wireFormat);
122 try {
123 this.socket = createSocket(remoteLocation, localLocation);
124 initializeStreams();
125 }
126 catch (Exception ioe) {
127 throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed: " + ioe, ioe);
128 }
129 }
130
131 /**
132 * Initialize from a ServerSocket
133 * @param serverChannel
134 * @param wireFormat
135 * @param socket
136 * @param executor
137 * @throws JMSException
138 */
139 public TcpTransportChannel(TcpTransportServerChannel serverChannel,WireFormat wireFormat, Socket socket, Executor executor) throws JMSException {
140 this(wireFormat);
141 this.socket = socket;
142 this.executor = executor;
143 this.serverChannel = serverChannel;
144 setServerSide(true);
145 try {
146 initialiseSocket(socket);
147 initializeStreams();
148 }
149 catch (IOException ioe) {
150 throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed: " + ioe, ioe);
151 }
152 }
153
154 public TcpTransportChannel(WireFormat wireFormat, Socket socket, Executor executor) throws JMSException {
155 this(wireFormat);
156 this.socket = socket;
157 this.executor = executor;
158 try {
159 initialiseSocket(socket);
160 initializeStreams();
161 }
162 catch (IOException ioe) {
163 throw JMSExceptionHelper.newJMSException("Initialization of TcpTransportChannel failed: " + ioe, ioe);
164 }
165 }
166
167 /**
168 * start listeneing for events
169 *
170 * @throws JMSException if an error occurs
171 */
172 public void start() throws JMSException {
173 if (started.commit(false, true)) {
174 thread = new Thread(this, toString());
175 try {
176 if (isServerSide()) {
177 thread.setDaemon(true);
178 readWireFormat();
179 getWireFormat().registerTransportStreams(dataOut, dataIn);
180 getWireFormat().initiateServerSideProtocol();
181 }
182 else {
183 getWireFormat().registerTransportStreams(dataOut, dataIn);
184 thread.setPriority(Thread.NORM_PRIORITY + 2);
185 }
186 //enable caching on the wire format
187 currentWireFormat.setCachingEnabled(isCachingEnabled());
188 thread.start();
189 //send the wire format
190 if (!isServerSide()) {
191 getWireFormat().initiateClientSideProtocol();
192 }
193 fireStatusEvent(new TransportStatusEvent(this,TransportStatusEvent.CONNECTED));
194 }
195 catch (EOFException e) {
196 doClose(e);
197 }
198 catch (IOException e) {
199 JMSException jmsEx = new JMSException("start failed: " + e.getMessage());
200 jmsEx.initCause(e);
201 jmsEx.setLinkedException(e);
202 throw jmsEx;
203 }
204 }
205 }
206
207 protected void readWireFormat() throws JMSException, IOException {
208 WireFormat wf = wireFormatLoader.getWireFormat(dataIn);
209 if (wf != null) {
210 setWireFormat(wf);
211 }
212 }
213
214 /**
215 * close the channel
216 */
217 public void stop() {
218 if (closed.commit(false, true)) {
219 super.stop();
220 try {
221 if (executor != null) {
222 stopExecutor(executor);
223 }
224 closeStreams();
225 socket.close();
226 }
227 catch (Exception e) {
228 log.warn("Caught while closing: " + e + ". Now Closed", e);
229 }
230 }
231 closed.set(true);
232 if (this.serverChannel != null){
233 this.serverChannel.removeClient(this);
234 }
235 }
236
237 public void forceDisconnect() {
238 log.debug("Forcing disconnect");
239 if (socket != null && socket.isConnected()) {
240 try {
241 socket.close();
242 }
243 catch (IOException e) {
244 // Ignore
245 }
246 }
247 }
248
249 /**
250 * Asynchronously send a Packet
251 *
252 * @param packet
253 * @throws JMSException
254 */
255 public void asyncSend(final Packet packet) throws JMSException {
256 if (executor != null) {
257 try {
258 executor.execute(new Runnable() {
259 public void run() {
260 try {
261 if (!isClosed()) {
262 doAsyncSend(packet);
263 }
264 }
265 catch (JMSException e) {
266 try {
267 exceptionsList.put(e);
268 }
269 catch (InterruptedException e1) {
270 log.warn("Failed to add element to exception list: " + e1);
271 }
272 }
273 }
274 });
275 }
276 catch (InterruptedException e) {
277 log.info("Caught: " + e, e);
278 }
279 try {
280 JMSException e = (JMSException) exceptionsList.poll(0);
281 if (e != null) {
282 throw e;
283 }
284 }
285 catch (InterruptedException e1) {
286 log.warn("Failed to remove element to exception list: " + e1);
287 }
288 }
289 else {
290 doAsyncSend(packet);
291 }
292 }
293
294 /**
295 * @return false
296 */
297 public boolean isMulticast() {
298 return false;
299 }
300
301 /**
302 * reads packets from a Socket
303 */
304 public void run() {
305 log.trace("TCP consumer thread starting");
306 int count = 0;
307 while (!isClosed()) {
308 if (isServerSide() && ++count > 500) {
309 count = 0;
310 Thread.yield();
311 }
312 try {
313 Packet packet = getWireFormat().readPacket(dataIn);
314 if (packet != null) {
315 doConsumePacket(packet);
316 }
317 }
318 catch (SocketTimeoutException e) {
319 //onAsyncException(JMSExceptionHelper.newJMSException(e));
320 }
321 catch (InterruptedIOException e) {
322 // TODO confirm that this really is a bug in the AS/400 JVM
323 // Patch for AS/400 JVM
324 // lets ignore these exceptions
325 // as they typically just indicate the thread was interupted
326 // while waiting for input, not that the socket is in error
327 //onAsyncException(JMSExceptionHelper.newJMSException(e));
328 }
329 catch (IOException e) {
330 doClose(e);
331 }
332 }
333 }
334
335 public boolean isClosed() {
336 return closed.get();
337 }
338
339 /**
340 * pretty print for object
341 *
342 * @return String representation of this object
343 */
344 public String toString() {
345 return "TcpTransportChannel: " + socket;
346 }
347
348 /**
349 * @return the socket used by the TcpTransportChannel
350 */
351 public Socket getSocket() {
352 return socket;
353 }
354
355 /**
356 * Can this wireformat process packets of this version
357 *
358 * @param version the version number to test
359 * @return true if can accept the version
360 */
361 public boolean canProcessWireFormatVersion(int version) {
362 return getWireFormat().canProcessWireFormatVersion(version);
363 }
364
365 /**
366 * @return the current version of this wire format
367 */
368 public int getCurrentWireFormatVersion() {
369 return getWireFormat().getCurrentWireFormatVersion();
370 }
371
372 // Properties
373 //-------------------------------------------------------------------------
374
375 /**
376 * @return true if packets are enqueued to a separate queue before dispatching
377 */
378 public boolean isUseAsyncSend() {
379 return useAsyncSend;
380 }
381
382 /**
383 * set the useAsync flag
384 *
385 * @param useAsyncSend
386 */
387 public void setUseAsyncSend(boolean useAsyncSend) {
388 this.useAsyncSend = useAsyncSend;
389 try {
390 if (useAsyncSend && executor==null ) {
391 PooledExecutor pe = new PooledExecutor(new BoundedBuffer(10), 1);
392 pe.waitWhenBlocked();
393 pe.setKeepAliveTime(1000);
394 executor = pe;
395 }
396 else if (!useAsyncSend && executor != null) {
397 stopExecutor(executor);
398 }
399 }
400 catch (Exception e) {
401 log.warn("problem closing executor", e);
402 }
403 }
404
405
406
407 /**
408 * @return the current so timeout used on the socket
409 */
410 public int getSoTimeout() {
411 return soTimeout;
412 }
413
414 /**
415 * set the socket so timeout
416 *
417 * @param soTimeout
418 * @throws JMSException
419 */
420 public void setSoTimeout(int soTimeout) throws JMSException {
421 this.soTimeout = soTimeout;
422 if (this.socket != null){
423 try {
424 socket.setSoTimeout(soTimeout);
425 }
426 catch (SocketException e) {
427 JMSException jmsEx = new JMSException("Failed to set soTimeout: ", e.getMessage());
428 jmsEx.setLinkedException(e);
429 throw jmsEx;
430 }
431 }
432 }
433
434 /**
435 * @param noDelay The noDelay to set.
436 */
437 public void setNoDelay(boolean noDelay) {
438 super.setNoDelay(noDelay);
439 if (socket != null){
440 try {
441 socket.setTcpNoDelay(noDelay);
442 }
443 catch (SocketException e) {
444 log.warn("failed to set noDelay on the socket");//should never happen
445 }
446 }
447 }
448
449 /**
450 * @return Returns the socketBufferSize.
451 */
452 public int getSocketBufferSize() {
453 return socketBufferSize;
454 }
455 /**
456 * @param socketBufferSize The socketBufferSize to set.
457 */
458 public void setSocketBufferSize(int socketBufferSize) {
459 this.socketBufferSize = socketBufferSize;
460 }
461 // Implementation methods
462 //-------------------------------------------------------------------------
463 /**
464 * Actually performs the async send of a packet
465 *
466 * @param packet
467 * @return a response or null
468 * @throws JMSException
469 */
470 protected Packet doAsyncSend(Packet packet) throws JMSException {
471 Packet response = null;
472 try {
473 synchronized (outboundLock) {
474 response = getWireFormat().writePacket(packet, dataOut);
475 dataOut.flush();
476 }
477 }
478 catch (IOException e) {
479 // if (closed.get()) {
480 // log.trace("Caught exception while closed: " + e, e);
481 // }
482 // else {
483 JMSException exception = JMSExceptionHelper.newJMSException("asyncSend failed: " + e, e);
484 onAsyncException(exception);
485 throw exception;
486 // }
487 }
488 catch (JMSException e) {
489 if (isClosed()) {
490 log.trace("Caught exception while closed: " + e, e);
491 }
492 else {
493 throw e;
494 }
495 }
496 return response;
497 }
498
499 protected void doClose(Exception ex) {
500 if (!isClosed()) {
501 if (!pendingStop) {
502 setPendingStop(true);
503 setTransportConnected(false);
504 if (ex instanceof EOFException) {
505 if (!isServerSide() && !isUsedInternally()){
506 log.warn("Peer closed connection", ex);
507 }
508 fireStatusEvent(new TransportStatusEvent(this,TransportStatusEvent.DISCONNECTED));
509 onAsyncException(JMSExceptionHelper.newJMSException("Error reading socket: " + ex, ex));
510 }
511 else {
512 fireStatusEvent(new TransportStatusEvent(this,TransportStatusEvent.DISCONNECTED));
513 onAsyncException(JMSExceptionHelper.newJMSException("Error reading socket: " + ex, ex));
514 }
515 }
516 stop();
517 }
518 }
519
520 /**
521 * Configures the socket for use
522 * @param sock
523 * @throws SocketException
524 */
525 protected void initialiseSocket(Socket sock) throws SocketException {
526 try {
527 sock.setReceiveBufferSize(socketBufferSize);
528 sock.setSendBufferSize(socketBufferSize);
529 }
530 catch (SocketException se) {
531 log.debug("Cannot set socket buffer size = " + socketBufferSize, se);
532 }
533 sock.setSoTimeout(soTimeout);
534 sock.setTcpNoDelay(isNoDelay());
535 }
536
537 protected void initializeStreams() throws IOException{
538 BufferedInputStream buffIn = new BufferedInputStream(socket.getInputStream(),8192);
539 this.dataIn = new DataInputStream(buffIn);
540 TcpBufferedOutputStream buffOut = new TcpBufferedOutputStream(socket.getOutputStream(),8192);
541 this.dataOut = new DataOutputStream(buffOut);
542 }
543
544 protected void closeStreams() throws IOException {
545 if (dataOut != null) {
546 dataOut.close();
547 }
548 if (dataIn != null) {
549 dataIn.close();
550 }
551 }
552
553 /**
554 * Factory method to create a new socket
555 *
556 * @param remoteLocation the URI to connect to
557 * @return the newly created socket
558 * @throws UnknownHostException
559 * @throws IOException
560 */
561 protected Socket createSocket(URI remoteLocation) throws UnknownHostException, IOException {
562 SocketAddress sockAddress = new InetSocketAddress(remoteLocation.getHost(), remoteLocation.getPort());
563 Socket sock = new Socket();
564 initialiseSocket(sock);
565 sock.connect(sockAddress);
566 return sock;
567 }
568
569 /**
570 * Factory method to create a new socket
571 *
572 * @param remoteLocation
573 * @param localLocation
574 * @return @throws IOException
575 * @throws IOException
576 * @throws UnknownHostException
577 */
578 protected Socket createSocket(URI remoteLocation, URI localLocation) throws IOException, UnknownHostException {
579 SocketAddress sockAddress = new InetSocketAddress(remoteLocation.getHost(), remoteLocation.getPort());
580 SocketAddress localAddress = new InetSocketAddress(InetAddress.getByName(localLocation.getHost()), localLocation.getPort());
581 Socket sock = new Socket();
582 initialiseSocket(sock);
583 sock.bind(localAddress);
584 sock.connect(sockAddress);
585 return sock;
586 }
587
588 }