001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.tool;
020
021 import java.io.IOException;
022 import java.io.PrintWriter;
023 import java.util.ArrayList;
024 import java.util.Collections;
025 import java.util.Iterator;
026 import java.util.List;
027 import java.util.Random;
028
029 import javax.jms.BytesMessage;
030 import javax.jms.Connection;
031 import javax.jms.DeliveryMode;
032 import javax.jms.Destination;
033 import javax.jms.JMSException;
034 import javax.jms.Message;
035 import javax.jms.MessageConsumer;
036 import javax.jms.MessageProducer;
037 import javax.jms.Session;
038
039 import junit.framework.TestCase;
040
041 import org.activemq.ActiveMQConnectionFactory;
042 import org.activemq.message.ActiveMQQueue;
043
044 import EDU.oswego.cs.dl.util.concurrent.Latch;
045 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
046 import EDU.oswego.cs.dl.util.concurrent.WaitableInt;
047
048 /**
049 * @version $Revision$
050 */
051 public class AcidTestTool extends TestCase {
052
053 private Random random = new Random();
054 private byte data[];
055 private int workerCount = 10;
056 private PrintWriter statWriter;
057
058 // Worker configuration.
059 protected int recordSize = 1024;
060 protected int batchSize = 5;
061 protected int workerThinkTime = 500;
062 SynchronizedBoolean ignoreJMSErrors = new SynchronizedBoolean(false);
063
064 protected Destination target;
065 private ActiveMQConnectionFactory factory;
066 private Connection connection;
067
068 WaitableInt publishedBatches = new WaitableInt(0);
069 WaitableInt consumedBatches = new WaitableInt(0);
070
071 List errors = Collections.synchronizedList(new ArrayList());
072
073 private interface Worker extends Runnable {
074 public boolean waitForExit(long i) throws InterruptedException;
075 }
076
077 private final class ProducerWorker implements Worker {
078
079 Session session;
080 private MessageProducer producer;
081 private BytesMessage message;
082 Latch doneLatch = new Latch();
083 private final String workerId;
084
085 ProducerWorker(Session session, String workerId) throws JMSException {
086 this.session = session;
087 this.workerId = workerId;
088 producer = session.createProducer(target);
089 producer.setDeliveryMode(DeliveryMode.PERSISTENT);
090 message = session.createBytesMessage();
091 message.setStringProperty("workerId", workerId);
092 message.writeBytes(data);
093 }
094
095 public void run() {
096 try {
097 for( int batchId=0; true; batchId++ ) {
098 // System.out.println("Sending batch: "+workerId+" "+batchId);
099 for( int msgId=0; msgId < batchSize; msgId++ ) {
100 // Sleep some random amount of time less than workerThinkTime
101 try {
102 Thread.sleep(random.nextInt(workerThinkTime));
103 } catch (InterruptedException e1) {
104 return;
105 }
106
107 message.setIntProperty("batch-id",batchId);
108 message.setIntProperty("msg-id",msgId);
109
110
111 producer.send(message);
112 }
113 session.commit();
114 publishedBatches.increment();
115 // System.out.println("Commited send batch: "+workerId+" "+batchId);
116 }
117 } catch (JMSException e) {
118 if( !ignoreJMSErrors.get() ) {
119 e.printStackTrace();
120 errors.add(e);
121 }
122 return;
123 } catch (Throwable e) {
124 e.printStackTrace();
125 errors.add(e);
126 return;
127 } finally {
128 System.out.println("Producer exiting.");
129 doneLatch.release();
130 }
131 }
132
133 public boolean waitForExit(long i) throws InterruptedException {
134 return doneLatch.attempt(i);
135 }
136 }
137
138 private final class ConsumerWorker implements Worker {
139
140 Session session;
141 private MessageConsumer consumer;
142 private final long timeout;
143 Latch doneLatch = new Latch();
144 private final String workerId;
145
146 ConsumerWorker(Session session, String workerId, long timeout) throws JMSException {
147 this.session = session;
148 this.workerId = workerId;
149 this.timeout = timeout;
150 consumer = session.createConsumer(target,"workerId='"+workerId+"'");
151 }
152
153 public void run() {
154
155 try {
156 int batchId=0;
157 while( true ) {
158 for( int msgId=0; msgId < batchSize; msgId++ ) {
159
160 // Sleep some random amount of time less than workerThinkTime
161 try {
162 Thread.sleep(random.nextInt(workerThinkTime));
163 } catch (InterruptedException e1) {
164 return;
165 }
166
167 Message message = consumer.receive(timeout);
168 if( msgId > 0 ) {
169 assertNotNull(message);
170 assertEquals(message.getIntProperty("batch-id"), batchId);
171 assertEquals(message.getIntProperty("msg-id"), msgId);
172 } else {
173 if( message==null ) {
174 System.out.println("At end of batch an don't have a next batch to process. done.");
175 return;
176 }
177 assertEquals(msgId, message.getIntProperty("msg-id") );
178 batchId = message.getIntProperty("batch-id");
179 // System.out.println("Receiving batch: "+workerId+" "+batchId);
180 }
181
182 }
183 session.commit();
184 consumedBatches.increment();
185 // System.out.println("Commited receive batch: "+workerId+" "+batchId);
186 }
187 } catch (JMSException e) {
188 if( !ignoreJMSErrors.get() ) {
189 e.printStackTrace();
190 errors.add(e);
191 }
192 return;
193 } catch (Throwable e) {
194 e.printStackTrace();
195 errors.add(e);
196 return;
197 } finally {
198 System.out.println("Consumer exiting.");
199 doneLatch.release();
200 }
201 }
202
203 public boolean waitForExit(long i) throws InterruptedException {
204 return doneLatch.attempt(i);
205 }
206 }
207
208 /**
209 * @see junit.framework.TestCase#setUp()
210 */
211 protected void setUp() throws Exception {
212 factory = new ActiveMQConnectionFactory("tcp://localhost:61616");
213 this.target = new ActiveMQQueue(getClass().getName());
214 }
215
216 protected void tearDown() throws Exception {
217 if( connection!=null ) {
218 try { connection.close(); } catch (Throwable ignore) {}
219 connection = null;
220 }
221 }
222
223 /**
224 * @throws InterruptedException
225 * @throws JMSException
226 * @throws JMSException
227 *
228 */
229 private void reconnect() throws InterruptedException, JMSException {
230 if( connection!=null ) {
231 try { connection.close(); } catch (Throwable ignore) {}
232 connection = null;
233 }
234
235 long reconnectDelay=1000;
236 JMSException lastError=null;
237
238 while( connection == null) {
239 if( reconnectDelay > 1000*10 ) {
240 reconnectDelay = 1000*10;
241 }
242 try {
243 connection = factory.createConnection();
244 connection.start();
245 } catch (JMSException e) {
246 lastError = e;
247 Thread.sleep(reconnectDelay);
248 reconnectDelay*=2;
249 }
250 }
251 }
252
253 /**
254 * @throws Throwable
255 * @throws IOException
256 *
257 */
258 public void testAcidTransactions() throws Throwable {
259
260 System.out.println("Client threads write records using: Record Size: " + recordSize + ", Batch Size: "
261 + batchSize + ", Worker Think Time: " + workerThinkTime);
262
263 // Create the record and fill it with some values.
264 data = new byte[recordSize];
265 for (int i = 0; i < data.length; i++) {
266 data[i] = (byte) i;
267 }
268
269 System.out.println("==============================================");
270 System.out.println("===> Start the server now.");
271 System.out.println("==============================================");
272 reconnect();
273
274 System.out.println("Starting " + workerCount + " Workers...");
275 ArrayList workers = new ArrayList();
276 for( int i=0; i< workerCount; i++ ){
277 String workerId = "worker-"+i;
278
279 Worker w = new ConsumerWorker(connection.createSession(true,Session.SESSION_TRANSACTED), workerId, 1000*5);
280 workers.add(w);
281 new Thread(w,"Consumer:"+workerId).start();
282
283 w = new ProducerWorker(connection.createSession(true,Session.SESSION_TRANSACTED), workerId);
284 workers.add(w);
285 new Thread(w,"Producer:"+workerId).start();
286 }
287
288 System.out.println("Waiting for "+(workerCount*10)+" batches to be delivered.");
289
290 //
291 // Wait for about 5 batches of messages per worker to be consumed before restart.
292 //
293 while( publishedBatches.get() < workerCount*5) {
294 System.out.println("Stats: Produced Batches: "+this.publishedBatches.get()+", Consumed Batches: "+this.consumedBatches.get());
295 Thread.sleep(1000);
296 }
297
298 System.out.println("==============================================");
299 System.out.println("===> Server is under load now. Kill it!");
300 System.out.println("==============================================");
301 ignoreJMSErrors.set(true);
302
303 // Wait for all the workers to finish.
304 System.out.println("Waiting for all workers to exit due to server shutdown.");
305 for (Iterator iter = workers.iterator(); iter.hasNext();) {
306 Worker worker = (Worker) iter.next();
307 while( !worker.waitForExit(1000) ) {
308 System.out.println("==============================================");
309 System.out.println("===> Server is under load now. Kill it!");
310 System.out.println("==============================================");
311 System.out.println("Stats: Produced Batches: "+this.publishedBatches.get()+", Consumed Batches: "+this.consumedBatches.get());
312 }
313 }
314 workers.clear();
315
316 // No errors should have occured so far.
317 if( errors.size()>0 )
318 throw (Throwable) errors.get(0);
319
320 System.out.println("==============================================");
321 System.out.println("===> Start the server now.");
322 System.out.println("==============================================");
323 reconnect();
324
325 System.out.println("Restarted.");
326
327 // Validate the all transactions were commited as a uow. Looking for partial commits.
328 for( int i=0; i< workerCount; i++ ){
329 String workerId = "worker-"+i;
330 Worker w = new ConsumerWorker(connection.createSession(true,Session.SESSION_TRANSACTED), workerId, 5*1000);
331 workers.add(w);
332 new Thread(w, "Consumer:"+workerId).start();
333 }
334
335 System.out.println("Waiting for restarted consumers to finish consuming all messages..");
336 for (Iterator iter = workers.iterator(); iter.hasNext();) {
337 Worker worker = (Worker) iter.next();
338 while( !worker.waitForExit(1000*5) ) {
339 System.out.println("Waiting for restarted consumers to finish consuming all messages..");
340 System.out.println("Stats: Produced Batches: "+this.publishedBatches.get()+", Consumed Batches: "+this.consumedBatches.get());
341 }
342 }
343 workers.clear();
344
345 System.out.println("Workers finished..");
346 System.out.println("Stats: Produced Batches: "+this.publishedBatches.get()+", Consumed Batches: "+this.consumedBatches.get());
347
348 if( errors.size()>0 )
349 throw (Throwable) errors.get(0);
350
351 }
352
353 public static void main(String[] args) {
354 try {
355 AcidTestTool tool = new AcidTestTool();
356 tool.setUp();
357 tool.testAcidTransactions();
358 tool.tearDown();
359 } catch (Throwable e) {
360 System.out.println("Test Failed: "+e.getMessage());
361 e.printStackTrace();
362 }
363 }
364 }