001 /**
002 *
003 * Copyright 2004 Hiram Chirino
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.ra;
019
020 import java.util.ArrayList;
021 import java.util.Iterator;
022 import java.util.LinkedList;
023 import java.util.List;
024
025 import javax.jms.JMSException;
026 import javax.jms.ServerSession;
027 import javax.jms.ServerSessionPool;
028 import javax.jms.Session;
029 import javax.resource.spi.UnavailableException;
030 import javax.resource.spi.endpoint.MessageEndpoint;
031
032 import org.activemq.ActiveMQQueueSession;
033 import org.activemq.ActiveMQSession;
034 import org.activemq.ActiveMQTopicSession;
035 import org.activemq.message.ActiveMQMessage;
036 import org.apache.commons.logging.Log;
037 import org.apache.commons.logging.LogFactory;
038
039 /**
040 * @version $Revision: 1.1.1.1 $ $Date: 2005/03/11 21:15:11 $
041 */
042 public class ServerSessionPoolImpl implements ServerSessionPool {
043
044 private static final Log log = LogFactory.getLog(ServerSessionPoolImpl.class);
045
046 private final ActiveMQAsfEndpointWorker activeMQAsfEndpointWorker;
047 private final int maxSessions;
048
049 private ArrayList idleSessions = new ArrayList();
050 private LinkedList activeSessions = new LinkedList();
051 private boolean closing = false;
052
053 public ServerSessionPoolImpl(ActiveMQAsfEndpointWorker activeMQAsfEndpointWorker, int maxSessions) {
054 this.activeMQAsfEndpointWorker = activeMQAsfEndpointWorker;
055 this.maxSessions=maxSessions;
056 }
057
058 private ServerSessionImpl createServerSessionImpl() throws JMSException {
059 ActiveMQActivationSpec activationSpec = activeMQAsfEndpointWorker.endpointActivationKey.getActivationSpec();
060 int acknowledge = (activeMQAsfEndpointWorker.transacted) ? Session.SESSION_TRANSACTED : activationSpec.getAcknowledgeModeForSession();
061 final ActiveMQSession session = (ActiveMQSession) activeMQAsfEndpointWorker.connection.createSession(activeMQAsfEndpointWorker.transacted,acknowledge);
062 MessageEndpoint endpoint;
063 try {
064 int batchSize = 0;
065 if (activationSpec.getEnableBatchBooleanValue()) {
066 batchSize = activationSpec.getMaxMessagesPerBatchIntValue();
067 }
068 if( activationSpec.isUseRAManagedTransactionEnabled() ) {
069 // The RA will manage the transaction commit.
070 endpoint = createEndpoint(null);
071 return new ServerSessionImpl(this, (ActiveMQSession)session, activeMQAsfEndpointWorker.workManager, endpoint, true, batchSize);
072 } else {
073 // Give the container an object to manage to transaction with.
074 endpoint = createEndpoint(new LocalAndXATransaction(session.getTransactionContext()));
075 return new ServerSessionImpl(this, (ActiveMQSession)session, activeMQAsfEndpointWorker.workManager, endpoint, false, batchSize);
076 }
077 } catch (UnavailableException e) {
078 // The container could be limiting us on the number of endpoints
079 // that are being created.
080 log.debug("Could not create an endpoint.", e);
081 session.close();
082 return null;
083 }
084 }
085
086 private MessageEndpoint createEndpoint(LocalAndXATransaction txResourceProxy) throws UnavailableException {
087 MessageEndpoint endpoint;
088 endpoint = activeMQAsfEndpointWorker.endpointFactory.createEndpoint(txResourceProxy);
089 MessageEndpointProxy endpointProxy = new MessageEndpointProxy(endpoint);
090 return endpointProxy;
091 }
092
093 /**
094 */
095 synchronized public ServerSession getServerSession() throws JMSException {
096 log.debug("ServerSession requested.");
097 if (closing) {
098 throw new JMSException("Session Pool Shutting Down.");
099 }
100
101 if (idleSessions.size() > 0) {
102 ServerSessionImpl ss = (ServerSessionImpl) idleSessions.remove(idleSessions.size() - 1);
103 activeSessions.addLast(ss);
104 log.debug("Using idle session: " + ss);
105 return ss;
106 } else {
107 // Are we at the upper limit?
108 if (activeSessions.size() >= maxSessions) {
109 // then reuse the allready created sessions..
110 // This is going to queue up messages into a session for
111 // processing.
112 return getExistingServerSession();
113 }
114 ServerSessionImpl ss = createServerSessionImpl();
115 // We may not be able to create a session due to the container
116 // restricting us.
117 if (ss == null) {
118 if( idleSessions.size() == 0 ) {
119 throw new JMSException("Endpoint factory did not allows to any endpoints.");
120 }
121
122 return getExistingServerSession();
123 }
124 activeSessions.addLast(ss);
125 log.debug("Created a new session: " + ss);
126 return ss;
127 }
128 }
129
130 /**
131 * @param message
132 * @throws JMSException
133 */
134 private void dispatchToSession(ActiveMQMessage message) throws JMSException {
135
136 ServerSession serverSession = getServerSession();
137 Session nestedSession = serverSession.getSession();
138 ActiveMQSession session = null;
139 if (nestedSession instanceof ActiveMQSession) {
140 session = (ActiveMQSession) nestedSession;
141 } else if (nestedSession instanceof ActiveMQTopicSession) {
142 ActiveMQTopicSession topicSession = (ActiveMQTopicSession) nestedSession;
143 session = (ActiveMQSession) topicSession.getNext();
144 } else if (nestedSession instanceof ActiveMQQueueSession) {
145 ActiveMQQueueSession queueSession = (ActiveMQQueueSession) nestedSession;
146 session = (ActiveMQSession) queueSession.getNext();
147 } else {
148 throw new JMSException("Invalid instance of session obtained from server session." +
149 "The instance should be one of the following: ActiveMQSession, ActiveMQTopicSession, ActiveMQQueueSession. " +
150 "Found instance of " + nestedSession.getClass().getName());
151 }
152 session.dispatch(message);
153 serverSession.start();
154 }
155
156
157 /**
158 * @return
159 */
160 private ServerSession getExistingServerSession() {
161 ServerSessionImpl ss = (ServerSessionImpl) activeSessions.removeFirst();
162 activeSessions.addLast(ss);
163 log.debug("Reusing an active session: " + ss);
164 return ss;
165 }
166
167 synchronized public void returnToPool(ServerSessionImpl ss) {
168 log.debug("Session returned to pool: " + ss);
169 activeSessions.remove(ss);
170 idleSessions.add(ss);
171 notify();
172 }
173
174 synchronized public void removeFromPool(ServerSessionImpl ss) {
175 activeSessions.remove(ss);
176 try {
177 ActiveMQSession session = (ActiveMQSession) ss.getSession();
178 List l = session.getUnconsumedMessages();
179 for (Iterator i = l.iterator(); i.hasNext();) {
180 dispatchToSession((ActiveMQMessage) i.next());
181 }
182 } catch (Throwable t) {
183 log.error("Error redispatching unconsumed messages from stale session", t);
184 }
185 ss.close();
186 notify();
187 }
188
189 public void close() {
190 synchronized (this) {
191 closing = true;
192 closeIdleSessions();
193 while( activeSessions.size() > 0 ) {
194 try {
195 wait();
196 } catch (InterruptedException e) {
197 Thread.currentThread().interrupt();
198 return;
199 }
200 closeIdleSessions();
201 }
202 }
203 }
204
205 private void closeIdleSessions() {
206 for (Iterator iter = idleSessions.iterator(); iter.hasNext();) {
207 ServerSessionImpl ss = (ServerSessionImpl) iter.next();
208 ss.close();
209 }
210 }
211
212 }