001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.tool;
019
020 import javax.jms.Connection;
021 import javax.jms.JMSException;
022 import javax.jms.Message;
023 import javax.jms.MessageConsumer;
024 import javax.jms.MessageListener;
025 import javax.jms.Session;
026 import javax.jms.TextMessage;
027 import javax.jms.Topic;
028 import java.io.IOException;
029
030 /**
031 * A simple tool for consuming messages
032 *
033 * @version $Revision$
034 */
035 public class ConsumerTool extends ToolSupport implements MessageListener {
036
037 protected int count = 0;
038 protected int dumpCount = 10;
039 protected boolean verbose = true;
040 protected int maxiumMessages = 0;
041 private boolean pauseBeforeShutdown;
042
043
044 public static void main(String[] args) {
045 ConsumerTool tool = new ConsumerTool();
046 if (args.length > 0) {
047 tool.url = args[0];
048 }
049 if (args.length > 1) {
050 tool.topic = args[1].equalsIgnoreCase("true");
051 }
052 if (args.length > 2) {
053 tool.subject = args[2];
054 }
055 if (args.length > 3) {
056 tool.durable = args[3].equalsIgnoreCase("true");
057 }
058 if (args.length > 4) {
059 tool.maxiumMessages = Integer.parseInt(args[4]);
060 }
061 tool.run();
062 }
063
064 public void run() {
065 try {
066 System.out.println("Connecting to URL: " + url);
067 System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject);
068 System.out.println("Using " + (durable ? "durable" : "non-durable") + " subscription");
069
070 Connection connection = createConnection();
071 Session session = createSession(connection);
072 MessageConsumer consumer = null;
073 if (durable && topic) {
074 consumer = session.createDurableSubscriber((Topic) destination, consumerName);
075 }
076 else {
077 consumer = session.createConsumer(destination);
078 }
079 if (maxiumMessages <= 0) {
080 consumer.setMessageListener(this);
081 }
082 connection.start();
083
084 if (maxiumMessages > 0) {
085 consumeMessagesAndClose(connection, session, consumer);
086 }
087 }
088 catch (Exception e) {
089 System.out.println("Caught: " + e);
090 e.printStackTrace();
091 }
092 }
093
094 public void onMessage(Message message) {
095 try {
096 if (message instanceof TextMessage) {
097 TextMessage txtMsg = (TextMessage) message;
098 if (verbose) {
099
100 String msg = txtMsg.getText();
101 if( msg.length() > 50 )
102 msg = msg.substring(0, 50)+"...";
103
104 System.out.println("Received: " + msg);
105 }
106 }
107 else {
108 if (verbose) {
109 System.out.println("Received: " + message);
110 }
111 }
112 /*
113 if (++count % dumpCount == 0) {
114 dumpStats(connection);
115 }
116 */
117 }
118 catch (JMSException e) {
119 System.out.println("Caught: " + e);
120 e.printStackTrace();
121 }
122 }
123
124
125 protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException, IOException {
126 System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown");
127
128 for (int i = 0; i < maxiumMessages; i++) {
129 Message message = consumer.receive();
130 onMessage(message);
131 }
132 System.out.println("Closing connection");
133 consumer.close();
134 session.close();
135 connection.close();
136 if (pauseBeforeShutdown) {
137 System.out.println("Press return to shut down");
138 System.in.read();
139 }
140 }
141 }