001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.commons.transaction.memory;
018
019 import java.io.PrintWriter;
020 import java.util.HashSet;
021 import java.util.Iterator;
022 import java.util.Map;
023 import java.util.Set;
024 import java.util.Collections;
025
026 import org.apache.commons.transaction.locking.ReadWriteLock;
027 import org.apache.commons.transaction.util.LoggerFacade;
028 import org.apache.commons.transaction.util.PrintWriterLogger;
029
030 /**
031 * Wrapper that adds transactional control to all kinds of maps that implement the {@link Map} interface. By using
032 * a naive optimistic transaction control this wrapper has better isolation than {@link TransactionalMapWrapper}, but
033 * may also fail to commit.
034 *
035 * <br>
036 * Start a transaction by calling {@link #startTransaction()}. Then perform the normal actions on the map and
037 * finally either call {@link #commitTransaction()} to make your changes permanent or {@link #rollbackTransaction()} to
038 * undo them.
039 * <br>
040 * <em>Caution:</em> Do not modify values retrieved by {@link #get(Object)} as this will circumvent the transactional mechanism.
041 * Rather clone the value or copy it in a way you see fit and store it back using {@link #put(Object, Object)}.
042 * <br>
043 * <em>Note:</em> This wrapper guarantees isolation level <code>SERIALIZABLE</code>.
044 * <br>
045 * <em>Caution:</em> This implementation might be slow when large amounts of data is changed in a transaction as much references will need to be copied around.
046 *
047 * @version $Id: OptimisticMapWrapper.java 493628 2007-01-07 01:42:48Z joerg $
048 * @see TransactionalMapWrapper
049 * @see PessimisticMapWrapper
050 */
051 public class OptimisticMapWrapper extends TransactionalMapWrapper {
052
053 protected static final int COMMIT_TIMEOUT = 1000 * 60; // 1 minute
054 protected static final int ACCESS_TIMEOUT = 1000 * 30; // 30 seconds
055
056 protected Set activeTransactions;
057
058 protected LoggerFacade logger;
059
060 protected ReadWriteLock commitLock;
061
062 /**
063 * Creates a new optimistic transactional map wrapper. Temporary maps and sets to store transactional
064 * data will be instances of {@link java.util.HashMap} and {@link java.util.HashSet}.
065 *
066 * @param wrapped map to be wrapped
067 */
068 public OptimisticMapWrapper(Map wrapped) {
069 this(wrapped, new HashMapFactory(), new HashSetFactory());
070 }
071
072 /**
073 * Creates a new optimistic transactional map wrapper. Temporary maps and sets to store transactional
074 * data will be created and disposed using {@link MapFactory} and {@link SetFactory}.
075 *
076 * @param wrapped map to be wrapped
077 * @param mapFactory factory for temporary maps
078 * @param setFactory factory for temporary sets
079 */
080 public OptimisticMapWrapper(Map wrapped, MapFactory mapFactory, SetFactory setFactory) {
081 this(wrapped, mapFactory, setFactory, new PrintWriterLogger(new PrintWriter(System.out),
082 OptimisticMapWrapper.class.getName(), false));
083 }
084
085 /**
086 * Creates a new optimistic transactional map wrapper. Temporary maps and sets to store transactional
087 * data will be created and disposed using {@link MapFactory} and {@link SetFactory}.
088 *
089 * @param wrapped map to be wrapped
090 * @param mapFactory factory for temporary maps
091 * @param setFactory factory for temporary sets
092 * @param logger
093 * generic logger used for all kinds of logging
094 */
095 public OptimisticMapWrapper(Map wrapped, MapFactory mapFactory, SetFactory setFactory, LoggerFacade logger) {
096 super(wrapped, mapFactory, setFactory);
097 activeTransactions = Collections.synchronizedSet(new HashSet());
098 this.logger = logger;
099 commitLock = new ReadWriteLock("COMMIT", logger);
100 }
101
102 public void startTransaction() {
103 if (getActiveTx() != null) {
104 throw new IllegalStateException(
105 "Active thread " + Thread.currentThread() + " already associated with a transaction!");
106 }
107 CopyingTxContext context = new CopyingTxContext();
108 activeTransactions.add(context);
109 setActiveTx(context);
110 }
111
112 public void rollbackTransaction() {
113 TxContext txContext = getActiveTx();
114 super.rollbackTransaction();
115 activeTransactions.remove(txContext);
116 }
117
118 public void commitTransaction() throws ConflictException {
119 commitTransaction(false);
120 }
121
122 public void commitTransaction(boolean force) throws ConflictException {
123 TxContext txContext = getActiveTx();
124
125 if (txContext == null) {
126 throw new IllegalStateException(
127 "Active thread " + Thread.currentThread() + " not associated with a transaction!");
128 }
129
130 if (txContext.status == STATUS_MARKED_ROLLBACK) {
131 throw new IllegalStateException("Active thread " + Thread.currentThread() + " is marked for rollback!");
132 }
133
134 try {
135 // in this final commit phase we need to be the only one access the map
136 // to make sure no one adds an entry after we checked for conflicts
137 commitLock.acquireWrite(txContext, COMMIT_TIMEOUT);
138
139 if (!force) {
140 Object conflictKey = checkForConflicts();
141 if (conflictKey != null) {
142 throw new ConflictException(conflictKey);
143 }
144 }
145
146 activeTransactions.remove(txContext);
147 copyChangesToConcurrentTransactions();
148 super.commitTransaction();
149
150 } catch (InterruptedException e) {
151 // XXX a bit dirty ;)
152 throw new ConflictException(e);
153 } finally {
154 commitLock.release(txContext);
155 }
156 }
157
158 // TODO: Shouldn't we return a collection rather than a single key here?
159 public Object checkForConflicts() {
160 CopyingTxContext txContext = (CopyingTxContext) getActiveTx();
161
162 Set keys = txContext.changedKeys();
163 Set externalKeys = txContext.externalChangedKeys();
164
165 for (Iterator it2 = keys.iterator(); it2.hasNext();) {
166 Object key = it2.next();
167 if (externalKeys.contains(key)) {
168 return key;
169 }
170 }
171 return null;
172 }
173
174 protected void copyChangesToConcurrentTransactions() {
175 CopyingTxContext thisTxContext = (CopyingTxContext) getActiveTx();
176
177 synchronized (activeTransactions) {
178 for (Iterator it = activeTransactions.iterator(); it.hasNext();) {
179 CopyingTxContext otherTxContext = (CopyingTxContext) it.next();
180
181 // no need to copy data if the other transaction does not access global map anyway
182 if (otherTxContext.cleared)
183 continue;
184
185 if (thisTxContext.cleared) {
186 // we will clear everything, so we have to copy everything before
187 otherTxContext.externalChanges.putAll(wrapped);
188 } else // no need to check if we have already copied everthing
189 {
190 for (Iterator it2 = thisTxContext.changes.entrySet().iterator(); it2.hasNext();) {
191 Map.Entry entry = (Map.Entry) it2.next();
192 Object value = wrapped.get(entry.getKey());
193 if (value != null) {
194 // undo change
195 otherTxContext.externalChanges.put(entry.getKey(), value);
196 } else {
197 // undo add
198 otherTxContext.externalDeletes.add(entry.getKey());
199 }
200 }
201
202 for (Iterator it2 = thisTxContext.deletes.iterator(); it2.hasNext();) {
203 // undo delete
204 Object key = it2.next();
205 Object value = wrapped.get(key);
206 otherTxContext.externalChanges.put(key, value);
207 }
208 }
209 }
210 }
211 }
212
213 public class CopyingTxContext extends TxContext {
214 protected Map externalChanges;
215 protected Map externalAdds;
216 protected Set externalDeletes;
217
218 protected CopyingTxContext() {
219 super();
220 externalChanges = mapFactory.createMap();
221 externalDeletes = setFactory.createSet();
222 externalAdds = mapFactory.createMap();
223 }
224
225 protected Set externalChangedKeys() {
226 Set keySet = new HashSet();
227 keySet.addAll(externalDeletes);
228 keySet.addAll(externalChanges.keySet());
229 keySet.addAll(externalAdds.keySet());
230 return keySet;
231 }
232
233 protected Set changedKeys() {
234 Set keySet = new HashSet();
235 keySet.addAll(deletes);
236 keySet.addAll(changes.keySet());
237 keySet.addAll(adds.keySet());
238 return keySet;
239 }
240
241 protected Set keys() {
242 try {
243 commitLock.acquireRead(this, ACCESS_TIMEOUT);
244 Set keySet = super.keys();
245 keySet.removeAll(externalDeletes);
246 keySet.addAll(externalAdds.keySet());
247 return keySet;
248 } catch (InterruptedException e) {
249 return null;
250 } finally {
251 commitLock.release(this);
252 }
253 }
254
255 protected Object get(Object key) {
256 try {
257 commitLock.acquireRead(this, ACCESS_TIMEOUT);
258
259 if (deletes.contains(key)) {
260 // reflects that entry has been deleted in this tx
261 return null;
262 }
263
264 Object changed = changes.get(key);
265 if (changed != null) {
266 return changed;
267 }
268
269 Object added = adds.get(key);
270 if (added != null) {
271 return added;
272 }
273
274 if (cleared) {
275 return null;
276 } else {
277 if (externalDeletes.contains(key)) {
278 // reflects that entry has been deleted in this tx
279 return null;
280 }
281
282 changed = externalChanges.get(key);
283 if (changed != null) {
284 return changed;
285 }
286
287 added = externalAdds.get(key);
288 if (added != null) {
289 return added;
290 }
291
292 // not modified in this tx
293 return wrapped.get(key);
294 }
295 } catch (InterruptedException e) {
296 return null;
297 } finally {
298 commitLock.release(this);
299 }
300 }
301
302 protected void put(Object key, Object value) {
303 try {
304 commitLock.acquireRead(this, ACCESS_TIMEOUT);
305 super.put(key, value);
306 } catch (InterruptedException e) {
307 } finally {
308 commitLock.release(this);
309 }
310 }
311
312 protected void remove(Object key) {
313 try {
314 commitLock.acquireRead(this, ACCESS_TIMEOUT);
315 super.remove(key);
316 } catch (InterruptedException e) {
317 } finally {
318 commitLock.release(this);
319 }
320 }
321
322 protected int size() {
323 try {
324 commitLock.acquireRead(this, ACCESS_TIMEOUT);
325 int size = super.size();
326
327 size -= externalDeletes.size();
328 size += externalAdds.size();
329
330 return size;
331 } catch (InterruptedException e) {
332 return -1;
333 } finally {
334 commitLock.release(this);
335 }
336 }
337
338 protected void clear() {
339 try {
340 commitLock.acquireRead(this, ACCESS_TIMEOUT);
341 super.clear();
342 externalDeletes.clear();
343 externalChanges.clear();
344 externalAdds.clear();
345 } catch (InterruptedException e) {
346 } finally {
347 commitLock.release(this);
348 }
349 }
350
351 protected void merge() {
352 try {
353 commitLock.acquireRead(this, ACCESS_TIMEOUT);
354 super.merge();
355 } catch (InterruptedException e) {
356 } finally {
357 commitLock.release(this);
358 }
359 }
360
361 protected void dispose() {
362 try {
363 commitLock.acquireRead(this, ACCESS_TIMEOUT);
364 super.dispose();
365 setFactory.disposeSet(externalDeletes);
366 externalDeletes = null;
367 mapFactory.disposeMap(externalChanges);
368 externalChanges = null;
369 mapFactory.disposeMap(externalAdds);
370 externalAdds = null;
371 } catch (InterruptedException e) {
372 } finally {
373 commitLock.release(this);
374 }
375 }
376
377 protected void finalize() throws Throwable {
378 activeTransactions.remove(this);
379 super.finalize();
380 }
381 }
382 }