001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 * Copyright 2004 Hiram Chirino
005 *
006 * Licensed under the Apache License, Version 2.0 (the "License");
007 * you may not use this file except in compliance with the License.
008 * You may obtain a copy of the License at
009 *
010 * http://www.apache.org/licenses/LICENSE-2.0
011 *
012 * Unless required by applicable law or agreed to in writing, software
013 * distributed under the License is distributed on an "AS IS" BASIS,
014 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
015 * See the License for the specific language governing permissions and
016 * limitations under the License.
017 *
018 **/
019
020 package org.activemq.io.util;
021 import java.util.ArrayList;
022 import java.util.Iterator;
023 import java.util.List;
024
025 import EDU.oswego.cs.dl.util.concurrent.ConcurrentHashMap;
026 import EDU.oswego.cs.dl.util.concurrent.SynchronizedBoolean;
027
028 /**
029 * A factory manager for MemoryBoundedQueue and also ensures that the maximum memory used by all active
030 * MemoryBoundedQueues created by this instance stays within the memory usage bounds set.
031 *
032 * @version $Revision: 1.1.1.1 $
033 */
034 public class MemoryBoundedQueueManager {
035
036 private final ConcurrentHashMap activeQueues = new ConcurrentHashMap();
037 private final MemoryBoundedObjectManager memoryManager;
038 private final SynchronizedBoolean memoryLimitEnforced = new SynchronizedBoolean(true);
039
040 /**
041 * @param name
042 * @param maxSize
043 */
044 public MemoryBoundedQueueManager(MemoryBoundedObjectManager memoryManager) {
045 this.memoryManager = memoryManager;
046 }
047
048 /**
049 * retrieve a named MemoryBoundedQueue or creates one if not found
050 *
051 * @param name
052 * @return an named instance of a MemoryBoundedQueue
053 */
054 public MemoryBoundedQueue getMemoryBoundedQueue(String name) {
055 MemoryBoundedQueue result = null;
056 synchronized (activeQueues) {
057 result = (MemoryBoundedQueue) activeQueues.get(name);
058 if (result == null) {
059 if (memoryManager.isSupportJMSPriority())
060 result = new MemoryBoundedPrioritizedQueue(this, name);
061 else
062 result = new MemoryBoundedQueue(this, name);
063 activeQueues.put(name, result);
064 }
065 }
066 return result;
067 }
068
069 /**
070 * close this queue manager and all associated MemoryBoundedQueues
071 */
072 public void close() {
073 memoryManager.close();
074 }
075
076 /**
077 * @return Returns the memoryManager.
078 */
079 public MemoryBoundedObjectManager getMemoryManager() {
080 return memoryManager;
081 }
082
083 public int getCurrentCapacity() {
084 return memoryManager.getCurrentCapacity();
085 }
086
087 public void add(MemoryBoundedQueue queue) {
088 memoryManager.add(queue);
089 }
090
091 public void remove(MemoryBoundedQueue queue) {
092 memoryManager.remove(queue);
093 activeQueues.remove(queue.getName());
094 }
095
096 public boolean isFull() {
097 return memoryManager.isFull();
098 }
099
100 public void incrementMemoryUsed(int size) {
101 memoryManager.incrementMemoryUsed(size);
102 }
103
104 public void decrementMemoryUsed(int size) {
105 memoryManager.decrementMemoryUsed(size);
106 }
107
108 public List getMemoryBoundedQueues(){
109 return new ArrayList(activeQueues.values());
110 }
111
112 public String dumpContents(){
113 String result = "Memory = " + memoryManager.getTotalMemoryUsedSize() + " , capacity = " + memoryManager.getCurrentCapacity() + "\n";
114 for (Iterator i = activeQueues.values().iterator(); i.hasNext(); ){
115 MemoryBoundedQueue q = (MemoryBoundedQueue)i.next();
116 result += "\t" + q.getName() + " enqueued = " + q.getContents().size() + " memory = " + q.getLocalMemoryUsedByThisQueue() + "\n";
117 }
118 result += "\n\n";
119 return result;
120 }
121
122 public void setMemoryLimitEnforced(boolean enable) {
123 memoryLimitEnforced.set(enable);
124 }
125
126 public boolean isMemoryLimitEnforced() {
127 return memoryLimitEnforced.get();
128 }
129 }