Path Locking in Python3

By Scram Software 02 October 2017

Path locking in Python3, presentation by Andrew Peel to the Melbourne Python Users Group (MPUG) on 2 October, 2017.

A 15 minute presentation and demonstration describing a short python module that provides shared locks, applied to strings representing paths on a file system.

The slides from the presentation are attached in the link below.

The code referred to in the presentation is available here:

"""Shared and exclusive locks for paths.

A separate lock is provided for each path (eg. '/a/b/c').  When acquiring
a lock for a path, shared locks are first acquired on the ancestor directories
('/', '/a' and '/a/b').  This allows a thread to prevent concurrent
changes in the ancestor directories as well as the path it is
accessing.

Shared and exclusive locks are provided.  Multiple shared locks may be
acquired simultaneously on a path.  A shared lock may not be acquired
on a path that has an exclusive lock, except by the same thread.
An exclusive lock may not be acquired on a path that has an exclusive lock
held by another thread, or a shared lock, even when it is held by the
same thread attempting to acquire the exclusive lock.

The locks are re-entrant: they may be acquired multiple times by the same
thread, but must be released the same number of times.

Stale locks that are acquired but not released by a thread before it terminates
are removed after a watchdog timeout.

References to the locks for all paths are stored in PathLocker using a
WeakValueDictionary so that multiple threads accessing a single path
will obtain the same lock, and then the lock is garbage collected after
last thread deletes its PathLock containing a reference to the lock.

Example usage:

    # In FS.__init__()
    self.path_locker = PathLocker()

    # In an FS method (eg. FS.makedir(path))
    lock = PathLock(path, self.path_locker)
    lock.acquire_exclusive_lock()
    # Do the makedir()
    lock.release_exclusive_lock()

    # Or using the context manager
    lock = PathLock(path, self.path_locker)
    with lock.exclusive_lock():
        # Do the makedir
"""

import threading
import contextlib
import warnings
import time
from weakref import WeakValueDictionary
from fs.path import normpath, abspath, recursepath
import sys


class PathLocker(object):
    """Shared and exclusive locks for paths.

    A container storing the path locks in a WeakValueDictionary, and
    providing the get_lock() called by PathLock() to add new locks
    to the dictionary in a synchronised manner.

    :param float thread_watchdog_period:  passed to SharedLock().
    :param bool suppress_warnings:  passed to SharedLock().
    """


    def __init__(self, thread_watchdog_period=0.1, suppress_warnings=False):
        self._path_wdict = WeakValueDictionary()
        self._thread_watchdog_period = thread_watchdog_period
        self._suppress_warnings = suppress_warnings
        # Lock to synchronise the addition of new locks to the dictionary.
        self._lock = threading.Lock()

    def get_lock(self, path):
        """Get the Sharedlock for a given path.

        If the lock does not already exist, a new lock is created and
        added to the WeakValueDictionary.

        :param src path:  path whose lock is retrieved.  Must be an
            absolute and normalised path.
        """

        with self._lock:
            shared_lock = self._path_wdict.get(path)
            if shared_lock is None:
                shared_lock = SharedLock(
                    thread_watchdog_period=self._thread_watchdog_period,
                    suppress_warnings=self._suppress_warnings)
                self._path_wdict[path] = shared_lock
            return shared_lock


