001 package org.activemq.ra;
002
003 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
004
005 /**
006 */
007 public class CircularQueue {
008
009 private final int size;
010
011 private final SynchronizedBoolean stopping;
012
013
014 // For pooling objects
015 private final Object[] contents;
016 final private Object mutex = new Object();
017 //where the next worker to be supplied currently is.
018 private int start=0;
019 //where the next worker to be inserted will go
020 private int end=0;
021
022 public CircularQueue(int size, SynchronizedBoolean stopping) {
023 this.size = size;
024 contents = new Object[size];
025 this.stopping = stopping;
026 }
027
028 public Object get() {
029 synchronized(mutex) {
030 while( true ) {
031 Object ew = contents[start];
032 if (ew != null) {
033 start++;
034 if(start == contents.length) {
035 start=0;
036 }
037 return ew;
038 } else {
039 try {
040 mutex.wait();
041 if(stopping.get()) {
042 return null;
043 }
044 } catch (InterruptedException e) {
045 return null;
046 }
047 }
048 }
049 }
050 }
051
052 public void returnObject(Object worker) {
053 synchronized(mutex) {
054 contents[end++] = worker;
055 if( end == contents.length) {
056 end=0;
057 }
058 mutex.notify();
059 }
060 }
061
062 public int size() {
063 return contents.length;
064 }
065
066 public void drain() {
067 int i = 0;
068 while (i < size) {
069 if (get() != null) {
070 i++;
071 }
072 }
073 }
074
075
076 public void notifyWaiting() {
077 synchronized(mutex) {
078 mutex.notifyAll();
079 }
080 }
081
082 }