001 /*
002 * Copyright (c) 2005 Your Corporation. All Rights Reserved.
003 */
004 package org.activemq.transport.stomp;
005
006 import EDU.oswego.cs.dl.util.concurrent.Channel;
007 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
008 import EDU.oswego.cs.dl.util.concurrent.CopyOnWriteArrayList;
009 import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
010 import org.activemq.io.WireFormat;
011 import org.activemq.message.ActiveMQDestination;
012 import org.activemq.message.ActiveMQTextMessage;
013 import org.activemq.message.ConnectionInfo;
014 import org.activemq.message.ConsumerInfo;
015 import org.activemq.message.Packet;
016 import org.activemq.message.Receipt;
017 import org.activemq.message.SessionInfo;
018 import org.activemq.message.ActiveMQBytesMessage;
019 import org.activemq.util.IdGenerator;
020
021 import javax.jms.JMSException;
022 import javax.jms.Session;
023 import java.io.BufferedReader;
024 import java.io.DataInput;
025 import java.io.DataInputStream;
026 import java.io.DataOutput;
027 import java.io.DataOutputStream;
028 import java.io.IOException;
029 import java.io.InputStreamReader;
030 import java.net.DatagramPacket;
031 import java.net.ProtocolException;
032 import java.util.List;
033 import java.util.Map;
034 import java.util.Properties;
035
036 /**
037 * Implements the TTMP protocol.
038 */
039 public class StompWireFormat implements WireFormat
040 {
041
042 static final IdGenerator PACKET_IDS = new IdGenerator();
043 static final IdGenerator clientIds = new IdGenerator();
044
045 private CommandParser commandParser = new CommandParser(this);
046 private HeaderParser headerParser = new HeaderParser();
047
048 private DataInputStream in;
049
050 private String clientId;
051
052 private Channel pendingReadPackets = new LinkedQueue();
053 private Channel pendingWriteFrames = new LinkedQueue();
054 private List receiptListeners = new CopyOnWriteArrayList();
055 private short sessionId;
056 private Map subscriptions = new ConcurrentHashMap();
057 private List ackListeners = new CopyOnWriteArrayList();
058 private final Map transactions = new ConcurrentHashMap();
059
060
061
062 void addReceiptListener(ReceiptListener listener)
063 {
064 receiptListeners.add(listener);
065 }
066
067
068 public Packet readPacket(DataInput in) throws IOException
069 {
070 Packet pending = (Packet) AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.HelperWithReturn()
071 {
072 public Object cycle() throws InterruptedException
073 {
074 return pendingReadPackets.poll(0);
075 }
076 });
077 if (pending != null)
078 {
079 return pending;
080 }
081
082 try
083 {
084 return commandParser.parse(in);
085 }
086 catch (ProtocolException e)
087 {
088 sendError(e.getMessage());
089 return FlushPacket.PACKET;
090 }
091 }
092
093 public Packet writePacket(final Packet packet, final DataOutput out) throws IOException, JMSException
094 {
095 flushPendingFrames(out);
096
097 // It may have just been a flush request.
098 if( packet == null )
099 return null;
100
101 if (packet.getPacketType() == Packet.RECEIPT_INFO)
102 {
103 assert(packet instanceof Receipt);
104 Receipt receipt = (Receipt) packet;
105 for (int i = 0; i < receiptListeners.size(); i++)
106 {
107 ReceiptListener listener = (ReceiptListener) receiptListeners.get(i);
108 if (listener.onReceipt(receipt, out))
109 {
110 receiptListeners.remove(listener);
111 return null;
112 }
113 }
114 }
115
116 if (packet.getPacketType() == Packet.ACTIVEMQ_TEXT_MESSAGE)
117 {
118 assert(packet instanceof ActiveMQTextMessage);
119 ActiveMQTextMessage msg = (ActiveMQTextMessage) packet;
120 Subscription sub = (Subscription) subscriptions.get(msg.getJMSDestination());
121 sub.receive(msg, out);
122 }
123 else if (packet.getPacketType() == Packet.ACTIVEMQ_BYTES_MESSAGE)
124 {
125 assert(packet instanceof ActiveMQBytesMessage);
126 ActiveMQBytesMessage msg = (ActiveMQBytesMessage) packet;
127 Subscription sub = (Subscription) subscriptions.get(msg.getJMSDestination());
128 sub.receive(msg, out);
129 }
130 return null;
131 }
132
133 private void flushPendingFrames(final DataOutput out) throws IOException
134 {
135 boolean interrupted = false;
136 do
137 {
138 try
139 {
140 byte[] frame = (byte[]) pendingWriteFrames.poll(0);
141 if (frame == null) return;
142 out.write(frame);
143 }
144 catch (InterruptedException e)
145 {
146 interrupted = true;
147 }
148 }
149 while (interrupted);
150 }
151
152 private void sendError(final String message)
153 {
154 // System.err.println("sending error [" + message + "]");
155 AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper()
156 {
157 public void cycle() throws InterruptedException
158 {
159 pendingWriteFrames.put(new FrameBuilder(Stomp.Responses.ERROR)
160 .addHeader(Stomp.Headers.Error.MESSAGE, message)
161 .toFrame());
162 }
163 });
164 }
165
166 /**
167 * some transports may register their streams (e.g. Tcp)
168 *
169 * @param dataOut
170 * @param dataIn
171 */
172 public void registerTransportStreams(DataOutputStream dataOut, DataInputStream dataIn)
173 {
174 this.in = dataIn;
175 }
176
177 /**
178 * Some wire formats require a handshake at start-up
179 *
180 * @throws java.io.IOException
181 */
182 public void initiateServerSideProtocol() throws IOException
183 {
184 BufferedReader in = new BufferedReader(new InputStreamReader(this.in));
185 String first_line = in.readLine();
186 if (!first_line.startsWith(Stomp.Commands.CONNECT))
187 {
188 throw new IOException("First line does not begin with with " + Stomp.Commands.CONNECT);
189 }
190
191 Properties headers = headerParser.parse(in);
192 //if (!headers.containsKey(TTMP.Headers.Connect.LOGIN))
193 // System.err.println("Required header [" + TTMP.Headers.Connect.LOGIN + "] missing");
194 //if (!headers.containsKey(TTMP.Headers.Connect.PASSCODE))
195 // System.err.println("Required header [" + TTMP.Headers.Connect.PASSCODE + "] missing");
196
197 // allow anyone to login for now
198
199 String login = headers.getProperty(Stomp.Headers.Connect.LOGIN);
200 String passcode = headers.getProperty(Stomp.Headers.Connect.PASSCODE);
201
202 // skip to end of the packet
203 while (in.read() != 0) {}
204 final ConnectionInfo info = new ConnectionInfo();
205 final Short packet_id = new Short(PACKET_IDS.getNextShortSequence());
206 clientId = clientIds.generateId();
207 commandParser.setClientId(clientId);
208
209 info.setClientId(clientId);
210 info.setReceiptRequired(true);
211 info.setClientVersion(Integer.toString(getCurrentWireFormatVersion()));
212 info.setClosed(false);
213 info.setHostName("ttmp.fake.host.name");
214 info.setId(packet_id.shortValue());
215 info.setUserName(login);
216 info.setPassword(passcode);
217 info.setStartTime(System.currentTimeMillis());
218 info.setStarted(true);
219
220 AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper()
221 {
222 public void cycle() throws InterruptedException
223 {
224 pendingReadPackets.put(info);
225 }
226 });
227
228 addReceiptListener(new ReceiptListener()
229 {
230 public boolean onReceipt(Receipt receipt, DataOutput out)
231 {
232 if (receipt.getCorrelationId() != packet_id.shortValue()) return false;
233 final Short session_packet_id = new Short(PACKET_IDS.getNextShortSequence());
234 sessionId = clientIds.getNextShortSequence();
235
236 final SessionInfo info = new SessionInfo();
237 info.setStartTime(System.currentTimeMillis());
238 info.setId(session_packet_id.shortValue());
239 info.setClientId(clientId);
240 info.setSessionId(sessionId);
241 info.setStarted(true);
242 info.setSessionMode(Session.AUTO_ACKNOWLEDGE);
243 info.setReceiptRequired(true);
244
245 AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper()
246 {
247 public void cycle() throws InterruptedException
248 {
249 pendingReadPackets.put(info);
250 }
251 });
252
253 addReceiptListener(new ReceiptListener()
254 {
255 public boolean onReceipt(Receipt receipt, DataOutput out) throws IOException
256 {
257 if (receipt.getCorrelationId() != session_packet_id.shortValue()) return false;
258 StringBuffer buffer = new StringBuffer();
259 buffer.append(Stomp.Responses.CONNECTED).append(Stomp.NEWLINE);
260 buffer.append(Stomp.Headers.Connected.SESSION)
261 .append(Stomp.Headers.SEPERATOR)
262 .append(clientId)
263 .append(Stomp.NEWLINE)
264 .append(Stomp.NEWLINE);
265 buffer.append(Stomp.NULL);
266 out.writeBytes(buffer.toString());
267 return true;
268 }
269 });
270
271 return true;
272 }
273 });
274 }
275
276 /**
277 * Creates a new copy of this wire format so it can be used in another thread/context
278 */
279 public WireFormat copy()
280 {
281 return new StompWireFormat();
282 }
283
284 /* Stuff below here is leaky stuff we don't actually need */
285
286 /**
287 * Some wire formats require a handshake at start-up
288 *
289 * @throws java.io.IOException
290 */
291 public void initiateClientSideProtocol() throws IOException
292 {
293 throw new UnsupportedOperationException("Not yet implemented!");
294 }
295
296 /**
297 * Can this wireformat process packets of this version
298 *
299 * @param version the version number to test
300 * @return true if can accept the version
301 */
302 public boolean canProcessWireFormatVersion(int version)
303 {
304 return version == getCurrentWireFormatVersion();
305 }
306
307 /**
308 * @return the current version of this wire format
309 */
310 public int getCurrentWireFormatVersion()
311 {
312 return 1;
313 }
314
315 /**
316 * @return Returns the enableCaching.
317 */
318 public boolean isCachingEnabled()
319 {
320 return false;
321 }
322
323 /**
324 * @param enableCaching The enableCaching to set.
325 */
326 public void setCachingEnabled(boolean enableCaching)
327 {
328 // never
329 }
330
331 /**
332 * some wire formats will implement their own fragementation
333 *
334 * @return true unless a wire format supports it's own fragmentation
335 */
336 public boolean doesSupportMessageFragmentation()
337 {
338 return false;
339 }
340
341 /**
342 * Some wire formats will not be able to understand compressed messages
343 *
344 * @return true unless a wire format cannot understand compression
345 */
346 public boolean doesSupportMessageCompression()
347 {
348 return false;
349 }
350
351 /**
352 * Writes the given package to a new datagram
353 *
354 * @param channelID is the unique channel ID
355 * @param packet is the packet to write
356 * @return
357 * @throws java.io.IOException
358 * @throws javax.jms.JMSException
359 */
360 public DatagramPacket writePacket(String channelID, Packet packet) throws IOException, JMSException
361 {
362 throw new UnsupportedOperationException("Will not be implemented");
363 }
364
365 /**
366 * Reads the packet from the given byte[]
367 *
368 * @param bytes
369 * @param offset
370 * @param length
371 * @return
372 * @throws java.io.IOException
373 */
374 public Packet fromBytes(byte[] bytes, int offset, int length) throws IOException
375 {
376 throw new UnsupportedOperationException("Will not be implemented");
377 }
378
379 /**
380 * Reads the packet from the given byte[]
381 *
382 * @param bytes
383 * @return
384 * @throws java.io.IOException
385 */
386 public Packet fromBytes(byte[] bytes) throws IOException
387 {
388 throw new UnsupportedOperationException("Will not be implemented");
389 }
390
391 /**
392 * A helper method which converts a packet into a byte array
393 *
394 * @param packet
395 * @return a byte array representing the packet using some wire protocol
396 * @throws java.io.IOException
397 * @throws javax.jms.JMSException
398 */
399 public byte[] toBytes(Packet packet) throws IOException, JMSException
400 {
401 throw new UnsupportedOperationException("Will not be implemented");
402 }
403
404 /**
405 * A helper method for working with sockets where the first byte is read
406 * first, then the rest of the message is read.
407 * <p/>
408 * Its common when dealing with sockets to have different timeout semantics
409 * until the first non-zero byte is read of a message, after which
410 * time a zero timeout is used.
411 *
412 * @param firstByte the first byte of the packet
413 * @param in the rest of the packet
414 * @return
415 * @throws java.io.IOException
416 */
417 public Packet readPacket(int firstByte, DataInput in) throws IOException
418 {
419 throw new UnsupportedOperationException("Will not be implemented");
420 }
421
422 /**
423 * Read a packet from a Datagram packet from the given channelID. If the
424 * packet is from the same channel ID as it was sent then we have a
425 * loop-back so discard the packet
426 *
427 * @param channelID is the unique channel ID
428 * @param dpacket
429 * @return the packet read from the datagram or null if it should be
430 * discarded
431 * @throws java.io.IOException
432 */
433 public Packet readPacket(String channelID, DatagramPacket dpacket) throws IOException
434 {
435 throw new UnsupportedOperationException("Will not be implemented");
436 }
437
438 void clearTransactionId(String user_tx_id)
439 {
440 this.transactions.remove(user_tx_id);
441 }
442
443 String getClientId()
444 {
445 return this.clientId;
446 }
447
448 public short getSessionId()
449 {
450 return sessionId;
451 }
452
453 public void addSubscription(Subscription s)
454 {
455 if (subscriptions.containsKey(s.getDestination()))
456 {
457 Subscription old = (Subscription) subscriptions.get(s.getDestination());
458 ConsumerInfo p = old.close();
459 enqueuePacket(p);
460 subscriptions.put(s.getDestination(), s);
461 }
462 else
463 {
464 subscriptions.put(s.getDestination(), s);
465 }
466 }
467
468 public void enqueuePacket(final Packet ack)
469 {
470 AsyncHelper.tryUntilNotInterrupted(new AsyncHelper.Helper()
471 {
472 public void cycle() throws InterruptedException
473 {
474 pendingReadPackets.put(ack);
475 }
476 });
477 }
478
479 public Subscription getSubscriptionFor(ActiveMQDestination destination)
480 {
481 return (Subscription) subscriptions.get(destination);
482 }
483
484 public void addAckListener(AckListener listener)
485 {
486 this.ackListeners.add(listener);
487 }
488
489 public List getAckListeners()
490 {
491 return ackListeners;
492 }
493
494 public String getTransactionId(String key)
495 {
496 return (String) transactions.get(key);
497 }
498
499 public void registerTransactionId(String user_tx_id, String tx_id)
500 {
501 transactions.put(user_tx_id, tx_id);
502 }
503 }