Path locking in Python3, presentation by Andrew Peel to the Melbourne Python Users Group (MPUG) on 2 October, 2017.
A 15 minute presentation and demonstration describing a short python module that provides shared locks, applied to strings representing paths on a file system.
The slides from the presentation are attached in the link below.
The code referred to in the presentation is available here:
"""Shared and exclusive locks for paths.
A separate lock is provided for each path (eg. '/a/b/c'). When acquiring
a lock for a path, shared locks are first acquired on the ancestor directories
('/', '/a' and '/a/b'). This allows a thread to prevent concurrent
changes in the ancestor directories as well as the path it is
accessing.
Shared and exclusive locks are provided. Multiple shared locks may be
acquired simultaneously on a path. A shared lock may not be acquired
on a path that has an exclusive lock, except by the same thread.
An exclusive lock may not be acquired on a path that has an exclusive lock
held by another thread, or a shared lock, even when it is held by the
same thread attempting to acquire the exclusive lock.
The locks are re-entrant: they may be acquired multiple times by the same
thread, but must be released the same number of times.
Stale locks that are acquired but not released by a thread before it terminates
are removed after a watchdog timeout.
References to the locks for all paths are stored in PathLocker using a
WeakValueDictionary so that multiple threads accessing a single path
will obtain the same lock, and then the lock is garbage collected after
last thread deletes its PathLock containing a reference to the lock.
Example usage:
# In FS.__init__()
self.path_locker = PathLocker()
# In an FS method (eg. FS.makedir(path))
lock = PathLock(path, self.path_locker)
lock.acquire_exclusive_lock()
# Do the makedir()
lock.release_exclusive_lock()
# Or using the context manager
lock = PathLock(path, self.path_locker)
with lock.exclusive_lock():
# Do the makedir
"""
import threading
import contextlib
import warnings
import time
from weakref import WeakValueDictionary
from fs.path import normpath, abspath, recursepath
import sys
class PathLocker(object):
"""Shared and exclusive locks for paths.
A container storing the path locks in a WeakValueDictionary, and
providing the get_lock() called by PathLock() to add new locks
to the dictionary in a synchronised manner.
:param float thread_watchdog_period: passed to SharedLock().
:param bool suppress_warnings: passed to SharedLock().
"""
def __init__(self, thread_watchdog_period=0.1, suppress_warnings=False):
self._path_wdict = WeakValueDictionary()
self._thread_watchdog_period = thread_watchdog_period
self._suppress_warnings = suppress_warnings
# Lock to synchronise the addition of new locks to the dictionary.
self._lock = threading.Lock()
def get_lock(self, path):
"""Get the Sharedlock for a given path.
If the lock does not already exist, a new lock is created and
added to the WeakValueDictionary.
:param src path: path whose lock is retrieved. Must be an
absolute and normalised path.
"""
with self._lock:
shared_lock = self._path_wdict.get(path)
if shared_lock is None:
shared_lock = SharedLock(
thread_watchdog_period=self._thread_watchdog_period,
suppress_warnings=self._suppress_warnings)
self._path_wdict[path] = shared_lock
return shared_lock
class PathLock(object):
"""Shared and exclusive locks for paths.
Acquiring a lock for a path first obtains shared locks for
the ancestor paths, in order starting from '/'. For example,
acquire_exclusive_lock('a/b/c') does
- acquire shared lock for '/'
- acquire shared lock for '/a'
- acquire shared lock for '/a/b'
- acquire exclusive lock for '/a/b/c'
The locks are released in the reverse order.
The PathLock object contains references to the SharedLocks for each
path component. While the PathLock object is still in context,
these references ensure the SharedLocks are not garbage collected
from the PathLocker WeakValueDictionary containing all path locks.
:param str path: The path that is locked.
:param PathLocker path_locker: PathLocker object containing the
WeakValueDictionary of all the SharedLocks, with absolute paths
as the keys.
"""
def __init__(self, path, path_locker, *args, **kwargs):
self.path = abspath(normpath(path))
self._locks = []
for path_element in recursepath(self.path):
# Retrieve locks from the dictionary, or make new locks.
shared_lock = path_locker.get_lock(path_element)
self._locks.append(shared_lock)
def acquire_shared_lock(self, blocking=True, timeout=-1):
"""Acquire the shared lock for the path.
This involves obtaining shared locks on the ancestor paths
as well as the path. For example, if path='/a/b/c',
shared locks for '/', '/a', '/a/b', and '/a/b/c' are acquired
in that sequence.
Waits until the lock is acquired when blocking is True, or the
timeout elapses.
The lock is re-entrant - it can be acquired multiple times by the
same thread, provided it is released the same number of times.
:param bool blocking: When False, returns immediately with value
False instead of waiting.
:param float timeout: When set to a positive value, return False
if the lock has not been acquired after this time (seconds)
has elapsed.
:return bool: True if the lock is acquired, False otherwise.
"""
for i in range(len(self._locks)):
result = self._locks[i].acquire_shared_lock(
blocking=blocking, timeout=timeout)
if not result:
# Failed to acquire a lock, so release locks we already
# have and return.
for j in range(i-1, -1, -1):
self._locks[j].release_shared_lock()
return False
return True
def release_shared_lock(self):
"""Release the shared lock.
The shared locks acquired for ancestor directories are also
released, in the sequence starting furthest from the root.
For example, when path='a/b/c', the locks are released in
order '/a/b/c', '/a/b', '/a', '/'.
"""
for lock in reversed(self._locks):
lock.release_shared_lock()
def acquire_exclusive_lock(self, blocking=True, timeout=-1):
"""Acquire the exclusive lock for the path.
This involves obtaining shared locks on the ancestor paths
as well. For example, if path='/a/b/c', shared locks are
acquired for '/', '/a', '/a/b', then the exclusive lock
for '/a/b/c' is acquired.
Waits until the lock is acquired when blocking is True, or the
timeout elapses.
The lock is re-entrant - it can be acquired multiple times by the
same thread, provided it is released the same number of times.
:param bool blocking: When False, returns immediately with value
False instead of waiting.
:param float timeout: When set to a positive value, return False
if the lock has not been acquired after this time (seconds)
has elapsed.
:return bool: True if the lock is acquired, False otherwise.
"""
locks_acquired = 0
result = True
for i in range(len(self._locks)-1):
result = self._locks[i].acquire_shared_lock(
blocking=blocking, timeout=timeout)
if not result:
break
else:
locks_acquired += 1
if result:
result = self._locks[-1].acquire_exclusive_lock(
blocking=blocking, timeout=timeout)
if not result:
# Failed to acquire a lock, so release locks we already
# have and return.
for j in range(locks_acquired-1, -1, -1):
self._locks[j].release_shared_lock()
return False
return True
def release_exclusive_lock(self):
"""Release the exclusive lock.
The shared locks acquired for ancestor directories are also
released, in the sequence starting furthest from the root.
For example, when path='a/b/c', the locks are released in
order '/a/b/c', '/a/b', '/a', '/'.
"""
self._locks[-1].release_exclusive_lock()
for lock in reversed(self._locks[:-1]):
lock.release_shared_lock()
@contextlib.contextmanager
def shared_lock(self):
self.acquire_shared_lock()
try:
yield self
finally:
self.release_shared_lock()
@contextlib.contextmanager
def exclusive_lock(self):
self.acquire_exclusive_lock()
try:
yield self
finally:
self.release_exclusive_lock()
def __pretty_print__(self, file=sys.stdout):
print('{:s}'.format(repr(self)), file=file)
print(' path={:s}'.format(self.path), file=file)
for lock in self._locks:
lock.__pretty_print__(file=file)
class SharedLock(object):
"""A re-entrant shared locking scheme.
An exclusive lock is provided that may only be acquired by a single
thread when no threads have the shared lock. When the exclusive lock has
not been acquired, multiple threads may acquire the shared lock.
If the thread already holds the exclusive lock, it may also
acquire the shared lock.
The exclusive and shared locks may be acquired multiple times by the same
thread, but must release the lock the same number of times.
A context handler is provided so the locks may be used in constructs
such as
with shared_lock():
...
or
with exclusive_lock():
...
The locking scheme handles stale locks. After a short delay, locks
acquired but not released by a terminated thread are removed.
"""
def __init__(self, thread_watchdog_period=0.1, suppress_warnings=False):
"""Create a SharedLock.
:param float thread_watchdog_period: Period for checking that all
active locks have active threads. Locks acquired by threads
that have since terminated are removed.
:param bool suppress_warnings: When False, a RuntimeWarning
is raised when a stale lock is removed. These warnings are
suppressed when True.
"""
if (not isinstance(thread_watchdog_period, float) and
not isinstance(thread_watchdog_period, int)) or \
thread_watchdog_period <= 0:
raise ValueError(
'thread_watchdog_period={:s} '.format(
repr(thread_watchdog_period)) +
'argument must be a positive float or int.')
if not isinstance(suppress_warnings, bool):
raise ValueError('suppress_warnings={:s} '.format(repr(
suppress_warnings)) +
'argument must be boolean.')
self._condition = threading.Condition()
# Dictionary of the threads holding the shared lock.
# The key is the thread id, and the value is the number of times
# the lock has been acquired by the thread.
self._shared_threads = dict()
self._exclusive_thread_id = None
self._exclusive_thread_count = 0
# Timeout for wait() before checking for stale locks.
self._thread_watchdog_period = thread_watchdog_period
# Maintain our own thread ids. Cannot use threading.ident as
# the same id is often re-used for new threads, which causes problems
# when removing stale locks from threads that have terminated.
self._suppress_warnings = suppress_warnings
def _set_unique_thread_id(self):
"""Set a unique thread id.
Thread.ident is re-used by new threads, so it cannot be used
as the key for the dictionary of shared locks. So we make a
unique identifier using the lower part of Thread.ident and the
least significant bytes of the time.
"""
# The identifier is a 64 bit integer, with 48 bits from the time,
# which rolls over every 3 days, and 16 bits from Thread.ident.
time_part = int(time.perf_counter() * 1000 * 1000 * 1000) & \
0xffffffffffff
thread_id_part = threading.current_thread().ident & 0xffff00
thread_id = time_part | (thread_id_part << 40)
this_thread_obj = threading.current_thread()
if not hasattr(this_thread_obj, 'shared_lock_thread_id'):
this_thread_obj.shared_lock_thread_id = thread_id
def acquire_shared_lock(self, blocking=True, timeout=-1):
"""Acquire the shared lock.
Waits until the lock is acquired when blocking is True, or the
timeout elapses. The shared lock is acquired when the
exclusive lock is None or is held by the current thread.
The lock is re-entrant - it can be acquired multiple times by the
same thread, provided it is released the same number of times.
:param bool blocking: When False, returns immediately with value
False instead of waiting.
:param float timeout: When set to a positive value, return False
if the lock has not been acquired after this time (seconds)
has elapsed.
:return bool: True if the lock is acquired, False otherwise.
"""
if not isinstance(timeout, float) and not isinstance(timeout, int):
raise ValueError(
'timeout argument {:s} must be a float or int.'.format(
repr(timeout)))
if not isinstance(blocking, bool):
raise ValueError('blocking={:s} '.format(repr(blocking)) +
'argument must be boolean.')
end_time = time.time() + timeout
this_thread_obj = threading.current_thread()
if not hasattr(this_thread_obj, 'shared_lock_thread_id'):
self._set_unique_thread_id()
this_thread = this_thread_obj.shared_lock_thread_id
with self._condition:
if not blocking and not (self._exclusive_thread_id is None or
self._exclusive_thread_id == this_thread):
return_val = False
else:
watchdog_time = time.time() + self._thread_watchdog_period
timed_out = False
while not (self._exclusive_thread_id is None or
self._exclusive_thread_id == this_thread):
if timeout > 0 and time.time() > end_time:
timed_out = True
break
result = self._condition.wait(
timeout=self._thread_watchdog_period)
if not result or time.time() > watchdog_time:
# wait() timed out.
self._clean_stale_locks()
watchdog_time = time.time() + \
self._thread_watchdog_period
if timed_out:
return_val = False
else:
# Lock is available, so get it.
num_acquired = self._shared_threads.get(this_thread)
if num_acquired is None:
self._shared_threads[this_thread] = 1
else:
self._shared_threads[this_thread] = num_acquired + 1
return_val = True
return return_val
def release_shared_lock(self):
"""Release the shared lock."""
this_thread_obj = threading.current_thread()
this_thread = this_thread_obj.shared_lock_thread_id
num_acquired = self._shared_threads.get(this_thread)
if num_acquired is None:
raise RuntimeError(
'Thread {:d} '.format(this_thread_obj.ident) +
'attempted to release a shared lock ' +
'it had not previously acquired.')
with self._condition:
if num_acquired > 1:
self._shared_threads[this_thread] = num_acquired - 1
else:
# Release the lock.
self._shared_threads.pop(this_thread)
self._condition.notify_all()
def acquire_exclusive_lock(self, blocking=True, timeout=-1):
"""Acquire the exclusive lock.
Waits until the lock is acquired when blocking is True, or the
timeout elapses.
The lock is re-entrant - it can be acquired multiple times by the
same thread, provided it is released the same number of times.
:param bool blocking: When False, returns immediately with value
False instead of waiting.
:param float timeout: When set to a positive value, return False
if the lock has not been acquired after this time (seconds)
has elapsed.
:return bool: True if the lock is acquired, False otherwise.
"""
if not isinstance(timeout, float) and not isinstance(timeout, int):
raise ValueError(
'timeout argument {:s} must be a float or int.'.format(
repr(timeout)))
if not isinstance(blocking, bool):
raise ValueError('blocking={:s} '.format(repr(blocking)) +
'argument must be boolean.')
end_time = time.time() + timeout
this_thread_obj = threading.current_thread()
if not hasattr(this_thread_obj, 'shared_lock_thread_id'):
self._set_unique_thread_id()
this_thread = this_thread_obj.shared_lock_thread_id
if self._shared_threads.get(this_thread) is not None and \
self._exclusive_thread_id != this_thread:
raise RuntimeError(
'Thread {:d} '.format(this_thread_obj.ident) +
'attempted to acquire the exclusive lock ' +
'before releasing the shared lock.')
with self._condition:
if this_thread == self._exclusive_thread_id:
# This thread already has the lock.
self._exclusive_thread_count += 1
return_val = True
else:
if not blocking and (self._exclusive_thread_id is not None or
len(self._shared_threads) > 0):
return_val = False
else:
# Wait until the other locks are released.
watchdog_time = time.time() + self._thread_watchdog_period
timed_out = False
while self._exclusive_thread_id is not None or \
len(self._shared_threads) > 0:
if timeout > 0 and time.time() > end_time:
timed_out = True
break
result = self._condition.wait(
timeout=self._thread_watchdog_period)
if not result or time.time() > watchdog_time:
# wait() timed out.
self._clean_stale_locks()
watchdog_time = time.time() + \
self._thread_watchdog_period
if timed_out:
return_val = False
else:
# Get the lock.
self._exclusive_thread_id = this_thread
self._exclusive_thread_count = 1
return_val = True
return return_val
def release_exclusive_lock(self):
"""Release the exclusive lock."""
this_thread_obj = threading.current_thread()
this_thread = this_thread_obj.shared_lock_thread_id
if self._exclusive_thread_id != this_thread:
raise RuntimeError(
'Thread {:d} '.format(this_thread_obj.ident) +
'attempted to release an exclusive lock ' +
'it had not previously acquired.')
with self._condition:
if self._exclusive_thread_count > 1:
self._exclusive_thread_count -= 1
else:
# Release the lock.
self._exclusive_thread_count = 0
self._exclusive_thread_id = None
self._condition.notify_all()
def _clean_stale_locks(self):
# Remove locks for threads that terminated without releasing the lock.
with self._condition:
# Make a list of the active threads.
thread_ids = []
threads = threading.enumerate()
for thread_ in threads:
if thread_.is_alive() and \
hasattr(thread_, 'shared_lock_thread_id'):
thread_ids.append(thread_.shared_lock_thread_id)
# Remove shared locks for threads that have terminated.
shared_threads = list(self._shared_threads.keys())
for sthread in shared_threads:
if sthread not in thread_ids:
self._shared_threads.pop(sthread)
if not self._suppress_warnings:
warnings.warn(
'Removed shared lock acquired by thread with ' +
'shared_lock_thread_id={:d} '.format(sthread) +
'that terminated without releasing.',
RuntimeWarning, stacklevel=0)
# Remove exclusive locks for threads that have terminated.
if self._exclusive_thread_id is not None and \
self._exclusive_thread_id not in thread_ids:
ex_thread = self._exclusive_thread_id
self._exclusive_thread_id = None
self._exclusive_thread_count = 0
if not self._suppress_warnings:
warnings.warn(
'Removed exclusive lock acquired by thread with ' +
'shared_lock_thread_id={:d} '.format(ex_thread) +
'that terminated without releasing.',
RuntimeWarning, stacklevel=0)
@contextlib.contextmanager
def shared_lock(self):
self.acquire_shared_lock()
try:
yield self
finally:
self.release_shared_lock()
@contextlib.contextmanager
def exclusive_lock(self):
self.acquire_exclusive_lock()
try:
yield self
finally:
self.release_exclusive_lock()
def __pretty_print__(self, file=sys.stdout):
print('SharedLock: {:s}'.format(repr(self)), file=file)
print(' _exclusive_thread_id={:s}'.format(
repr(self._exclusive_thread_id)), file=file)
print(' _exclusive_thread_count={:d}'.format(
self._exclusive_thread_count), file=file)
print(' _shared_threads={', file=file)
for key, val in self._shared_threads.items():
print(' {:s}: {:s},'.format(repr(key), repr(val)), file=file)
print(' }', flush=True, file=file)
A separate lock is provided for each path (eg. '/a/b/c'). When acquiring
a lock for a path, shared locks are first acquired on the ancestor directories
('/', '/a' and '/a/b'). This allows a thread to prevent concurrent
changes in the ancestor directories as well as the path it is
accessing.
Shared and exclusive locks are provided. Multiple shared locks may be
acquired simultaneously on a path. A shared lock may not be acquired
on a path that has an exclusive lock, except by the same thread.
An exclusive lock may not be acquired on a path that has an exclusive lock
held by another thread, or a shared lock, even when it is held by the
same thread attempting to acquire the exclusive lock.
The locks are re-entrant: they may be acquired multiple times by the same
thread, but must be released the same number of times.
Stale locks that are acquired but not released by a thread before it terminates
are removed after a watchdog timeout.
References to the locks for all paths are stored in PathLocker using a
WeakValueDictionary so that multiple threads accessing a single path
will obtain the same lock, and then the lock is garbage collected after
last thread deletes its PathLock containing a reference to the lock.
Example usage:
# In FS.__init__()
self.path_locker = PathLocker()
# In an FS method (eg. FS.makedir(path))
lock = PathLock(path, self.path_locker)
lock.acquire_exclusive_lock()
# Do the makedir()
lock.release_exclusive_lock()
# Or using the context manager
lock = PathLock(path, self.path_locker)
with lock.exclusive_lock():
# Do the makedir
"""
import threading
import contextlib
import warnings
import time
from weakref import WeakValueDictionary
from fs.path import normpath, abspath, recursepath
import sys
class PathLocker(object):
"""Shared and exclusive locks for paths.
A container storing the path locks in a WeakValueDictionary, and
providing the get_lock() called by PathLock() to add new locks
to the dictionary in a synchronised manner.
:param float thread_watchdog_period: passed to SharedLock().
:param bool suppress_warnings: passed to SharedLock().
"""
def __init__(self, thread_watchdog_period=0.1, suppress_warnings=False):
self._path_wdict = WeakValueDictionary()
self._thread_watchdog_period = thread_watchdog_period
self._suppress_warnings = suppress_warnings
# Lock to synchronise the addition of new locks to the dictionary.
self._lock = threading.Lock()
def get_lock(self, path):
"""Get the Sharedlock for a given path.
If the lock does not already exist, a new lock is created and
added to the WeakValueDictionary.
:param src path: path whose lock is retrieved. Must be an
absolute and normalised path.
"""
with self._lock:
shared_lock = self._path_wdict.get(path)
if shared_lock is None:
shared_lock = SharedLock(
thread_watchdog_period=self._thread_watchdog_period,
suppress_warnings=self._suppress_warnings)
self._path_wdict[path] = shared_lock
return shared_lock
class PathLock(object):
"""Shared and exclusive locks for paths.
Acquiring a lock for a path first obtains shared locks for
the ancestor paths, in order starting from '/'. For example,
acquire_exclusive_lock('a/b/c') does
- acquire shared lock for '/'
- acquire shared lock for '/a'
- acquire shared lock for '/a/b'
- acquire exclusive lock for '/a/b/c'
The locks are released in the reverse order.
The PathLock object contains references to the SharedLocks for each
path component. While the PathLock object is still in context,
these references ensure the SharedLocks are not garbage collected
from the PathLocker WeakValueDictionary containing all path locks.
:param str path: The path that is locked.
:param PathLocker path_locker: PathLocker object containing the
WeakValueDictionary of all the SharedLocks, with absolute paths
as the keys.
"""
def __init__(self, path, path_locker, *args, **kwargs):
self.path = abspath(normpath(path))
self._locks = []
for path_element in recursepath(self.path):
# Retrieve locks from the dictionary, or make new locks.
shared_lock = path_locker.get_lock(path_element)
self._locks.append(shared_lock)
def acquire_shared_lock(self, blocking=True, timeout=-1):
"""Acquire the shared lock for the path.
This involves obtaining shared locks on the ancestor paths
as well as the path. For example, if path='/a/b/c',
shared locks for '/', '/a', '/a/b', and '/a/b/c' are acquired
in that sequence.
Waits until the lock is acquired when blocking is True, or the
timeout elapses.
The lock is re-entrant - it can be acquired multiple times by the
same thread, provided it is released the same number of times.
:param bool blocking: When False, returns immediately with value
False instead of waiting.
:param float timeout: When set to a positive value, return False
if the lock has not been acquired after this time (seconds)
has elapsed.
:return bool: True if the lock is acquired, False otherwise.
"""
for i in range(len(self._locks)):
result = self._locks[i].acquire_shared_lock(
blocking=blocking, timeout=timeout)
if not result:
# Failed to acquire a lock, so release locks we already
# have and return.
for j in range(i-1, -1, -1):
self._locks[j].release_shared_lock()
return False
return True
def release_shared_lock(self):
"""Release the shared lock.
The shared locks acquired for ancestor directories are also
released, in the sequence starting furthest from the root.
For example, when path='a/b/c', the locks are released in
order '/a/b/c', '/a/b', '/a', '/'.
"""
for lock in reversed(self._locks):
lock.release_shared_lock()
def acquire_exclusive_lock(self, blocking=True, timeout=-1):
"""Acquire the exclusive lock for the path.
This involves obtaining shared locks on the ancestor paths
as well. For example, if path='/a/b/c', shared locks are
acquired for '/', '/a', '/a/b', then the exclusive lock
for '/a/b/c' is acquired.
Waits until the lock is acquired when blocking is True, or the
timeout elapses.
The lock is re-entrant - it can be acquired multiple times by the
same thread, provided it is released the same number of times.
:param bool blocking: When False, returns immediately with value
False instead of waiting.
:param float timeout: When set to a positive value, return False
if the lock has not been acquired after this time (seconds)
has elapsed.
:return bool: True if the lock is acquired, False otherwise.
"""
locks_acquired = 0
result = True
for i in range(len(self._locks)-1):
result = self._locks[i].acquire_shared_lock(
blocking=blocking, timeout=timeout)
if not result:
break
else:
locks_acquired += 1
if result:
result = self._locks[-1].acquire_exclusive_lock(
blocking=blocking, timeout=timeout)
if not result:
# Failed to acquire a lock, so release locks we already
# have and return.
for j in range(locks_acquired-1, -1, -1):
self._locks[j].release_shared_lock()
return False
return True
def release_exclusive_lock(self):
"""Release the exclusive lock.
The shared locks acquired for ancestor directories are also
released, in the sequence starting furthest from the root.
For example, when path='a/b/c', the locks are released in
order '/a/b/c', '/a/b', '/a', '/'.
"""
self._locks[-1].release_exclusive_lock()
for lock in reversed(self._locks[:-1]):
lock.release_shared_lock()
@contextlib.contextmanager
def shared_lock(self):
self.acquire_shared_lock()
try:
yield self
finally:
self.release_shared_lock()
@contextlib.contextmanager
def exclusive_lock(self):
self.acquire_exclusive_lock()
try:
yield self
finally:
self.release_exclusive_lock()
def __pretty_print__(self, file=sys.stdout):
print('{:s}'.format(repr(self)), file=file)
print(' path={:s}'.format(self.path), file=file)
for lock in self._locks:
lock.__pretty_print__(file=file)
class SharedLock(object):
"""A re-entrant shared locking scheme.
An exclusive lock is provided that may only be acquired by a single
thread when no threads have the shared lock. When the exclusive lock has
not been acquired, multiple threads may acquire the shared lock.
If the thread already holds the exclusive lock, it may also
acquire the shared lock.
The exclusive and shared locks may be acquired multiple times by the same
thread, but must release the lock the same number of times.
A context handler is provided so the locks may be used in constructs
such as
with shared_lock():
...
or
with exclusive_lock():
...
The locking scheme handles stale locks. After a short delay, locks
acquired but not released by a terminated thread are removed.
"""
def __init__(self, thread_watchdog_period=0.1, suppress_warnings=False):
"""Create a SharedLock.
:param float thread_watchdog_period: Period for checking that all
active locks have active threads. Locks acquired by threads
that have since terminated are removed.
:param bool suppress_warnings: When False, a RuntimeWarning
is raised when a stale lock is removed. These warnings are
suppressed when True.
"""
if (not isinstance(thread_watchdog_period, float) and
not isinstance(thread_watchdog_period, int)) or \
thread_watchdog_period <= 0:
raise ValueError(
'thread_watchdog_period={:s} '.format(
repr(thread_watchdog_period)) +
'argument must be a positive float or int.')
if not isinstance(suppress_warnings, bool):
raise ValueError('suppress_warnings={:s} '.format(repr(
suppress_warnings)) +
'argument must be boolean.')
self._condition = threading.Condition()
# Dictionary of the threads holding the shared lock.
# The key is the thread id, and the value is the number of times
# the lock has been acquired by the thread.
self._shared_threads = dict()
self._exclusive_thread_id = None
self._exclusive_thread_count = 0
# Timeout for wait() before checking for stale locks.
self._thread_watchdog_period = thread_watchdog_period
# Maintain our own thread ids. Cannot use threading.ident as
# the same id is often re-used for new threads, which causes problems
# when removing stale locks from threads that have terminated.
self._suppress_warnings = suppress_warnings
def _set_unique_thread_id(self):
"""Set a unique thread id.
Thread.ident is re-used by new threads, so it cannot be used
as the key for the dictionary of shared locks. So we make a
unique identifier using the lower part of Thread.ident and the
least significant bytes of the time.
"""
# The identifier is a 64 bit integer, with 48 bits from the time,
# which rolls over every 3 days, and 16 bits from Thread.ident.
time_part = int(time.perf_counter() * 1000 * 1000 * 1000) & \
0xffffffffffff
thread_id_part = threading.current_thread().ident & 0xffff00
thread_id = time_part | (thread_id_part << 40)
this_thread_obj = threading.current_thread()
if not hasattr(this_thread_obj, 'shared_lock_thread_id'):
this_thread_obj.shared_lock_thread_id = thread_id
def acquire_shared_lock(self, blocking=True, timeout=-1):
"""Acquire the shared lock.
Waits until the lock is acquired when blocking is True, or the
timeout elapses. The shared lock is acquired when the
exclusive lock is None or is held by the current thread.
The lock is re-entrant - it can be acquired multiple times by the
same thread, provided it is released the same number of times.
:param bool blocking: When False, returns immediately with value
False instead of waiting.
:param float timeout: When set to a positive value, return False
if the lock has not been acquired after this time (seconds)
has elapsed.
:return bool: True if the lock is acquired, False otherwise.
"""
if not isinstance(timeout, float) and not isinstance(timeout, int):
raise ValueError(
'timeout argument {:s} must be a float or int.'.format(
repr(timeout)))
if not isinstance(blocking, bool):
raise ValueError('blocking={:s} '.format(repr(blocking)) +
'argument must be boolean.')
end_time = time.time() + timeout
this_thread_obj = threading.current_thread()
if not hasattr(this_thread_obj, 'shared_lock_thread_id'):
self._set_unique_thread_id()
this_thread = this_thread_obj.shared_lock_thread_id
with self._condition:
if not blocking and not (self._exclusive_thread_id is None or
self._exclusive_thread_id == this_thread):
return_val = False
else:
watchdog_time = time.time() + self._thread_watchdog_period
timed_out = False
while not (self._exclusive_thread_id is None or
self._exclusive_thread_id == this_thread):
if timeout > 0 and time.time() > end_time:
timed_out = True
break
result = self._condition.wait(
timeout=self._thread_watchdog_period)
if not result or time.time() > watchdog_time:
# wait() timed out.
self._clean_stale_locks()
watchdog_time = time.time() + \
self._thread_watchdog_period
if timed_out:
return_val = False
else:
# Lock is available, so get it.
num_acquired = self._shared_threads.get(this_thread)
if num_acquired is None:
self._shared_threads[this_thread] = 1
else:
self._shared_threads[this_thread] = num_acquired + 1
return_val = True
return return_val
def release_shared_lock(self):
"""Release the shared lock."""
this_thread_obj = threading.current_thread()
this_thread = this_thread_obj.shared_lock_thread_id
num_acquired = self._shared_threads.get(this_thread)
if num_acquired is None:
raise RuntimeError(
'Thread {:d} '.format(this_thread_obj.ident) +
'attempted to release a shared lock ' +
'it had not previously acquired.')
with self._condition:
if num_acquired > 1:
self._shared_threads[this_thread] = num_acquired - 1
else:
# Release the lock.
self._shared_threads.pop(this_thread)
self._condition.notify_all()
def acquire_exclusive_lock(self, blocking=True, timeout=-1):
"""Acquire the exclusive lock.
Waits until the lock is acquired when blocking is True, or the
timeout elapses.
The lock is re-entrant - it can be acquired multiple times by the
same thread, provided it is released the same number of times.
:param bool blocking: When False, returns immediately with value
False instead of waiting.
:param float timeout: When set to a positive value, return False
if the lock has not been acquired after this time (seconds)
has elapsed.
:return bool: True if the lock is acquired, False otherwise.
"""
if not isinstance(timeout, float) and not isinstance(timeout, int):
raise ValueError(
'timeout argument {:s} must be a float or int.'.format(
repr(timeout)))
if not isinstance(blocking, bool):
raise ValueError('blocking={:s} '.format(repr(blocking)) +
'argument must be boolean.')
end_time = time.time() + timeout
this_thread_obj = threading.current_thread()
if not hasattr(this_thread_obj, 'shared_lock_thread_id'):
self._set_unique_thread_id()
this_thread = this_thread_obj.shared_lock_thread_id
if self._shared_threads.get(this_thread) is not None and \
self._exclusive_thread_id != this_thread:
raise RuntimeError(
'Thread {:d} '.format(this_thread_obj.ident) +
'attempted to acquire the exclusive lock ' +
'before releasing the shared lock.')
with self._condition:
if this_thread == self._exclusive_thread_id:
# This thread already has the lock.
self._exclusive_thread_count += 1
return_val = True
else:
if not blocking and (self._exclusive_thread_id is not None or
len(self._shared_threads) > 0):
return_val = False
else:
# Wait until the other locks are released.
watchdog_time = time.time() + self._thread_watchdog_period
timed_out = False
while self._exclusive_thread_id is not None or \
len(self._shared_threads) > 0:
if timeout > 0 and time.time() > end_time:
timed_out = True
break
result = self._condition.wait(
timeout=self._thread_watchdog_period)
if not result or time.time() > watchdog_time:
# wait() timed out.
self._clean_stale_locks()
watchdog_time = time.time() + \
self._thread_watchdog_period
if timed_out:
return_val = False
else:
# Get the lock.
self._exclusive_thread_id = this_thread
self._exclusive_thread_count = 1
return_val = True
return return_val
def release_exclusive_lock(self):
"""Release the exclusive lock."""
this_thread_obj = threading.current_thread()
this_thread = this_thread_obj.shared_lock_thread_id
if self._exclusive_thread_id != this_thread:
raise RuntimeError(
'Thread {:d} '.format(this_thread_obj.ident) +
'attempted to release an exclusive lock ' +
'it had not previously acquired.')
with self._condition:
if self._exclusive_thread_count > 1:
self._exclusive_thread_count -= 1
else:
# Release the lock.
self._exclusive_thread_count = 0
self._exclusive_thread_id = None
self._condition.notify_all()
def _clean_stale_locks(self):
# Remove locks for threads that terminated without releasing the lock.
with self._condition:
# Make a list of the active threads.
thread_ids = []
threads = threading.enumerate()
for thread_ in threads:
if thread_.is_alive() and \
hasattr(thread_, 'shared_lock_thread_id'):
thread_ids.append(thread_.shared_lock_thread_id)
# Remove shared locks for threads that have terminated.
shared_threads = list(self._shared_threads.keys())
for sthread in shared_threads:
if sthread not in thread_ids:
self._shared_threads.pop(sthread)
if not self._suppress_warnings:
warnings.warn(
'Removed shared lock acquired by thread with ' +
'shared_lock_thread_id={:d} '.format(sthread) +
'that terminated without releasing.',
RuntimeWarning, stacklevel=0)
# Remove exclusive locks for threads that have terminated.
if self._exclusive_thread_id is not None and \
self._exclusive_thread_id not in thread_ids:
ex_thread = self._exclusive_thread_id
self._exclusive_thread_id = None
self._exclusive_thread_count = 0
if not self._suppress_warnings:
warnings.warn(
'Removed exclusive lock acquired by thread with ' +
'shared_lock_thread_id={:d} '.format(ex_thread) +
'that terminated without releasing.',
RuntimeWarning, stacklevel=0)
@contextlib.contextmanager
def shared_lock(self):
self.acquire_shared_lock()
try:
yield self
finally:
self.release_shared_lock()
@contextlib.contextmanager
def exclusive_lock(self):
self.acquire_exclusive_lock()
try:
yield self
finally:
self.release_exclusive_lock()
def __pretty_print__(self, file=sys.stdout):
print('SharedLock: {:s}'.format(repr(self)), file=file)
print(' _exclusive_thread_id={:s}'.format(
repr(self._exclusive_thread_id)), file=file)
print(' _exclusive_thread_count={:d}'.format(
self._exclusive_thread_count), file=file)
print(' _shared_threads={', file=file)
for key, val in self._shared_threads.items():
print(' {:s}: {:s},'.format(repr(key), repr(val)), file=file)
print(' }', flush=True, file=file)