class PathLock(object):
    """Shared and exclusive locks for paths.

    Acquiring a lock for a path first obtains shared locks for
    the ancestor paths, in order starting from '/'.  For example,
    acquire_exclusive_lock('a/b/c') does
        - acquire shared lock for '/'
        - acquire shared lock for '/a'
        - acquire shared lock for '/a/b'
        - acquire exclusive lock for '/a/b/c'
    The locks are released in the reverse order.

    The PathLock object contains references to the SharedLocks for each
    path component.  While the PathLock object is still in context,
    these references ensure the SharedLocks are not garbage collected
    from the PathLocker WeakValueDictionary containing all path locks.

    :param str path:  The path that is locked.
    :param PathLocker path_locker:   PathLocker object containing the
        WeakValueDictionary of all the SharedLocks, with absolute paths
        as the keys.
    """


    def __init__(self, path, path_locker, *args, **kwargs):
        self.path = abspath(normpath(path))
        self._locks = []
        for path_element in recursepath(self.path):
            # Retrieve locks from the dictionary, or make new locks.
            shared_lock = path_locker.get_lock(path_element)
            self._locks.append(shared_lock)

    def acquire_shared_lock(self, blocking=True, timeout=-1):
        """Acquire the shared lock for the path.

        This involves obtaining shared locks on the ancestor paths
        as well as the path.  For example, if path='/a/b/c',
        shared locks for '/', '/a', '/a/b', and '/a/b/c' are acquired
        in that sequence.

        Waits until the lock is acquired when blocking is True, or the
        timeout elapses.

        The lock is re-entrant - it can be acquired multiple times by the
        same thread, provided it is released the same number of times.

        :param bool blocking:  When False, returns immediately with value
            False instead of waiting.
        :param float timeout:  When set to a positive value, return False
            if the lock has not been acquired after this time (seconds)
            has elapsed.
        :return bool:  True if the lock is acquired, False otherwise.
        """

        for i in range(len(self._locks)):
            result = self._locks[i].acquire_shared_lock(
                blocking=blocking, timeout=timeout)
            if not result:
                # Failed to acquire a lock, so release locks we already
                # have and return.
                for j in range(i-1, -1, -1):
                    self._locks[j].release_shared_lock()
                return False
        return True

    def release_shared_lock(self):
        """Release the shared lock.

        The shared locks acquired for ancestor directories are also
        released, in the sequence starting furthest from the root.
        For example, when path='a/b/c', the locks are released in
        order '/a/b/c', '/a/b', '/a', '/'.
        """

        for lock in reversed(self._locks):
            lock.release_shared_lock()

    def acquire_exclusive_lock(self, blocking=True, timeout=-1):
        """Acquire the exclusive lock for the path.

        This involves obtaining shared locks on the ancestor paths
        as well.  For example, if path='/a/b/c', shared locks are
        acquired for '/', '/a', '/a/b', then the exclusive lock
        for '/a/b/c' is acquired.

        Waits until the lock is acquired when blocking is True, or the
        timeout elapses.

        The lock is re-entrant - it can be acquired multiple times by the
        same thread, provided it is released the same number of times.

        :param bool blocking:  When False, returns immediately with value
            False instead of waiting.
        :param float timeout:  When set to a positive value, return False
            if the lock has not been acquired after this time (seconds)
            has elapsed.
        :return bool:  True if the lock is acquired, False otherwise.
        """

        locks_acquired = 0
        result = True
        for i in range(len(self._locks)-1):
            result = self._locks[i].acquire_shared_lock(
                blocking=blocking, timeout=timeout)
            if not result:
                break
            else:
                locks_acquired += 1
        if result:
            result = self._locks[-1].acquire_exclusive_lock(
                blocking=blocking, timeout=timeout)
        if not result:
            # Failed to acquire a lock, so release locks we already
            # have and return.
            for j in range(locks_acquired-1, -1, -1):
                self._locks[j].release_shared_lock()
            return False
        return True

    def release_exclusive_lock(self):
        """Release the exclusive lock.

        The shared locks acquired for ancestor directories are also
        released, in the sequence starting furthest from the root.
        For example, when path='a/b/c', the locks are released in
        order '/a/b/c', '/a/b', '/a', '/'.
        """

        self._locks[-1].release_exclusive_lock()
        for lock in reversed(self._locks[:-1]):
            lock.release_shared_lock()

    @contextlib.contextmanager
    def shared_lock(self):
        self.acquire_shared_lock()
        try:
            yield self
        finally:
            self.release_shared_lock()

    @contextlib.contextmanager
    def exclusive_lock(self):
        self.acquire_exclusive_lock()
        try:
            yield self
        finally:
            self.release_exclusive_lock()

    def __pretty_print__(self, file=sys.stdout):
        print('{:s}'.format(repr(self)), file=file)
        print('  path={:s}'.format(self.path), file=file)
        for lock in self._locks:
            lock.__pretty_print__(file=file)


