001 /**
002 *
003 * Copyright 2004 Protique Ltd
004 *
005 * Licensed under the Apache License, Version 2.0 (the "License");
006 * you may not use this file except in compliance with the License.
007 * You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 *
017 **/
018
019 package org.activemq.service;
020 import javax.jms.JMSException;
021 import javax.jms.DeliveryMode;
022 import org.apache.commons.logging.*;
023 import org.activemq.broker.BrokerContainer;
024 import org.activemq.broker.Broker;
025 import org.activemq.message.ActiveMQDestination;
026 import org.activemq.message.ActiveMQMessage;
027 import org.activemq.message.ActiveMQQueue;
028 import org.activemq.store.PersistenceAdapter;
029 import org.activemq.util.IdGenerator;
030
031 /**
032 * Determines how messages are stored in a dead letter queue
033 *
034 * @version $Revision: 1.1.1.1 $
035 */
036 public class DeadLetterPolicy {
037 /**
038 * Prefix used by dead letter queues
039 */
040 public static final String DEAD_LETTER_PREFIX = "org.activemq.deadletter.";
041 private static final String DEFAULT_DEAD_LETTER_NAME = "DLQ";
042 private static final Log log = LogFactory.getLog(DeadLetterPolicy.class);
043 private Broker broker;
044 private String deadLetterPrefix = DEAD_LETTER_PREFIX;
045 private String deadLetterName = DEFAULT_DEAD_LETTER_NAME;
046 private boolean deadLetterEnabled = true;
047 private boolean deadLetterPerDestinationName = true;
048 private boolean storeNonPersistentMessages = true;
049 private boolean noTopicConsumerEnabled = true;
050 private boolean allowDuplicates = false;
051 private boolean useDatabaseLocking = false;
052 private long deadLetterQueueTTL = 0L;
053 private long deadLetterTopicTTL = 0L;
054 private IdGenerator idGenerator = new IdGenerator();
055
056 /**
057 * Construct a dead letter policy
058 *
059 * @param broker
060 */
061 public DeadLetterPolicy(Broker broker) {
062 this.broker = broker;
063 }
064
065 public DeadLetterPolicy(BrokerContainer brokerContainer) {
066 this(brokerContainer.getBroker());
067 }
068
069 /**
070 * Default constructor
071 */
072 public DeadLetterPolicy() {
073 }
074
075 /**
076 * @return Returns the broker.
077 */
078 public Broker getBroker() {
079 return broker;
080 }
081
082 /**
083 * @param broker The broker to set.
084 */
085 public void setBroker(Broker broker) {
086 this.broker = broker;
087 }
088
089 /**
090 * @return Returns the deadLetterEnabled.
091 */
092 public boolean isDeadLetterEnabled() {
093 return deadLetterEnabled;
094 }
095
096 /**
097 * @param deadLetterEnabled The deadLetterEnabled to set.
098 */
099 public void setDeadLetterEnabled(boolean deadLetterEnabled) {
100 this.deadLetterEnabled = deadLetterEnabled;
101 }
102
103 /**
104 * @return Returns the deadLetterPerDestinationName.
105 */
106 public boolean isDeadLetterPerDestinationName() {
107 return deadLetterPerDestinationName;
108 }
109
110 /**
111 * @param deadLetterPerDestinationName The deadLetterPerDestinationName to set.
112 */
113 public void setDeadLetterPerDestinationName(boolean deadLetterPerDestinationName) {
114 this.deadLetterPerDestinationName = deadLetterPerDestinationName;
115 }
116
117 /**
118 * @return Returns the deadLetterName.
119 */
120 public String getDeadLetterName() {
121 return deadLetterName;
122 }
123
124 /**
125 * @param deadLetterName The deadLetterName to set.
126 */
127 public void setDeadLetterName(String deadLetterName) {
128 this.deadLetterName = deadLetterName;
129 }
130
131 /**
132 * @return Returns the deadLetterPrefix.
133 */
134 public String getDeadLetterPrefix() {
135 return deadLetterPrefix;
136 }
137
138 /**
139 * @param deadLetterPrefix The deadLetterPrefix to set.
140 */
141 public void setDeadLetterPrefix(String deadLetterPrefix) {
142 this.deadLetterPrefix = deadLetterPrefix;
143 }
144
145 /**
146 * @return Returns the storeNonPersistentMessages.
147 */
148 public boolean isStoreNonPersistentMessages() {
149 return storeNonPersistentMessages;
150 }
151
152 /**
153 * @param storeNonPersistentMessages The storeNonPersistentMessages to set.
154 */
155 public void setStoreNonPersistentMessages(boolean storeNonPersistentMessages) {
156 this.storeNonPersistentMessages = storeNonPersistentMessages;
157 }
158
159 /**
160 * @return Returns the noTopicConsumerEnabled.
161 */
162 public boolean isNoTopicConsumerEnabled() {
163 return noTopicConsumerEnabled;
164 }
165 /**
166 * @param noTopicConsumerEnabled The noTopicConsumerEnabled to set.
167 */
168 public void setNoTopicConsumerEnabled(boolean noTopicConsumerEnabled) {
169 this.noTopicConsumerEnabled = noTopicConsumerEnabled;
170 }
171
172 /**
173 * @return Returns the allowDuplicates.
174 */
175 public boolean isAllowDuplicates() {
176 return allowDuplicates;
177 }
178 /**
179 * @param allowDuplicates The allowDuplicates to set.
180 */
181 public void setAllowDuplicates(boolean allowDuplicates) {
182 this.allowDuplicates = allowDuplicates;
183 }
184 /**
185 * @return Returns the useDatabaseLocking.
186 */
187 public boolean isUseDatabaseLocking() {
188 return useDatabaseLocking;
189 }
190 /**
191 * @param useDatabaseLocking The useDatabaseLocking to set.
192 */
193 public void setUseDatabaseLocking(boolean useDatabaseLocking) {
194 this.useDatabaseLocking = useDatabaseLocking;
195 }
196 /**
197 * @param deadLetterQueueTTL The deadLetterQueueTTL to set.
198 */
199 public void setDeadLetterQueueTTL(long deadLetterQueueTTL) {
200 this.deadLetterQueueTTL = deadLetterQueueTTL;
201 }
202 /**
203 * @param deadLetterTopicTTL The deadLetterTopicTTL to set.
204 */
205 public void setDeadLetterTopicTTL(long deadLetterTopicTTL) {
206 this.deadLetterTopicTTL = deadLetterTopicTTL;
207 }
208 /**
209 * Get the name of the DLQ from the destination provided
210 * @param destination
211 * @return the name of the DLQ for this Destination
212 */
213 public String getDeadLetterNameFromDestination(ActiveMQDestination destination){
214 String answer = this.deadLetterPrefix;
215 if (deadLetterPerDestinationName) {
216 answer += destination.getPhysicalName();
217 }
218 else {
219 answer += this.deadLetterName;
220 }
221 return answer;
222 }
223
224 /**
225 * Send a message to a dead letter queue
226 *
227 * @param message
228 * @throws JMSException
229 */
230 public void sendToDeadLetter(ActiveMQMessage message) {
231 if (deadLetterEnabled && message != null && (message.isPersistent() || storeNonPersistentMessages) && !message.isDispatchedFromDLQ()) {
232 if (broker != null) {
233 // process duplicates
234 if (!isAllowDuplicates()) {
235 PersistenceAdapter persistenceAdapter = getBroker().getPersistenceAdapter();
236 // make sure no previous dead letter was already sent
237 if (persistenceAdapter!=null
238 && message.getJMSMessageIdentity()!=null
239 && message.getJMSMessageIdentity().getSequenceNumber()!=null
240 && persistenceAdapter.deadLetterAlreadySent(((Long)message.getJMSMessageIdentity().getSequenceNumber()).longValue(), isUseDatabaseLocking())) {
241 if (log.isDebugEnabled()) log.debug("Dead letter has been already sent for this message: " + message.getJMSMessageID());
242 return;
243 }
244 }
245
246 // send a dead letter message
247 String dlqName = getDeadLetterNameFromDestination(message.getJMSActiveMQDestination());
248 try {
249 ActiveMQMessage deadMessage = createDeadLetterMessage(dlqName, message);
250 broker.sendToDeadLetterQueue(dlqName, deadMessage);
251 if (log.isDebugEnabled()) log.debug("Passed message: " + deadMessage + " to DLQ: " + dlqName);
252 } catch (JMSException e) {
253 log.warn("Failed to send message to dead letter due to: " + e, e);
254 }
255 }
256 else {
257 log.warn("Broker is not initialized - cannot add to DLQ: " + message);
258 }
259 }else if (log.isDebugEnabled()){
260 log.debug("DLQ not storing message: " + message);
261 }
262 }
263
264 protected ActiveMQMessage createDeadLetterMessage(String dlqName, ActiveMQMessage message) throws JMSException {
265 // make a shallow copy of the orginal message
266 ActiveMQMessage deadMessage = message.shallowCopy();
267
268 // generate a new producer and message ID
269 String id = idGenerator.generateId();
270 String producerKey = IdGenerator.getSeedFromId(id);
271 long seq = IdGenerator.getCountFromId(id);
272 deadMessage.setProducerKey(producerKey);
273 deadMessage.setJMSMessageID(id);
274 deadMessage.setSequenceNumber(seq);
275 deadMessage.getJMSMessageIdentity().setMessageID(id);
276 deadMessage.getJMSMessageIdentity().setSequenceNumber(new Long(seq));
277
278 ActiveMQQueue destination = new ActiveMQQueue(dlqName);
279 deadMessage.setJMSDestination(destination);
280 deadMessage.setDispatchedFromDLQ(true);
281
282 // set the expiration of the dead letter message
283 long expiration = 0L;
284 long timeStamp = System.currentTimeMillis();
285 if (message.getJMSActiveMQDestination().isTopic()) {
286 if (deadLetterTopicTTL > 0) {
287 expiration = deadLetterTopicTTL + timeStamp;
288 }
289 } else {
290 if (deadLetterQueueTTL > 0) {
291 expiration = deadLetterQueueTTL + timeStamp;
292 }
293 }
294 deadMessage.setJMSExpiration(expiration);
295 deadMessage.setJMSDeliveryMode(DeliveryMode.PERSISTENT);
296
297 return deadMessage;
298 }
299 }