001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018 package org.activemq.broker.impl;
019
020 import org.activemq.broker.BrokerClient;
021 import org.activemq.broker.BrokerConnector;
022 import org.activemq.broker.BrokerContainer;
023 import org.activemq.io.WireFormat;
024 import org.activemq.message.ActiveMQMessage;
025 import org.activemq.message.ActiveMQXid;
026 import org.activemq.message.BrokerInfo;
027 import org.activemq.message.ConnectionInfo;
028 import org.activemq.message.ConsumerInfo;
029 import org.activemq.message.DurableUnsubscribe;
030 import org.activemq.message.MessageAck;
031 import org.activemq.message.ProducerInfo;
032 import org.activemq.message.SessionInfo;
033 import org.activemq.transport.TransportChannel;
034 import org.activemq.transport.TransportChannelListener;
035 import org.activemq.transport.TransportServerChannel;
036 import org.activemq.transport.TransportServerChannelProvider;
037 import org.apache.commons.logging.Log;
038 import org.apache.commons.logging.LogFactory;
039
040 import javax.jms.JMSException;
041 import javax.jms.JMSSecurityException;
042 import javax.transaction.xa.XAException;
043 import java.net.URI;
044 import java.net.URISyntaxException;
045 import java.util.Collections;
046 import java.util.HashMap;
047 import java.util.Map;
048
049 /**
050 * An implementation of the broker (the JMS server)
051 *
052 * @version $Revision: 1.1.1.1 $
053 */
054 public class BrokerConnectorImpl implements BrokerConnector, TransportChannelListener {
055
056 private TransportServerChannel serverChannel;
057 private Log log;
058 private BrokerContainer container;
059 private Map clients = Collections.synchronizedMap(new HashMap());
060
061 /**
062 * Helper constructor for TCP protocol with the given bind address
063 *
064 * @param container
065 * @param bindAddress
066 * @throws JMSException
067 */
068 public BrokerConnectorImpl(BrokerContainer container, String bindAddress, WireFormat wireFormat) throws JMSException {
069 this(container, createTransportServerChannel(wireFormat, bindAddress));
070 }
071
072 /**
073 * @param container
074 * @param serverChannel
075 */
076 public BrokerConnectorImpl(BrokerContainer container, TransportServerChannel serverChannel) {
077 this(container);
078 this.serverChannel = serverChannel;
079 serverChannel.setTransportChannelListener(this);
080 }
081
082 /**
083 * @param container
084 * @param serverChannel
085 */
086 public BrokerConnectorImpl(BrokerContainer container) {
087 assert container != null;
088 this.log = LogFactory.getLog(getClass().getName());
089 this.container = container;
090 this.container.addConnector(this);
091
092 }
093
094 /**
095 * @return infomation about the Broker
096 */
097 public BrokerInfo getBrokerInfo() {
098 return container.getBroker().getBrokerInfo();
099 }
100
101 /**
102 * Get a hint about the broker capacity for more messages
103 *
104 * @return percentage value (0-100) about how much capacity the
105 * broker has
106 */
107 public int getBrokerCapacity() {
108 return container.getBroker().getRoundedCapacity();
109 }
110
111 /**
112 * @return Get the server channel
113 */
114 public TransportServerChannel getServerChannel() {
115 return serverChannel;
116 }
117
118 /**
119 * start the Broker
120 *
121 * @throws JMSException
122 */
123 public void start() throws JMSException {
124 if (this.serverChannel != null){
125 this.serverChannel.start();
126 }
127 log.info("ActiveMQ connector started: " + serverChannel);
128 }
129
130 /**
131 * Stop the Broker
132 *
133 * @throws JMSException
134 */
135 public void stop() throws JMSException {
136 this.container.removeConnector(this);
137 if (this.serverChannel != null){
138 this.serverChannel.stop();
139 }
140 log.info("ActiveMQ connector stopped: " + serverChannel);
141 }
142
143 /**
144 * Register a Broker Client
145 *
146 * @param client
147 * @param info contains infomation about the Connection this Client represents
148 * @throws JMSException
149 * @throws javax.jms.InvalidClientIDException
150 * if the JMS client specifies an invalid or duplicate client ID.
151 * @throws JMSSecurityException if client authentication fails due to an invalid user name or password.
152 */
153 public void registerClient(BrokerClient client, ConnectionInfo info) throws JMSException {
154 this.container.registerConnection(client, info);
155 }
156
157 /**
158 * Deregister a Broker Client
159 *
160 * @param client
161 * @param info
162 * @throws JMSException if some internal error occurs
163 */
164 public void deregisterClient(BrokerClient client, ConnectionInfo info) throws JMSException {
165 this.container.deregisterConnection(client, info);
166 }
167
168 /**
169 * Registers a MessageConsumer
170 *
171 * @param client
172 * @param info
173 * @throws JMSException
174 * @throws JMSSecurityException if client authentication fails for the Destination the Consumer applies for
175 */
176 public void registerMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
177 if (info.getDestination() == null) {
178 throw new JMSException("No Destination specified on consumerInfo for client: " + client + " info: " + info);
179 }
180 this.container.registerMessageConsumer(client, info);
181
182 }
183
184 /**
185 * De-register a MessageConsumer from the Broker
186 *
187 * @param client
188 * @param info
189 * @throws JMSException
190 */
191 public void deregisterMessageConsumer(BrokerClient client, ConsumerInfo info) throws JMSException {
192 this.container.deregisterMessageConsumer(client, info);
193 }
194
195 /**
196 * Registers a MessageProducer
197 *
198 * @param client
199 * @param info
200 * @throws JMSException
201 * @throws JMSSecurityException if client authentication fails for the Destination the Consumer applies for
202 */
203 public void registerMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
204 this.container.registerMessageProducer(client, info);
205 }
206
207 /**
208 * De-register a MessageProducer from the Broker
209 *
210 * @param client
211 * @param info
212 * @throws JMSException
213 */
214 public void deregisterMessageProducer(BrokerClient client, ProducerInfo info) throws JMSException {
215 this.container.deregisterMessageProducer(client, info);
216 }
217
218 /**
219 * Register a client-side Session (used for Monitoring)
220 *
221 * @param client
222 * @param info
223 * @throws JMSException
224 */
225 public void registerSession(BrokerClient client, SessionInfo info) throws JMSException {
226 this.container.registerSession(client, info);
227 }
228
229 /**
230 * De-register a client-side Session from the Broker (used for monitoring)
231 *
232 * @param client
233 * @param info
234 * @throws JMSException
235 */
236 public void deregisterSession(BrokerClient client, SessionInfo info) throws JMSException {
237 this.container.deregisterSession(client, info);
238 }
239
240 /**
241 * Start a transaction from the Client session
242 *
243 * @param client
244 * @param transactionId
245 * @throws JMSException
246 */
247 public void startTransaction(BrokerClient client, String transactionId) throws JMSException {
248 this.container.startTransaction(client, transactionId);
249 }
250
251 /**
252 * Rollback a transacton
253 *
254 * @param client
255 * @param transactionId
256 * @throws JMSException
257 */
258 public void rollbackTransaction(BrokerClient client, String transactionId) throws JMSException {
259 this.container.rollbackTransaction(client, transactionId);
260 }
261
262 /**
263 * Commit a transaction
264 *
265 * @param client
266 * @param transactionId
267 * @throws JMSException
268 */
269 public void commitTransaction(BrokerClient client, String transactionId) throws JMSException {
270 this.container.commitTransaction(client, transactionId);
271 }
272
273 /**
274 * Send a non-transacted message to the Broker
275 *
276 * @param client
277 * @param message
278 * @throws JMSException
279 */
280 public void sendMessage(BrokerClient client, ActiveMQMessage message) throws JMSException {
281 this.container.sendMessage(client, message);
282 }
283
284 /**
285 * Acknowledge reciept of a message
286 *
287 * @param client
288 * @param ack
289 * @throws JMSException
290 */
291 public void acknowledgeMessage(BrokerClient client, MessageAck ack) throws JMSException {
292 this.container.acknowledgeMessage(client, ack);
293 }
294
295 /**
296 * Command to delete a durable topic subscription
297 *
298 * @param client
299 * @param ds
300 * @throws JMSException
301 */
302 public void durableUnsubscribe(BrokerClient client, DurableUnsubscribe ds) throws JMSException {
303 this.container.durableUnsubscribe(client, ds);
304 }
305
306
307 /**
308 * @param channel - client to add
309 */
310 public void addClient(TransportChannel channel) {
311 try {
312 BrokerClient client = new BrokerClientImpl();
313 client.initialize(this, channel);
314 if (log.isDebugEnabled()) {
315 log.debug("Starting new client: " + client);
316 }
317 channel.setServerSide(true);
318 channel.start();
319 clients.put(channel, client);
320 }
321 catch (JMSException e) {
322 log.error("Failed to add client due to: " + e, e);
323 }
324 }
325
326 /**
327 * @param channel - client to remove
328 */
329 public void removeClient(TransportChannel channel) {
330 BrokerClient client = (BrokerClient) clients.remove(channel);
331 if (client != null) {
332 if (log.isDebugEnabled()) {
333 log.debug("Client leaving client: " + client);
334 }
335
336 // we may have already been closed, if not then lets simulate a normal shutdown
337 client.cleanUp();
338 }
339 else {
340 // might have got a duplicate callback
341 log.warn("No such client for channel: " + channel);
342 }
343 }
344
345 /**
346 * @return the BrokerContainer for this Connector
347 */
348 public BrokerContainer getBrokerContainer() {
349 return this.container;
350 }
351
352 /**
353 * Start an XA transaction.
354 *
355 * @see org.activemq.broker.BrokerConnector#startTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid)
356 */
357 public void startTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
358 this.container.startTransaction(client, xid);
359 }
360
361 /**
362 * Gets the prepared XA transactions.
363 *
364 * @see org.activemq.broker.BrokerConnector#getPreparedTransactions(org.activemq.broker.BrokerClient)
365 */
366 public ActiveMQXid[] getPreparedTransactions(BrokerClient client) throws XAException {
367 return this.container.getPreparedTransactions(client);
368 }
369
370 /**
371 * Prepare an XA transaction.
372 *
373 * @see org.activemq.broker.BrokerConnector#prepareTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid)
374 */
375 public int prepareTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
376 return this.container.prepareTransaction(client, xid);
377 }
378
379 /**
380 * Rollback an XA transaction.
381 *
382 * @see org.activemq.broker.BrokerConnector#rollbackTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid)
383 */
384 public void rollbackTransaction(BrokerClient client, ActiveMQXid xid) throws XAException {
385 this.container.rollbackTransaction(client, xid);
386 }
387
388 /**
389 * Commit an XA transaction.
390 *
391 * @see org.activemq.broker.BrokerConnector#commitTransaction(org.activemq.broker.BrokerClient, org.activemq.message.ActiveMQXid, boolean)
392 */
393 public void commitTransaction(BrokerClient client, ActiveMQXid xid, boolean onePhase) throws XAException {
394 this.container.commitTransaction(client, xid, onePhase);
395 }
396
397 /**
398 * @see org.activemq.broker.BrokerConnector#getResourceManagerId(org.activemq.broker.BrokerClient)
399 */
400 public String getResourceManagerId(BrokerClient client) {
401 // TODO: I think we need to return a better (more unique) RM id.
402 return getBrokerInfo().getBrokerName();
403 }
404
405
406 // Implementation methods
407 //-------------------------------------------------------------------------
408 /**
409 * Factory method ot create a transport channel
410 *
411 * @param bindAddress
412 * @return @throws JMSException
413 * @throws JMSException
414 */
415 protected static TransportServerChannel createTransportServerChannel(WireFormat wireFormat, String bindAddress) throws JMSException {
416 URI url;
417 try {
418 url = new URI(bindAddress);
419 }
420 catch (URISyntaxException e) {
421 JMSException jmsEx = new JMSException("Badly formated bindAddress: " + e.getMessage());
422 jmsEx.setLinkedException(e);
423 throw jmsEx;
424 }
425 return TransportServerChannelProvider.create(wireFormat, url);
426 }
427
428 }