001 /*
002 * Licensed to the Apache Software Foundation (ASF) under one or more
003 * contributor license agreements. See the NOTICE file distributed with
004 * this work for additional information regarding copyright ownership.
005 * The ASF licenses this file to You under the Apache License, Version 2.0
006 * (the "License"); you may not use this file except in compliance with
007 * the License. You may obtain a copy of the License at
008 *
009 * http://www.apache.org/licenses/LICENSE-2.0
010 *
011 * Unless required by applicable law or agreed to in writing, software
012 * distributed under the License is distributed on an "AS IS" BASIS,
013 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
014 * See the License for the specific language governing permissions and
015 * limitations under the License.
016 */
017 package org.apache.commons.transaction.file;
018
019 import java.io.BufferedReader;
020 import java.io.BufferedWriter;
021 import java.io.File;
022 import java.io.FileInputStream;
023 import java.io.FileNotFoundException;
024 import java.io.FileOutputStream;
025 import java.io.IOException;
026 import java.io.InputStream;
027 import java.io.InputStreamReader;
028 import java.io.OutputStream;
029 import java.io.OutputStreamWriter;
030 import java.util.ArrayList;
031 import java.util.Collection;
032 import java.util.HashMap;
033 import java.util.List;
034 import java.util.Map;
035 import java.util.Iterator;
036 import java.util.Collections;
037
038 import org.apache.commons.transaction.locking.GenericLock;
039 import org.apache.commons.transaction.locking.GenericLockManager;
040 import org.apache.commons.transaction.locking.LockException;
041 import org.apache.commons.transaction.locking.LockManager2;
042 import org.apache.commons.transaction.util.FileHelper;
043 import org.apache.commons.transaction.util.LoggerFacade;
044
045 /**
046 * A resource manager for streamable objects stored in a file system.
047 *
048 * It is intended for developer and "out of the box" use.
049 * It is <em>not</em> intended to be a real alternative for
050 * a full blown DMBS (of course it can not be compared to a RDBMS at all).
051 *
052 * Major features:<br>
053 * <ul>
054 * <li>Transactions performed with this class more or less comform to the widely accepted ACID properties
055 * <li>Reading should be as fast as from the ordinary file system (at the cost of a bit slower commits)
056 * </ul>
057 *
058 * Compared to a "real" DBMS major limitations are (in order of assumed severity):<br>
059 * <ul>
060 * <li>Number of simultaneously open resources is limited to the number of available file descriptors
061 * <li>It does not scale a bit
062 * <li>Pessimistic transaction and locking scheme
063 * <li>Isolation level currently is restricted to <em>read committed</em> and <em>repeated read</em> (which is not that bad)
064 * </ul>
065 *
066 * <em>Important</em>: If possible you should have the work and store directory located in the
067 * same file system. If not, you might get additional problems, as there are:
068 * <ul>
069 * <li>On commit it might be necessay to copy files instead of rename/relink them. This may lead to time consuming,
070 * overly blocking commit phases and higher risk of corrupted files
071 * <li>Prepare phase might be too permissive, no check for sufficient memory on store file system is possible
072 * </ul>
073 *
074 * General limitations include:<br>
075 * <ul>
076 * <li>Due to lack of synchronization on the transaction context level, every transaction may only be
077 * accessed by a <em>single thread</em> throughout its full life.
078 * This means it is forbidden for a thread that has not started a transaction
079 * to perform any operations inside this transaction. However, threads associated
080 * with different transactions can safely access these methods concurrently.
081 * Reasons for the lack of synchronization are improved performance and simplicity (of the code of this class).
082 * <li>There is no dedicated class for a transaction. Having such a class would be better practice and
083 * make certain actions more intuitive.
084 * <li>Resource identifiers need a reasonsable string representation obtainable by <code>toString</code>.
085 * More specifically, they will have to resolve to a <em>valid</em> file path that does note denote a directory.
086 * If it does, you might be able to create it, but not to read or write anything
087 * from resp. to it. Valid string representations of a resource idenfier are
088 * for example "file" "/root" or "hjfhdfhuhuhsdufhdsufhdsufhdfuhdfduhduhduhdu".
089 * Invalid are for example "/" or "/root/". Invalid on some file systems are for example "c:" or "file://huhu".
090 * <li>As there are no active processes inside this RM and it shares its threads with the application,
091 * control over transactions is limited to points where the application calls the RM.
092 * In particular, this disables <em>active</em> termination of transactions upon timeout.
093 * <li>There is no notion of a connection to this file manager. This means you can not connect from hosts other than
094 * local and you will get problems when plugging this store into a J2EE store using connectors.
095 * <li>Methods should throw more specific exceptions
096 * </ul>
097 *
098 * <p><em>Caution</em>:<br>
099 * The <code>txId</code> passed to many methods as an identifier for the
100 * transaction concerned will function as a key in a <code>HashMap</code>.
101 * Thus assure that <code>equals</code> and <code>hashCode</code> are both
102 * properly implemented and match each other.</p>
103 *
104 * <p><em>Caution</em>:<br>
105 * You will have to guarantee that no other process will access neither
106 * the store or the working dir concurrently to this <code>FileResourceManager</code>.</p>
107 *
108 * <p><em>Special Caution</em>:<br>
109 * Be very careful not to have two instances of <code>FileResourceManager</code>
110 * working in the same store and/or working dir.
111 *
112 * @version $Id: FileResourceManager.java 573315 2007-09-06 16:28:42Z ozeigermann $
113 */
114 public class FileResourceManager implements ResourceManager, ResourceManagerErrorCodes {
115
116 // reflects the natural isolation level of this store
117 protected static final int NATIVE_ISOLATION_LEVEL = ISOLATION_LEVEL_REPEATABLE_READ;
118 protected static final int DEFAULT_ISOLATION_LEVEL = NATIVE_ISOLATION_LEVEL;
119
120 protected static final int NO_LOCK = 0;
121 protected static final int LOCK_ACCESS = NO_LOCK + 1;
122 protected static final int LOCK_SHARED = NO_LOCK + 2;
123 protected static final int LOCK_EXCLUSIVE = NO_LOCK + 3;
124 protected static final int LOCK_COMMIT = NO_LOCK + 4;
125
126 protected static final int OPERATION_MODE_STOPPED = 0;
127 protected static final int OPERATION_MODE_STOPPING = 1;
128 protected static final int OPERATION_MODE_STARTED = 2;
129 protected static final int OPERATION_MODE_STARTING = 3;
130 protected static final int OPERATION_MODE_RECOVERING = 4;
131
132 protected static final String DEFAULT_PARAMETER_ENCODING = "ISO-8859-15";
133
134 protected static final int DEFAULT_TIMEOUT_MSECS = 5000;
135 protected static final int DEFAULT_COMMIT_TIMEOUT_FACTOR = 2;
136
137 protected static final String WORK_CHANGE_DIR = "change";
138 protected static final String WORK_DELETE_DIR = "delete";
139
140 protected static final String CONTEXT_FILE = "transaction.log";
141
142 /*
143 * --- Static helper methods ---
144 *
145 *
146 */
147
148 protected static void applyDeletes(File removeDir, File targetDir, File rootDir)
149 throws IOException {
150 if (removeDir.isDirectory() && targetDir.isDirectory()) {
151 File[] files = removeDir.listFiles();
152 for (int i = 0; i < files.length; i++) {
153 File removeFile = files[i];
154 File targetFile = new File(targetDir, removeFile.getName());
155 if (removeFile.isFile()) {
156 if (targetFile.exists()) {
157 if (!targetFile.delete()) {
158 throw new IOException("Could not delete file " + removeFile.getName()
159 + " in directory targetDir");
160 }
161 }
162 // indicate, this has been done
163 removeFile.delete();
164 } else {
165 applyDeletes(removeFile, targetFile, rootDir);
166 }
167 }
168 // delete empty target directories, except root dir
169 if (!targetDir.equals(rootDir) && targetDir.list().length == 0) {
170 targetDir.delete();
171 }
172 }
173 }
174
175 /*
176 * --- object members ---
177 *
178 *
179 */
180
181 protected String workDir;
182 protected String storeDir;
183 protected boolean cleanUp = true;
184 protected boolean dirty = false;
185 protected int operationMode = OPERATION_MODE_STOPPED;
186 protected long defaultTimeout = DEFAULT_TIMEOUT_MSECS;
187 protected boolean debug;
188
189 protected LoggerFacade logger;
190
191 protected Map globalTransactions;
192 protected List globalOpenResources;
193 protected LockManager2 lockManager;
194
195 protected ResourceIdToPathMapper idMapper = null;
196 protected TransactionIdToPathMapper txIdMapper = null;
197
198 protected int idCnt = 0;
199
200 /*
201 * --- ctor and general getter / setter methods ---
202 *
203 *
204 */
205
206 /**
207 * Creates a new resource manager operation on the specified directories.
208 *
209 * @param storeDir directory where main data should go after commit
210 * @param workDir directory where transactions store temporary data
211 * @param urlEncodePath if set to <code>true</code> encodes all paths to allow for any kind of characters
212 * @param logger the logger to be used by this store
213 */
214 public FileResourceManager(String storeDir, String workDir, boolean urlEncodePath, LoggerFacade logger) {
215 this(storeDir, workDir, urlEncodePath, logger, false);
216 }
217
218 /**
219 * Creates a new resource manager operation on the specified directories.
220 *
221 * @param storeDir directory where main data should go after commit
222 * @param workDir directory where transactions store temporary data
223 * @param urlEncodePath if set to <code>true</code> encodes all paths to allow for any kind of characters
224 * @param logger the logger to be used by this store
225 * @param debug if set to <code>true</code> logs all locking information to "transaction.log" for debugging inspection
226 */
227 public FileResourceManager(
228 String storeDir,
229 String workDir,
230 boolean urlEncodePath,
231 LoggerFacade logger,
232 boolean debug) {
233 this(storeDir, workDir, urlEncodePath ? new URLEncodeIdMapper() : null, new NoOpTransactionIdToPathMapper(), logger, debug);
234 }
235
236 /**
237 * Creates a new resource manager operation on the specified directories.
238 * This constructor is reintroduced for backwards API compatibility and is used by jakarta-slide.
239 *
240 * @param storeDir directory where main data should go after commit
241 * @param workDir directory where transactions store temporary data
242 * @param idMapper mapper for resourceId to path
243 * @param logger the logger to be used by this store
244 * @param debug if set to <code>true</code> logs all locking information to "transaction.log" for debugging inspection
245 */
246 public FileResourceManager(
247 String storeDir,
248 String workDir,
249 ResourceIdToPathMapper idMapper,
250 LoggerFacade logger,
251 boolean debug) {
252 this(storeDir, workDir, idMapper, new NoOpTransactionIdToPathMapper(), logger, debug);
253 }
254 /**
255 * Creates a new resource manager operation on the specified directories.
256 *
257 * @param storeDir directory where main data should go after commit
258 * @param workDir directory where transactions store temporary data
259 * @param idMapper mapper for resourceId to path
260 * @param txIdMapper mapper for transaction id to path
261 * @param logger the logger to be used by this store
262 * @param debug if set to <code>true</code> logs all locking information to "transaction.log" for debugging inspection
263 */
264 public FileResourceManager(
265 String storeDir,
266 String workDir,
267 ResourceIdToPathMapper idMapper,
268 TransactionIdToPathMapper txIdMapper,
269 LoggerFacade logger,
270 boolean debug) {
271 this.workDir = workDir;
272 this.storeDir = storeDir;
273 this.idMapper = idMapper;
274 this.txIdMapper = txIdMapper;
275 this.logger = logger;
276 this.debug = debug;
277 }
278
279 /**
280 * Gets the store directory.
281 *
282 * @return the store directory
283 * @see #FileResourceManager(String, String, boolean, LoggerFacade)
284 * @see #FileResourceManager(String, String, boolean, LoggerFacade, boolean)
285 */
286 public String getStoreDir() {
287 return storeDir;
288 }
289
290 /**
291 * Gets the working directory.
292 *
293 * @return the work directory
294 * @see #FileResourceManager(String, String, boolean, LoggerFacade)
295 * @see #FileResourceManager(String, String, boolean, LoggerFacade, boolean)
296 */
297 public String getWorkDir() {
298 return workDir;
299 }
300
301 /**
302 * Gets the logger used by this resource manager.
303 *
304 * @return used logger
305 */
306 public LoggerFacade getLogger() {
307 return logger;
308 }
309
310 /*
311 * --- public methods of interface ResourceManager ---
312 *
313 *
314 */
315
316 public boolean lockResource(Object resourceId, Object txId) throws ResourceManagerException {
317 lockResource(resourceId, txId, false);
318 // XXX will never return false as it will either throw or return true
319 return true;
320 }
321
322 public boolean lockResource(Object resourceId, Object txId, boolean shared) throws ResourceManagerException {
323 lockResource(resourceId, txId, shared, true, Long.MAX_VALUE, true);
324 // XXX will never return false as it will either throw or return true
325 return true;
326 }
327
328 public boolean lockResource(
329 Object resourceId,
330 Object txId,
331 boolean shared,
332 boolean wait,
333 long timeoutMSecs,
334 boolean reentrant)
335 throws ResourceManagerException {
336
337 TransactionContext context = (shared ? txInitialSaneCheck(txId) : txInitialSaneCheckForWriting(txId));
338 assureNotMarkedForRollback(context);
339 fileInitialSaneCheck(txId, resourceId);
340
341 // XXX allows locking of non existent resources (e.g. to prepare a create)
342 int level = (shared ? getSharedLockLevel(context) : LOCK_EXCLUSIVE);
343 try {
344 lockManager.lock(txId, resourceId, level, reentrant, Math.min(timeoutMSecs,
345 context.timeoutMSecs));
346 // XXX will never return false as it will either throw or return true
347 return true;
348 } catch (LockException e) {
349 switch (e.getCode()) {
350 case LockException.CODE_INTERRUPTED:
351 throw new ResourceManagerException("Could not get lock for resource at '"
352 + resourceId + "'", ERR_NO_LOCK, txId);
353 case LockException.CODE_TIMED_OUT:
354 throw new ResourceManagerException("Lock timed out for resource at '" + resourceId
355 + "'", ERR_NO_LOCK, txId);
356 case LockException.CODE_DEADLOCK_VICTIM:
357 throw new ResourceManagerException("Deadlock victim resource at '" + resourceId
358 + "'", ERR_DEAD_LOCK, txId);
359 default :
360 throw new ResourceManagerException("Locking exception for resource at '" + resourceId
361 + "'", ERR_DEAD_LOCK, txId);
362 }
363 }
364 }
365
366 public int getDefaultIsolationLevel() {
367 return DEFAULT_ISOLATION_LEVEL;
368 }
369
370 public int[] getSupportedIsolationLevels() throws ResourceManagerException {
371 return new int[] { ISOLATION_LEVEL_READ_COMMITTED, ISOLATION_LEVEL_REPEATABLE_READ };
372 }
373
374 public boolean isIsolationLevelSupported(int level) throws ResourceManagerException {
375 return (level == ISOLATION_LEVEL_READ_COMMITTED || level == ISOLATION_LEVEL_REPEATABLE_READ);
376 }
377
378 /**
379 * Gets the default transaction timeout in <em>milliseconds</em>.
380 */
381 public long getDefaultTransactionTimeout() {
382 return defaultTimeout;
383 }
384
385 /**
386 * Sets the default transaction timeout.
387 *
388 * @param timeout timeout in <em>milliseconds</em>
389 */
390 public void setDefaultTransactionTimeout(long timeout) {
391 defaultTimeout = timeout;
392 }
393
394 public long getTransactionTimeout(Object txId) throws ResourceManagerException {
395 assureRMReady();
396 long msecs = 0;
397 TransactionContext context = getContext(txId);
398 if (context == null) {
399 msecs = getDefaultTransactionTimeout();
400 } else {
401 msecs = context.timeoutMSecs;
402 }
403 return msecs;
404 }
405
406 public void setTransactionTimeout(Object txId, long mSecs) throws ResourceManagerException {
407 assureRMReady();
408 TransactionContext context = getContext(txId);
409 if (context != null) {
410 context.timeoutMSecs = mSecs;
411 } else {
412 throw new ResourceManagerException(ERR_NO_TX, txId);
413 }
414 }
415
416 public int getIsolationLevel(Object txId) throws ResourceManagerException {
417 assureRMReady();
418 TransactionContext context = getContext(txId);
419 if (context == null) {
420 return DEFAULT_ISOLATION_LEVEL;
421 } else {
422 return context.isolationLevel;
423 }
424 }
425
426 public void setIsolationLevel(Object txId, int level) throws ResourceManagerException {
427 assureRMReady();
428 TransactionContext context = getContext(txId);
429 if (context != null) {
430 if (level != ISOLATION_LEVEL_READ_COMMITTED || level != ISOLATION_LEVEL_REPEATABLE_READ) {
431 context.isolationLevel = level;
432 } else {
433 throw new ResourceManagerException(ERR_ISOLATION_LEVEL_UNSUPPORTED, txId);
434 }
435 } else {
436 throw new ResourceManagerException(ERR_NO_TX, txId);
437 }
438 }
439
440 public synchronized void start() throws ResourceManagerSystemException {
441
442 logger.logInfo("Starting RM at '" + storeDir + "' / '" + workDir + "'");
443
444 operationMode = OPERATION_MODE_STARTING;
445
446 globalTransactions = Collections.synchronizedMap(new HashMap());
447 lockManager = new GenericLockManager(LOCK_COMMIT, logger);
448 globalOpenResources = Collections.synchronizedList(new ArrayList());
449
450 recover();
451 sync();
452
453 operationMode = OPERATION_MODE_STARTED;
454
455 if (dirty) {
456 logger.logWarning("Started RM, but in dirty mode only (Recovery of pending transactions failed)");
457 } else {
458 logger.logInfo("Started RM");
459 }
460
461 }
462
463 public synchronized boolean stop(int mode) throws ResourceManagerSystemException {
464 return stop(mode, getDefaultTransactionTimeout() * DEFAULT_COMMIT_TIMEOUT_FACTOR);
465 }
466
467 public synchronized boolean stop(int mode, long timeOut) throws ResourceManagerSystemException {
468
469 logger.logInfo("Stopping RM at '" + storeDir + "' / '" + workDir + "'");
470
471 operationMode = OPERATION_MODE_STOPPING;
472
473 sync();
474 boolean success = shutdown(mode, timeOut);
475
476 releaseGlobalOpenResources();
477
478 if (success) {
479 operationMode = OPERATION_MODE_STOPPED;
480 logger.logInfo("Stopped RM");
481 } else {
482 logger.logWarning("Failed to stop RM");
483 }
484
485 return success;
486 }
487
488 public synchronized boolean recover() throws ResourceManagerSystemException {
489 if (operationMode != OPERATION_MODE_STARTED && operationMode != OPERATION_MODE_STARTING) {
490 throw new ResourceManagerSystemException(
491 ERR_SYSTEM,
492 "Recovery is possible in started or starting resource manager only");
493 }
494 int oldMode = operationMode;
495 operationMode = OPERATION_MODE_RECOVERING;
496
497 recoverContexts();
498 if (globalTransactions.size() > 0) {
499 logger.logInfo("Recovering pending transactions");
500 }
501
502 dirty = !rollBackOrForward();
503
504 operationMode = oldMode;
505 return dirty;
506 }
507
508 public int getTransactionState(Object txId) throws ResourceManagerException {
509 TransactionContext context = getContext(txId);
510
511 if (context == null) {
512 return STATUS_NO_TRANSACTION;
513 } else {
514 return context.status;
515 }
516
517 }
518
519 public void startTransaction(Object txId) throws ResourceManagerException {
520
521 if (logger.isFineEnabled()) logger.logFine("Starting Tx " + txId);
522
523 assureStarted(); // can only start a new transaction when not already stopping
524 if (txId == null || txIdMapper.getPathForId(txId).length() == 0) {
525 throw new ResourceManagerException(ERR_TXID_INVALID, txId);
526 }
527
528 // be sure we are the only ones who create this tx
529 synchronized (globalTransactions) {
530 TransactionContext context = getContext(txId);
531
532 if (context != null) {
533 throw new ResourceManagerException(ERR_DUP_TX, txId);
534 }
535
536 context = new TransactionContext(txId);
537 context.init();
538 globalTransactions.put(txId, context);
539
540 }
541 }
542
543 public void markTransactionForRollback(Object txId) throws ResourceManagerException {
544 assureRMReady();
545 TransactionContext context = txInitialSaneCheckForWriting(txId);
546 try {
547 context.status = STATUS_MARKED_ROLLBACK;
548 context.saveState();
549 } finally {
550 // be very sure to free locks and resources, as application might crash or otherwise forget to roll this tx back
551 context.finalCleanUp();
552 }
553 }
554
555 public int prepareTransaction(Object txId) throws ResourceManagerException {
556 assureRMReady();
557 // do not allow any further writing or commit or rollback when db is corrupt
558 if (dirty) {
559 throw new ResourceManagerSystemException(
560 "Database is set to dirty, this *may* mean it is corrupt. No modifications are allowed until a recovery run has been performed!",
561 ERR_SYSTEM,
562 txId);
563 }
564
565 if (txId == null) {
566 throw new ResourceManagerException(ERR_TXID_INVALID, txId);
567 }
568
569 TransactionContext context = getContext(txId);
570
571 if (context == null) {
572 return PREPARE_FAILURE;
573 }
574
575 synchronized (context) {
576
577 sync();
578
579 if (context.status != STATUS_ACTIVE) {
580 context.status = STATUS_MARKED_ROLLBACK;
581 context.saveState();
582 return PREPARE_FAILURE;
583 }
584
585 if (logger.isFineEnabled()) logger.logFine("Preparing Tx " + txId);
586
587 int prepareStatus = PREPARE_FAILURE;
588
589 context.status = STATUS_PREPARING;
590 context.saveState();
591 // do all checks as early as possible
592 context.closeResources();
593 if (context.readOnly) {
594 prepareStatus = PREPARE_SUCCESS_READONLY;
595 } else {
596 // do all checks as early as possible
597 try {
598 context.upgradeLockToCommit();
599 } catch (ResourceManagerException rme) {
600 // if this did not work, mark it for roll back as early as possible
601 markTransactionForRollback(txId);
602 throw rme;
603 }
604 prepareStatus = PREPARE_SUCCESS;
605 }
606 context.status = STATUS_PREPARED;
607 context.saveState();
608 if (logger.isFineEnabled()) logger.logFine("Prepared Tx " + txId);
609
610 return prepareStatus;
611 }
612 }
613
614 public void rollbackTransaction(Object txId) throws ResourceManagerException {
615 assureRMReady();
616 TransactionContext context = txInitialSaneCheckForWriting(txId);
617 // needing synchronization in order not to interfer with shutdown thread
618 synchronized (context) {
619 try {
620
621 if (logger.isFineEnabled()) logger.logFine("Rolling back Tx " + txId);
622
623 context.status = STATUS_ROLLING_BACK;
624 context.saveState();
625 context.rollback();
626 if (logger.isFineEnabled()) logger.logFine("All resources successfully removed for tx" + txId);
627 context.status = STATUS_ROLLEDBACK;
628 context.saveState();
629 globalTransactions.remove(txId);
630 context.cleanUp();
631
632 if (logger.isFineEnabled()) logger.logFine("Rolled back Tx " + txId);
633
634 // any system or runtime exceptions or errors thrown in rollback means we are in deep trouble, set the dirty flag
635 } catch (Error e) {
636 setDirty(txId, e);
637 throw e;
638 } catch (RuntimeException e) {
639 setDirty(txId, e);
640 throw e;
641 } catch (ResourceManagerSystemException e) {
642 setDirty(txId, e);
643 throw e;
644 } finally {
645 context.finalCleanUp();
646 // tell shutdown thread this tx is finished
647 context.notifyFinish();
648 }
649 }
650 }
651
652 public void commitTransaction(Object txId) throws ResourceManagerException {
653 assureRMReady();
654 TransactionContext context = txInitialSaneCheckForWriting(txId);
655 assureNotMarkedForRollback(context);
656
657 // needing synchronization in order not to interfer with shutdown thread
658 synchronized (context) {
659 try {
660
661 if (logger.isFineEnabled()) logger.logFine("Committing Tx " + txId);
662
663 context.status = STATUS_COMMITTING;
664 context.saveState();
665 context.commit();
666 if (logger.isFineEnabled()) logger.logFine("All resources successfully moved for tx" + txId);
667 context.status = STATUS_COMMITTED;
668 context.saveState();
669 globalTransactions.remove(txId);
670 context.cleanUp();
671
672 if (logger.isFineEnabled()) logger.logFine("Committed Tx " + txId);
673
674 // any system or runtime exceptions or errors thrown in rollback means we are in deep trouble, set the dirty flag
675 } catch (Error e) {
676 setDirty(txId, e);
677 throw e;
678 } catch (RuntimeException e) {
679 setDirty(txId, e);
680 throw e;
681 } catch (ResourceManagerSystemException e) {
682 setDirty(txId, e);
683 throw e;
684 // like "could not upgrade lock"
685 } catch (ResourceManagerException e) {
686 logger.logWarning("Could not commit tx " + txId + ", rolling back instead", e);
687 rollbackTransaction(txId);
688 } finally {
689 context.finalCleanUp();
690 // tell shutdown thread this tx is finished
691 context.notifyFinish();
692 }
693 }
694 }
695
696 public boolean resourceExists(Object resourceId) throws ResourceManagerException {
697 // create temporary light weight tx
698 Object txId;
699 TransactionContext context;
700 synchronized (globalTransactions) {
701 txId = generatedUniqueTxId();
702 if (logger.isFinerEnabled())
703 logger.logFiner("Creating temporary light weight tx " + txId + " to check for exists");
704 context = new TransactionContext(txId);
705 context.isLightWeight = true;
706 // XXX higher isolation might be needed to make sure upgrade to commit lock always works
707 context.isolationLevel = ISOLATION_LEVEL_READ_COMMITTED;
708 // context.isolationLevel = ISOLATION_LEVEL_REPEATABLE_READ;
709 globalTransactions.put(txId, context);
710 }
711
712 boolean exists = resourceExists(txId, resourceId);
713
714 context.freeLocks();
715 globalTransactions.remove(txId);
716 if (logger.isFinerEnabled())
717 logger.logFiner("Removing temporary light weight tx " + txId);
718
719 return exists;
720 }
721
722 public boolean resourceExists(Object txId, Object resourceId) throws ResourceManagerException {
723 lockResource(resourceId, txId, true);
724 return (getPathForRead(txId, resourceId) != null);
725 }
726
727 public void deleteResource(Object txId, Object resourceId) throws ResourceManagerException {
728 deleteResource(txId, resourceId, true);
729 }
730
731 public void deleteResource(Object txId, Object resourceId, boolean assureOnly) throws ResourceManagerException {
732
733 if (logger.isFineEnabled()) logger.logFine(txId + " deleting " + resourceId);
734
735 lockResource(resourceId, txId, false);
736
737 if (getPathForRead(txId, resourceId) == null) {
738 if (assureOnly) {
739 return;
740 }
741 throw new ResourceManagerException("No such resource at '" + resourceId + "'", ERR_NO_SUCH_RESOURCE, txId);
742 }
743 String txDeletePath = getDeletePath(txId, resourceId);
744 String mainPath = getMainPath(resourceId);
745 try {
746 getContext(txId).readOnly = false;
747
748 // first undo change / create when there was one
749 undoScheduledChangeOrCreate(txId, resourceId);
750
751 // if there still is a file in main store, we need to schedule
752 // a delete additionally
753 if (FileHelper.fileExists(mainPath)) {
754 FileHelper.createFile(txDeletePath);
755 }
756 } catch (IOException e) {
757 throw new ResourceManagerSystemException(
758 "Can not delete resource at '" + resourceId + "'",
759 ERR_SYSTEM,
760 txId,
761 e);
762 }
763 }
764
765 public void createResource(Object txId, Object resourceId) throws ResourceManagerException {
766 createResource(txId, resourceId, true);
767 }
768
769 public void createResource(Object txId, Object resourceId, boolean assureOnly) throws ResourceManagerException {
770
771 if (logger.isFineEnabled()) logger.logFine(txId + " creating " + resourceId);
772
773 lockResource(resourceId, txId, false);
774
775 if (getPathForRead(txId, resourceId) != null) {
776 if (assureOnly) {
777 return;
778 }
779 throw new ResourceManagerException(
780 "Resource at '" + resourceId + "', already exists",
781 ERR_RESOURCE_EXISTS,
782 txId);
783 }
784
785 String txChangePath = getChangePath(txId, resourceId);
786 try {
787 getContext(txId).readOnly = false;
788
789 // creation means either undoing a delete or actually scheduling a create
790 if (!undoScheduledDelete(txId, resourceId)) {
791 FileHelper.createFile(txChangePath);
792 }
793
794 } catch (IOException e) {
795 throw new ResourceManagerSystemException(
796 "Can not create resource at '" + resourceId + "'",
797 ERR_SYSTEM,
798 txId,
799 e);
800 }
801 }
802
803 public void copyResource(Object txId, Object fromResourceId, Object toResourceId, boolean overwrite) throws ResourceManagerException {
804 if (logger.isFineEnabled()) logger.logFine(txId + " copying " + fromResourceId + " to " + toResourceId);
805
806 lockResource(fromResourceId, txId, true);
807 lockResource(toResourceId, txId, false);
808
809 if (resourceExists(txId, toResourceId) && !overwrite) {
810 throw new ResourceManagerException(
811 "Resource at '" + toResourceId + "' already exists",
812 ERR_RESOURCE_EXISTS,
813 txId);
814 }
815
816 InputStream fromResourceStream = null;
817 OutputStream toResourceStream = null;
818 try {
819 fromResourceStream = readResource(txId, fromResourceId);
820 toResourceStream = writeResource(txId, toResourceId);
821 FileHelper.copy(fromResourceStream, toResourceStream);
822 } catch (IOException e) {
823 throw new ResourceManagerException(ERR_SYSTEM, txId, e);
824 } finally {
825 closeOpenResource(fromResourceStream);
826 closeOpenResource(toResourceStream);
827 }
828 }
829
830 public void moveResource(Object txId, Object fromResourceId, Object toResourceId, boolean overwrite) throws ResourceManagerException {
831 if (logger.isFineEnabled()) logger.logFine(txId + " moving " + fromResourceId + " to " + toResourceId);
832
833 lockResource(fromResourceId, txId, false);
834 lockResource(toResourceId, txId, false);
835
836 copyResource(txId, fromResourceId, toResourceId, overwrite);
837
838 deleteResource(txId, fromResourceId, false);
839 }
840
841 public InputStream readResource(Object resourceId) throws ResourceManagerException {
842 // create temporary light weight tx
843 Object txId;
844 synchronized (globalTransactions) {
845 txId = generatedUniqueTxId();
846 if (logger.isFinerEnabled())
847 logger.logFiner("Creating temporary light weight tx " + txId + " for reading");
848 TransactionContext context = new TransactionContext(txId);
849 context.isLightWeight = true;
850 // XXX higher isolation might be needed to make sure upgrade to commit lock always works
851 context.isolationLevel = ISOLATION_LEVEL_READ_COMMITTED;
852 // context.isolationLevel = ISOLATION_LEVEL_REPEATABLE_READ;
853 globalTransactions.put(txId, context);
854 }
855
856 InputStream is = readResource(txId, resourceId);
857 return is;
858 }
859
860 public InputStream readResource(Object txId, Object resourceId) throws ResourceManagerException {
861
862 if (logger.isFineEnabled()) logger.logFine(txId + " reading " + resourceId);
863
864 lockResource(resourceId, txId, true);
865
866 String resourcePath = getPathForRead(txId, resourceId);
867 if (resourcePath == null) {
868 throw new ResourceManagerException("No such resource at '" + resourceId + "'", ERR_NO_SUCH_RESOURCE, txId);
869 }
870
871 File file = new File(resourcePath);
872 try {
873 FileInputStream stream = new FileInputStream(file);
874 getContext(txId).registerResource(stream);
875 return new InputStreamWrapper(stream, txId, resourceId);
876 } catch (FileNotFoundException e) {
877 throw new ResourceManagerSystemException("File '" + resourcePath + "' does not exist", ERR_SYSTEM, txId);
878 }
879 }
880
881 public OutputStream writeResource(Object txId, Object resourceId) throws ResourceManagerException {
882 return writeResource(txId, resourceId, false);
883 }
884
885 public OutputStream writeResource(Object txId, Object resourceId, boolean append) throws ResourceManagerException {
886
887 if (logger.isFineEnabled()) logger.logFine(txId + " writing " + resourceId);
888
889 lockResource(resourceId, txId, false);
890
891 if (append) {
892 String mainPath = getMainPath(resourceId);
893 String txChangePath = getChangePath(txId, resourceId);
894 String txDeletePath = getDeletePath(txId, resourceId);
895
896 boolean changeExists = FileHelper.fileExists(txChangePath);
897 boolean deleteExists = FileHelper.fileExists(txDeletePath);
898 boolean mainExists = FileHelper.fileExists(mainPath);
899
900 if (mainExists && !changeExists && !deleteExists) {
901 // the read and the write path for resourceId will be different!
902 copyResource(txId, resourceId, resourceId, true);
903 }
904 }
905
906 String resourcePath = getPathForWrite(txId, resourceId);
907
908 try {
909 FileOutputStream stream = new FileOutputStream(resourcePath, append);
910 TransactionContext context = getContext(txId);
911 context.registerResource(stream);
912 context.readOnly = false;
913 return stream;
914 } catch (FileNotFoundException e) {
915 throw new ResourceManagerSystemException("File '" + resourcePath + "' does not exist", ERR_SYSTEM, txId);
916 }
917 }
918
919 /*
920 * --- additional public methods complementing implementation of interfaces ---
921 *
922 *
923 */
924
925 /**
926 * Resets the store by deleting work <em>and</em> store directory.
927 */
928 public synchronized void reset() {
929 FileHelper.removeRec(new File(storeDir));
930 FileHelper.removeRec(new File(workDir));
931 new File(storeDir).mkdirs();
932 new File(workDir).mkdirs();
933 }
934
935 /**
936 * Synchronizes persistent data with caches. Is implemented with an empty
937 * body, but called by other methods relying on synchronization. Subclasses
938 * that utilize caching must implement this method reasonably.
939 *
940 * @throws ResourceManagerSystemException if anything fatal hapened during synchonization
941 */
942 public synchronized void sync() throws ResourceManagerSystemException {
943 }
944
945 /**
946 * Generates a transaction identifier unique to this resource manager. To do so
947 * it requires this resource manager to be started.
948 *
949 * @return generated transaction identifier
950 * @throws ResourceManagerSystemException if this resource manager has not been started, yet
951 */
952 public String generatedUniqueTxId() throws ResourceManagerSystemException {
953 assureRMReady();
954 String txId;
955 synchronized (globalTransactions) {
956 do {
957 txId = Long.toHexString(System.currentTimeMillis()) + "-"
958 + Integer.toHexString(idCnt++);
959 // XXX busy loop
960 } while (getContext(txId) != null);
961 }
962 return txId;
963 }
964
965 /*
966 * --- sane checks ---
967 *
968 *
969 */
970
971 protected void fileInitialSaneCheck(Object txId, Object path) throws ResourceManagerException {
972 if (path == null || path.toString().length() == 0) {
973 throw new ResourceManagerException(ERR_RESOURCEID_INVALID, txId);
974 }
975 }
976
977 protected void assureStarted() throws ResourceManagerSystemException {
978 if (operationMode != OPERATION_MODE_STARTED) {
979 throw new ResourceManagerSystemException("Resource Manager Service not started", ERR_SYSTEM, null);
980 }
981 }
982
983 protected void assureRMReady() throws ResourceManagerSystemException {
984 if (operationMode != OPERATION_MODE_STARTED && operationMode != OPERATION_MODE_STOPPING) {
985 throw new ResourceManagerSystemException("Resource Manager Service not ready", ERR_SYSTEM, null);
986 }
987 }
988
989 protected void assureNotMarkedForRollback(TransactionContext context) throws ResourceManagerException {
990 if (context.status == STATUS_MARKED_ROLLBACK) {
991 throw new ResourceManagerException(ERR_MARKED_FOR_ROLLBACK, context.txId);
992 }
993 }
994
995 protected TransactionContext txInitialSaneCheckForWriting(Object txId) throws ResourceManagerException {
996 assureRMReady();
997 // do not allow any further writing or commit or rollback when db is corrupt
998 if (dirty) {
999 throw new ResourceManagerSystemException(
1000 "Database is set to dirty, this *may* mean it is corrupt. No modifications are allowed until a recovery run has been performed!",
1001 ERR_SYSTEM,
1002 txId);
1003 }
1004 return txInitialSaneCheck(txId);
1005 }
1006
1007 protected TransactionContext txInitialSaneCheck(Object txId) throws ResourceManagerException {
1008 assureRMReady();
1009 if (txId == null) {
1010 throw new ResourceManagerException(ERR_TXID_INVALID, txId);
1011 }
1012
1013 TransactionContext context = getContext(txId);
1014
1015 if (context == null) {
1016 throw new ResourceManagerException(ERR_NO_TX, txId);
1017 }
1018
1019 return context;
1020 }
1021
1022 /*
1023 * --- General Helpers ---
1024 *
1025 *
1026 */
1027
1028 protected TransactionContext getContext(Object txId) {
1029 return (TransactionContext) globalTransactions.get(txId);
1030 }
1031
1032 protected String assureLeadingSlash(Object pathObject) {
1033 String path = "";
1034 if (pathObject != null) {
1035 if (idMapper != null) {
1036 path = idMapper.getPathForId(pathObject);
1037 } else {
1038 path = pathObject.toString();
1039 }
1040 if (path.length() > 0 && path.charAt(0) != '/' && path.charAt(0) != '\\') {
1041 path = "/" + path;
1042 }
1043 }
1044 return path;
1045 }
1046
1047 protected String getMainPath(Object path) {
1048 StringBuffer buf = new StringBuffer(storeDir.length() + path.toString().length() + 5);
1049 buf.append(storeDir).append(assureLeadingSlash(path));
1050 return buf.toString();
1051 }
1052
1053 protected String getTransactionBaseDir(Object txId) {
1054 return workDir + '/' + txIdMapper.getPathForId(txId);
1055 }
1056
1057 protected String getChangePath(Object txId, Object path) {
1058 String txBaseDir = getTransactionBaseDir(txId);
1059 StringBuffer buf = new StringBuffer(txBaseDir.length() + path.toString().length()
1060 + WORK_CHANGE_DIR.length() + 5);
1061 buf.append(txBaseDir).append('/').append(WORK_CHANGE_DIR).append(assureLeadingSlash(path));
1062 return buf.toString();
1063 }
1064
1065 protected String getDeletePath(Object txId, Object path) {
1066 String txBaseDir = getTransactionBaseDir(txId);
1067 StringBuffer buf = new StringBuffer(txBaseDir.length() + path.toString().length()
1068 + WORK_DELETE_DIR.length() + 5);
1069 buf.append(txBaseDir).append('/').append(WORK_DELETE_DIR).append(assureLeadingSlash(path));
1070 return buf.toString();
1071 }
1072
1073 protected boolean undoScheduledDelete(Object txId, Object resourceId) throws ResourceManagerException {
1074 String txDeletePath = getDeletePath(txId, resourceId);
1075 File deleteFile = new File(txDeletePath);
1076 if (deleteFile.exists()) {
1077 if (!deleteFile.delete()) {
1078 throw new ResourceManagerSystemException(
1079 "Failed to undo delete of '" + resourceId + "'",
1080 ERR_SYSTEM,
1081 txId);
1082 }
1083 return true;
1084 }
1085 return false;
1086 }
1087
1088 protected boolean undoScheduledChangeOrCreate(Object txId, Object resourceId) throws ResourceManagerException {
1089 String txChangePath = getChangePath(txId, resourceId);
1090 File changeFile = new File(txChangePath);
1091 if (changeFile.exists()) {
1092 if (!changeFile.delete()) {
1093 throw new ResourceManagerSystemException(
1094 "Failed to undo change / create of '" + resourceId + "'",
1095 ERR_SYSTEM,
1096 txId);
1097 }
1098 return true;
1099 }
1100 return false;
1101 }
1102
1103 protected String getPathForWrite(Object txId, Object resourceId) throws ResourceManagerException {
1104 try {
1105 // when we want to write, be sure to write to a local copy
1106 String txChangePath = getChangePath(txId, resourceId);
1107 if (!FileHelper.fileExists(txChangePath)) {
1108 FileHelper.createFile(txChangePath);
1109 }
1110 return txChangePath;
1111 } catch (IOException e) {
1112 throw new ResourceManagerSystemException(
1113 "Can not write to resource at '" + resourceId + "'",
1114 ERR_SYSTEM,
1115 txId,
1116 e);
1117 }
1118 }
1119
1120 protected String getPathForRead(Object txId, Object resourceId) throws ResourceManagerException {
1121
1122 String mainPath = getMainPath(resourceId);
1123 String txChangePath = getChangePath(txId, resourceId);
1124 String txDeletePath = getDeletePath(txId, resourceId);
1125
1126 // now, this gets a bit complicated:
1127
1128 boolean changeExists = FileHelper.fileExists(txChangePath);
1129 boolean deleteExists = FileHelper.fileExists(txDeletePath);
1130 boolean mainExists = FileHelper.fileExists(mainPath);
1131 boolean resourceIsDir =
1132 ((mainExists && new File(mainPath).isDirectory())
1133 || (changeExists && new File(txChangePath).isDirectory()));
1134 if (resourceIsDir) {
1135 logger.logWarning("Resource at '" + resourceId + "' maps to directory");
1136 }
1137
1138 // first do some sane checks
1139
1140 // this may never be, two cases are possible, both disallowing to have a delete together with a change
1141 // 1. first there was a change, than a delete -> at least delete file exists (when there is a file in main store)
1142 // 2. first there was a delete, than a change -> only change file exists
1143 if (!resourceIsDir && changeExists && deleteExists) {
1144 throw new ResourceManagerSystemException(
1145 "Inconsistent delete and change combination for resource at '" + resourceId + "'",
1146 ERR_TX_INCONSISTENT,
1147 txId);
1148 }
1149
1150 // you should not have been allowed to delete a file that does not exist at all
1151 if (deleteExists && !mainExists) {
1152 throw new ResourceManagerSystemException(
1153 "Inconsistent delete for resource at '" + resourceId + "'",
1154 ERR_TX_INCONSISTENT,
1155 txId);
1156 }
1157
1158 if (changeExists) {
1159 return txChangePath;
1160 } else if (mainExists && !deleteExists) {
1161 return mainPath;
1162 } else {
1163 return null;
1164 }
1165 }
1166
1167 /*
1168 * --- Locking Helpers ---
1169 *
1170 *
1171 */
1172
1173 protected int getSharedLockLevel(TransactionContext context) throws ResourceManagerException {
1174 if (context.isolationLevel == ISOLATION_LEVEL_READ_COMMITTED
1175 || context.isolationLevel == ISOLATION_LEVEL_READ_UNCOMMITTED) {
1176 return LOCK_ACCESS;
1177 } else if (
1178 context.isolationLevel == ISOLATION_LEVEL_REPEATABLE_READ
1179 || context.isolationLevel == ISOLATION_LEVEL_SERIALIZABLE) {
1180 return LOCK_SHARED;
1181 } else {
1182 return LOCK_ACCESS;
1183 }
1184 }
1185
1186 /*
1187 * --- Resource Management ---
1188 *
1189 *
1190 */
1191
1192 protected void registerOpenResource(Object openResource) {
1193 if (logger.isFinerEnabled())
1194 logger.logFiner("Registering open resource " + openResource);
1195 globalOpenResources.add(openResource);
1196 }
1197
1198 protected void releaseGlobalOpenResources() {
1199 ArrayList copy;
1200 synchronized (globalOpenResources) {
1201 // XXX need to copy in order to allow removal in releaseOpenResource
1202 copy = new ArrayList(globalOpenResources);
1203 for (Iterator it = copy.iterator(); it.hasNext();) {
1204 Object stream = it.next();
1205 closeOpenResource(stream);
1206 }
1207 }
1208 }
1209
1210 protected void closeOpenResource(Object openResource) {
1211 if (logger.isFinerEnabled()) logger.logFiner("Releasing resource " + openResource);
1212 globalOpenResources.remove(openResource);
1213 if (openResource instanceof InputStream) {
1214 InputStream is = (InputStream) openResource;
1215 try {
1216 is.close();
1217 } catch (IOException e) {
1218 // do not care, as it might have been closed somewhere else, before
1219 }
1220 } else if (openResource instanceof OutputStream) {
1221 OutputStream os = (OutputStream) openResource;
1222 try {
1223 os.close();
1224 } catch (IOException e) {
1225 // do not care, as it might have been closed somewhere else, before
1226 }
1227 }
1228 }
1229
1230 /*
1231 * --- Recovery / Shutdown Support ---
1232 *
1233 *
1234 */
1235
1236 protected boolean rollBackOrForward() {
1237 boolean allCool = true;
1238
1239 synchronized (globalTransactions) {
1240 ArrayList contexts = new ArrayList(globalTransactions.values());
1241 for (Iterator it = contexts.iterator(); it.hasNext();) {
1242 TransactionContext context = (TransactionContext) it.next();
1243 if (context.status == STATUS_COMMITTING) {
1244 // roll forward
1245 logger.logInfo("Rolling forward " + context.txId);
1246
1247 try {
1248 context.commit();
1249 context.status = STATUS_COMMITTED;
1250 context.saveState();
1251 globalTransactions.remove(context.txId);
1252 context.cleanUp();
1253 } catch (ResourceManagerException e) {
1254 // this is not good, but what can we do now?
1255 allCool = false;
1256 logger.logSevere("Rolling forward of " + context.txId + " failed", e);
1257 }
1258 } else if (context.status == STATUS_COMMITTED) {
1259 logger.logInfo("Cleaning already commited " + context.txId);
1260 globalTransactions.remove(context.txId);
1261 try {
1262 context.cleanUp();
1263 } catch (ResourceManagerException e) {
1264 // this is not good, but what can we do now?
1265 allCool = false;
1266 logger.logWarning("Cleaning of " + context.txId + " failed", e);
1267 }
1268 } else {
1269 // in all other cases roll back and warn when not rollback was explicitely selected for tx
1270 if (context.status != STATUS_ROLLING_BACK
1271 && context.status != STATUS_ROLLEDBACK
1272 && context.status != STATUS_MARKED_ROLLBACK) {
1273 logger.logWarning("Irregularly rolling back " + context.txId);
1274 } else {
1275 logger.logInfo("Rolling back " + context.txId);
1276 }
1277 try {
1278 context.rollback();
1279 context.status = STATUS_ROLLEDBACK;
1280 context.saveState();
1281 globalTransactions.remove(context.txId);
1282 context.cleanUp();
1283 } catch (ResourceManagerException e) {
1284 logger.logWarning("Rolling back of " + context.txId + " failed", e);
1285 }
1286 }
1287 }
1288
1289 }
1290 return allCool;
1291 }
1292
1293 protected void recoverContexts() {
1294 File dir = new File(workDir);
1295 File[] files = dir.listFiles();
1296 if (files == null)
1297 return;
1298 for (int i = 0; i < files.length; i++) {
1299 File file = files[i];
1300 Object txId = txIdMapper.getIdForPath(file.getName());
1301 // recover all transactions we do not already know
1302 if (!globalTransactions.containsKey(txId)) {
1303
1304 logger.logInfo("Recovering " + txId);
1305 TransactionContext context;
1306 try {
1307 context = new TransactionContext(txId);
1308 context.recoverState();
1309 globalTransactions.put(txId, context);
1310 } catch (ResourceManagerException e) {
1311 // this is not good, but the best we get, just log as warning
1312 logger.logWarning("Recovering of " + txId + " failed");
1313 }
1314 }
1315 }
1316 }
1317
1318 protected boolean waitForAllTxToStop(long timeoutMSecs) {
1319 long startTime = System.currentTimeMillis();
1320
1321 // be sure not to lock globalTransactions for too long, as we need to give
1322 // txs the chance to complete (otherwise deadlocks are very likely to occur)
1323 // instead iterate over a copy as we can be sure no new txs will be registered
1324 // after operation level has been set to stopping
1325
1326 Collection transactionsToStop;
1327 synchronized (globalTransactions) {
1328 transactionsToStop = new ArrayList(globalTransactions.values());
1329 }
1330 for (Iterator it = transactionsToStop.iterator(); it.hasNext();) {
1331 long remainingTimeout = startTime - System.currentTimeMillis() + timeoutMSecs;
1332
1333 if (remainingTimeout <= 0) {
1334 return false;
1335 }
1336
1337 TransactionContext context = (TransactionContext) it.next();
1338 synchronized (context) {
1339 if (!context.finished) {
1340 logger.logInfo(
1341 "Waiting for tx " + context.txId + " to finish for " + remainingTimeout + " milli seconds");
1342 }
1343 while (!context.finished && remainingTimeout > 0) {
1344 try {
1345 context.wait(remainingTimeout);
1346 } catch (InterruptedException e) {
1347 return false;
1348 }
1349 remainingTimeout = startTime - System.currentTimeMillis() + timeoutMSecs;
1350 }
1351 if (context.finished) {
1352 logger.logInfo("Tx " + context.txId + " finished");
1353 } else {
1354 logger.logWarning("Tx " + context.txId + " failed to finish in given time");
1355 }
1356 }
1357 }
1358
1359 return (globalTransactions.size() == 0);
1360 }
1361
1362 protected boolean shutdown(int mode, long timeoutMSecs) {
1363 switch (mode) {
1364 case SHUTDOWN_MODE_NORMAL :
1365 return waitForAllTxToStop(timeoutMSecs);
1366 case SHUTDOWN_MODE_ROLLBACK :
1367 return rollBackOrForward();
1368 case SHUTDOWN_MODE_KILL :
1369 return true;
1370 default :
1371 return false;
1372 }
1373 }
1374
1375 protected void setDirty(Object txId, Throwable t) {
1376 logger.logSevere(
1377 "Fatal error during critical commit/rollback of transaction " + txId + ", setting database to dirty.",
1378 t);
1379 dirty = true;
1380 }
1381
1382 /**
1383 * Inner class to hold the complete context, i.e. all information needed, for a transaction.
1384 *
1385 */
1386 protected class TransactionContext {
1387
1388 protected Object txId;
1389 protected int status = STATUS_ACTIVE;
1390 protected int isolationLevel = DEFAULT_ISOLATION_LEVEL;
1391 protected long timeoutMSecs = getDefaultTransactionTimeout();
1392 protected long startTime;
1393 protected long commitTime = -1L;
1394 protected boolean isLightWeight = false;
1395 protected boolean readOnly = true;
1396 protected boolean finished = false;
1397
1398 // list of streams participating in this tx
1399 private List openResources = new ArrayList();
1400
1401 public TransactionContext(Object txId) throws ResourceManagerException {
1402 this.txId = txId;
1403 startTime = System.currentTimeMillis();
1404 }
1405
1406 public long getRemainingTimeout() {
1407 long now = System.currentTimeMillis();
1408 return (startTime - now + timeoutMSecs);
1409 }
1410
1411 public synchronized void init() throws ResourceManagerException {
1412 String baseDir = getTransactionBaseDir(txId);
1413 String changeDir = baseDir + "/" + WORK_CHANGE_DIR;
1414 String deleteDir = baseDir + "/" + WORK_DELETE_DIR;
1415
1416 new File(changeDir).mkdirs();
1417 new File(deleteDir).mkdirs();
1418
1419 saveState();
1420 }
1421
1422 public synchronized void rollback() throws ResourceManagerException {
1423 closeResources();
1424 freeLocks();
1425 }
1426
1427 public synchronized void commit() throws ResourceManagerException {
1428 String baseDir = getTransactionBaseDir(txId);
1429 String changeDir = baseDir + "/" + WORK_CHANGE_DIR;
1430 String deleteDir = baseDir + "/" + WORK_DELETE_DIR;
1431
1432 closeResources();
1433 upgradeLockToCommit();
1434 try {
1435 applyDeletes(new File(deleteDir), new File(storeDir), new File(storeDir));
1436 FileHelper.moveRec(new File(changeDir), new File(storeDir));
1437 } catch (IOException e) {
1438 throw new ResourceManagerSystemException("Commit failed", ERR_SYSTEM, txId, e);
1439 }
1440 freeLocks();
1441 commitTime = System.currentTimeMillis();
1442 }
1443
1444 public synchronized void notifyFinish() {
1445 finished = true;
1446 notifyAll();
1447 }
1448
1449 public synchronized void cleanUp() throws ResourceManagerException {
1450 if (!cleanUp)
1451 return; // XXX for debugging only
1452 boolean clean = true;
1453 Exception cleanException = null;
1454 String baseDir = getTransactionBaseDir(txId);
1455 FileHelper.removeRec(new File(baseDir));
1456 if (!clean) {
1457 throw new ResourceManagerSystemException(
1458 "Clean up failed due to unreleasable lock",
1459 ERR_SYSTEM,
1460 txId,
1461 cleanException);
1462 }
1463 }
1464
1465 public synchronized void finalCleanUp() throws ResourceManagerException {
1466 closeResources();
1467 freeLocks();
1468 }
1469
1470 public synchronized void upgradeLockToCommit() throws ResourceManagerException {
1471 for (Iterator it = lockManager.getAll(txId).iterator(); it.hasNext();) {
1472 GenericLock lock = (GenericLock) it.next();
1473 // only upgrade if we had write access
1474 if (lock.getLockLevel(txId) == LOCK_EXCLUSIVE) {
1475 try {
1476 // in case of deadlocks, make failure of non-committing tx more likely
1477 if (!lock
1478 .acquire(
1479 txId,
1480 LOCK_COMMIT,
1481 true,
1482 true,
1483 getDefaultTransactionTimeout() * DEFAULT_COMMIT_TIMEOUT_FACTOR)) {
1484 throw new ResourceManagerException(
1485 "Could not upgrade to commit lock for resource at '"
1486 + lock.getResourceId().toString()
1487 + "'",
1488 ERR_NO_LOCK,
1489 txId);
1490 }
1491 } catch (InterruptedException e) {
1492 throw new ResourceManagerSystemException(ERR_SYSTEM, txId, e);
1493 }
1494 }
1495
1496 }
1497 }
1498
1499 public synchronized void freeLocks() {
1500 lockManager.releaseAll(txId);
1501 }
1502
1503 public synchronized void closeResources() {
1504 synchronized (globalOpenResources) {
1505 for (Iterator it = openResources.iterator(); it.hasNext();) {
1506 Object stream = it.next();
1507 closeOpenResource(stream);
1508 }
1509 }
1510 }
1511
1512 public synchronized void registerResource(Object openResource) {
1513 synchronized (globalOpenResources) {
1514 registerOpenResource(openResource);
1515 openResources.add(openResource);
1516 }
1517 }
1518
1519 public synchronized void saveState() throws ResourceManagerException {
1520 String statePath = getTransactionBaseDir(txId) + "/" + CONTEXT_FILE;
1521 File file = new File(statePath);
1522 BufferedWriter writer = null;
1523 try {
1524 OutputStream os = new FileOutputStream(file);
1525 writer = new BufferedWriter(new OutputStreamWriter(os, DEFAULT_PARAMETER_ENCODING));
1526 writer.write(toString());
1527 } catch (FileNotFoundException e) {
1528 String msg = "Saving status information to '" + statePath + "' failed! Could not create file";
1529 logger.logSevere(msg, e);
1530 throw new ResourceManagerSystemException(msg, ERR_SYSTEM, txId, e);
1531 } catch (IOException e) {
1532 String msg = "Saving status information to '" + statePath + "' failed";
1533 logger.logSevere(msg, e);
1534 throw new ResourceManagerSystemException(msg, ERR_SYSTEM, txId, e);
1535 } finally {
1536 if (writer != null) {
1537 try {
1538 writer.close();
1539 } catch (IOException e) {
1540 }
1541
1542 }
1543 }
1544 }
1545
1546 public synchronized void recoverState() throws ResourceManagerException {
1547 String statePath = getTransactionBaseDir(txId) + "/" + CONTEXT_FILE;
1548 File file = new File(statePath);
1549 BufferedReader reader = null;
1550 try {
1551 InputStream is = new FileInputStream(file);
1552
1553 reader = new BufferedReader(new InputStreamReader(is, DEFAULT_PARAMETER_ENCODING));
1554 txId = reader.readLine();
1555 status = Integer.parseInt(reader.readLine());
1556 isolationLevel = Integer.parseInt(reader.readLine());
1557 timeoutMSecs = Long.parseLong(reader.readLine());
1558 startTime = Long.parseLong(reader.readLine());
1559 } catch (FileNotFoundException e) {
1560 String msg = "Recovering status information from '" + statePath + "' failed! Could not find file";
1561 logger.logSevere(msg, e);
1562 throw new ResourceManagerSystemException(msg, ERR_SYSTEM, txId);
1563 } catch (IOException e) {
1564 String msg = "Recovering status information from '" + statePath + "' failed";
1565 logger.logSevere(msg, e);
1566 throw new ResourceManagerSystemException(msg, ERR_SYSTEM, txId, e);
1567 } catch (Throwable t) {
1568 String msg = "Recovering status information from '" + statePath + "' failed";
1569 logger.logSevere(msg, t);
1570 throw new ResourceManagerSystemException(msg, ERR_SYSTEM, txId, t);
1571 } finally {
1572 if (reader != null) {
1573 try {
1574 reader.close();
1575 } catch (IOException e) {
1576 }
1577
1578 }
1579 }
1580 }
1581
1582 public synchronized String toString() {
1583 StringBuffer buf = new StringBuffer();
1584 buf.append(txId).append('\n');
1585 buf.append(Integer.toString(status)).append('\n');
1586 buf.append(Integer.toString(isolationLevel)).append('\n');
1587 buf.append(Long.toString(timeoutMSecs)).append('\n');
1588 buf.append(Long.toString(startTime)).append('\n');
1589 if (debug) {
1590 buf.append("----- Lock Debug Info -----\n");
1591
1592 for (Iterator it = lockManager.getAll(txId).iterator(); it.hasNext();) {
1593 GenericLock lock = (GenericLock) it.next();
1594 buf.append(lock.toString()+"\n");
1595 }
1596
1597 }
1598 return buf.toString();
1599 }
1600
1601 }
1602
1603 private class InputStreamWrapper extends InputStream {
1604 private InputStream is;
1605 private Object txId;
1606 private Object resourceId;
1607
1608 public InputStreamWrapper(InputStream is, Object txId, Object resourceId) {
1609 this.is = is;
1610 this.txId = txId;
1611 this.resourceId = resourceId;
1612 }
1613
1614 public int read() throws IOException {
1615 return is.read();
1616 }
1617
1618 public int read(byte b[]) throws IOException {
1619 return is.read(b);
1620 }
1621
1622 public int read(byte b[], int off, int len) throws IOException {
1623 return is.read(b, off, len);
1624 }
1625
1626 public int available() throws IOException {
1627 return is.available();
1628 }
1629
1630 public void close() throws IOException {
1631 try {
1632 is.close();
1633 } finally {
1634 TransactionContext context;
1635 synchronized (globalTransactions) {
1636 context = getContext(txId);
1637 if (context == null) {
1638 return;
1639 }
1640 }
1641 synchronized (context) {
1642 if (context.isLightWeight) {
1643 if (logger.isFinerEnabled())
1644 logger.logFiner("Upon close of resource removing temporary light weight tx " + txId);
1645 context.freeLocks();
1646 globalTransactions.remove(txId);
1647 } else {
1648 // release access lock in order to allow other transactions to commit
1649 if (lockManager.getLevel(txId, resourceId) == LOCK_ACCESS) {
1650 if (logger.isFinerEnabled()) {
1651 logger.logFiner("Upon close of resource releasing access lock for tx " + txId + " on resource at " + resourceId);
1652 }
1653 lockManager.release(txId, resourceId);
1654 }
1655 }
1656 }
1657 }
1658 }
1659
1660 public void mark(int readlimit) {
1661 is.mark(readlimit);
1662 }
1663
1664 public void reset() throws IOException {
1665 is.reset();
1666 }
1667
1668 public boolean markSupported() {
1669 return is.markSupported();
1670
1671 }
1672
1673 }
1674
1675 }