diff options
Diffstat (limited to 'gnuradio-runtime/python/gnuradio/gr')
-rw-r--r-- | gnuradio-runtime/python/gnuradio/gr/CMakeLists.txt | 46 | ||||
-rw-r--r-- | gnuradio-runtime/python/gnuradio/gr/__init__.py | 39 | ||||
-rw-r--r-- | gnuradio-runtime/python/gnuradio/gr/exceptions.py | 27 | ||||
-rw-r--r-- | gnuradio-runtime/python/gnuradio/gr/gateway.py | 243 | ||||
-rw-r--r-- | gnuradio-runtime/python/gnuradio/gr/gr_threading.py | 35 | ||||
-rw-r--r-- | gnuradio-runtime/python/gnuradio/gr/gr_threading_23.py | 724 | ||||
-rw-r--r-- | gnuradio-runtime/python/gnuradio/gr/gr_threading_24.py | 793 | ||||
-rw-r--r-- | gnuradio-runtime/python/gnuradio/gr/hier_block2.py | 132 | ||||
-rw-r--r-- | gnuradio-runtime/python/gnuradio/gr/prefs.py | 127 | ||||
-rw-r--r-- | gnuradio-runtime/python/gnuradio/gr/pubsub.py | 153 | ||||
-rwxr-xr-x | gnuradio-runtime/python/gnuradio/gr/qa_feval.py | 110 | ||||
-rwxr-xr-x | gnuradio-runtime/python/gnuradio/gr/qa_kludged_imports.py | 39 | ||||
-rwxr-xr-x | gnuradio-runtime/python/gnuradio/gr/qa_tag_utils.py | 55 | ||||
-rw-r--r-- | gnuradio-runtime/python/gnuradio/gr/tag_utils.py | 57 | ||||
-rw-r--r-- | gnuradio-runtime/python/gnuradio/gr/top_block.py | 170 |
15 files changed, 2750 insertions, 0 deletions
diff --git a/gnuradio-runtime/python/gnuradio/gr/CMakeLists.txt b/gnuradio-runtime/python/gnuradio/gr/CMakeLists.txt new file mode 100644 index 0000000000..cd57704930 --- /dev/null +++ b/gnuradio-runtime/python/gnuradio/gr/CMakeLists.txt @@ -0,0 +1,46 @@ +# Copyright 2012 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. + +######################################################################## +include(GrPython) + +GR_PYTHON_INSTALL(FILES + __init__.py + gateway.py + gr_threading.py + gr_threading_23.py + gr_threading_24.py + hier_block2.py + tag_utils.py + top_block.py + DESTINATION ${GR_PYTHON_DIR}/gnuradio/gr + COMPONENT "runtime_python" +) + +######################################################################## +# Handle the unit tests +######################################################################## +if(ENABLE_TESTING) +include(GrTest) +file(GLOB py_qa_test_files "qa_*.py") +foreach(py_qa_test_file ${py_qa_test_files}) + get_filename_component(py_qa_test_name ${py_qa_test_file} NAME_WE) + GR_ADD_TEST(${py_qa_test_name} ${PYTHON_EXECUTABLE} ${PYTHON_DASH_B} ${py_qa_test_file}) +endforeach(py_qa_test_file) +endif(ENABLE_TESTING) diff --git a/gnuradio-runtime/python/gnuradio/gr/__init__.py b/gnuradio-runtime/python/gnuradio/gr/__init__.py new file mode 100644 index 0000000000..c1d6c87629 --- /dev/null +++ b/gnuradio-runtime/python/gnuradio/gr/__init__.py @@ -0,0 +1,39 @@ +# +# Copyright 2003-2012 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +# The presence of this file turns this directory into a Python package + +""" +Core contents. +""" + +# This is the main GNU Radio python module. +# We pull the swig output and the other modules into the gnuradio.gr namespace + +from runtime_swig import * +from exceptions import * +from top_block import * +from hier_block2 import * +from tag_utils import * +from gateway import basic_block, sync_block, decim_block, interp_block + +# Force the preference database to be initialized +prefs = gr_prefs.singleton diff --git a/gnuradio-runtime/python/gnuradio/gr/exceptions.py b/gnuradio-runtime/python/gnuradio/gr/exceptions.py new file mode 100644 index 0000000000..dba04750bc --- /dev/null +++ b/gnuradio-runtime/python/gnuradio/gr/exceptions.py @@ -0,0 +1,27 @@ +# +# Copyright 2004 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. + +class NotDAG (Exception): + """Not a directed acyclic graph""" + pass + +class CantHappen (Exception): + """Can't happen""" + pass diff --git a/gnuradio-runtime/python/gnuradio/gr/gateway.py b/gnuradio-runtime/python/gnuradio/gr/gateway.py new file mode 100644 index 0000000000..b595959494 --- /dev/null +++ b/gnuradio-runtime/python/gnuradio/gr/gateway.py @@ -0,0 +1,243 @@ +# +# Copyright 2011-2012 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +import runtime_swig as gr +from runtime_swig import io_signature, io_signaturev +from runtime_swig import gr_block_gw_message_type +from runtime_swig import block_gateway +import numpy + +######################################################################## +# Magic to turn pointers into numpy arrays +# http://docs.scipy.org/doc/numpy/reference/arrays.interface.html +######################################################################## +def pointer_to_ndarray(addr, dtype, nitems): + class array_like: + __array_interface__ = { + 'data' : (int(addr), False), + 'typestr' : dtype.base.str, + 'descr' : dtype.base.descr, + 'shape' : (nitems,) + dtype.shape, + 'strides' : None, + 'version' : 3 + } + return numpy.asarray(array_like()).view(dtype.base) + +######################################################################## +# Handler that does callbacks from C++ +######################################################################## +class gateway_handler(gr.feval_ll): + + #dont put a constructor, it wont work + + def init(self, callback): + self._callback = callback + + def eval(self, arg): + try: self._callback() + except Exception as ex: + print("handler caught exception: %s"%ex) + import traceback; traceback.print_exc() + raise ex + return 0 + +######################################################################## +# Handler that does callbacks from C++ +######################################################################## +class msg_handler(gr.feval_p): + + #dont put a constructor, it wont work + + def init(self, callback): + self._callback = callback + + def eval(self, arg): + try: self._callback(arg) + except Exception as ex: + print("handler caught exception: %s"%ex) + import traceback; traceback.print_exc() + raise ex + return 0 + +######################################################################## +# The guts that make this into a gr block +######################################################################## +class gateway_block(object): + + def __init__(self, name, in_sig, out_sig, work_type, factor): + + #ensure that the sigs are iterable dtypes + def sig_to_dtype_sig(sig): + if sig is None: sig = () + return map(numpy.dtype, sig) + self.__in_sig = sig_to_dtype_sig(in_sig) + self.__out_sig = sig_to_dtype_sig(out_sig) + + #cache the ranges to iterate when dispatching work + self.__in_indexes = range(len(self.__in_sig)) + self.__out_indexes = range(len(self.__out_sig)) + + #convert the signatures into gr.io_signatures + def sig_to_gr_io_sigv(sig): + if not len(sig): return io_signature(0, 0, 0) + return io_signaturev(len(sig), len(sig), [s.itemsize for s in sig]) + gr_in_sig = sig_to_gr_io_sigv(self.__in_sig) + gr_out_sig = sig_to_gr_io_sigv(self.__out_sig) + + #create internal gateway block + self.__handler = gateway_handler() + self.__handler.init(self.__gr_block_handle) + self.__gateway = block_gateway( + self.__handler, name, gr_in_sig, gr_out_sig, work_type, factor) + self.__message = self.__gateway.gr_block_message() + + #dict to keep references to all message handlers + self.__msg_handlers = {} + + #register gr_block functions + prefix = 'gr_block__' + for attr in [x for x in dir(self.__gateway) if x.startswith(prefix)]: + setattr(self, attr.replace(prefix, ''), getattr(self.__gateway, attr)) + self.pop_msg_queue = lambda: gr.gr_block_gw_pop_msg_queue_safe(self.__gateway) + + def to_basic_block(self): + """ + Makes this block connectable by hier/top block python + """ + return self.__gateway.to_basic_block() + + def __gr_block_handle(self): + """ + Dispatch tasks according to the action type specified in the message. + """ + if self.__message.action == gr_block_gw_message_type.ACTION_GENERAL_WORK: + self.__message.general_work_args_return_value = self.general_work( + + input_items=[pointer_to_ndarray( + self.__message.general_work_args_input_items[i], + self.__in_sig[i], + self.__message.general_work_args_ninput_items[i] + ) for i in self.__in_indexes], + + output_items=[pointer_to_ndarray( + self.__message.general_work_args_output_items[i], + self.__out_sig[i], + self.__message.general_work_args_noutput_items + ) for i in self.__out_indexes], + ) + + elif self.__message.action == gr_block_gw_message_type.ACTION_WORK: + self.__message.work_args_return_value = self.work( + + input_items=[pointer_to_ndarray( + self.__message.work_args_input_items[i], + self.__in_sig[i], + self.__message.work_args_ninput_items + ) for i in self.__in_indexes], + + output_items=[pointer_to_ndarray( + self.__message.work_args_output_items[i], + self.__out_sig[i], + self.__message.work_args_noutput_items + ) for i in self.__out_indexes], + ) + + elif self.__message.action == gr_block_gw_message_type.ACTION_FORECAST: + self.forecast( + noutput_items=self.__message.forecast_args_noutput_items, + ninput_items_required=self.__message.forecast_args_ninput_items_required, + ) + + elif self.__message.action == gr_block_gw_message_type.ACTION_START: + self.__message.start_args_return_value = self.start() + + elif self.__message.action == gr_block_gw_message_type.ACTION_STOP: + self.__message.stop_args_return_value = self.stop() + + def forecast(self, noutput_items, ninput_items_required): + """ + forecast is only called from a general block + this is the default implementation + """ + for ninput_item in ninput_items_required: + ninput_item = noutput_items + self.history() - 1; + return + + def general_work(self, *args, **kwargs): + """general work to be overloaded in a derived class""" + raise NotImplementedError("general work not implemented") + + def work(self, *args, **kwargs): + """work to be overloaded in a derived class""" + raise NotImplementedError("work not implemented") + + def start(self): return True + def stop(self): return True + + def set_msg_handler(self, which_port, handler_func): + handler = msg_handler() + handler.init(handler_func) + self.__gateway.set_msg_handler_feval(which_port, handler) + # Save handler object in class so it's not garbage collected + self.__msg_handlers[which_port] = handler + +######################################################################## +# Wrappers for the user to inherit from +######################################################################## +class basic_block(gateway_block): + def __init__(self, name, in_sig, out_sig): + gateway_block.__init__(self, + name=name, + in_sig=in_sig, + out_sig=out_sig, + work_type=gr.GR_BLOCK_GW_WORK_GENERAL, + factor=1, #not relevant factor + ) + +class sync_block(gateway_block): + def __init__(self, name, in_sig, out_sig): + gateway_block.__init__(self, + name=name, + in_sig=in_sig, + out_sig=out_sig, + work_type=gr.GR_BLOCK_GW_WORK_SYNC, + factor=1, + ) + +class decim_block(gateway_block): + def __init__(self, name, in_sig, out_sig, decim): + gateway_block.__init__(self, + name=name, + in_sig=in_sig, + out_sig=out_sig, + work_type=gr.GR_BLOCK_GW_WORK_DECIM, + factor=decim, + ) + +class interp_block(gateway_block): + def __init__(self, name, in_sig, out_sig, interp): + gateway_block.__init__(self, + name=name, + in_sig=in_sig, + out_sig=out_sig, + work_type=gr.GR_BLOCK_GW_WORK_INTERP, + factor=interp, + ) diff --git a/gnuradio-runtime/python/gnuradio/gr/gr_threading.py b/gnuradio-runtime/python/gnuradio/gr/gr_threading.py new file mode 100644 index 0000000000..5d6f0fdaf9 --- /dev/null +++ b/gnuradio-runtime/python/gnuradio/gr/gr_threading.py @@ -0,0 +1,35 @@ +# +# Copyright 2005 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from sys import version_info as _version_info + +# import patched version of standard threading module + +if _version_info[0:2] == (2, 3): + #print "Importing gr_threading_23" + from gr_threading_23 import * +elif _version_info[0:2] == (2, 4): + #print "Importing gr_threading_24" + from gr_threading_24 import * +else: + # assume the patch was applied... + #print "Importing system provided threading" + from threading import * diff --git a/gnuradio-runtime/python/gnuradio/gr/gr_threading_23.py b/gnuradio-runtime/python/gnuradio/gr/gr_threading_23.py new file mode 100644 index 0000000000..dee8034c1c --- /dev/null +++ b/gnuradio-runtime/python/gnuradio/gr/gr_threading_23.py @@ -0,0 +1,724 @@ +"""Thread module emulating a subset of Java's threading model.""" + +# This started life as the threading.py module of Python 2.3 +# It's been patched to fix a problem with join, where a KeyboardInterrupt +# caused a lock to be left in the acquired state. + +import sys as _sys + +try: + import thread +except ImportError: + del _sys.modules[__name__] + raise + +from StringIO import StringIO as _StringIO +from time import time as _time, sleep as _sleep +from traceback import print_exc as _print_exc + +# Rename some stuff so "from threading import *" is safe +__all__ = ['activeCount', 'Condition', 'currentThread', 'enumerate', 'Event', + 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', + 'Timer', 'setprofile', 'settrace'] + +_start_new_thread = thread.start_new_thread +_allocate_lock = thread.allocate_lock +_get_ident = thread.get_ident +ThreadError = thread.error +del thread + + +# Debug support (adapted from ihooks.py). +# All the major classes here derive from _Verbose. We force that to +# be a new-style class so that all the major classes here are new-style. +# This helps debugging (type(instance) is more revealing for instances +# of new-style classes). + +_VERBOSE = False + +if __debug__: + + class _Verbose(object): + + def __init__(self, verbose=None): + if verbose is None: + verbose = _VERBOSE + self.__verbose = verbose + + def _note(self, format, *args): + if self.__verbose: + format = format % args + format = "%s: %s\n" % ( + currentThread().getName(), format) + _sys.stderr.write(format) + +else: + # Disable this when using "python -O" + class _Verbose(object): + def __init__(self, verbose=None): + pass + def _note(self, *args): + pass + +# Support for profile and trace hooks + +_profile_hook = None +_trace_hook = None + +def setprofile(func): + global _profile_hook + _profile_hook = func + +def settrace(func): + global _trace_hook + _trace_hook = func + +# Synchronization classes + +Lock = _allocate_lock + +def RLock(*args, **kwargs): + return _RLock(*args, **kwargs) + +class _RLock(_Verbose): + + def __init__(self, verbose=None): + _Verbose.__init__(self, verbose) + self.__block = _allocate_lock() + self.__owner = None + self.__count = 0 + + def __repr__(self): + return "<%s(%s, %d)>" % ( + self.__class__.__name__, + self.__owner and self.__owner.getName(), + self.__count) + + def acquire(self, blocking=1): + me = currentThread() + if self.__owner is me: + self.__count = self.__count + 1 + if __debug__: + self._note("%s.acquire(%s): recursive success", self, blocking) + return 1 + rc = self.__block.acquire(blocking) + if rc: + self.__owner = me + self.__count = 1 + if __debug__: + self._note("%s.acquire(%s): initial succes", self, blocking) + else: + if __debug__: + self._note("%s.acquire(%s): failure", self, blocking) + return rc + + def release(self): + me = currentThread() + assert self.__owner is me, "release() of un-acquire()d lock" + self.__count = count = self.__count - 1 + if not count: + self.__owner = None + self.__block.release() + if __debug__: + self._note("%s.release(): final release", self) + else: + if __debug__: + self._note("%s.release(): non-final release", self) + + # Internal methods used by condition variables + + def _acquire_restore(self, (count, owner)): + self.__block.acquire() + self.__count = count + self.__owner = owner + if __debug__: + self._note("%s._acquire_restore()", self) + + def _release_save(self): + if __debug__: + self._note("%s._release_save()", self) + count = self.__count + self.__count = 0 + owner = self.__owner + self.__owner = None + self.__block.release() + return (count, owner) + + def _is_owned(self): + return self.__owner is currentThread() + + +def Condition(*args, **kwargs): + return _Condition(*args, **kwargs) + +class _Condition(_Verbose): + + def __init__(self, lock=None, verbose=None): + _Verbose.__init__(self, verbose) + if lock is None: + lock = RLock() + self.__lock = lock + # Export the lock's acquire() and release() methods + self.acquire = lock.acquire + self.release = lock.release + # If the lock defines _release_save() and/or _acquire_restore(), + # these override the default implementations (which just call + # release() and acquire() on the lock). Ditto for _is_owned(). + try: + self._release_save = lock._release_save + except AttributeError: + pass + try: + self._acquire_restore = lock._acquire_restore + except AttributeError: + pass + try: + self._is_owned = lock._is_owned + except AttributeError: + pass + self.__waiters = [] + + def __repr__(self): + return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters)) + + def _release_save(self): + self.__lock.release() # No state to save + + def _acquire_restore(self, x): + self.__lock.acquire() # Ignore saved state + + def _is_owned(self): + # Return True if lock is owned by currentThread. + # This method is called only if __lock doesn't have _is_owned(). + if self.__lock.acquire(0): + self.__lock.release() + return False + else: + return True + + def wait(self, timeout=None): + currentThread() # for side-effect + assert self._is_owned(), "wait() of un-acquire()d lock" + waiter = _allocate_lock() + waiter.acquire() + self.__waiters.append(waiter) + saved_state = self._release_save() + try: # restore state no matter what (e.g., KeyboardInterrupt) + if timeout is None: + waiter.acquire() + if __debug__: + self._note("%s.wait(): got it", self) + else: + # Balancing act: We can't afford a pure busy loop, so we + # have to sleep; but if we sleep the whole timeout time, + # we'll be unresponsive. The scheme here sleeps very + # little at first, longer as time goes on, but never longer + # than 20 times per second (or the timeout time remaining). + endtime = _time() + timeout + delay = 0.0005 # 500 us -> initial delay of 1 ms + while True: + gotit = waiter.acquire(0) + if gotit: + break + remaining = endtime - _time() + if remaining <= 0: + break + delay = min(delay * 2, remaining, .05) + _sleep(delay) + if not gotit: + if __debug__: + self._note("%s.wait(%s): timed out", self, timeout) + try: + self.__waiters.remove(waiter) + except ValueError: + pass + else: + if __debug__: + self._note("%s.wait(%s): got it", self, timeout) + finally: + self._acquire_restore(saved_state) + + def notify(self, n=1): + currentThread() # for side-effect + assert self._is_owned(), "notify() of un-acquire()d lock" + __waiters = self.__waiters + waiters = __waiters[:n] + if not waiters: + if __debug__: + self._note("%s.notify(): no waiters", self) + return + self._note("%s.notify(): notifying %d waiter%s", self, n, + n!=1 and "s" or "") + for waiter in waiters: + waiter.release() + try: + __waiters.remove(waiter) + except ValueError: + pass + + def notifyAll(self): + self.notify(len(self.__waiters)) + + +def Semaphore(*args, **kwargs): + return _Semaphore(*args, **kwargs) + +class _Semaphore(_Verbose): + + # After Tim Peters' semaphore class, but not quite the same (no maximum) + + def __init__(self, value=1, verbose=None): + assert value >= 0, "Semaphore initial value must be >= 0" + _Verbose.__init__(self, verbose) + self.__cond = Condition(Lock()) + self.__value = value + + def acquire(self, blocking=1): + rc = False + self.__cond.acquire() + while self.__value == 0: + if not blocking: + break + if __debug__: + self._note("%s.acquire(%s): blocked waiting, value=%s", + self, blocking, self.__value) + self.__cond.wait() + else: + self.__value = self.__value - 1 + if __debug__: + self._note("%s.acquire: success, value=%s", + self, self.__value) + rc = True + self.__cond.release() + return rc + + def release(self): + self.__cond.acquire() + self.__value = self.__value + 1 + if __debug__: + self._note("%s.release: success, value=%s", + self, self.__value) + self.__cond.notify() + self.__cond.release() + + +def BoundedSemaphore(*args, **kwargs): + return _BoundedSemaphore(*args, **kwargs) + +class _BoundedSemaphore(_Semaphore): + """Semaphore that checks that # releases is <= # acquires""" + def __init__(self, value=1, verbose=None): + _Semaphore.__init__(self, value, verbose) + self._initial_value = value + + def release(self): + if self._Semaphore__value >= self._initial_value: + raise ValueError, "Semaphore released too many times" + return _Semaphore.release(self) + + +def Event(*args, **kwargs): + return _Event(*args, **kwargs) + +class _Event(_Verbose): + + # After Tim Peters' event class (without is_posted()) + + def __init__(self, verbose=None): + _Verbose.__init__(self, verbose) + self.__cond = Condition(Lock()) + self.__flag = False + + def isSet(self): + return self.__flag + + def set(self): + self.__cond.acquire() + try: + self.__flag = True + self.__cond.notifyAll() + finally: + self.__cond.release() + + def clear(self): + self.__cond.acquire() + try: + self.__flag = False + finally: + self.__cond.release() + + def wait(self, timeout=None): + self.__cond.acquire() + try: + if not self.__flag: + self.__cond.wait(timeout) + finally: + self.__cond.release() + +# Helper to generate new thread names +_counter = 0 +def _newname(template="Thread-%d"): + global _counter + _counter = _counter + 1 + return template % _counter + +# Active thread administration +_active_limbo_lock = _allocate_lock() +_active = {} +_limbo = {} + + +# Main class for threads + +class Thread(_Verbose): + + __initialized = False + + def __init__(self, group=None, target=None, name=None, + args=(), kwargs={}, verbose=None): + assert group is None, "group argument must be None for now" + _Verbose.__init__(self, verbose) + self.__target = target + self.__name = str(name or _newname()) + self.__args = args + self.__kwargs = kwargs + self.__daemonic = self._set_daemon() + self.__started = False + self.__stopped = False + self.__block = Condition(Lock()) + self.__initialized = True + + def _set_daemon(self): + # Overridden in _MainThread and _DummyThread + return currentThread().isDaemon() + + def __repr__(self): + assert self.__initialized, "Thread.__init__() was not called" + status = "initial" + if self.__started: + status = "started" + if self.__stopped: + status = "stopped" + if self.__daemonic: + status = status + " daemon" + return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status) + + def start(self): + assert self.__initialized, "Thread.__init__() not called" + assert not self.__started, "thread already started" + if __debug__: + self._note("%s.start(): starting thread", self) + _active_limbo_lock.acquire() + _limbo[self] = self + _active_limbo_lock.release() + _start_new_thread(self.__bootstrap, ()) + self.__started = True + _sleep(0.000001) # 1 usec, to let the thread run (Solaris hack) + + def run(self): + if self.__target: + self.__target(*self.__args, **self.__kwargs) + + def __bootstrap(self): + try: + self.__started = True + _active_limbo_lock.acquire() + _active[_get_ident()] = self + del _limbo[self] + _active_limbo_lock.release() + if __debug__: + self._note("%s.__bootstrap(): thread started", self) + + if _trace_hook: + self._note("%s.__bootstrap(): registering trace hook", self) + _sys.settrace(_trace_hook) + if _profile_hook: + self._note("%s.__bootstrap(): registering profile hook", self) + _sys.setprofile(_profile_hook) + + try: + self.run() + except SystemExit: + if __debug__: + self._note("%s.__bootstrap(): raised SystemExit", self) + except: + if __debug__: + self._note("%s.__bootstrap(): unhandled exception", self) + s = _StringIO() + _print_exc(file=s) + _sys.stderr.write("Exception in thread %s:\n%s\n" % + (self.getName(), s.getvalue())) + else: + if __debug__: + self._note("%s.__bootstrap(): normal return", self) + finally: + self.__stop() + try: + self.__delete() + except: + pass + + def __stop(self): + self.__block.acquire() + self.__stopped = True + self.__block.notifyAll() + self.__block.release() + + def __delete(self): + _active_limbo_lock.acquire() + del _active[_get_ident()] + _active_limbo_lock.release() + + def join(self, timeout=None): + assert self.__initialized, "Thread.__init__() not called" + assert self.__started, "cannot join thread before it is started" + assert self is not currentThread(), "cannot join current thread" + if __debug__: + if not self.__stopped: + self._note("%s.join(): waiting until thread stops", self) + self.__block.acquire() + try: + if timeout is None: + while not self.__stopped: + self.__block.wait() + if __debug__: + self._note("%s.join(): thread stopped", self) + else: + deadline = _time() + timeout + while not self.__stopped: + delay = deadline - _time() + if delay <= 0: + if __debug__: + self._note("%s.join(): timed out", self) + break + self.__block.wait(delay) + else: + if __debug__: + self._note("%s.join(): thread stopped", self) + finally: + self.__block.release() + + def getName(self): + assert self.__initialized, "Thread.__init__() not called" + return self.__name + + def setName(self, name): + assert self.__initialized, "Thread.__init__() not called" + self.__name = str(name) + + def isAlive(self): + assert self.__initialized, "Thread.__init__() not called" + return self.__started and not self.__stopped + + def isDaemon(self): + assert self.__initialized, "Thread.__init__() not called" + return self.__daemonic + + def setDaemon(self, daemonic): + assert self.__initialized, "Thread.__init__() not called" + assert not self.__started, "cannot set daemon status of active thread" + self.__daemonic = daemonic + +# The timer class was contributed by Itamar Shtull-Trauring + +def Timer(*args, **kwargs): + return _Timer(*args, **kwargs) + +class _Timer(Thread): + """Call a function after a specified number of seconds: + + t = Timer(30.0, f, args=[], kwargs={}) + t.start() + t.cancel() # stop the timer's action if it's still waiting + """ + + def __init__(self, interval, function, args=[], kwargs={}): + Thread.__init__(self) + self.interval = interval + self.function = function + self.args = args + self.kwargs = kwargs + self.finished = Event() + + def cancel(self): + """Stop the timer if it hasn't finished yet""" + self.finished.set() + + def run(self): + self.finished.wait(self.interval) + if not self.finished.isSet(): + self.function(*self.args, **self.kwargs) + self.finished.set() + +# Special thread class to represent the main thread +# This is garbage collected through an exit handler + +class _MainThread(Thread): + + def __init__(self): + Thread.__init__(self, name="MainThread") + self._Thread__started = True + _active_limbo_lock.acquire() + _active[_get_ident()] = self + _active_limbo_lock.release() + import atexit + atexit.register(self.__exitfunc) + + def _set_daemon(self): + return False + + def __exitfunc(self): + self._Thread__stop() + t = _pickSomeNonDaemonThread() + if t: + if __debug__: + self._note("%s: waiting for other threads", self) + while t: + t.join() + t = _pickSomeNonDaemonThread() + if __debug__: + self._note("%s: exiting", self) + self._Thread__delete() + +def _pickSomeNonDaemonThread(): + for t in enumerate(): + if not t.isDaemon() and t.isAlive(): + return t + return None + + +# Dummy thread class to represent threads not started here. +# These aren't garbage collected when they die, +# nor can they be waited for. +# Their purpose is to return *something* from currentThread(). +# They are marked as daemon threads so we won't wait for them +# when we exit (conform previous semantics). + +class _DummyThread(Thread): + + def __init__(self): + Thread.__init__(self, name=_newname("Dummy-%d")) + self._Thread__started = True + _active_limbo_lock.acquire() + _active[_get_ident()] = self + _active_limbo_lock.release() + + def _set_daemon(self): + return True + + def join(self, timeout=None): + assert False, "cannot join a dummy thread" + + +# Global API functions + +def currentThread(): + try: + return _active[_get_ident()] + except KeyError: + ##print "currentThread(): no current thread for", _get_ident() + return _DummyThread() + +def activeCount(): + _active_limbo_lock.acquire() + count = len(_active) + len(_limbo) + _active_limbo_lock.release() + return count + +def enumerate(): + _active_limbo_lock.acquire() + active = _active.values() + _limbo.values() + _active_limbo_lock.release() + return active + +# Create the main thread object + +_MainThread() + + +# Self-test code + +def _test(): + + class BoundedQueue(_Verbose): + + def __init__(self, limit): + _Verbose.__init__(self) + self.mon = RLock() + self.rc = Condition(self.mon) + self.wc = Condition(self.mon) + self.limit = limit + self.queue = [] + + def put(self, item): + self.mon.acquire() + while len(self.queue) >= self.limit: + self._note("put(%s): queue full", item) + self.wc.wait() + self.queue.append(item) + self._note("put(%s): appended, length now %d", + item, len(self.queue)) + self.rc.notify() + self.mon.release() + + def get(self): + self.mon.acquire() + while not self.queue: + self._note("get(): queue empty") + self.rc.wait() + item = self.queue.pop(0) + self._note("get(): got %s, %d left", item, len(self.queue)) + self.wc.notify() + self.mon.release() + return item + + class ProducerThread(Thread): + + def __init__(self, queue, quota): + Thread.__init__(self, name="Producer") + self.queue = queue + self.quota = quota + + def run(self): + from random import random + counter = 0 + while counter < self.quota: + counter = counter + 1 + self.queue.put("%s.%d" % (self.getName(), counter)) + _sleep(random() * 0.00001) + + + class ConsumerThread(Thread): + + def __init__(self, queue, count): + Thread.__init__(self, name="Consumer") + self.queue = queue + self.count = count + + def run(self): + while self.count > 0: + item = self.queue.get() + print item + self.count = self.count - 1 + + NP = 3 + QL = 4 + NI = 5 + + Q = BoundedQueue(QL) + P = [] + for i in range(NP): + t = ProducerThread(Q, NI) + t.setName("Producer-%d" % (i+1)) + P.append(t) + C = ConsumerThread(Q, NI*NP) + for t in P: + t.start() + _sleep(0.000001) + C.start() + for t in P: + t.join() + C.join() + +if __name__ == '__main__': + _test() diff --git a/gnuradio-runtime/python/gnuradio/gr/gr_threading_24.py b/gnuradio-runtime/python/gnuradio/gr/gr_threading_24.py new file mode 100644 index 0000000000..8539bfc047 --- /dev/null +++ b/gnuradio-runtime/python/gnuradio/gr/gr_threading_24.py @@ -0,0 +1,793 @@ +"""Thread module emulating a subset of Java's threading model.""" + +# This started life as the threading.py module of Python 2.4 +# It's been patched to fix a problem with join, where a KeyboardInterrupt +# caused a lock to be left in the acquired state. + +import sys as _sys + +try: + import thread +except ImportError: + del _sys.modules[__name__] + raise + +from time import time as _time, sleep as _sleep +from traceback import format_exc as _format_exc +from collections import deque + +# Rename some stuff so "from threading import *" is safe +__all__ = ['activeCount', 'Condition', 'currentThread', 'enumerate', 'Event', + 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', + 'Timer', 'setprofile', 'settrace', 'local'] + +_start_new_thread = thread.start_new_thread +_allocate_lock = thread.allocate_lock +_get_ident = thread.get_ident +ThreadError = thread.error +del thread + + +# Debug support (adapted from ihooks.py). +# All the major classes here derive from _Verbose. We force that to +# be a new-style class so that all the major classes here are new-style. +# This helps debugging (type(instance) is more revealing for instances +# of new-style classes). + +_VERBOSE = False + +if __debug__: + + class _Verbose(object): + + def __init__(self, verbose=None): + if verbose is None: + verbose = _VERBOSE + self.__verbose = verbose + + def _note(self, format, *args): + if self.__verbose: + format = format % args + format = "%s: %s\n" % ( + currentThread().getName(), format) + _sys.stderr.write(format) + +else: + # Disable this when using "python -O" + class _Verbose(object): + def __init__(self, verbose=None): + pass + def _note(self, *args): + pass + +# Support for profile and trace hooks + +_profile_hook = None +_trace_hook = None + +def setprofile(func): + global _profile_hook + _profile_hook = func + +def settrace(func): + global _trace_hook + _trace_hook = func + +# Synchronization classes + +Lock = _allocate_lock + +def RLock(*args, **kwargs): + return _RLock(*args, **kwargs) + +class _RLock(_Verbose): + + def __init__(self, verbose=None): + _Verbose.__init__(self, verbose) + self.__block = _allocate_lock() + self.__owner = None + self.__count = 0 + + def __repr__(self): + return "<%s(%s, %d)>" % ( + self.__class__.__name__, + self.__owner and self.__owner.getName(), + self.__count) + + def acquire(self, blocking=1): + me = currentThread() + if self.__owner is me: + self.__count = self.__count + 1 + if __debug__: + self._note("%s.acquire(%s): recursive success", self, blocking) + return 1 + rc = self.__block.acquire(blocking) + if rc: + self.__owner = me + self.__count = 1 + if __debug__: + self._note("%s.acquire(%s): initial succes", self, blocking) + else: + if __debug__: + self._note("%s.acquire(%s): failure", self, blocking) + return rc + + def release(self): + me = currentThread() + assert self.__owner is me, "release() of un-acquire()d lock" + self.__count = count = self.__count - 1 + if not count: + self.__owner = None + self.__block.release() + if __debug__: + self._note("%s.release(): final release", self) + else: + if __debug__: + self._note("%s.release(): non-final release", self) + + # Internal methods used by condition variables + + def _acquire_restore(self, (count, owner)): + self.__block.acquire() + self.__count = count + self.__owner = owner + if __debug__: + self._note("%s._acquire_restore()", self) + + def _release_save(self): + if __debug__: + self._note("%s._release_save()", self) + count = self.__count + self.__count = 0 + owner = self.__owner + self.__owner = None + self.__block.release() + return (count, owner) + + def _is_owned(self): + return self.__owner is currentThread() + + +def Condition(*args, **kwargs): + return _Condition(*args, **kwargs) + +class _Condition(_Verbose): + + def __init__(self, lock=None, verbose=None): + _Verbose.__init__(self, verbose) + if lock is None: + lock = RLock() + self.__lock = lock + # Export the lock's acquire() and release() methods + self.acquire = lock.acquire + self.release = lock.release + # If the lock defines _release_save() and/or _acquire_restore(), + # these override the default implementations (which just call + # release() and acquire() on the lock). Ditto for _is_owned(). + try: + self._release_save = lock._release_save + except AttributeError: + pass + try: + self._acquire_restore = lock._acquire_restore + except AttributeError: + pass + try: + self._is_owned = lock._is_owned + except AttributeError: + pass + self.__waiters = [] + + def __repr__(self): + return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters)) + + def _release_save(self): + self.__lock.release() # No state to save + + def _acquire_restore(self, x): + self.__lock.acquire() # Ignore saved state + + def _is_owned(self): + # Return True if lock is owned by currentThread. + # This method is called only if __lock doesn't have _is_owned(). + if self.__lock.acquire(0): + self.__lock.release() + return False + else: + return True + + def wait(self, timeout=None): + assert self._is_owned(), "wait() of un-acquire()d lock" + waiter = _allocate_lock() + waiter.acquire() + self.__waiters.append(waiter) + saved_state = self._release_save() + try: # restore state no matter what (e.g., KeyboardInterrupt) + if timeout is None: + waiter.acquire() + if __debug__: + self._note("%s.wait(): got it", self) + else: + # Balancing act: We can't afford a pure busy loop, so we + # have to sleep; but if we sleep the whole timeout time, + # we'll be unresponsive. The scheme here sleeps very + # little at first, longer as time goes on, but never longer + # than 20 times per second (or the timeout time remaining). + endtime = _time() + timeout + delay = 0.0005 # 500 us -> initial delay of 1 ms + while True: + gotit = waiter.acquire(0) + if gotit: + break + remaining = endtime - _time() + if remaining <= 0: + break + delay = min(delay * 2, remaining, .05) + _sleep(delay) + if not gotit: + if __debug__: + self._note("%s.wait(%s): timed out", self, timeout) + try: + self.__waiters.remove(waiter) + except ValueError: + pass + else: + if __debug__: + self._note("%s.wait(%s): got it", self, timeout) + finally: + self._acquire_restore(saved_state) + + def notify(self, n=1): + assert self._is_owned(), "notify() of un-acquire()d lock" + __waiters = self.__waiters + waiters = __waiters[:n] + if not waiters: + if __debug__: + self._note("%s.notify(): no waiters", self) + return + self._note("%s.notify(): notifying %d waiter%s", self, n, + n!=1 and "s" or "") + for waiter in waiters: + waiter.release() + try: + __waiters.remove(waiter) + except ValueError: + pass + + def notifyAll(self): + self.notify(len(self.__waiters)) + + +def Semaphore(*args, **kwargs): + return _Semaphore(*args, **kwargs) + +class _Semaphore(_Verbose): + + # After Tim Peters' semaphore class, but not quite the same (no maximum) + + def __init__(self, value=1, verbose=None): + assert value >= 0, "Semaphore initial value must be >= 0" + _Verbose.__init__(self, verbose) + self.__cond = Condition(Lock()) + self.__value = value + + def acquire(self, blocking=1): + rc = False + self.__cond.acquire() + while self.__value == 0: + if not blocking: + break + if __debug__: + self._note("%s.acquire(%s): blocked waiting, value=%s", + self, blocking, self.__value) + self.__cond.wait() + else: + self.__value = self.__value - 1 + if __debug__: + self._note("%s.acquire: success, value=%s", + self, self.__value) + rc = True + self.__cond.release() + return rc + + def release(self): + self.__cond.acquire() + self.__value = self.__value + 1 + if __debug__: + self._note("%s.release: success, value=%s", + self, self.__value) + self.__cond.notify() + self.__cond.release() + + +def BoundedSemaphore(*args, **kwargs): + return _BoundedSemaphore(*args, **kwargs) + +class _BoundedSemaphore(_Semaphore): + """Semaphore that checks that # releases is <= # acquires""" + def __init__(self, value=1, verbose=None): + _Semaphore.__init__(self, value, verbose) + self._initial_value = value + + def release(self): + if self._Semaphore__value >= self._initial_value: + raise ValueError, "Semaphore released too many times" + return _Semaphore.release(self) + + +def Event(*args, **kwargs): + return _Event(*args, **kwargs) + +class _Event(_Verbose): + + # After Tim Peters' event class (without is_posted()) + + def __init__(self, verbose=None): + _Verbose.__init__(self, verbose) + self.__cond = Condition(Lock()) + self.__flag = False + + def isSet(self): + return self.__flag + + def set(self): + self.__cond.acquire() + try: + self.__flag = True + self.__cond.notifyAll() + finally: + self.__cond.release() + + def clear(self): + self.__cond.acquire() + try: + self.__flag = False + finally: + self.__cond.release() + + def wait(self, timeout=None): + self.__cond.acquire() + try: + if not self.__flag: + self.__cond.wait(timeout) + finally: + self.__cond.release() + +# Helper to generate new thread names +_counter = 0 +def _newname(template="Thread-%d"): + global _counter + _counter = _counter + 1 + return template % _counter + +# Active thread administration +_active_limbo_lock = _allocate_lock() +_active = {} +_limbo = {} + + +# Main class for threads + +class Thread(_Verbose): + + __initialized = False + # Need to store a reference to sys.exc_info for printing + # out exceptions when a thread tries to use a global var. during interp. + # shutdown and thus raises an exception about trying to perform some + # operation on/with a NoneType + __exc_info = _sys.exc_info + + def __init__(self, group=None, target=None, name=None, + args=(), kwargs={}, verbose=None): + assert group is None, "group argument must be None for now" + _Verbose.__init__(self, verbose) + self.__target = target + self.__name = str(name or _newname()) + self.__args = args + self.__kwargs = kwargs + self.__daemonic = self._set_daemon() + self.__started = False + self.__stopped = False + self.__block = Condition(Lock()) + self.__initialized = True + # sys.stderr is not stored in the class like + # sys.exc_info since it can be changed between instances + self.__stderr = _sys.stderr + + def _set_daemon(self): + # Overridden in _MainThread and _DummyThread + return currentThread().isDaemon() + + def __repr__(self): + assert self.__initialized, "Thread.__init__() was not called" + status = "initial" + if self.__started: + status = "started" + if self.__stopped: + status = "stopped" + if self.__daemonic: + status = status + " daemon" + return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status) + + def start(self): + assert self.__initialized, "Thread.__init__() not called" + assert not self.__started, "thread already started" + if __debug__: + self._note("%s.start(): starting thread", self) + _active_limbo_lock.acquire() + _limbo[self] = self + _active_limbo_lock.release() + _start_new_thread(self.__bootstrap, ()) + self.__started = True + _sleep(0.000001) # 1 usec, to let the thread run (Solaris hack) + + def run(self): + if self.__target: + self.__target(*self.__args, **self.__kwargs) + + def __bootstrap(self): + try: + self.__started = True + _active_limbo_lock.acquire() + _active[_get_ident()] = self + del _limbo[self] + _active_limbo_lock.release() + if __debug__: + self._note("%s.__bootstrap(): thread started", self) + + if _trace_hook: + self._note("%s.__bootstrap(): registering trace hook", self) + _sys.settrace(_trace_hook) + if _profile_hook: + self._note("%s.__bootstrap(): registering profile hook", self) + _sys.setprofile(_profile_hook) + + try: + self.run() + except SystemExit: + if __debug__: + self._note("%s.__bootstrap(): raised SystemExit", self) + except: + if __debug__: + self._note("%s.__bootstrap(): unhandled exception", self) + # If sys.stderr is no more (most likely from interpreter + # shutdown) use self.__stderr. Otherwise still use sys (as in + # _sys) in case sys.stderr was redefined since the creation of + # self. + if _sys: + _sys.stderr.write("Exception in thread %s:\n%s\n" % + (self.getName(), _format_exc())) + else: + # Do the best job possible w/o a huge amt. of code to + # approximate a traceback (code ideas from + # Lib/traceback.py) + exc_type, exc_value, exc_tb = self.__exc_info() + try: + print>>self.__stderr, ( + "Exception in thread " + self.getName() + + " (most likely raised during interpreter shutdown):") + print>>self.__stderr, ( + "Traceback (most recent call last):") + while exc_tb: + print>>self.__stderr, ( + ' File "%s", line %s, in %s' % + (exc_tb.tb_frame.f_code.co_filename, + exc_tb.tb_lineno, + exc_tb.tb_frame.f_code.co_name)) + exc_tb = exc_tb.tb_next + print>>self.__stderr, ("%s: %s" % (exc_type, exc_value)) + # Make sure that exc_tb gets deleted since it is a memory + # hog; deleting everything else is just for thoroughness + finally: + del exc_type, exc_value, exc_tb + else: + if __debug__: + self._note("%s.__bootstrap(): normal return", self) + finally: + self.__stop() + try: + self.__delete() + except: + pass + + def __stop(self): + self.__block.acquire() + self.__stopped = True + self.__block.notifyAll() + self.__block.release() + + def __delete(self): + "Remove current thread from the dict of currently running threads." + + # Notes about running with dummy_thread: + # + # Must take care to not raise an exception if dummy_thread is being + # used (and thus this module is being used as an instance of + # dummy_threading). dummy_thread.get_ident() always returns -1 since + # there is only one thread if dummy_thread is being used. Thus + # len(_active) is always <= 1 here, and any Thread instance created + # overwrites the (if any) thread currently registered in _active. + # + # An instance of _MainThread is always created by 'threading'. This + # gets overwritten the instant an instance of Thread is created; both + # threads return -1 from dummy_thread.get_ident() and thus have the + # same key in the dict. So when the _MainThread instance created by + # 'threading' tries to clean itself up when atexit calls this method + # it gets a KeyError if another Thread instance was created. + # + # This all means that KeyError from trying to delete something from + # _active if dummy_threading is being used is a red herring. But + # since it isn't if dummy_threading is *not* being used then don't + # hide the exception. + + _active_limbo_lock.acquire() + try: + try: + del _active[_get_ident()] + except KeyError: + if 'dummy_threading' not in _sys.modules: + raise + finally: + _active_limbo_lock.release() + + def join(self, timeout=None): + assert self.__initialized, "Thread.__init__() not called" + assert self.__started, "cannot join thread before it is started" + assert self is not currentThread(), "cannot join current thread" + if __debug__: + if not self.__stopped: + self._note("%s.join(): waiting until thread stops", self) + self.__block.acquire() + try: + if timeout is None: + while not self.__stopped: + self.__block.wait() + if __debug__: + self._note("%s.join(): thread stopped", self) + else: + deadline = _time() + timeout + while not self.__stopped: + delay = deadline - _time() + if delay <= 0: + if __debug__: + self._note("%s.join(): timed out", self) + break + self.__block.wait(delay) + else: + if __debug__: + self._note("%s.join(): thread stopped", self) + finally: + self.__block.release() + + def getName(self): + assert self.__initialized, "Thread.__init__() not called" + return self.__name + + def setName(self, name): + assert self.__initialized, "Thread.__init__() not called" + self.__name = str(name) + + def isAlive(self): + assert self.__initialized, "Thread.__init__() not called" + return self.__started and not self.__stopped + + def isDaemon(self): + assert self.__initialized, "Thread.__init__() not called" + return self.__daemonic + + def setDaemon(self, daemonic): + assert self.__initialized, "Thread.__init__() not called" + assert not self.__started, "cannot set daemon status of active thread" + self.__daemonic = daemonic + +# The timer class was contributed by Itamar Shtull-Trauring + +def Timer(*args, **kwargs): + return _Timer(*args, **kwargs) + +class _Timer(Thread): + """Call a function after a specified number of seconds: + + t = Timer(30.0, f, args=[], kwargs={}) + t.start() + t.cancel() # stop the timer's action if it's still waiting + """ + + def __init__(self, interval, function, args=[], kwargs={}): + Thread.__init__(self) + self.interval = interval + self.function = function + self.args = args + self.kwargs = kwargs + self.finished = Event() + + def cancel(self): + """Stop the timer if it hasn't finished yet""" + self.finished.set() + + def run(self): + self.finished.wait(self.interval) + if not self.finished.isSet(): + self.function(*self.args, **self.kwargs) + self.finished.set() + +# Special thread class to represent the main thread +# This is garbage collected through an exit handler + +class _MainThread(Thread): + + def __init__(self): + Thread.__init__(self, name="MainThread") + self._Thread__started = True + _active_limbo_lock.acquire() + _active[_get_ident()] = self + _active_limbo_lock.release() + import atexit + atexit.register(self.__exitfunc) + + def _set_daemon(self): + return False + + def __exitfunc(self): + self._Thread__stop() + t = _pickSomeNonDaemonThread() + if t: + if __debug__: + self._note("%s: waiting for other threads", self) + while t: + t.join() + t = _pickSomeNonDaemonThread() + if __debug__: + self._note("%s: exiting", self) + self._Thread__delete() + +def _pickSomeNonDaemonThread(): + for t in enumerate(): + if not t.isDaemon() and t.isAlive(): + return t + return None + + +# Dummy thread class to represent threads not started here. +# These aren't garbage collected when they die, +# nor can they be waited for. +# Their purpose is to return *something* from currentThread(). +# They are marked as daemon threads so we won't wait for them +# when we exit (conform previous semantics). + +class _DummyThread(Thread): + + def __init__(self): + Thread.__init__(self, name=_newname("Dummy-%d")) + self._Thread__started = True + _active_limbo_lock.acquire() + _active[_get_ident()] = self + _active_limbo_lock.release() + + def _set_daemon(self): + return True + + def join(self, timeout=None): + assert False, "cannot join a dummy thread" + + +# Global API functions + +def currentThread(): + try: + return _active[_get_ident()] + except KeyError: + ##print "currentThread(): no current thread for", _get_ident() + return _DummyThread() + +def activeCount(): + _active_limbo_lock.acquire() + count = len(_active) + len(_limbo) + _active_limbo_lock.release() + return count + +def enumerate(): + _active_limbo_lock.acquire() + active = _active.values() + _limbo.values() + _active_limbo_lock.release() + return active + +# Create the main thread object + +_MainThread() + +# get thread-local implementation, either from the thread +# module, or from the python fallback + +try: + from thread import _local as local +except ImportError: + from _threading_local import local + + +# Self-test code + +def _test(): + + class BoundedQueue(_Verbose): + + def __init__(self, limit): + _Verbose.__init__(self) + self.mon = RLock() + self.rc = Condition(self.mon) + self.wc = Condition(self.mon) + self.limit = limit + self.queue = deque() + + def put(self, item): + self.mon.acquire() + while len(self.queue) >= self.limit: + self._note("put(%s): queue full", item) + self.wc.wait() + self.queue.append(item) + self._note("put(%s): appended, length now %d", + item, len(self.queue)) + self.rc.notify() + self.mon.release() + + def get(self): + self.mon.acquire() + while not self.queue: + self._note("get(): queue empty") + self.rc.wait() + item = self.queue.popleft() + self._note("get(): got %s, %d left", item, len(self.queue)) + self.wc.notify() + self.mon.release() + return item + + class ProducerThread(Thread): + + def __init__(self, queue, quota): + Thread.__init__(self, name="Producer") + self.queue = queue + self.quota = quota + + def run(self): + from random import random + counter = 0 + while counter < self.quota: + counter = counter + 1 + self.queue.put("%s.%d" % (self.getName(), counter)) + _sleep(random() * 0.00001) + + + class ConsumerThread(Thread): + + def __init__(self, queue, count): + Thread.__init__(self, name="Consumer") + self.queue = queue + self.count = count + + def run(self): + while self.count > 0: + item = self.queue.get() + print item + self.count = self.count - 1 + + NP = 3 + QL = 4 + NI = 5 + + Q = BoundedQueue(QL) + P = [] + for i in range(NP): + t = ProducerThread(Q, NI) + t.setName("Producer-%d" % (i+1)) + P.append(t) + C = ConsumerThread(Q, NI*NP) + for t in P: + t.start() + _sleep(0.000001) + C.start() + for t in P: + t.join() + C.join() + +if __name__ == '__main__': + _test() diff --git a/gnuradio-runtime/python/gnuradio/gr/hier_block2.py b/gnuradio-runtime/python/gnuradio/gr/hier_block2.py new file mode 100644 index 0000000000..31e4065a25 --- /dev/null +++ b/gnuradio-runtime/python/gnuradio/gr/hier_block2.py @@ -0,0 +1,132 @@ +# +# Copyright 2006,2007 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from runtime_swig import hier_block2_swig + +try: + import pmt +except ImportError: + from gruel import pmt + +# +# This hack forces a 'has-a' relationship to look like an 'is-a' one. +# +# It allows Python classes to subclass this one, while passing through +# method calls to the C++ class shared pointer from SWIG. +# +# It also allows us to intercept method calls if needed +# +class hier_block2(object): + """ + Subclass this to create a python hierarchical block. + + This is a python wrapper around the C++ hierarchical block implementation. + Provides convenience functions and allows proper Python subclassing. + """ + + def __init__(self, name, input_signature, output_signature): + """ + Create a hierarchical block with a given name and I/O signatures. + """ + self._hb = hier_block2_swig(name, input_signature, output_signature) + + def __getattr__(self, name): + """ + Pass-through member requests to the C++ object. + """ + if not hasattr(self, "_hb"): + raise RuntimeError("hier_block2: invalid state--did you forget to call gr.hier_block2.__init__ in a derived class?") + return getattr(self._hb, name) + + def connect(self, *points): + """ + Connect two or more block endpoints. An endpoint is either a (block, port) + tuple or a block instance. In the latter case, the port number is assumed + to be zero. + + To connect the hierarchical block external inputs or outputs to internal block + inputs or outputs, use 'self' in the connect call. + + If multiple arguments are provided, connect will attempt to wire them in series, + interpreting the endpoints as inputs or outputs as appropriate. + """ + + if len (points) < 1: + raise ValueError, ("connect requires at least one endpoint; %d provided." % (len (points),)) + else: + if len(points) == 1: + self._hb.primitive_connect(points[0].to_basic_block()) + else: + for i in range (1, len (points)): + self._connect(points[i-1], points[i]) + + def _connect(self, src, dst): + (src_block, src_port) = self._coerce_endpoint(src) + (dst_block, dst_port) = self._coerce_endpoint(dst) + self._hb.primitive_connect(src_block.to_basic_block(), src_port, + dst_block.to_basic_block(), dst_port) + + def _coerce_endpoint(self, endp): + if hasattr(endp, 'to_basic_block'): + return (endp, 0) + else: + if hasattr(endp, "__getitem__") and len(endp) == 2: + return endp # Assume user put (block, port) + else: + raise ValueError("unable to coerce endpoint") + + def disconnect(self, *points): + """ + Disconnect two endpoints in the flowgraph. + + To disconnect the hierarchical block external inputs or outputs to internal block + inputs or outputs, use 'self' in the connect call. + + If more than two arguments are provided, they are disconnected successively. + """ + + if len (points) < 1: + raise ValueError, ("disconnect requires at least one endpoint; %d provided." % (len (points),)) + else: + if len (points) == 1: + self._hb.primitive_disconnect(points[0].to_basic_block()) + else: + for i in range (1, len (points)): + self._disconnect(points[i-1], points[i]) + + def _disconnect(self, src, dst): + (src_block, src_port) = self._coerce_endpoint(src) + (dst_block, dst_port) = self._coerce_endpoint(dst) + self._hb.primitive_disconnect(src_block.to_basic_block(), src_port, + dst_block.to_basic_block(), dst_port) + + def msg_connect(self, src, srcport, dst, dstport): + self.primitive_msg_connect(src.to_basic_block(), srcport, dst.to_basic_block(), dstport); + + def msg_disconnect(self, src, srcport, dst, dstport): + self.primitive_msg_disconnect(src.to_basic_block(), srcport, dst.to_basic_block(), dstport); + + def message_port_register_hier_in(self, portname): + self.primitive_message_port_register_hier_in(pmt.intern(portname)); + + def message_port_register_hier_out(self, portname): + self.primitive_message_port_register_hier_out(pmt.intern(portname)); + diff --git a/gnuradio-runtime/python/gnuradio/gr/prefs.py b/gnuradio-runtime/python/gnuradio/gr/prefs.py new file mode 100644 index 0000000000..25fa8cd6ae --- /dev/null +++ b/gnuradio-runtime/python/gnuradio/gr/prefs.py @@ -0,0 +1,127 @@ +# +# Copyright 2006,2009 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +import gnuradio_core as gsp +_prefs_base = gsp.gr_prefs + + +import ConfigParser +import os +import os.path +import sys +import glob + + +def _user_prefs_filename(): + return os.path.expanduser('~/.gnuradio/config.conf') + +def _sys_prefs_dirname(): + return gsp.prefsdir() + +def _bool(x): + """ + Try to coerce obj to a True or False + """ + if isinstance(x, bool): + return x + if isinstance(x, (float, int)): + return bool(x) + raise TypeError, x + + +class _prefs(_prefs_base): + """ + Derive our 'real class' from the stubbed out base class that has support + for SWIG directors. This allows C++ code to magically and transparently + invoke the methods in this python class. + """ + def __init__(self): + _prefs_base.__init__(self) + self.cp = ConfigParser.RawConfigParser() + self.__getattr__ = lambda self, name: getattr(self.cp, name) + + def _sys_prefs_filenames(self): + dir = _sys_prefs_dirname() + try: + fnames = glob.glob(os.path.join(dir, '*.conf')) + except (IOError, OSError): + return [] + fnames.sort() + return fnames + + def _read_files(self): + filenames = self._sys_prefs_filenames() + filenames.append(_user_prefs_filename()) + #print "filenames: ", filenames + self.cp.read(filenames) + + # ---------------------------------------------------------------- + # These methods override the C++ virtual methods of the same name + # ---------------------------------------------------------------- + def has_section(self, section): + return self.cp.has_section(section) + + def has_option(self, section, option): + return self.cp.has_option(section, option) + + def get_string(self, section, option, default_val): + try: + return self.cp.get(section, option) + except: + return default_val + + def get_bool(self, section, option, default_val): + try: + return self.cp.getboolean(section, option) + except: + return default_val + + def get_long(self, section, option, default_val): + try: + return self.cp.getint(section, option) + except: + return default_val + + def get_double(self, section, option, default_val): + try: + return self.cp.getfloat(section, option) + except: + return default_val + # ---------------------------------------------------------------- + # End override of C++ virtual methods + # ---------------------------------------------------------------- + + +_prefs_db = _prefs() + +# if GR_DONT_LOAD_PREFS is set, don't load them. +# (make check uses this to avoid interactions.) +if os.getenv("GR_DONT_LOAD_PREFS", None) is None: + _prefs_db._read_files() + + +_prefs_base.set_singleton(_prefs_db) # tell C++ what instance to use + +def prefs(): + """ + Return the global preference data base + """ + return _prefs_db diff --git a/gnuradio-runtime/python/gnuradio/gr/pubsub.py b/gnuradio-runtime/python/gnuradio/gr/pubsub.py new file mode 100644 index 0000000000..90568418fc --- /dev/null +++ b/gnuradio-runtime/python/gnuradio/gr/pubsub.py @@ -0,0 +1,153 @@ +#!/usr/bin/env python +# +# Copyright 2008,2009 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +""" +Abstract GNU Radio publisher/subscriber interface + +This is a proof of concept implementation, will likely change significantly. +""" + +class pubsub(dict): + def __init__(self): + self._publishers = { } + self._subscribers = { } + self._proxies = { } + + def __missing__(self, key, value=None): + dict.__setitem__(self, key, value) + self._publishers[key] = None + self._subscribers[key] = [] + self._proxies[key] = None + + def __setitem__(self, key, val): + if not self.has_key(key): + self.__missing__(key, val) + elif self._proxies[key] is not None: + (p, newkey) = self._proxies[key] + p[newkey] = val + else: + dict.__setitem__(self, key, val) + for sub in self._subscribers[key]: + # Note this means subscribers will get called in the thread + # context of the 'set' caller. + sub(val) + + def __getitem__(self, key): + if not self.has_key(key): self.__missing__(key) + if self._proxies[key] is not None: + (p, newkey) = self._proxies[key] + return p[newkey] + elif self._publishers[key] is not None: + return self._publishers[key]() + else: + return dict.__getitem__(self, key) + + def publish(self, key, publisher): + if not self.has_key(key): self.__missing__(key) + if self._proxies[key] is not None: + (p, newkey) = self._proxies[key] + p.publish(newkey, publisher) + else: + self._publishers[key] = publisher + + def subscribe(self, key, subscriber): + if not self.has_key(key): self.__missing__(key) + if self._proxies[key] is not None: + (p, newkey) = self._proxies[key] + p.subscribe(newkey, subscriber) + else: + self._subscribers[key].append(subscriber) + + def unpublish(self, key): + if self._proxies[key] is not None: + (p, newkey) = self._proxies[key] + p.unpublish(newkey) + else: + self._publishers[key] = None + + def unsubscribe(self, key, subscriber): + if self._proxies[key] is not None: + (p, newkey) = self._proxies[key] + p.unsubscribe(newkey, subscriber) + else: + self._subscribers[key].remove(subscriber) + + def proxy(self, key, p, newkey=None): + if not self.has_key(key): self.__missing__(key) + if newkey is None: newkey = key + self._proxies[key] = (p, newkey) + + def unproxy(self, key): + self._proxies[key] = None + +# Test code +if __name__ == "__main__": + import sys + o = pubsub() + + # Non-existent key gets auto-created with None value + print "Auto-created key 'foo' value:", o['foo'] + + # Add some subscribers + # First is a bare function + def print_len(x): + print "len=%i" % (len(x), ) + o.subscribe('foo', print_len) + + # The second is a class member function + class subber(object): + def __init__(self, param): + self._param = param + def printer(self, x): + print self._param, `x` + s = subber('param') + o.subscribe('foo', s.printer) + + # The third is a lambda function + o.subscribe('foo', lambda x: sys.stdout.write('val='+`x`+'\n')) + + # Update key 'foo', will notify subscribers + print "Updating 'foo' with three subscribers:" + o['foo'] = 'bar'; + + # Remove first subscriber + o.unsubscribe('foo', print_len) + + # Update now will only trigger second and third subscriber + print "Updating 'foo' after removing a subscriber:" + o['foo'] = 'bar2'; + + # Publish a key as a function, in this case, a lambda function + o.publish('baz', lambda : 42) + print "Published value of 'baz':", o['baz'] + + # Unpublish the key + o.unpublish('baz') + + # This will return None, as there is no publisher + print "Value of 'baz' with no publisher:", o['baz'] + + # Set 'baz' key, it gets cached + o['baz'] = 'bazzz' + + # Now will return cached value, since no provider + print "Cached value of 'baz' after being set:", o['baz'] diff --git a/gnuradio-runtime/python/gnuradio/gr/qa_feval.py b/gnuradio-runtime/python/gnuradio/gr/qa_feval.py new file mode 100755 index 0000000000..9018e12f36 --- /dev/null +++ b/gnuradio-runtime/python/gnuradio/gr/qa_feval.py @@ -0,0 +1,110 @@ +#!/usr/bin/env python +# +# Copyright 2006,2007,2010 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest + +class my_add2_dd(gr.feval_dd): + def eval(self, x): + return x + 2 + +class my_add2_ll(gr.feval_ll): + def eval(self, x): + return x + 2 + +class my_add2_cc(gr.feval_cc): + def eval(self, x): + return x + (2 - 2j) + +class my_feval(gr.feval): + def __init__(self): + gr.feval.__init__(self) + self.fired = False + def eval(self): + self.fired = True + +class test_feval(gr_unittest.TestCase): + + def test_dd_1(self): + f = my_add2_dd() + src_data = (0.0, 1.0, 2.0, 3.0, 4.0) + expected_result = (2.0, 3.0, 4.0, 5.0, 6.0) + # this is all in python... + actual_result = tuple([f.eval(x) for x in src_data]) + self.assertEqual(expected_result, actual_result) + + def test_dd_2(self): + f = my_add2_dd() + src_data = (0.0, 1.0, 2.0, 3.0, 4.0) + expected_result = (2.0, 3.0, 4.0, 5.0, 6.0) + # this is python -> C++ -> python and back again... + actual_result = tuple([gr.feval_dd_example(f, x) for x in src_data]) + self.assertEqual(expected_result, actual_result) + + + def test_ll_1(self): + f = my_add2_ll() + src_data = (0, 1, 2, 3, 4) + expected_result = (2, 3, 4, 5, 6) + # this is all in python... + actual_result = tuple([f.eval(x) for x in src_data]) + self.assertEqual(expected_result, actual_result) + + def test_ll_2(self): + f = my_add2_ll() + src_data = (0, 1, 2, 3, 4) + expected_result = (2, 3, 4, 5, 6) + # this is python -> C++ -> python and back again... + actual_result = tuple([gr.feval_ll_example(f, x) for x in src_data]) + self.assertEqual(expected_result, actual_result) + + + def test_cc_1(self): + f = my_add2_cc() + src_data = (0+1j, 2+3j, 4+5j, 6+7j) + expected_result = (2-1j, 4+1j, 6+3j, 8+5j) + # this is all in python... + actual_result = tuple([f.eval(x) for x in src_data]) + self.assertEqual(expected_result, actual_result) + + def test_cc_2(self): + f = my_add2_cc() + src_data = (0+1j, 2+3j, 4+5j, 6+7j) + expected_result = (2-1j, 4+1j, 6+3j, 8+5j) + # this is python -> C++ -> python and back again... + actual_result = tuple([gr.feval_cc_example(f, x) for x in src_data]) + self.assertEqual(expected_result, actual_result) + + def test_void_1(self): + # this is all in python + f = my_feval() + f.eval() + self.assertEqual(True, f.fired) + + def test_void_2(self): + # this is python -> C++ -> python and back again + f = my_feval() + gr.feval_example(f) + self.assertEqual(True, f.fired) + + +if __name__ == '__main__': + gr_unittest.run(test_feval, "test_feval.xml") diff --git a/gnuradio-runtime/python/gnuradio/gr/qa_kludged_imports.py b/gnuradio-runtime/python/gnuradio/gr/qa_kludged_imports.py new file mode 100755 index 0000000000..f80188c9fc --- /dev/null +++ b/gnuradio-runtime/python/gnuradio/gr/qa_kludged_imports.py @@ -0,0 +1,39 @@ +#!/usr/bin/env python +# +# Copyright 2005,2008,2010 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest + +class test_kludged_imports (gr_unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_gru_import(self): + # make sure that this somewhat magic import works + from gnuradio import gru + + +if __name__ == '__main__': + gr_unittest.run(test_kludged_imports, "test_kludged_imports.xml") diff --git a/gnuradio-runtime/python/gnuradio/gr/qa_tag_utils.py b/gnuradio-runtime/python/gnuradio/gr/qa_tag_utils.py new file mode 100755 index 0000000000..de1b5aa002 --- /dev/null +++ b/gnuradio-runtime/python/gnuradio/gr/qa_tag_utils.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python +# +# Copyright 2007,2010 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from gnuradio import gr, gr_unittest +import tag_utils + +try: + import pmt_swig as pmt +except ImportError: + import pmt + +class test_tag_utils (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + + def tearDown (self): + self.tb = None + + def test_001(self): + t = gr.gr_tag_t() + t.offset = 10 + t.key = pmt.string_to_symbol('key') + t.value = pmt.from_long(23) + t.srcid = pmt.from_bool(False) + pt = tag_utils.tag_to_python(t) + self.assertEqual(pt.key, 'key') + self.assertEqual(pt.value, 23) + self.assertEqual(pt.offset, 10) + + +if __name__ == '__main__': + print 'hi' + gr_unittest.run(test_tag_utils, "test_tag_utils.xml") + diff --git a/gnuradio-runtime/python/gnuradio/gr/tag_utils.py b/gnuradio-runtime/python/gnuradio/gr/tag_utils.py new file mode 100644 index 0000000000..1c9594d6d0 --- /dev/null +++ b/gnuradio-runtime/python/gnuradio/gr/tag_utils.py @@ -0,0 +1,57 @@ +# +# Copyright 2003-2012 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# +""" Conversion tools between stream tags and Python objects """ + +try: import pmt +except: from gruel import pmt + +try: + from gnuradio import gr +except ImportError: + from runtime_swig import gr_tag_t + +class PythonTag(object): + " Python container for tags " + def __init__(self): + self.offset = None + self.key = None + self.value = None + self.srcid = None + +def tag_to_python(tag): + """ Convert a stream tag to a Python-readable object """ + newtag = PythonTag() + newtag.offset = tag.offset + newtag.key = pmt.to_python(tag.key) + newtag.value = pmt.to_python(tag.value) + newtag.srcid = pmt.to_python(tag.srcid) + return newtag + +def tag_to_pmt(tag): + """ Convert a Python-readable object to a stream tag """ + newtag = gr_tag_t() + newtag.offset = tag.offset + newtag.key = pmt.to_python(tag.key) + newtag.value = pmt.from_python(tag.value) + newtag.srcid = pmt.from_python(tag.srcid) + return newtag + + diff --git a/gnuradio-runtime/python/gnuradio/gr/top_block.py b/gnuradio-runtime/python/gnuradio/gr/top_block.py new file mode 100644 index 0000000000..944e95e5ae --- /dev/null +++ b/gnuradio-runtime/python/gnuradio/gr/top_block.py @@ -0,0 +1,170 @@ +# +# Copyright 2007 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +from runtime_swig import top_block_swig, \ + top_block_wait_unlocked, top_block_run_unlocked + +#import gnuradio.gr.gr_threading as _threading +import gr_threading as _threading + +# +# There is no problem that can't be solved with an additional +# level of indirection... +# +# This kludge allows ^C to interrupt top_block.run and top_block.wait +# +# The problem that we are working around is that Python only services +# signals (e.g., KeyboardInterrupt) in its main thread. If the main +# thread is blocked in our C++ version of wait, even though Python's +# SIGINT handler fires, and even though there may be other python +# threads running, no one will know. Thus instead of directly waiting +# in the thread that calls wait (which is likely to be the Python main +# thread), we create a separate thread that does the blocking wait, +# and then use the thread that called wait to do a slow poll of an +# event queue. That thread, which is executing "wait" below is +# interruptable, and if it sees a KeyboardInterrupt, executes a stop +# on the top_block, then goes back to waiting for it to complete. +# This ensures that the unlocked wait that was in progress (in the +# _top_block_waiter thread) can complete, release its mutex and back +# out. If we don't do that, we are never able to clean up, and nasty +# things occur like leaving the USRP transmitter sending a carrier. +# +# See also top_block.wait (below), which uses this class to implement +# the interruptable wait. +# +class _top_block_waiter(_threading.Thread): + def __init__(self, tb): + _threading.Thread.__init__(self) + self.setDaemon(1) + self.tb = tb + self.event = _threading.Event() + self.start() + + def run(self): + top_block_wait_unlocked(self.tb) + self.event.set() + + def wait(self): + try: + while not self.event.isSet(): + self.event.wait(0.100) + except KeyboardInterrupt: + self.tb.stop() + self.wait() + + +# +# This hack forces a 'has-a' relationship to look like an 'is-a' one. +# +# It allows Python classes to subclass this one, while passing through +# method calls to the C++ class shared pointer from SWIG. +# +# It also allows us to intercept method calls if needed. +# +# This allows the 'run_locked' methods, which are defined in gr_top_block.i, +# to release the Python global interpreter lock before calling the actual +# method in gr_top_block +# +class top_block(object): + """ + Top-level hierarchical block representing a flow-graph. + + This is a python wrapper around the C++ implementation to allow + python subclassing. + """ + def __init__(self, name="top_block"): + self._tb = top_block_swig(name) + + def __getattr__(self, name): + if not hasattr(self, "_tb"): + raise RuntimeError("top_block: invalid state--did you forget to call gr.top_block.__init__ in a derived class?") + return getattr(self._tb, name) + + def start(self, max_noutput_items=10000000): + self._tb.start(max_noutput_items) + + def stop(self): + self._tb.stop() + + def run(self, max_noutput_items=10000000): + self.start(max_noutput_items) + self.wait() + + def wait(self): + _top_block_waiter(self._tb).wait() + + + # FIXME: these are duplicated from hier_block2.py; they should really be implemented + # in the original C++ class (gr_hier_block2), then they would all be inherited here + + def connect(self, *points): + '''connect requires one or more arguments that can be coerced to endpoints. + If more than two arguments are provided, they are connected together successively. + ''' + if len (points) < 1: + raise ValueError, ("connect requires at least one endpoint; %d provided." % (len (points),)) + else: + if len(points) == 1: + self._tb.primitive_connect(points[0].to_basic_block()) + else: + for i in range (1, len (points)): + self._connect(points[i-1], points[i]) + + def msg_connect(self, src, srcport, dst, dstport): + self.primitive_msg_connect(src.to_basic_block(), srcport, dst.to_basic_block(), dstport); + + def msg_disconnect(self, src, srcport, dst, dstport): + self.primitive_msg_disconnect(src.to_basic_block(), srcport, dst.to_basic_block(), dstport); + + def _connect(self, src, dst): + (src_block, src_port) = self._coerce_endpoint(src) + (dst_block, dst_port) = self._coerce_endpoint(dst) + self._tb.primitive_connect(src_block.to_basic_block(), src_port, + dst_block.to_basic_block(), dst_port) + + def _coerce_endpoint(self, endp): + if hasattr(endp, 'to_basic_block'): + return (endp, 0) + else: + if hasattr(endp, "__getitem__") and len(endp) == 2: + return endp # Assume user put (block, port) + else: + raise ValueError("unable to coerce endpoint") + + def disconnect(self, *points): + '''disconnect requires one or more arguments that can be coerced to endpoints. + If more than two arguments are provided, they are disconnected successively. + ''' + if len (points) < 1: + raise ValueError, ("disconnect requires at least one endpoint; %d provided." % (len (points),)) + else: + if len(points) == 1: + self._tb.primitive_disconnect(points[0].to_basic_block()) + else: + for i in range (1, len (points)): + self._disconnect(points[i-1], points[i]) + + def _disconnect(self, src, dst): + (src_block, src_port) = self._coerce_endpoint(src) + (dst_block, dst_port) = self._coerce_endpoint(dst) + self._tb.primitive_disconnect(src_block.to_basic_block(), src_port, + dst_block.to_basic_block(), dst_port) + |