class SharedLock(object):
    """A re-entrant shared locking scheme.

    An exclusive lock is provided that may only be acquired by a single
    thread when no threads have the shared lock.  When the exclusive lock has
    not been acquired, multiple threads may acquire the shared lock.
    If the thread already holds the exclusive lock, it may also
    acquire the shared lock.

    The exclusive and shared locks may be acquired multiple times by the same
    thread, but must release the lock the same number of times.

    A context handler is provided so the locks may be used in constructs
    such as
        with shared_lock():
            ...
    or
        with exclusive_lock():
            ...

    The locking scheme handles stale locks.  After a short delay, locks
    acquired but not released by a terminated thread are removed.
    """


    def __init__(self, thread_watchdog_period=0.1, suppress_warnings=False):
        """Create a SharedLock.

        :param float thread_watchdog_period:  Period for checking that all
            active locks have active threads.  Locks acquired by threads
            that have since terminated are removed.
        :param bool suppress_warnings:  When False, a RuntimeWarning
            is raised when a stale lock is removed.  These warnings are
            suppressed when True.
        """

        if (not isinstance(thread_watchdog_period, float) and
                not isinstance(thread_watchdog_period, int)) or \
                thread_watchdog_period <= 0:
            raise ValueError(
                'thread_watchdog_period={:s} '.format(
                    repr(thread_watchdog_period)) +
                'argument must be a positive float or int.')
        if not isinstance(suppress_warnings, bool):
            raise ValueError('suppress_warnings={:s} '.format(repr(
                suppress_warnings)) +
                             'argument must be boolean.')
        self._condition = threading.Condition()
        # Dictionary of the threads holding the shared lock.
        # The key is the thread id, and the value is the number of times
        # the lock has been acquired by the thread.
        self._shared_threads = dict()
        self._exclusive_thread_id = None
        self._exclusive_thread_count = 0
        # Timeout for wait() before checking for stale locks.
        self._thread_watchdog_period = thread_watchdog_period
        # Maintain our own thread ids.  Cannot use threading.ident as
        # the same id is often re-used for new threads, which causes problems
        # when removing stale locks from threads that have terminated.
        self._suppress_warnings = suppress_warnings

    def _set_unique_thread_id(self):
        """Set a unique thread id.

        Thread.ident is re-used by new threads, so it cannot be used
        as the key for the dictionary of shared locks.  So we make a
        unique identifier using the lower part of Thread.ident and the
        least significant bytes of the time.
        """

        # The identifier is a 64 bit integer, with 48 bits from the time,
        # which rolls over every 3 days, and 16 bits from Thread.ident.
        time_part = int(time.perf_counter() * 1000 * 1000 * 1000) & \
            0xffffffffffff
        thread_id_part = threading.current_thread().ident & 0xffff00
        thread_id = time_part | (thread_id_part << 40)
        this_thread_obj = threading.current_thread()
        if not hasattr(this_thread_obj, 'shared_lock_thread_id'):
            this_thread_obj.shared_lock_thread_id = thread_id

    def acquire_shared_lock(self, blocking=True, timeout=-1):
        """Acquire the shared lock.

        Waits until the lock is acquired when blocking is True, or the
        timeout elapses.  The shared lock is acquired when the
        exclusive lock is None or is held by the current thread.

        The lock is re-entrant - it can be acquired multiple times by the
        same thread, provided it is released the same number of times.

        :param bool blocking:  When False, returns immediately with value
            False instead of waiting.
        :param float timeout:  When set to a positive value, return False
            if the lock has not been acquired after this time (seconds)
            has elapsed.
        :return bool:  True if the lock is acquired, False otherwise.
        """

        if not isinstance(timeout, float) and not isinstance(timeout, int):
            raise ValueError(
                'timeout argument {:s} must be a float or int.'.format(
                    repr(timeout)))
        if not isinstance(blocking, bool):
            raise ValueError('blocking={:s} '.format(repr(blocking)) +
                             'argument must be boolean.')
        end_time = time.time() + timeout
        this_thread_obj = threading.current_thread()
        if not hasattr(this_thread_obj, 'shared_lock_thread_id'):
            self._set_unique_thread_id()
        this_thread = this_thread_obj.shared_lock_thread_id
        with self._condition:
            if not blocking and not (self._exclusive_thread_id is None or
                                     self._exclusive_thread_id == this_thread):
                return_val = False
            else:
                watchdog_time = time.time() + self._thread_watchdog_period
                timed_out = False
                while not (self._exclusive_thread_id is None or
                           self._exclusive_thread_id == this_thread):
                    if timeout > 0 and time.time() > end_time:
                        timed_out = True
                        break
                    result = self._condition.wait(
                        timeout=self._thread_watchdog_period)
                    if not result or time.time() > watchdog_time:
                        # wait() timed out.
                        self._clean_stale_locks()
                        watchdog_time = time.time() + \
                                        self._thread_watchdog_period
                if timed_out:
                    return_val = False
                else:
                    # Lock is available, so get it.
                    num_acquired = self._shared_threads.get(this_thread)
                    if num_acquired is None:
                        self._shared_threads[this_thread] = 1
                    else:
                        self._shared_threads[this_thread] = num_acquired + 1
                    return_val = True
        return return_val

    def release_shared_lock(self):
        """Release the shared lock."""
        this_thread_obj = threading.current_thread()
        this_thread = this_thread_obj.shared_lock_thread_id
        num_acquired = self._shared_threads.get(this_thread)
        if num_acquired is None:
            raise RuntimeError(
                'Thread {:d} '.format(this_thread_obj.ident) +
                'attempted to release a shared lock ' +
                'it had not previously acquired.')
        with self._condition:
            if num_acquired > 1:
                self._shared_threads[this_thread] = num_acquired - 1
            else:
                # Release the lock.
                self._shared_threads.pop(this_thread)
                self._condition.notify_all()

    def acquire_exclusive_lock(self, blocking=True, timeout=-1):
        """Acquire the exclusive lock.

        Waits until the lock is acquired when blocking is True, or the
        timeout elapses.

        The lock is re-entrant - it can be acquired multiple times by the
        same thread, provided it is released the same number of times.

        :param bool blocking:  When False, returns immediately with value
            False instead of waiting.
        :param float timeout:  When set to a positive value, return False
            if the lock has not been acquired after this time (seconds)
            has elapsed.
        :return bool:  True if the lock is acquired, False otherwise.
        """

        if not isinstance(timeout, float) and not isinstance(timeout, int):
            raise ValueError(
                'timeout argument {:s} must be a float or int.'.format(
                    repr(timeout)))
        if not isinstance(blocking, bool):
            raise ValueError('blocking={:s} '.format(repr(blocking)) +
                             'argument must be boolean.')
        end_time = time.time() + timeout
        this_thread_obj = threading.current_thread()
        if not hasattr(this_thread_obj, 'shared_lock_thread_id'):
            self._set_unique_thread_id()
        this_thread = this_thread_obj.shared_lock_thread_id
        if self._shared_threads.get(this_thread) is not None and \
                self._exclusive_thread_id != this_thread:
            raise RuntimeError(
                'Thread {:d} '.format(this_thread_obj.ident) +
                'attempted to acquire the exclusive lock ' +
                'before releasing the shared lock.')
        with self._condition:
            if this_thread == self._exclusive_thread_id:
                # This thread already has the lock.
                self._exclusive_thread_count += 1
                return_val = True
            else:
                if not blocking and (self._exclusive_thread_id is not None or
                                     len(self._shared_threads) > 0):
                    return_val = False
                else:
                    # Wait until the other locks are released.
                    watchdog_time = time.time() + self._thread_watchdog_period
                    timed_out = False
                    while self._exclusive_thread_id is not None or \
                            len(self._shared_threads) > 0:
                        if timeout > 0 and time.time() > end_time:
                            timed_out = True
                            break
                        result = self._condition.wait(
                            timeout=self._thread_watchdog_period)
                        if not result or time.time() > watchdog_time:
                            # wait() timed out.
                            self._clean_stale_locks()
                            watchdog_time = time.time() + \
                                self._thread_watchdog_period
                    if timed_out:
                        return_val = False
                    else:
                        # Get the lock.
                        self._exclusive_thread_id = this_thread
                        self._exclusive_thread_count = 1
                        return_val = True
        return return_val

    def release_exclusive_lock(self):
        """Release the exclusive lock."""
        this_thread_obj = threading.current_thread()
        this_thread = this_thread_obj.shared_lock_thread_id
        if self._exclusive_thread_id != this_thread:
            raise RuntimeError(
                'Thread {:d} '.format(this_thread_obj.ident) +
                'attempted to release an exclusive lock ' +
                'it had not previously acquired.')
        with self._condition:
            if self._exclusive_thread_count > 1:
                self._exclusive_thread_count -= 1
            else:
                # Release the lock.
                self._exclusive_thread_count = 0
                self._exclusive_thread_id = None
                self._condition.notify_all()

    def _clean_stale_locks(self):
        # Remove locks for threads that terminated without releasing the lock.
        with self._condition:
            # Make a list of the active threads.
            thread_ids = []
            threads = threading.enumerate()
            for thread_ in threads:
                if thread_.is_alive() and \
                        hasattr(thread_, 'shared_lock_thread_id'):
                    thread_ids.append(thread_.shared_lock_thread_id)
            # Remove shared locks for threads that have terminated.
            shared_threads = list(self._shared_threads.keys())
            for sthread in shared_threads:
                if sthread not in thread_ids:
                    self._shared_threads.pop(sthread)
                    if not self._suppress_warnings:
                        warnings.warn(
                            'Removed shared lock acquired by thread with ' +
                            'shared_lock_thread_id={:d} '.format(sthread) +
                            'that terminated without releasing.',
                            RuntimeWarning, stacklevel=0)
            # Remove exclusive locks for threads that have terminated.
            if self._exclusive_thread_id is not None and \
                    self._exclusive_thread_id not in thread_ids:
                ex_thread = self._exclusive_thread_id
                self._exclusive_thread_id = None
                self._exclusive_thread_count = 0
                if not self._suppress_warnings:
                    warnings.warn(
                        'Removed exclusive lock acquired by thread with ' +
                        'shared_lock_thread_id={:d} '.format(ex_thread) +
                        'that terminated without releasing.',
                        RuntimeWarning, stacklevel=0)

    @contextlib.contextmanager
    def shared_lock(self):
        self.acquire_shared_lock()
        try:
            yield self
        finally:
            self.release_shared_lock()

    @contextlib.contextmanager
    def exclusive_lock(self):
        self.acquire_exclusive_lock()
        try:
            yield self
        finally:
            self.release_exclusive_lock()

    def __pretty_print__(self, file=sys.stdout):
        print('SharedLock: {:s}'.format(repr(self)), file=file)
        print('  _exclusive_thread_id={:s}'.format(
            repr(self._exclusive_thread_id)), file=file)
        print('  _exclusive_thread_count={:d}'.format(
            self._exclusive_thread_count), file=file)
        print('  _shared_threads={', file=file)
        for key, val in self._shared_threads.items():
            print('    {:s}: {:s},'.format(repr(key), repr(val)), file=file)
        print('  }', flush=True, file=file)

Presentation Slides – File Locking in Python

Leave a comment

Send us a message

The field is required.




Cant read the image? click here to refresh