diff options
author | Marcus Müller <marcus.mueller@ettus.com> | 2018-11-04 18:44:10 +0100 |
---|---|---|
committer | Andrej Rode <mail@andrejro.de> | 2018-11-12 12:30:42 +0100 |
commit | 9def7b29897e137f4936fbbaaae990910c0913a1 (patch) | |
tree | 0caefef7e2c8a355471b77bff0ad0a4dbe167c65 /gnuradio-runtime/python/gnuradio | |
parent | b1e0b54d02bb981eba734ea29174e6134950dde6 (diff) |
runtime: gr_threading clutch obsolete; remove it and its usage
We essentially monkeypatched Python's `threading` module inelegantly to
get around shortcomings in Python 2.3 and 2.4.
Since support for these versions is long gone: removal of this clutch
and its usage.
Diffstat (limited to 'gnuradio-runtime/python/gnuradio')
6 files changed, 7 insertions, 1573 deletions
diff --git a/gnuradio-runtime/python/gnuradio/gr/CMakeLists.txt b/gnuradio-runtime/python/gnuradio/gr/CMakeLists.txt index 1b535ef997..87631f11bd 100644 --- a/gnuradio-runtime/python/gnuradio/gr/CMakeLists.txt +++ b/gnuradio-runtime/python/gnuradio/gr/CMakeLists.txt @@ -26,9 +26,6 @@ GR_PYTHON_INSTALL(FILES tag_utils.py packet_utils.py gateway.py - gr_threading.py - gr_threading_23.py - gr_threading_24.py hier_block2.py top_block.py pubsub.py diff --git a/gnuradio-runtime/python/gnuradio/gr/gr_threading.py b/gnuradio-runtime/python/gnuradio/gr/gr_threading.py deleted file mode 100644 index cb0519f6f6..0000000000 --- a/gnuradio-runtime/python/gnuradio/gr/gr_threading.py +++ /dev/null @@ -1,37 +0,0 @@ -from __future__ import absolute_import -from __future__ import unicode_literals -# -# Copyright 2005 Free Software Foundation, Inc. -# -# This file is part of GNU Radio -# -# GNU Radio is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3, or (at your option) -# any later version. -# -# GNU Radio is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with GNU Radio; see the file COPYING. If not, write to -# the Free Software Foundation, Inc., 51 Franklin Street, -# Boston, MA 02110-1301, USA. -# - -from sys import version_info as _version_info - -# import patched version of standard threading module - -if _version_info[0:2] == (2, 3): - #print "Importing gr_threading_23" - from .gr_threading_23 import * -elif _version_info[0:2] == (2, 4): - #print "Importing gr_threading_24" - from .gr_threading_24 import * -else: - # assume the patch was applied... - #print "Importing system provided threading" - from threading import * diff --git a/gnuradio-runtime/python/gnuradio/gr/gr_threading_23.py b/gnuradio-runtime/python/gnuradio/gr/gr_threading_23.py deleted file mode 100644 index f4f062f715..0000000000 --- a/gnuradio-runtime/python/gnuradio/gr/gr_threading_23.py +++ /dev/null @@ -1,728 +0,0 @@ -"""Thread module emulating a subset of Java's threading model.""" - -# This started life as the threading.py module of Python 2.3 -# It's been patched to fix a problem with join, where a KeyboardInterrupt -# caused a lock to be left in the acquired state. - -from __future__ import print_function -from __future__ import unicode_literals - -import sys as _sys - -try: - import _thread -except ImportError: - del _sys.modules[__name__] - raise - -from io import StringIO as _StringIO -from time import time as _time, sleep as _sleep -from traceback import print_exc as _print_exc - -# Rename some stuff so "from threading import *" is safe -__all__ = ['activeCount', 'Condition', 'currentThread', 'enumerate', 'Event', - 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', - 'Timer', 'setprofile', 'settrace'] - -_start_new_thread = _thread.start_new_thread -_allocate_lock = _thread.allocate_lock -_get_ident = _thread.get_ident -ThreadError = _thread.error -del _thread - - -# Debug support (adapted from ihooks.py). -# All the major classes here derive from _Verbose. We force that to -# be a new-style class so that all the major classes here are new-style. -# This helps debugging (type(instance) is more revealing for instances -# of new-style classes). - -_VERBOSE = False - -if __debug__: - - class _Verbose(object): - - def __init__(self, verbose=None): - if verbose is None: - verbose = _VERBOSE - self.__verbose = verbose - - def _note(self, format, *args): - if self.__verbose: - format = format % args - format = "%s: %s\n" % ( - currentThread().getName(), format) - _sys.stderr.write(format) - -else: - # Disable this when using "python -O" - class _Verbose(object): - def __init__(self, verbose=None): - pass - def _note(self, *args): - pass - -# Support for profile and trace hooks - -_profile_hook = None -_trace_hook = None - -def setprofile(func): - global _profile_hook - _profile_hook = func - -def settrace(func): - global _trace_hook - _trace_hook = func - -# Synchronization classes - -Lock = _allocate_lock - -def RLock(*args, **kwargs): - return _RLock(*args, **kwargs) - -class _RLock(_Verbose): - - def __init__(self, verbose=None): - _Verbose.__init__(self, verbose) - self.__block = _allocate_lock() - self.__owner = None - self.__count = 0 - - def __repr__(self): - return "<%s(%s, %d)>" % ( - self.__class__.__name__, - self.__owner and self.__owner.getName(), - self.__count) - - def acquire(self, blocking=1): - me = currentThread() - if self.__owner is me: - self.__count = self.__count + 1 - if __debug__: - self._note("%s.acquire(%s): recursive success", self, blocking) - return 1 - rc = self.__block.acquire(blocking) - if rc: - self.__owner = me - self.__count = 1 - if __debug__: - self._note("%s.acquire(%s): initial success", self, blocking) - else: - if __debug__: - self._note("%s.acquire(%s): failure", self, blocking) - return rc - - def release(self): - me = currentThread() - assert self.__owner is me, "release() of un-acquire()d lock" - self.__count = count = self.__count - 1 - if not count: - self.__owner = None - self.__block.release() - if __debug__: - self._note("%s.release(): final release", self) - else: - if __debug__: - self._note("%s.release(): non-final release", self) - - # Internal methods used by condition variables - - def _acquire_restore(self, lock): - self.__block.acquire() - count, owner = lock - self.__count = count - self.__owner = owner - if __debug__: - self._note("%s._acquire_restore()", self) - - def _release_save(self): - if __debug__: - self._note("%s._release_save()", self) - count = self.__count - self.__count = 0 - owner = self.__owner - self.__owner = None - self.__block.release() - return (count, owner) - - def _is_owned(self): - return self.__owner is currentThread() - - -def Condition(*args, **kwargs): - return _Condition(*args, **kwargs) - -class _Condition(_Verbose): - - def __init__(self, lock=None, verbose=None): - _Verbose.__init__(self, verbose) - if lock is None: - lock = RLock() - self.__lock = lock - # Export the lock's acquire() and release() methods - self.acquire = lock.acquire - self.release = lock.release - # If the lock defines _release_save() and/or _acquire_restore(), - # these override the default implementations (which just call - # release() and acquire() on the lock). Ditto for _is_owned(). - try: - self._release_save = lock._release_save - except AttributeError: - pass - try: - self._acquire_restore = lock._acquire_restore - except AttributeError: - pass - try: - self._is_owned = lock._is_owned - except AttributeError: - pass - self.__waiters = [] - - def __repr__(self): - return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters)) - - def _release_save(self): - self.__lock.release() # No state to save - - def _acquire_restore(self, x): - self.__lock.acquire() # Ignore saved state - - def _is_owned(self): - # Return True if lock is owned by currentThread. - # This method is called only if __lock doesn't have _is_owned(). - if self.__lock.acquire(0): - self.__lock.release() - return False - else: - return True - - def wait(self, timeout=None): - currentThread() # for side-effect - assert self._is_owned(), "wait() of un-acquire()d lock" - waiter = _allocate_lock() - waiter.acquire() - self.__waiters.append(waiter) - saved_state = self._release_save() - try: # restore state no matter what (e.g., KeyboardInterrupt) - if timeout is None: - waiter.acquire() - if __debug__: - self._note("%s.wait(): got it", self) - else: - # Balancing act: We can't afford a pure busy loop, so we - # have to sleep; but if we sleep the whole timeout time, - # we'll be unresponsive. The scheme here sleeps very - # little at first, longer as time goes on, but never longer - # than 20 times per second (or the timeout time remaining). - endtime = _time() + timeout - delay = 0.0005 # 500 us -> initial delay of 1 ms - while True: - gotit = waiter.acquire(0) - if gotit: - break - remaining = endtime - _time() - if remaining <= 0: - break - delay = min(delay * 2, remaining, .05) - _sleep(delay) - if not gotit: - if __debug__: - self._note("%s.wait(%s): timed out", self, timeout) - try: - self.__waiters.remove(waiter) - except ValueError: - pass - else: - if __debug__: - self._note("%s.wait(%s): got it", self, timeout) - finally: - self._acquire_restore(saved_state) - - def notify(self, n=1): - currentThread() # for side-effect - assert self._is_owned(), "notify() of un-acquire()d lock" - __waiters = self.__waiters - waiters = __waiters[:n] - if not waiters: - if __debug__: - self._note("%s.notify(): no waiters", self) - return - self._note("%s.notify(): notifying %d waiter%s", self, n, - n!=1 and "s" or "") - for waiter in waiters: - waiter.release() - try: - __waiters.remove(waiter) - except ValueError: - pass - - def notifyAll(self): - self.notify(len(self.__waiters)) - - -def Semaphore(*args, **kwargs): - return _Semaphore(*args, **kwargs) - -class _Semaphore(_Verbose): - - # After Tim Peters' semaphore class, but not quite the same (no maximum) - - def __init__(self, value=1, verbose=None): - assert value >= 0, "Semaphore initial value must be >= 0" - _Verbose.__init__(self, verbose) - self.__cond = Condition(Lock()) - self.__value = value - - def acquire(self, blocking=1): - rc = False - self.__cond.acquire() - while self.__value == 0: - if not blocking: - break - if __debug__: - self._note("%s.acquire(%s): blocked waiting, value=%s", - self, blocking, self.__value) - self.__cond.wait() - else: - self.__value = self.__value - 1 - if __debug__: - self._note("%s.acquire: success, value=%s", - self, self.__value) - rc = True - self.__cond.release() - return rc - - def release(self): - self.__cond.acquire() - self.__value = self.__value + 1 - if __debug__: - self._note("%s.release: success, value=%s", - self, self.__value) - self.__cond.notify() - self.__cond.release() - - -def BoundedSemaphore(*args, **kwargs): - return _BoundedSemaphore(*args, **kwargs) - -class _BoundedSemaphore(_Semaphore): - """Semaphore that checks that # releases is <= # acquires""" - def __init__(self, value=1, verbose=None): - _Semaphore.__init__(self, value, verbose) - self._initial_value = value - - def release(self): - if self._Semaphore__value >= self._initial_value: - raise ValueError("Semaphore released too many times") - return _Semaphore.release(self) - - -def Event(*args, **kwargs): - return _Event(*args, **kwargs) - -class _Event(_Verbose): - - # After Tim Peters' event class (without is_posted()) - - def __init__(self, verbose=None): - _Verbose.__init__(self, verbose) - self.__cond = Condition(Lock()) - self.__flag = False - - def isSet(self): - return self.__flag - - def set(self): - self.__cond.acquire() - try: - self.__flag = True - self.__cond.notifyAll() - finally: - self.__cond.release() - - def clear(self): - self.__cond.acquire() - try: - self.__flag = False - finally: - self.__cond.release() - - def wait(self, timeout=None): - self.__cond.acquire() - try: - if not self.__flag: - self.__cond.wait(timeout) - finally: - self.__cond.release() - -# Helper to generate new thread names -_counter = 0 -def _newname(template="Thread-%d"): - global _counter - _counter = _counter + 1 - return template % _counter - -# Active thread administration -_active_limbo_lock = _allocate_lock() -_active = {} -_limbo = {} - - -# Main class for threads - -class Thread(_Verbose): - - __initialized = False - - def __init__(self, group=None, target=None, name=None, - args=(), kwargs={}, verbose=None): - assert group is None, "group argument must be None for now" - _Verbose.__init__(self, verbose) - self.__target = target - self.__name = str(name or _newname()) - self.__args = args - self.__kwargs = kwargs - self.__daemonic = self._set_daemon() - self.__started = False - self.__stopped = False - self.__block = Condition(Lock()) - self.__initialized = True - - def _set_daemon(self): - # Overridden in _MainThread and _DummyThread - return currentThread().isDaemon() - - def __repr__(self): - assert self.__initialized, "Thread.__init__() was not called" - status = "initial" - if self.__started: - status = "started" - if self.__stopped: - status = "stopped" - if self.__daemonic: - status = status + " daemon" - return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status) - - def start(self): - assert self.__initialized, "Thread.__init__() not called" - assert not self.__started, "thread already started" - if __debug__: - self._note("%s.start(): starting thread", self) - _active_limbo_lock.acquire() - _limbo[self] = self - _active_limbo_lock.release() - _start_new_thread(self.__bootstrap, ()) - self.__started = True - _sleep(0.000001) # 1 usec, to let the thread run (Solaris hack) - - def run(self): - if self.__target: - self.__target(*self.__args, **self.__kwargs) - - def __bootstrap(self): - try: - self.__started = True - _active_limbo_lock.acquire() - _active[_get_ident()] = self - del _limbo[self] - _active_limbo_lock.release() - if __debug__: - self._note("%s.__bootstrap(): thread started", self) - - if _trace_hook: - self._note("%s.__bootstrap(): registering trace hook", self) - _sys.settrace(_trace_hook) - if _profile_hook: - self._note("%s.__bootstrap(): registering profile hook", self) - _sys.setprofile(_profile_hook) - - try: - self.run() - except SystemExit: - if __debug__: - self._note("%s.__bootstrap(): raised SystemExit", self) - except: - if __debug__: - self._note("%s.__bootstrap(): unhandled exception", self) - s = _StringIO() - _print_exc(file=s) - _sys.stderr.write("Exception in thread %s:\n%s\n" % - (self.getName(), s.getvalue())) - else: - if __debug__: - self._note("%s.__bootstrap(): normal return", self) - finally: - self.__stop() - try: - self.__delete() - except: - pass - - def __stop(self): - self.__block.acquire() - self.__stopped = True - self.__block.notifyAll() - self.__block.release() - - def __delete(self): - _active_limbo_lock.acquire() - del _active[_get_ident()] - _active_limbo_lock.release() - - def join(self, timeout=None): - assert self.__initialized, "Thread.__init__() not called" - assert self.__started, "cannot join thread before it is started" - assert self is not currentThread(), "cannot join current thread" - if __debug__: - if not self.__stopped: - self._note("%s.join(): waiting until thread stops", self) - self.__block.acquire() - try: - if timeout is None: - while not self.__stopped: - self.__block.wait() - if __debug__: - self._note("%s.join(): thread stopped", self) - else: - deadline = _time() + timeout - while not self.__stopped: - delay = deadline - _time() - if delay <= 0: - if __debug__: - self._note("%s.join(): timed out", self) - break - self.__block.wait(delay) - else: - if __debug__: - self._note("%s.join(): thread stopped", self) - finally: - self.__block.release() - - def getName(self): - assert self.__initialized, "Thread.__init__() not called" - return self.__name - - def setName(self, name): - assert self.__initialized, "Thread.__init__() not called" - self.__name = str(name) - - def isAlive(self): - assert self.__initialized, "Thread.__init__() not called" - return self.__started and not self.__stopped - - def isDaemon(self): - assert self.__initialized, "Thread.__init__() not called" - return self.__daemonic - - def setDaemon(self, daemonic): - assert self.__initialized, "Thread.__init__() not called" - assert not self.__started, "cannot set daemon status of active thread" - self.__daemonic = daemonic - -# The timer class was contributed by Itamar Shtull-Trauring - -def Timer(*args, **kwargs): - return _Timer(*args, **kwargs) - -class _Timer(Thread): - """Call a function after a specified number of seconds: - - t = Timer(30.0, f, args=[], kwargs={}) - t.start() - t.cancel() # stop the timer's action if it's still waiting - """ - - def __init__(self, interval, function, args=[], kwargs={}): - Thread.__init__(self) - self.interval = interval - self.function = function - self.args = args - self.kwargs = kwargs - self.finished = Event() - - def cancel(self): - """Stop the timer if it hasn't finished yet""" - self.finished.set() - - def run(self): - self.finished.wait(self.interval) - if not self.finished.isSet(): - self.function(*self.args, **self.kwargs) - self.finished.set() - -# Special thread class to represent the main thread -# This is garbage collected through an exit handler - -class _MainThread(Thread): - - def __init__(self): - Thread.__init__(self, name="MainThread") - self._Thread__started = True - _active_limbo_lock.acquire() - _active[_get_ident()] = self - _active_limbo_lock.release() - import atexit - atexit.register(self.__exitfunc) - - def _set_daemon(self): - return False - - def __exitfunc(self): - self._Thread__stop() - t = _pickSomeNonDaemonThread() - if t: - if __debug__: - self._note("%s: waiting for other threads", self) - while t: - t.join() - t = _pickSomeNonDaemonThread() - if __debug__: - self._note("%s: exiting", self) - self._Thread__delete() - -def _pickSomeNonDaemonThread(): - for t in enumerate(): - if not t.isDaemon() and t.isAlive(): - return t - return None - - -# Dummy thread class to represent threads not started here. -# These aren't garbage collected when they die, -# nor can they be waited for. -# Their purpose is to return *something* from currentThread(). -# They are marked as daemon threads so we won't wait for them -# when we exit (conform previous semantics). - -class _DummyThread(Thread): - - def __init__(self): - Thread.__init__(self, name=_newname("Dummy-%d")) - self._Thread__started = True - _active_limbo_lock.acquire() - _active[_get_ident()] = self - _active_limbo_lock.release() - - def _set_daemon(self): - return True - - def join(self, timeout=None): - assert False, "cannot join a dummy thread" - - -# Global API functions - -def currentThread(): - try: - return _active[_get_ident()] - except KeyError: - ##print "currentThread(): no current thread for", _get_ident() - return _DummyThread() - -def activeCount(): - _active_limbo_lock.acquire() - count = len(_active) + len(_limbo) - _active_limbo_lock.release() - return count - -def enumerate(): - _active_limbo_lock.acquire() - active = list(_active.values()) + list(_limbo.values()) - _active_limbo_lock.release() - return active - -# Create the main thread object - -_MainThread() - - -# Self-test code - -def _test(): - - class BoundedQueue(_Verbose): - - def __init__(self, limit): - _Verbose.__init__(self) - self.mon = RLock() - self.rc = Condition(self.mon) - self.wc = Condition(self.mon) - self.limit = limit - self.queue = [] - - def put(self, item): - self.mon.acquire() - while len(self.queue) >= self.limit: - self._note("put(%s): queue full", item) - self.wc.wait() - self.queue.append(item) - self._note("put(%s): appended, length now %d", - item, len(self.queue)) - self.rc.notify() - self.mon.release() - - def get(self): - self.mon.acquire() - while not self.queue: - self._note("get(): queue empty") - self.rc.wait() - item = self.queue.pop(0) - self._note("get(): got %s, %d left", item, len(self.queue)) - self.wc.notify() - self.mon.release() - return item - - class ProducerThread(Thread): - - def __init__(self, queue, quota): - Thread.__init__(self, name="Producer") - self.queue = queue - self.quota = quota - - def run(self): - from random import random - counter = 0 - while counter < self.quota: - counter = counter + 1 - self.queue.put("%s.%d" % (self.getName(), counter)) - _sleep(random() * 0.00001) - - - class ConsumerThread(Thread): - - def __init__(self, queue, count): - Thread.__init__(self, name="Consumer") - self.queue = queue - self.count = count - - def run(self): - while self.count > 0: - item = self.queue.get() - print(item) - self.count = self.count - 1 - - NP = 3 - QL = 4 - NI = 5 - - Q = BoundedQueue(QL) - P = [] - for i in range(NP): - t = ProducerThread(Q, NI) - t.setName("Producer-%d" % (i+1)) - P.append(t) - C = ConsumerThread(Q, NI*NP) - for t in P: - t.start() - _sleep(0.000001) - C.start() - for t in P: - t.join() - C.join() - -if __name__ == '__main__': - _test() diff --git a/gnuradio-runtime/python/gnuradio/gr/gr_threading_24.py b/gnuradio-runtime/python/gnuradio/gr/gr_threading_24.py deleted file mode 100644 index 6dddce7e22..0000000000 --- a/gnuradio-runtime/python/gnuradio/gr/gr_threading_24.py +++ /dev/null @@ -1,797 +0,0 @@ -"""Thread module emulating a subset of Java's threading model.""" - -# This started life as the threading.py module of Python 2.4 -# It's been patched to fix a problem with join, where a KeyboardInterrupt -# caused a lock to be left in the acquired state. - -from __future__ import print_function -from __future__ import unicode_literals - -import sys as _sys - -try: - import _thread -except ImportError: - del _sys.modules[__name__] - raise - -from time import time as _time, sleep as _sleep -from traceback import format_exc as _format_exc -from collections import deque - -# Rename some stuff so "from threading import *" is safe -__all__ = ['activeCount', 'Condition', 'currentThread', 'enumerate', 'Event', - 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', - 'Timer', 'setprofile', 'settrace', 'local'] - -_start_new_thread = _thread.start_new_thread -_allocate_lock = _thread.allocate_lock -_get_ident = _thread.get_ident -ThreadError = _thread.error -del _thread - - -# Debug support (adapted from ihooks.py). -# All the major classes here derive from _Verbose. We force that to -# be a new-style class so that all the major classes here are new-style. -# This helps debugging (type(instance) is more revealing for instances -# of new-style classes). - -_VERBOSE = False - -if __debug__: - - class _Verbose(object): - - def __init__(self, verbose=None): - if verbose is None: - verbose = _VERBOSE - self.__verbose = verbose - - def _note(self, format, *args): - if self.__verbose: - format = format % args - format = "%s: %s\n" % ( - currentThread().getName(), format) - _sys.stderr.write(format) - -else: - # Disable this when using "python -O" - class _Verbose(object): - def __init__(self, verbose=None): - pass - def _note(self, *args): - pass - -# Support for profile and trace hooks - -_profile_hook = None -_trace_hook = None - -def setprofile(func): - global _profile_hook - _profile_hook = func - -def settrace(func): - global _trace_hook - _trace_hook = func - -# Synchronization classes - -Lock = _allocate_lock - -def RLock(*args, **kwargs): - return _RLock(*args, **kwargs) - -class _RLock(_Verbose): - - def __init__(self, verbose=None): - _Verbose.__init__(self, verbose) - self.__block = _allocate_lock() - self.__owner = None - self.__count = 0 - - def __repr__(self): - return "<%s(%s, %d)>" % ( - self.__class__.__name__, - self.__owner and self.__owner.getName(), - self.__count) - - def acquire(self, blocking=1): - me = currentThread() - if self.__owner is me: - self.__count = self.__count + 1 - if __debug__: - self._note("%s.acquire(%s): recursive success", self, blocking) - return 1 - rc = self.__block.acquire(blocking) - if rc: - self.__owner = me - self.__count = 1 - if __debug__: - self._note("%s.acquire(%s): initial success", self, blocking) - else: - if __debug__: - self._note("%s.acquire(%s): failure", self, blocking) - return rc - - def release(self): - me = currentThread() - assert self.__owner is me, "release() of un-acquire()d lock" - self.__count = count = self.__count - 1 - if not count: - self.__owner = None - self.__block.release() - if __debug__: - self._note("%s.release(): final release", self) - else: - if __debug__: - self._note("%s.release(): non-final release", self) - - # Internal methods used by condition variables - - def _acquire_restore(self, lock): - self.__block.acquire() - count, owner = lock - self.__count = count - self.__owner = owner - if __debug__: - self._note("%s._acquire_restore()", self) - - def _release_save(self): - if __debug__: - self._note("%s._release_save()", self) - count = self.__count - self.__count = 0 - owner = self.__owner - self.__owner = None - self.__block.release() - return (count, owner) - - def _is_owned(self): - return self.__owner is currentThread() - - -def Condition(*args, **kwargs): - return _Condition(*args, **kwargs) - -class _Condition(_Verbose): - - def __init__(self, lock=None, verbose=None): - _Verbose.__init__(self, verbose) - if lock is None: - lock = RLock() - self.__lock = lock - # Export the lock's acquire() and release() methods - self.acquire = lock.acquire - self.release = lock.release - # If the lock defines _release_save() and/or _acquire_restore(), - # these override the default implementations (which just call - # release() and acquire() on the lock). Ditto for _is_owned(). - try: - self._release_save = lock._release_save - except AttributeError: - pass - try: - self._acquire_restore = lock._acquire_restore - except AttributeError: - pass - try: - self._is_owned = lock._is_owned - except AttributeError: - pass - self.__waiters = [] - - def __repr__(self): - return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters)) - - def _release_save(self): - self.__lock.release() # No state to save - - def _acquire_restore(self, x): - self.__lock.acquire() # Ignore saved state - - def _is_owned(self): - # Return True if lock is owned by currentThread. - # This method is called only if __lock doesn't have _is_owned(). - if self.__lock.acquire(0): - self.__lock.release() - return False - else: - return True - - def wait(self, timeout=None): - assert self._is_owned(), "wait() of un-acquire()d lock" - waiter = _allocate_lock() - waiter.acquire() - self.__waiters.append(waiter) - saved_state = self._release_save() - try: # restore state no matter what (e.g., KeyboardInterrupt) - if timeout is None: - waiter.acquire() - if __debug__: - self._note("%s.wait(): got it", self) - else: - # Balancing act: We can't afford a pure busy loop, so we - # have to sleep; but if we sleep the whole timeout time, - # we'll be unresponsive. The scheme here sleeps very - # little at first, longer as time goes on, but never longer - # than 20 times per second (or the timeout time remaining). - endtime = _time() + timeout - delay = 0.0005 # 500 us -> initial delay of 1 ms - while True: - gotit = waiter.acquire(0) - if gotit: - break - remaining = endtime - _time() - if remaining <= 0: - break - delay = min(delay * 2, remaining, .05) - _sleep(delay) - if not gotit: - if __debug__: - self._note("%s.wait(%s): timed out", self, timeout) - try: - self.__waiters.remove(waiter) - except ValueError: - pass - else: - if __debug__: - self._note("%s.wait(%s): got it", self, timeout) - finally: - self._acquire_restore(saved_state) - - def notify(self, n=1): - assert self._is_owned(), "notify() of un-acquire()d lock" - __waiters = self.__waiters - waiters = __waiters[:n] - if not waiters: - if __debug__: - self._note("%s.notify(): no waiters", self) - return - self._note("%s.notify(): notifying %d waiter%s", self, n, - n!=1 and "s" or "") - for waiter in waiters: - waiter.release() - try: - __waiters.remove(waiter) - except ValueError: - pass - - def notifyAll(self): - self.notify(len(self.__waiters)) - - -def Semaphore(*args, **kwargs): - return _Semaphore(*args, **kwargs) - -class _Semaphore(_Verbose): - - # After Tim Peters' semaphore class, but not quite the same (no maximum) - - def __init__(self, value=1, verbose=None): - assert value >= 0, "Semaphore initial value must be >= 0" - _Verbose.__init__(self, verbose) - self.__cond = Condition(Lock()) - self.__value = value - - def acquire(self, blocking=1): - rc = False - self.__cond.acquire() - while self.__value == 0: - if not blocking: - break - if __debug__: - self._note("%s.acquire(%s): blocked waiting, value=%s", - self, blocking, self.__value) - self.__cond.wait() - else: - self.__value = self.__value - 1 - if __debug__: - self._note("%s.acquire: success, value=%s", - self, self.__value) - rc = True - self.__cond.release() - return rc - - def release(self): - self.__cond.acquire() - self.__value = self.__value + 1 - if __debug__: - self._note("%s.release: success, value=%s", - self, self.__value) - self.__cond.notify() - self.__cond.release() - - -def BoundedSemaphore(*args, **kwargs): - return _BoundedSemaphore(*args, **kwargs) - -class _BoundedSemaphore(_Semaphore): - """Semaphore that checks that # releases is <= # acquires""" - def __init__(self, value=1, verbose=None): - _Semaphore.__init__(self, value, verbose) - self._initial_value = value - - def release(self): - if self._Semaphore__value >= self._initial_value: - raise ValueError("Semaphore released too many times") - return _Semaphore.release(self) - - -def Event(*args, **kwargs): - return _Event(*args, **kwargs) - -class _Event(_Verbose): - - # After Tim Peters' event class (without is_posted()) - - def __init__(self, verbose=None): - _Verbose.__init__(self, verbose) - self.__cond = Condition(Lock()) - self.__flag = False - - def isSet(self): - return self.__flag - - def set(self): - self.__cond.acquire() - try: - self.__flag = True - self.__cond.notifyAll() - finally: - self.__cond.release() - - def clear(self): - self.__cond.acquire() - try: - self.__flag = False - finally: - self.__cond.release() - - def wait(self, timeout=None): - self.__cond.acquire() - try: - if not self.__flag: - self.__cond.wait(timeout) - finally: - self.__cond.release() - -# Helper to generate new thread names -_counter = 0 -def _newname(template="Thread-%d"): - global _counter - _counter = _counter + 1 - return template % _counter - -# Active thread administration -_active_limbo_lock = _allocate_lock() -_active = {} -_limbo = {} - - -# Main class for threads - -class Thread(_Verbose): - - __initialized = False - # Need to store a reference to sys.exc_info for printing - # out exceptions when a thread tries to use a global var. during interp. - # shutdown and thus raises an exception about trying to perform some - # operation on/with a NoneType - __exc_info = _sys.exc_info - - def __init__(self, group=None, target=None, name=None, - args=(), kwargs={}, verbose=None): - assert group is None, "group argument must be None for now" - _Verbose.__init__(self, verbose) - self.__target = target - self.__name = str(name or _newname()) - self.__args = args - self.__kwargs = kwargs - self.__daemonic = self._set_daemon() - self.__started = False - self.__stopped = False - self.__block = Condition(Lock()) - self.__initialized = True - # sys.stderr is not stored in the class like - # sys.exc_info since it can be changed between instances - self.__stderr = _sys.stderr - - def _set_daemon(self): - # Overridden in _MainThread and _DummyThread - return currentThread().isDaemon() - - def __repr__(self): - assert self.__initialized, "Thread.__init__() was not called" - status = "initial" - if self.__started: - status = "started" - if self.__stopped: - status = "stopped" - if self.__daemonic: - status = status + " daemon" - return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status) - - def start(self): - assert self.__initialized, "Thread.__init__() not called" - assert not self.__started, "thread already started" - if __debug__: - self._note("%s.start(): starting thread", self) - _active_limbo_lock.acquire() - _limbo[self] = self - _active_limbo_lock.release() - _start_new_thread(self.__bootstrap, ()) - self.__started = True - _sleep(0.000001) # 1 usec, to let the thread run (Solaris hack) - - def run(self): - if self.__target: - self.__target(*self.__args, **self.__kwargs) - - def __bootstrap(self): - try: - self.__started = True - _active_limbo_lock.acquire() - _active[_get_ident()] = self - del _limbo[self] - _active_limbo_lock.release() - if __debug__: - self._note("%s.__bootstrap(): thread started", self) - - if _trace_hook: - self._note("%s.__bootstrap(): registering trace hook", self) - _sys.settrace(_trace_hook) - if _profile_hook: - self._note("%s.__bootstrap(): registering profile hook", self) - _sys.setprofile(_profile_hook) - - try: - self.run() - except SystemExit: - if __debug__: - self._note("%s.__bootstrap(): raised SystemExit", self) - except: - if __debug__: - self._note("%s.__bootstrap(): unhandled exception", self) - # If sys.stderr is no more (most likely from interpreter - # shutdown) use self.__stderr. Otherwise still use sys (as in - # _sys) in case sys.stderr was redefined since the creation of - # self. - if _sys: - _sys.stderr.write("Exception in thread %s:\n%s\n" % - (self.getName(), _format_exc())) - else: - # Do the best job possible w/o a huge amt. of code to - # approximate a traceback (code ideas from - # Lib/traceback.py) - exc_type, exc_value, exc_tb = self.__exc_info() - try: - print>>self.__stderr, ( - "Exception in thread " + self.getName() + - " (most likely raised during interpreter shutdown):") - print>>self.__stderr, ( - "Traceback (most recent call last):") - while exc_tb: - print>>self.__stderr, ( - ' File "%s", line %s, in %s' % - (exc_tb.tb_frame.f_code.co_filename, - exc_tb.tb_lineno, - exc_tb.tb_frame.f_code.co_name)) - exc_tb = exc_tb.tb_next - print>>self.__stderr, ("%s: %s" % (exc_type, exc_value)) - # Make sure that exc_tb gets deleted since it is a memory - # hog; deleting everything else is just for thoroughness - finally: - del exc_type, exc_value, exc_tb - else: - if __debug__: - self._note("%s.__bootstrap(): normal return", self) - finally: - self.__stop() - try: - self.__delete() - except: - pass - - def __stop(self): - self.__block.acquire() - self.__stopped = True - self.__block.notifyAll() - self.__block.release() - - def __delete(self): - "Remove current thread from the dict of currently running threads." - - # Notes about running with dummy_thread: - # - # Must take care to not raise an exception if dummy_thread is being - # used (and thus this module is being used as an instance of - # dummy_threading). dummy_thread.get_ident() always returns -1 since - # there is only one thread if dummy_thread is being used. Thus - # len(_active) is always <= 1 here, and any Thread instance created - # overwrites the (if any) thread currently registered in _active. - # - # An instance of _MainThread is always created by 'threading'. This - # gets overwritten the instant an instance of Thread is created; both - # threads return -1 from dummy_thread.get_ident() and thus have the - # same key in the dict. So when the _MainThread instance created by - # 'threading' tries to clean itself up when atexit calls this method - # it gets a KeyError if another Thread instance was created. - # - # This all means that KeyError from trying to delete something from - # _active if dummy_threading is being used is a red herring. But - # since it isn't if dummy_threading is *not* being used then don't - # hide the exception. - - _active_limbo_lock.acquire() - try: - try: - del _active[_get_ident()] - except KeyError: - if 'dummy_threading' not in _sys.modules: - raise - finally: - _active_limbo_lock.release() - - def join(self, timeout=None): - assert self.__initialized, "Thread.__init__() not called" - assert self.__started, "cannot join thread before it is started" - assert self is not currentThread(), "cannot join current thread" - if __debug__: - if not self.__stopped: - self._note("%s.join(): waiting until thread stops", self) - self.__block.acquire() - try: - if timeout is None: - while not self.__stopped: - self.__block.wait() - if __debug__: - self._note("%s.join(): thread stopped", self) - else: - deadline = _time() + timeout - while not self.__stopped: - delay = deadline - _time() - if delay <= 0: - if __debug__: - self._note("%s.join(): timed out", self) - break - self.__block.wait(delay) - else: - if __debug__: - self._note("%s.join(): thread stopped", self) - finally: - self.__block.release() - - def getName(self): - assert self.__initialized, "Thread.__init__() not called" - return self.__name - - def setName(self, name): - assert self.__initialized, "Thread.__init__() not called" - self.__name = str(name) - - def isAlive(self): - assert self.__initialized, "Thread.__init__() not called" - return self.__started and not self.__stopped - - def isDaemon(self): - assert self.__initialized, "Thread.__init__() not called" - return self.__daemonic - - def setDaemon(self, daemonic): - assert self.__initialized, "Thread.__init__() not called" - assert not self.__started, "cannot set daemon status of active thread" - self.__daemonic = daemonic - -# The timer class was contributed by Itamar Shtull-Trauring - -def Timer(*args, **kwargs): - return _Timer(*args, **kwargs) - -class _Timer(Thread): - """Call a function after a specified number of seconds: - - t = Timer(30.0, f, args=[], kwargs={}) - t.start() - t.cancel() # stop the timer's action if it's still waiting - """ - - def __init__(self, interval, function, args=[], kwargs={}): - Thread.__init__(self) - self.interval = interval - self.function = function - self.args = args - self.kwargs = kwargs - self.finished = Event() - - def cancel(self): - """Stop the timer if it hasn't finished yet""" - self.finished.set() - - def run(self): - self.finished.wait(self.interval) - if not self.finished.isSet(): - self.function(*self.args, **self.kwargs) - self.finished.set() - -# Special thread class to represent the main thread -# This is garbage collected through an exit handler - -class _MainThread(Thread): - - def __init__(self): - Thread.__init__(self, name="MainThread") - self._Thread__started = True - _active_limbo_lock.acquire() - _active[_get_ident()] = self - _active_limbo_lock.release() - import atexit - atexit.register(self.__exitfunc) - - def _set_daemon(self): - return False - - def __exitfunc(self): - self._Thread__stop() - t = _pickSomeNonDaemonThread() - if t: - if __debug__: - self._note("%s: waiting for other threads", self) - while t: - t.join() - t = _pickSomeNonDaemonThread() - if __debug__: - self._note("%s: exiting", self) - self._Thread__delete() - -def _pickSomeNonDaemonThread(): - for t in enumerate(): - if not t.isDaemon() and t.isAlive(): - return t - return None - - -# Dummy thread class to represent threads not started here. -# These aren't garbage collected when they die, -# nor can they be waited for. -# Their purpose is to return *something* from currentThread(). -# They are marked as daemon threads so we won't wait for them -# when we exit (conform previous semantics). - -class _DummyThread(Thread): - - def __init__(self): - Thread.__init__(self, name=_newname("Dummy-%d")) - self._Thread__started = True - _active_limbo_lock.acquire() - _active[_get_ident()] = self - _active_limbo_lock.release() - - def _set_daemon(self): - return True - - def join(self, timeout=None): - assert False, "cannot join a dummy thread" - - -# Global API functions - -def currentThread(): - try: - return _active[_get_ident()] - except KeyError: - ##print "currentThread(): no current thread for", _get_ident() - return _DummyThread() - -def activeCount(): - _active_limbo_lock.acquire() - count = len(_active) + len(_limbo) - _active_limbo_lock.release() - return count - -def enumerate(): - _active_limbo_lock.acquire() - active = list(_active.values()) + list(_limbo.values()) - _active_limbo_lock.release() - return active - -# Create the main thread object - -_MainThread() - -# get thread-local implementation, either from the thread -# module, or from the python fallback - -try: - from _thread import _local as local -except ImportError: - from _threading_local import local - - -# Self-test code - -def _test(): - - class BoundedQueue(_Verbose): - - def __init__(self, limit): - _Verbose.__init__(self) - self.mon = RLock() - self.rc = Condition(self.mon) - self.wc = Condition(self.mon) - self.limit = limit - self.queue = deque() - - def put(self, item): - self.mon.acquire() - while len(self.queue) >= self.limit: - self._note("put(%s): queue full", item) - self.wc.wait() - self.queue.append(item) - self._note("put(%s): appended, length now %d", - item, len(self.queue)) - self.rc.notify() - self.mon.release() - - def get(self): - self.mon.acquire() - while not self.queue: - self._note("get(): queue empty") - self.rc.wait() - item = self.queue.popleft() - self._note("get(): got %s, %d left", item, len(self.queue)) - self.wc.notify() - self.mon.release() - return item - - class ProducerThread(Thread): - - def __init__(self, queue, quota): - Thread.__init__(self, name="Producer") - self.queue = queue - self.quota = quota - - def run(self): - from random import random - counter = 0 - while counter < self.quota: - counter = counter + 1 - self.queue.put("%s.%d" % (self.getName(), counter)) - _sleep(random() * 0.00001) - - - class ConsumerThread(Thread): - - def __init__(self, queue, count): - Thread.__init__(self, name="Consumer") - self.queue = queue - self.count = count - - def run(self): - while self.count > 0: - item = self.queue.get() - print(item) - self.count = self.count - 1 - - NP = 3 - QL = 4 - NI = 5 - - Q = BoundedQueue(QL) - P = [] - for i in range(NP): - t = ProducerThread(Q, NI) - t.setName("Producer-%d" % (i+1)) - P.append(t) - C = ConsumerThread(Q, NI*NP) - for t in P: - t.start() - _sleep(0.000001) - C.start() - for t in P: - t.join() - C.join() - -if __name__ == '__main__': - _test() diff --git a/gnuradio-runtime/python/gnuradio/gr/top_block.py b/gnuradio-runtime/python/gnuradio/gr/top_block.py index e7608bf5e9..afa9c3b986 100644 --- a/gnuradio-runtime/python/gnuradio/gr/top_block.py +++ b/gnuradio-runtime/python/gnuradio/gr/top_block.py @@ -27,12 +27,11 @@ from .runtime_swig import (top_block_swig, top_block_start_unlocked, top_block_stop_unlocked, top_block_unlock_unlocked, dot_graph_tb) -#import gnuradio.gr.gr_threading as _threading -from . import gr_threading as _threading +import threading from .hier_block2 import hier_block2 -class _top_block_waiter(_threading.Thread): +class _top_block_waiter(threading.Thread): """ This kludge allows ^C to interrupt top_block.run and top_block.wait @@ -56,10 +55,10 @@ class _top_block_waiter(_threading.Thread): the interruptable wait. """ def __init__(self, tb): - _threading.Thread.__init__(self) + threading.Thread.__init__(self) self.setDaemon(1) self.tb = tb - self.event = _threading.Event() + self.event = threading.Event() self.start() def run(self): diff --git a/gnuradio-runtime/python/gnuradio/gru/msgq_runner.py b/gnuradio-runtime/python/gnuradio/gru/msgq_runner.py index 2d58480f5f..4bcc06c60a 100644 --- a/gnuradio-runtime/python/gnuradio/gru/msgq_runner.py +++ b/gnuradio-runtime/python/gnuradio/gru/msgq_runner.py @@ -43,12 +43,12 @@ To determine if the runner has exited, call exited() on the object. from __future__ import unicode_literals from gnuradio import gr -import gnuradio.gr.gr_threading as _threading +import threading -class msgq_runner(_threading.Thread): +class msgq_runner(threading.Thread): def __init__(self, msgq, callback, exit_on_error=False): - _threading.Thread.__init__(self) + threading.Thread.__init__(self) self._msgq = msgq self._callback = callback |