001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.benchmark;
019
020 import javax.jms.Destination;
021 import javax.jms.JMSException;
022 import javax.jms.Message;
023 import javax.jms.MessageConsumer;
024 import javax.jms.MessageListener;
025 import javax.jms.Session;
026 import javax.jms.TextMessage;
027 import javax.jms.Topic;
028
029 /**
030 * @author James Strachan
031 * @version $Revision$
032 */
033 public class Consumer extends BenchmarkSupport implements MessageListener {
034
035 public static void main(String[] args) {
036 Consumer tool = new Consumer();
037 if (args.length > 0) {
038 tool.setUrl(args[0]);
039 }
040 if (args.length > 1) {
041 tool.setTopic(parseBoolean(args[1]));
042 }
043 if (args.length > 2) {
044 tool.setSubject(args[2]);
045 }
046 if (args.length > 3) {
047 tool.setDurable(parseBoolean(args[3]));
048 }
049 if (args.length > 4) {
050 tool.setConnectionCount(Integer.parseInt(args[4]));
051 }
052
053 try {
054 tool.run();
055 }
056 catch (Exception e) {
057 System.out.println("Caught: " + e);
058 e.printStackTrace();
059 }
060 }
061
062 public Consumer() {
063 }
064
065 public void run() throws JMSException {
066 start();
067 subscribe();
068 }
069
070 protected void subscribe() throws JMSException {
071 for (int i = 0; i < subjects.length; i++) {
072 subscribe(subjects[i]);
073 }
074 }
075
076 protected void subscribe(String subject) throws JMSException {
077 Session session = createSession();
078
079 Destination destination = createDestination(session, subject);
080
081 System.out.println("Consuming on : " + destination + " of type: " + destination.getClass().getName());
082
083 MessageConsumer consumer = null;
084 if (isDurable() && isTopic()) {
085 consumer = session.createDurableSubscriber((Topic) destination, getClass().getName());
086 }
087 else {
088 consumer = session.createConsumer(destination);
089 }
090 consumer.setMessageListener(this);
091 addResource(consumer);
092 }
093
094 public void onMessage(Message message) {
095 try {
096 TextMessage textMessage = (TextMessage) message;
097
098 // lets force the content to be deserialized
099 String text = textMessage.getText();
100 count(1);
101
102 // lets count the messages
103
104 //message.acknowledge();
105 }
106 catch (JMSException e) {
107 // TODO Auto-generated catch block
108 e.printStackTrace();
109 }
110 }
111
112 }