summaryrefslogtreecommitdiff
path: root/gnuradio-core/src/python/gnuradio/gr
diff options
context:
space:
mode:
authorjcorgan <jcorgan@221aa14e-8319-0410-a670-987f0aec2ac5>2006-08-03 04:51:51 +0000
committerjcorgan <jcorgan@221aa14e-8319-0410-a670-987f0aec2ac5>2006-08-03 04:51:51 +0000
commit5d69a524f81f234b3fbc41d49ba18d6f6886baba (patch)
treeb71312bf7f1e8d10fef0f3ac6f28784065e73e72 /gnuradio-core/src/python/gnuradio/gr
Houston, we have a trunk.
git-svn-id: http://gnuradio.org/svn/gnuradio/trunk@3122 221aa14e-8319-0410-a670-987f0aec2ac5
Diffstat (limited to 'gnuradio-core/src/python/gnuradio/gr')
-rw-r--r--gnuradio-core/src/python/gnuradio/gr/Makefile.am77
-rw-r--r--gnuradio-core/src/python/gnuradio/gr/__init__.py40
-rw-r--r--gnuradio-core/src/python/gnuradio/gr/basic_flow_graph.py267
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/benchmark_filters.py55
-rw-r--r--gnuradio-core/src/python/gnuradio/gr/exceptions.py27
-rw-r--r--gnuradio-core/src/python/gnuradio/gr/flow_graph.py234
-rw-r--r--gnuradio-core/src/python/gnuradio/gr/gr_threading.py35
-rw-r--r--gnuradio-core/src/python/gnuradio/gr/gr_threading_23.py724
-rw-r--r--gnuradio-core/src/python/gnuradio/gr/gr_threading_24.py793
-rw-r--r--gnuradio-core/src/python/gnuradio/gr/hier_block.py132
-rw-r--r--gnuradio-core/src/python/gnuradio/gr/prefs.py129
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_add_and_friends.py129
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_add_v_and_friends.py353
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_basic_flow_graph.py190
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_cma_equalizer.py29
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_complex_to_xxx.py116
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_constellation_decoder_cb.py53
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_correlate_access_code.py83
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_diff_encoder.py86
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_diff_phasor_cc.py50
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_feval.py92
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_fft_filter.py268
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_filter_delay_fc.py317
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_flow_graph.py356
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_frequency_modulator.py56
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_fsk_stuff.py75
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_goertzel.py64
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_head.py47
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_hilbert.py116
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_iir.py135
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_interleave.py81
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_interp_fir_filter.py54
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_kludge_copy.py89
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_kludged_imports.py43
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_message.py117
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_mute.py89
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_nlog10.py47
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_packed_to_unpacked.py405
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_pipe_fittings.py143
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_rational_resampler.py290
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_sig_source.py85
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_single_pole_iir.py72
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_single_pole_iir_cc.py72
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/qa_unpack_k_bits.py57
-rwxr-xr-xgnuradio-core/src/python/gnuradio/gr/run_tests.in33
-rw-r--r--gnuradio-core/src/python/gnuradio/gr/scheduler.py70
46 files changed, 6875 insertions, 0 deletions
diff --git a/gnuradio-core/src/python/gnuradio/gr/Makefile.am b/gnuradio-core/src/python/gnuradio/gr/Makefile.am
new file mode 100644
index 0000000000..05dee29f08
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/Makefile.am
@@ -0,0 +1,77 @@
+#
+# Copyright 2004,2005,2006 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+include $(top_srcdir)/Makefile.common
+
+EXTRA_DIST = run_tests.in
+
+
+TESTS = \
+ run_tests
+
+
+grgrpythondir = $(grpythondir)/gr
+
+grgrpython_PYTHON = \
+ __init__.py \
+ basic_flow_graph.py \
+ exceptions.py \
+ flow_graph.py \
+ gr_threading.py \
+ gr_threading_23.py \
+ gr_threading_24.py \
+ hier_block.py \
+ prefs.py \
+ scheduler.py
+
+noinst_PYTHON = \
+ qa_add_and_friends.py \
+ qa_basic_flow_graph.py \
+ qa_complex_to_xxx.py \
+ qa_correlate_access_code.py \
+ qa_diff_encoder.py \
+ qa_diff_phasor_cc.py \
+ qa_feval.py \
+ qa_fft_filter.py \
+ qa_filter_delay_fc.py \
+ qa_flow_graph.py \
+ qa_frequency_modulator.py \
+ qa_fsk_stuff.py \
+ qa_head.py \
+ qa_hilbert.py \
+ qa_iir.py \
+ qa_interleave.py \
+ qa_interp_fir_filter.py \
+ qa_kludge_copy.py \
+ qa_kludged_imports.py \
+ qa_message.py \
+ qa_mute.py \
+ qa_nlog10.py \
+ qa_packed_to_unpacked.py \
+ qa_pipe_fittings.py \
+ qa_rational_resampler.py \
+ qa_sig_source.py \
+ qa_single_pole_iir.py \
+ qa_single_pole_iir_cc.py \
+ qa_unpack_k_bits.py
+
+
+CLEANFILES = *.pyc
diff --git a/gnuradio-core/src/python/gnuradio/gr/__init__.py b/gnuradio-core/src/python/gnuradio/gr/__init__.py
new file mode 100644
index 0000000000..5583c412a4
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/__init__.py
@@ -0,0 +1,40 @@
+#
+# Copyright 2003,2004,2006 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+# The presence of this file turns this directory into a Python package
+
+# This is the main GNU Radio python module.
+# We pull the swig output and the other modules into the gnuradio.gr namespace
+
+from gnuradio_swig_python import *
+from basic_flow_graph import *
+from flow_graph import *
+from exceptions import *
+from hier_block import *
+
+
+# create a couple of aliases
+serial_to_parallel = stream_to_vector
+parallel_to_serial = vector_to_stream
+
+# Force the preference database to be initialized
+from prefs import prefs
+
diff --git a/gnuradio-core/src/python/gnuradio/gr/basic_flow_graph.py b/gnuradio-core/src/python/gnuradio/gr/basic_flow_graph.py
new file mode 100644
index 0000000000..1afc96298a
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/basic_flow_graph.py
@@ -0,0 +1,267 @@
+#
+# Copyright 2004 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio_swig_python import gr_block_sptr
+import types
+import hier_block
+
+def remove_duplicates (seq):
+ new = []
+ for x in seq:
+ if not x in new:
+ new.append (x)
+ return new
+
+
+class endpoint (object):
+ __slots__ = ['block', 'port']
+ def __init__ (self, block, port):
+ self.block = block
+ self.port = port
+
+ def __cmp__ (self, other):
+ if self.block == other.block and self.port == other.port:
+ return 0
+ return 1
+
+ def __str__ (self):
+ return '<endpoint (%s, %s)>' % (self.block, self.port)
+
+def expand_src_endpoint (src_endpoint):
+ # A src_endpoint is an output of a block
+ src_endpoint = coerce_endpoint (src_endpoint)
+ if isinstance (src_endpoint.block, hier_block.hier_block_base):
+ return expand_src_endpoint (
+ coerce_endpoint (src_endpoint.block.resolve_output_port(src_endpoint.port)))
+ else:
+ return src_endpoint
+
+def expand_dst_endpoint (dst_endpoint):
+ # a dst_endpoint is the input to a block
+ dst_endpoint = coerce_endpoint (dst_endpoint)
+ if isinstance (dst_endpoint.block, hier_block.hier_block_base):
+ exp = [coerce_endpoint(x) for x in
+ dst_endpoint.block.resolve_input_port(dst_endpoint.port)]
+ return expand_dst_endpoints (exp)
+ else:
+ return [dst_endpoint]
+
+def expand_dst_endpoints (endpoint_list):
+ r = []
+ for e in endpoint_list:
+ r.extend (expand_dst_endpoint (e))
+ return r
+
+
+def coerce_endpoint (x):
+ if isinstance (x, endpoint):
+ return x
+ elif isinstance (x, types.TupleType) and len (x) == 2:
+ return endpoint (x[0], x[1])
+ elif hasattr (x, 'block'): # assume it's a block
+ return endpoint (x, 0)
+ elif isinstance(x, hier_block.hier_block_base):
+ return endpoint (x, 0)
+ else:
+ raise ValueError, "Not coercible to endpoint: %s" % (x,)
+
+
+class edge (object):
+ __slots__ = ['src', 'dst']
+ def __init__ (self, src_endpoint, dst_endpoint):
+ self.src = src_endpoint
+ self.dst = dst_endpoint
+
+ def __cmp__ (self, other):
+ if self.src == other.src and self.dst == other.dst:
+ return 0
+ return 1
+
+ def __repr__ (self):
+ return '<edge (%s, %s)>' % (self.src, self.dst)
+
+class basic_flow_graph (object):
+ '''basic_flow_graph -- describe connections between blocks'''
+ # __slots__ is incompatible with weakrefs (for some reason!)
+ # __slots__ = ['edge_list']
+ def __init__ (self):
+ self.edge_list = []
+
+ def connect (self, *points):
+ '''connect requires two or more arguments that can be coerced to endpoints.
+ If more than two arguments are provided, they are connected together successively.
+ '''
+ if len (points) < 2:
+ raise ValueError, ("connect requires at least two endpoints; %d provided." % (len (points),))
+ for i in range (1, len (points)):
+ self._connect (points[i-1], points[i])
+
+ def _connect (self, src_endpoint, dst_endpoint):
+ s = expand_src_endpoint (src_endpoint)
+ for d in expand_dst_endpoint (dst_endpoint):
+ self._connect_prim (s, d)
+
+ def _connect_prim (self, src_endpoint, dst_endpoint):
+ src_endpoint = coerce_endpoint (src_endpoint)
+ dst_endpoint = coerce_endpoint (dst_endpoint)
+ self._check_valid_src_port (src_endpoint)
+ self._check_valid_dst_port (dst_endpoint)
+ self._check_dst_in_use (dst_endpoint)
+ self._check_type_match (src_endpoint, dst_endpoint)
+ self.edge_list.append (edge (src_endpoint, dst_endpoint))
+
+ def disconnect (self, src_endpoint, dst_endpoint):
+ s = expand_src_endpoint (src_endpoint)
+ for d in expand_dst_endpoint (dst_endpoint):
+ self._disconnect_prim (s, d)
+
+ def _disconnect_prim (self, src_endpoint, dst_endpoint):
+ src_endpoint = coerce_endpoint (src_endpoint)
+ dst_endpoint = coerce_endpoint (dst_endpoint)
+ e = edge (src_endpoint, dst_endpoint)
+ self.edge_list.remove (e)
+
+ def disconnect_all (self):
+ self.edge_list = []
+
+ def validate (self):
+ # check all blocks to ensure:
+ # (1a) their input ports are contiguously assigned
+ # (1b) the number of input ports is between min and max
+ # (2a) their output ports are contiguously assigned
+ # (2b) the number of output ports is between min and max
+ # (3) check_topology returns true
+
+ for m in self.all_blocks ():
+ # print m
+
+ edges = self.in_edges (m)
+ used_ports = [e.dst.port for e in edges]
+ ninputs = self._check_contiguity (m, m.input_signature (), used_ports, "input")
+
+ edges = self.out_edges (m)
+ used_ports = [e.src.port for e in edges]
+ noutputs = self._check_contiguity (m, m.output_signature (), used_ports, "output")
+
+ if not m.check_topology (ninputs, noutputs):
+ raise ValueError, ("%s::check_topology (%d, %d) failed" % (m, ninputs, noutputs))
+
+
+ # --- public utilities ---
+
+ def all_blocks (self):
+ '''return list of all blocks in the graph'''
+ all_blocks = []
+ for edge in self.edge_list:
+ m = edge.src.block
+ if not m in all_blocks:
+ all_blocks.append (m)
+ m = edge.dst.block
+ if not m in all_blocks:
+ all_blocks.append (m)
+ return all_blocks
+
+ def in_edges (self, m):
+ '''return list of all edges that have M as a destination'''
+ return [e for e in self.edge_list if e.dst.block == m]
+
+ def out_edges (self, m):
+ '''return list of all edges that have M as a source'''
+ return [e for e in self.edge_list if e.src.block == m]
+
+ def downstream_verticies (self, m):
+ return [e.dst.block for e in self.out_edges (m)]
+
+ def downstream_verticies_port (self, m, port):
+ return [e.dst.block for e in self.out_edges(m) if e.src.port == port]
+
+ def upstream_verticies (self, m):
+ return [e.src.block for e in self.in_edges (m)]
+
+ def adjacent_verticies (self, m):
+ '''return list of all verticies adjacent to M'''
+ return self.downstream_verticies (m) + self.upstream_verticies (m)
+
+ def sink_p (self, m):
+ '''return True iff this block is a sink'''
+ e = self.out_edges (m)
+ return len (e) == 0
+
+ def source_p (self, m):
+ '''return True iff this block is a source'''
+ e = self.in_edges (m)
+ return len (e) == 0
+
+ # --- internal methods ---
+
+ def _check_dst_in_use (self, dst_endpoint):
+ '''Ensure that there is not already an endpoint that terminates at dst_endpoint.'''
+ x = [ep for ep in self.edge_list if ep.dst == dst_endpoint]
+ if x: # already in use
+ raise ValueError, ("destination endpoint already in use: %s" % (dst_endpoint))
+
+ def _check_valid_src_port (self, src_endpoint):
+ self._check_port (src_endpoint.block.output_signature(), src_endpoint.port)
+
+ def _check_valid_dst_port (self, dst_endpoint):
+ self._check_port (dst_endpoint.block.input_signature(), dst_endpoint.port)
+
+ def _check_port (self, signature, port):
+ if port < 0:
+ raise ValueError, 'port number out of range.'
+ if signature.max_streams () == -1: # infinite
+ return # OK
+ if port >= signature.max_streams ():
+ raise ValueError, 'port number out of range.'
+
+ def _check_type_match (self, src_endpoint, dst_endpoint):
+ # for now, we just ensure that the stream item sizes match
+ src_sig = src_endpoint.block.output_signature ()
+ dst_sig = dst_endpoint.block.input_signature ()
+ src_size = src_sig.sizeof_stream_item (src_endpoint.port)
+ dst_size = dst_sig.sizeof_stream_item (dst_endpoint.port)
+ if src_size != dst_size:
+ raise ValueError, 'source and destination data sizes are different'
+
+ def _check_contiguity (self, m, sig, used_ports, dir):
+ used_ports.sort ()
+ used_ports = remove_duplicates (used_ports)
+ min_s = sig.min_streams ()
+
+ l = len (used_ports)
+ if l == 0:
+ if min_s == 0:
+ return l
+ raise ValueError, ("%s requires %d %s connections. It has none." %
+ (m, min_s, dir))
+
+ if used_ports[-1] + 1 < min_s:
+ raise ValueError, ("%s requires %d %s connections. It has %d." %
+ (m, min_s, dir, used_ports[-1] + 1))
+
+ if used_ports[-1] + 1 != l:
+ for i in range (l):
+ if used_ports[i] != i:
+ raise ValueError, ("%s %s port %d is not connected" %
+ (m, dir, i))
+
+ # print "%s ports: %s" % (dir, used_ports)
+ return l
diff --git a/gnuradio-core/src/python/gnuradio/gr/benchmark_filters.py b/gnuradio-core/src/python/gnuradio/gr/benchmark_filters.py
new file mode 100755
index 0000000000..7b2f44c445
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/benchmark_filters.py
@@ -0,0 +1,55 @@
+#!/usr/bin/env python
+
+import time
+import random
+from optparse import OptionParser
+from gnuradio import gr
+from gnuradio.eng_option import eng_option
+
+def make_random_complex_tuple(L):
+ result = []
+ for x in range(L):
+ result.append(complex(random.uniform(-1000,1000),
+ random.uniform(-1000,1000)))
+ return tuple(result)
+
+def benchmark(name, creator, dec, ntaps, total_test_size, block_size):
+ block_size = 32768
+
+ fg = gr.flow_graph()
+ taps = make_random_complex_tuple(ntaps)
+ src = gr.vector_source_c(make_random_complex_tuple(block_size), True)
+ head = gr.head(gr.sizeof_gr_complex, int(total_test_size))
+ op = creator(dec, taps)
+ dst = gr.null_sink(gr.sizeof_gr_complex)
+ fg.connect(src, head, op, dst)
+ start = time.time()
+ fg.run()
+ stop = time.time()
+ delta = stop - start
+ print "%16s: taps: %4d input: %4g, time: %6.3f taps/sec: %10.4g" % (
+ name, ntaps, total_test_size, delta, ntaps*total_test_size/delta)
+
+def main():
+ parser = OptionParser(option_class=eng_option)
+ parser.add_option("-n", "--ntaps", type="int", default=256)
+ parser.add_option("-t", "--total-input-size", type="eng_float", default=40e6)
+ parser.add_option("-b", "--block-size", type="intx", default=50000)
+ parser.add_option("-d", "--decimation", type="int", default=1)
+ (options, args) = parser.parse_args()
+ if len(args) != 0:
+ parser.print_help()
+ sys.exit(1)
+
+ ntaps = options.ntaps
+ total_input_size = options.total_input_size
+ block_size = options.block_size
+ dec = options.decimation
+
+ benchmark("gr.fir_filter_ccc", gr.fir_filter_ccc,
+ dec, ntaps, total_input_size, block_size)
+ benchmark("gr.fft_filter_ccc", gr.fft_filter_ccc,
+ dec, ntaps, total_input_size, block_size)
+
+if __name__ == '__main__':
+ main()
diff --git a/gnuradio-core/src/python/gnuradio/gr/exceptions.py b/gnuradio-core/src/python/gnuradio/gr/exceptions.py
new file mode 100644
index 0000000000..0cbeb143a4
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/exceptions.py
@@ -0,0 +1,27 @@
+#
+# Copyright 2004 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+
+class NotDAG (Exception):
+ """Not a directed acyclic graph"""
+ pass
+
+class CantHappen (Exception):
+ """Can't happen"""
+ pass
diff --git a/gnuradio-core/src/python/gnuradio/gr/flow_graph.py b/gnuradio-core/src/python/gnuradio/gr/flow_graph.py
new file mode 100644
index 0000000000..db9c58768a
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/flow_graph.py
@@ -0,0 +1,234 @@
+#
+# Copyright 2004,2006 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio.gr.basic_flow_graph import remove_duplicates, endpoint, edge, \
+ basic_flow_graph
+
+from gnuradio.gr.exceptions import *
+from gnuradio_swig_python import buffer, buffer_add_reader, block_detail, \
+ single_threaded_scheduler
+
+from gnuradio.gr.scheduler import scheduler
+
+_WHITE = 0 # graph coloring tags
+_GRAY = 1
+_BLACK = 2
+
+_flow_graph_debug = False
+
+def set_flow_graph_debug(on):
+ global _flow_graph_debug
+ _flow_graph_debug = on
+
+
+class buffer_sizes (object):
+ """compute buffer sizes to use"""
+ def __init__ (self, flow_graph):
+ # We could scan the graph here and determine individual sizes
+ # based on relative_rate, number of readers, etc.
+
+ # The simplest thing that could possibly work: all buffers same size
+ self.flow_graph = flow_graph
+ self.fixed_buffer_size = 32*1024
+
+ def allocate (self, m, index):
+ """allocate buffer for output index of block m"""
+ item_size = m.output_signature().sizeof_stream_item (index)
+ nitems = self.fixed_buffer_size / item_size
+ if nitems < 2 * m.output_multiple ():
+ nitems = 2 * m.output_multiple ()
+
+ # if any downstream blocks is a decimator and/or has a large output_multiple,
+ # ensure that we have a buffer at least 2 * their decimation_factor*output_multiple
+ for mdown in self.flow_graph.downstream_verticies_port(m, index):
+ decimation = int(1.0 / mdown.relative_rate())
+ nitems = max(nitems, 2 * (decimation * mdown.output_multiple() + mdown.history()))
+
+ return buffer (nitems, item_size)
+
+
+class flow_graph (basic_flow_graph):
+ """add physical connection info to simple_flow_graph
+ """
+ # __slots__ is incompatible with weakrefs (for some reason!)
+ # __slots__ = ['blocks', 'scheduler']
+
+ def __init__ (self):
+ basic_flow_graph.__init__ (self);
+ self.blocks = None
+ self.scheduler = None
+
+ def __del__(self):
+ # print "\nflow_graph.__del__"
+ # this ensures that i/o devices such as the USRP get shutdown gracefully
+ self.stop()
+
+ def start (self):
+ '''start graph, forking thread(s), return immediately'''
+ if self.scheduler:
+ raise RuntimeError, "Scheduler already running"
+ self._setup_connections ()
+
+ # cast down to gr_module_sptr
+ # t = [x.block () for x in self.topological_sort (self.blocks)]
+ self.scheduler = scheduler (self)
+ self.scheduler.start ()
+
+ def stop (self):
+ '''tells scheduler to stop and waits for it to happen'''
+ if self.scheduler:
+ self.scheduler.stop ()
+ self.scheduler = None
+
+ def wait (self):
+ '''waits for scheduler to stop'''
+ if self.scheduler:
+ self.scheduler.wait ()
+ self.scheduler = None
+
+ def is_running (self):
+ return not not self.scheduler
+
+ def run (self):
+ '''start graph, wait for completion'''
+ self.start ()
+ self.wait ()
+
+ def _setup_connections (self):
+ """given the basic flow graph, setup all the physical connections"""
+ self.validate ()
+ self.blocks = self.all_blocks ()
+ self._assign_details ()
+ self._assign_buffers ()
+ self._connect_inputs ()
+
+ def _assign_details (self):
+ for m in self.blocks:
+ edges = self.in_edges (m)
+ used_ports = remove_duplicates ([e.dst.port for e in edges])
+ ninputs = len (used_ports)
+
+ edges = self.out_edges (m)
+ used_ports = remove_duplicates ([e.src.port for e in edges])
+ noutputs = len (used_ports)
+
+ m.set_detail (block_detail (ninputs, noutputs))
+
+ def _assign_buffers (self):
+ """determine the buffer sizes to use, allocate them and attach to detail"""
+ sizes = buffer_sizes (self)
+ for m in self.blocks:
+ d = m.detail ()
+ for index in range (d.noutputs ()):
+ d.set_output (index, sizes.allocate (m, index))
+
+ def _connect_inputs (self):
+ """connect all block inputs to appropriate upstream buffers"""
+ for m in self.blocks:
+ d = m.detail ()
+ # print "%r history = %d" % (m, m.history())
+ for e in self.in_edges(m):
+ # FYI, sources don't have any in_edges
+ our_port = e.dst.port
+ upstream_block = e.src.block
+ upstream_port = e.src.port
+ upstream_buffer = upstream_block.detail().output(upstream_port)
+ d.set_input(our_port, buffer_add_reader(upstream_buffer, m.history()))
+
+
+ def topological_sort (self, all_v):
+ '''
+ Return a topologically sorted list of vertices.
+ This is basically a depth-first search with checks
+ for back edges (the non-DAG condition)
+
+ '''
+
+ # it's correct without this sort, but this
+ # should give better ordering for cache utilization
+ all_v = self._sort_sources_first (all_v)
+
+ output = []
+ for v in all_v:
+ v.ts_color = _WHITE
+ for v in all_v:
+ if v.ts_color == _WHITE:
+ self._dfs_visit (v, output)
+ output.reverse ()
+ return output
+
+ def _dfs_visit (self, u, output):
+ # print "dfs_visit (enter): ", u
+ u.ts_color = _GRAY
+ for v in self.downstream_verticies (u):
+ if v.ts_color == _WHITE: # (u, v) is a tree edge
+ self._dfs_visit (v, output)
+ elif v.ts_color == _GRAY: # (u, v) is a back edge
+ raise NotDAG, "The graph is not an acyclic graph (It's got a loop)"
+ elif v.ts_color == _BLACK: # (u, v) is a cross or forward edge
+ pass
+ else:
+ raise CantHappen, "Invalid color on vertex"
+ u.ts_color = _BLACK
+ output.append (u)
+ # print "dfs_visit (exit): ", u, output
+
+ def _sort_sources_first (self, all_v):
+ # the sort function is not guaranteed to be stable.
+ # We add the unique_id in to the key so we're guaranteed
+ # of reproducible results. This is important for the test
+ # code. There is often more than one valid topological sort.
+ # We want to force a reproducible choice.
+ x = [(not self.source_p(v), v.unique_id(), v) for v in all_v]
+ x.sort ()
+ x = [v[2] for v in x]
+ # print "sorted: ", x
+ return x
+
+ def partition_graph (self, all_v):
+ '''Return a list of lists of nodes that are connected.
+ The result is a list of disjoint graphs.
+ The sublists are topologically sorted.
+ '''
+ result = []
+ working_v = all_v[:] # make copy
+ while working_v:
+ rv = self._reachable_verticies (working_v[0], working_v)
+ result.append (self.topological_sort (rv))
+ for v in rv:
+ working_v.remove (v)
+ if _flow_graph_debug:
+ print "partition_graph:", result
+ return result
+
+ def _reachable_verticies (self, start, all_v):
+ for v in all_v:
+ v.ts_color = _WHITE
+
+ self._reachable_dfs_visit (start)
+ return [v for v in all_v if v.ts_color == _BLACK]
+
+ def _reachable_dfs_visit (self, u):
+ u.ts_color = _BLACK
+ for v in self.adjacent_verticies (u):
+ if v.ts_color == _WHITE:
+ self._reachable_dfs_visit (v)
+ return None
diff --git a/gnuradio-core/src/python/gnuradio/gr/gr_threading.py b/gnuradio-core/src/python/gnuradio/gr/gr_threading.py
new file mode 100644
index 0000000000..6a09a3239f
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/gr_threading.py
@@ -0,0 +1,35 @@
+#
+# Copyright 2005 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from sys import version_info as _version_info
+
+# import patched version of standard threading module
+
+if _version_info[0:2] == (2, 3):
+ #print "Importing gr_threading_23"
+ from gr_threading_23 import *
+elif _version_info[0:2] == (2, 4):
+ #print "Importing gr_threading_24"
+ from gr_threading_24 import *
+else:
+ # assume the patch was applied...
+ #print "Importing system provided threading"
+ from threading import *
diff --git a/gnuradio-core/src/python/gnuradio/gr/gr_threading_23.py b/gnuradio-core/src/python/gnuradio/gr/gr_threading_23.py
new file mode 100644
index 0000000000..dee8034c1c
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/gr_threading_23.py
@@ -0,0 +1,724 @@
+"""Thread module emulating a subset of Java's threading model."""
+
+# This started life as the threading.py module of Python 2.3
+# It's been patched to fix a problem with join, where a KeyboardInterrupt
+# caused a lock to be left in the acquired state.
+
+import sys as _sys
+
+try:
+ import thread
+except ImportError:
+ del _sys.modules[__name__]
+ raise
+
+from StringIO import StringIO as _StringIO
+from time import time as _time, sleep as _sleep
+from traceback import print_exc as _print_exc
+
+# Rename some stuff so "from threading import *" is safe
+__all__ = ['activeCount', 'Condition', 'currentThread', 'enumerate', 'Event',
+ 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
+ 'Timer', 'setprofile', 'settrace']
+
+_start_new_thread = thread.start_new_thread
+_allocate_lock = thread.allocate_lock
+_get_ident = thread.get_ident
+ThreadError = thread.error
+del thread
+
+
+# Debug support (adapted from ihooks.py).
+# All the major classes here derive from _Verbose. We force that to
+# be a new-style class so that all the major classes here are new-style.
+# This helps debugging (type(instance) is more revealing for instances
+# of new-style classes).
+
+_VERBOSE = False
+
+if __debug__:
+
+ class _Verbose(object):
+
+ def __init__(self, verbose=None):
+ if verbose is None:
+ verbose = _VERBOSE
+ self.__verbose = verbose
+
+ def _note(self, format, *args):
+ if self.__verbose:
+ format = format % args
+ format = "%s: %s\n" % (
+ currentThread().getName(), format)
+ _sys.stderr.write(format)
+
+else:
+ # Disable this when using "python -O"
+ class _Verbose(object):
+ def __init__(self, verbose=None):
+ pass
+ def _note(self, *args):
+ pass
+
+# Support for profile and trace hooks
+
+_profile_hook = None
+_trace_hook = None
+
+def setprofile(func):
+ global _profile_hook
+ _profile_hook = func
+
+def settrace(func):
+ global _trace_hook
+ _trace_hook = func
+
+# Synchronization classes
+
+Lock = _allocate_lock
+
+def RLock(*args, **kwargs):
+ return _RLock(*args, **kwargs)
+
+class _RLock(_Verbose):
+
+ def __init__(self, verbose=None):
+ _Verbose.__init__(self, verbose)
+ self.__block = _allocate_lock()
+ self.__owner = None
+ self.__count = 0
+
+ def __repr__(self):
+ return "<%s(%s, %d)>" % (
+ self.__class__.__name__,
+ self.__owner and self.__owner.getName(),
+ self.__count)
+
+ def acquire(self, blocking=1):
+ me = currentThread()
+ if self.__owner is me:
+ self.__count = self.__count + 1
+ if __debug__:
+ self._note("%s.acquire(%s): recursive success", self, blocking)
+ return 1
+ rc = self.__block.acquire(blocking)
+ if rc:
+ self.__owner = me
+ self.__count = 1
+ if __debug__:
+ self._note("%s.acquire(%s): initial succes", self, blocking)
+ else:
+ if __debug__:
+ self._note("%s.acquire(%s): failure", self, blocking)
+ return rc
+
+ def release(self):
+ me = currentThread()
+ assert self.__owner is me, "release() of un-acquire()d lock"
+ self.__count = count = self.__count - 1
+ if not count:
+ self.__owner = None
+ self.__block.release()
+ if __debug__:
+ self._note("%s.release(): final release", self)
+ else:
+ if __debug__:
+ self._note("%s.release(): non-final release", self)
+
+ # Internal methods used by condition variables
+
+ def _acquire_restore(self, (count, owner)):
+ self.__block.acquire()
+ self.__count = count
+ self.__owner = owner
+ if __debug__:
+ self._note("%s._acquire_restore()", self)
+
+ def _release_save(self):
+ if __debug__:
+ self._note("%s._release_save()", self)
+ count = self.__count
+ self.__count = 0
+ owner = self.__owner
+ self.__owner = None
+ self.__block.release()
+ return (count, owner)
+
+ def _is_owned(self):
+ return self.__owner is currentThread()
+
+
+def Condition(*args, **kwargs):
+ return _Condition(*args, **kwargs)
+
+class _Condition(_Verbose):
+
+ def __init__(self, lock=None, verbose=None):
+ _Verbose.__init__(self, verbose)
+ if lock is None:
+ lock = RLock()
+ self.__lock = lock
+ # Export the lock's acquire() and release() methods
+ self.acquire = lock.acquire
+ self.release = lock.release
+ # If the lock defines _release_save() and/or _acquire_restore(),
+ # these override the default implementations (which just call
+ # release() and acquire() on the lock). Ditto for _is_owned().
+ try:
+ self._release_save = lock._release_save
+ except AttributeError:
+ pass
+ try:
+ self._acquire_restore = lock._acquire_restore
+ except AttributeError:
+ pass
+ try:
+ self._is_owned = lock._is_owned
+ except AttributeError:
+ pass
+ self.__waiters = []
+
+ def __repr__(self):
+ return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
+
+ def _release_save(self):
+ self.__lock.release() # No state to save
+
+ def _acquire_restore(self, x):
+ self.__lock.acquire() # Ignore saved state
+
+ def _is_owned(self):
+ # Return True if lock is owned by currentThread.
+ # This method is called only if __lock doesn't have _is_owned().
+ if self.__lock.acquire(0):
+ self.__lock.release()
+ return False
+ else:
+ return True
+
+ def wait(self, timeout=None):
+ currentThread() # for side-effect
+ assert self._is_owned(), "wait() of un-acquire()d lock"
+ waiter = _allocate_lock()
+ waiter.acquire()
+ self.__waiters.append(waiter)
+ saved_state = self._release_save()
+ try: # restore state no matter what (e.g., KeyboardInterrupt)
+ if timeout is None:
+ waiter.acquire()
+ if __debug__:
+ self._note("%s.wait(): got it", self)
+ else:
+ # Balancing act: We can't afford a pure busy loop, so we
+ # have to sleep; but if we sleep the whole timeout time,
+ # we'll be unresponsive. The scheme here sleeps very
+ # little at first, longer as time goes on, but never longer
+ # than 20 times per second (or the timeout time remaining).
+ endtime = _time() + timeout
+ delay = 0.0005 # 500 us -> initial delay of 1 ms
+ while True:
+ gotit = waiter.acquire(0)
+ if gotit:
+ break
+ remaining = endtime - _time()
+ if remaining <= 0:
+ break
+ delay = min(delay * 2, remaining, .05)
+ _sleep(delay)
+ if not gotit:
+ if __debug__:
+ self._note("%s.wait(%s): timed out", self, timeout)
+ try:
+ self.__waiters.remove(waiter)
+ except ValueError:
+ pass
+ else:
+ if __debug__:
+ self._note("%s.wait(%s): got it", self, timeout)
+ finally:
+ self._acquire_restore(saved_state)
+
+ def notify(self, n=1):
+ currentThread() # for side-effect
+ assert self._is_owned(), "notify() of un-acquire()d lock"
+ __waiters = self.__waiters
+ waiters = __waiters[:n]
+ if not waiters:
+ if __debug__:
+ self._note("%s.notify(): no waiters", self)
+ return
+ self._note("%s.notify(): notifying %d waiter%s", self, n,
+ n!=1 and "s" or "")
+ for waiter in waiters:
+ waiter.release()
+ try:
+ __waiters.remove(waiter)
+ except ValueError:
+ pass
+
+ def notifyAll(self):
+ self.notify(len(self.__waiters))
+
+
+def Semaphore(*args, **kwargs):
+ return _Semaphore(*args, **kwargs)
+
+class _Semaphore(_Verbose):
+
+ # After Tim Peters' semaphore class, but not quite the same (no maximum)
+
+ def __init__(self, value=1, verbose=None):
+ assert value >= 0, "Semaphore initial value must be >= 0"
+ _Verbose.__init__(self, verbose)
+ self.__cond = Condition(Lock())
+ self.__value = value
+
+ def acquire(self, blocking=1):
+ rc = False
+ self.__cond.acquire()
+ while self.__value == 0:
+ if not blocking:
+ break
+ if __debug__:
+ self._note("%s.acquire(%s): blocked waiting, value=%s",
+ self, blocking, self.__value)
+ self.__cond.wait()
+ else:
+ self.__value = self.__value - 1
+ if __debug__:
+ self._note("%s.acquire: success, value=%s",
+ self, self.__value)
+ rc = True
+ self.__cond.release()
+ return rc
+
+ def release(self):
+ self.__cond.acquire()
+ self.__value = self.__value + 1
+ if __debug__:
+ self._note("%s.release: success, value=%s",
+ self, self.__value)
+ self.__cond.notify()
+ self.__cond.release()
+
+
+def BoundedSemaphore(*args, **kwargs):
+ return _BoundedSemaphore(*args, **kwargs)
+
+class _BoundedSemaphore(_Semaphore):
+ """Semaphore that checks that # releases is <= # acquires"""
+ def __init__(self, value=1, verbose=None):
+ _Semaphore.__init__(self, value, verbose)
+ self._initial_value = value
+
+ def release(self):
+ if self._Semaphore__value >= self._initial_value:
+ raise ValueError, "Semaphore released too many times"
+ return _Semaphore.release(self)
+
+
+def Event(*args, **kwargs):
+ return _Event(*args, **kwargs)
+
+class _Event(_Verbose):
+
+ # After Tim Peters' event class (without is_posted())
+
+ def __init__(self, verbose=None):
+ _Verbose.__init__(self, verbose)
+ self.__cond = Condition(Lock())
+ self.__flag = False
+
+ def isSet(self):
+ return self.__flag
+
+ def set(self):
+ self.__cond.acquire()
+ try:
+ self.__flag = True
+ self.__cond.notifyAll()
+ finally:
+ self.__cond.release()
+
+ def clear(self):
+ self.__cond.acquire()
+ try:
+ self.__flag = False
+ finally:
+ self.__cond.release()
+
+ def wait(self, timeout=None):
+ self.__cond.acquire()
+ try:
+ if not self.__flag:
+ self.__cond.wait(timeout)
+ finally:
+ self.__cond.release()
+
+# Helper to generate new thread names
+_counter = 0
+def _newname(template="Thread-%d"):
+ global _counter
+ _counter = _counter + 1
+ return template % _counter
+
+# Active thread administration
+_active_limbo_lock = _allocate_lock()
+_active = {}
+_limbo = {}
+
+
+# Main class for threads
+
+class Thread(_Verbose):
+
+ __initialized = False
+
+ def __init__(self, group=None, target=None, name=None,
+ args=(), kwargs={}, verbose=None):
+ assert group is None, "group argument must be None for now"
+ _Verbose.__init__(self, verbose)
+ self.__target = target
+ self.__name = str(name or _newname())
+ self.__args = args
+ self.__kwargs = kwargs
+ self.__daemonic = self._set_daemon()
+ self.__started = False
+ self.__stopped = False
+ self.__block = Condition(Lock())
+ self.__initialized = True
+
+ def _set_daemon(self):
+ # Overridden in _MainThread and _DummyThread
+ return currentThread().isDaemon()
+
+ def __repr__(self):
+ assert self.__initialized, "Thread.__init__() was not called"
+ status = "initial"
+ if self.__started:
+ status = "started"
+ if self.__stopped:
+ status = "stopped"
+ if self.__daemonic:
+ status = status + " daemon"
+ return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
+
+ def start(self):
+ assert self.__initialized, "Thread.__init__() not called"
+ assert not self.__started, "thread already started"
+ if __debug__:
+ self._note("%s.start(): starting thread", self)
+ _active_limbo_lock.acquire()
+ _limbo[self] = self
+ _active_limbo_lock.release()
+ _start_new_thread(self.__bootstrap, ())
+ self.__started = True
+ _sleep(0.000001) # 1 usec, to let the thread run (Solaris hack)
+
+ def run(self):
+ if self.__target:
+ self.__target(*self.__args, **self.__kwargs)
+
+ def __bootstrap(self):
+ try:
+ self.__started = True
+ _active_limbo_lock.acquire()
+ _active[_get_ident()] = self
+ del _limbo[self]
+ _active_limbo_lock.release()
+ if __debug__:
+ self._note("%s.__bootstrap(): thread started", self)
+
+ if _trace_hook:
+ self._note("%s.__bootstrap(): registering trace hook", self)
+ _sys.settrace(_trace_hook)
+ if _profile_hook:
+ self._note("%s.__bootstrap(): registering profile hook", self)
+ _sys.setprofile(_profile_hook)
+
+ try:
+ self.run()
+ except SystemExit:
+ if __debug__:
+ self._note("%s.__bootstrap(): raised SystemExit", self)
+ except:
+ if __debug__:
+ self._note("%s.__bootstrap(): unhandled exception", self)
+ s = _StringIO()
+ _print_exc(file=s)
+ _sys.stderr.write("Exception in thread %s:\n%s\n" %
+ (self.getName(), s.getvalue()))
+ else:
+ if __debug__:
+ self._note("%s.__bootstrap(): normal return", self)
+ finally:
+ self.__stop()
+ try:
+ self.__delete()
+ except:
+ pass
+
+ def __stop(self):
+ self.__block.acquire()
+ self.__stopped = True
+ self.__block.notifyAll()
+ self.__block.release()
+
+ def __delete(self):
+ _active_limbo_lock.acquire()
+ del _active[_get_ident()]
+ _active_limbo_lock.release()
+
+ def join(self, timeout=None):
+ assert self.__initialized, "Thread.__init__() not called"
+ assert self.__started, "cannot join thread before it is started"
+ assert self is not currentThread(), "cannot join current thread"
+ if __debug__:
+ if not self.__stopped:
+ self._note("%s.join(): waiting until thread stops", self)
+ self.__block.acquire()
+ try:
+ if timeout is None:
+ while not self.__stopped:
+ self.__block.wait()
+ if __debug__:
+ self._note("%s.join(): thread stopped", self)
+ else:
+ deadline = _time() + timeout
+ while not self.__stopped:
+ delay = deadline - _time()
+ if delay <= 0:
+ if __debug__:
+ self._note("%s.join(): timed out", self)
+ break
+ self.__block.wait(delay)
+ else:
+ if __debug__:
+ self._note("%s.join(): thread stopped", self)
+ finally:
+ self.__block.release()
+
+ def getName(self):
+ assert self.__initialized, "Thread.__init__() not called"
+ return self.__name
+
+ def setName(self, name):
+ assert self.__initialized, "Thread.__init__() not called"
+ self.__name = str(name)
+
+ def isAlive(self):
+ assert self.__initialized, "Thread.__init__() not called"
+ return self.__started and not self.__stopped
+
+ def isDaemon(self):
+ assert self.__initialized, "Thread.__init__() not called"
+ return self.__daemonic
+
+ def setDaemon(self, daemonic):
+ assert self.__initialized, "Thread.__init__() not called"
+ assert not self.__started, "cannot set daemon status of active thread"
+ self.__daemonic = daemonic
+
+# The timer class was contributed by Itamar Shtull-Trauring
+
+def Timer(*args, **kwargs):
+ return _Timer(*args, **kwargs)
+
+class _Timer(Thread):
+ """Call a function after a specified number of seconds:
+
+ t = Timer(30.0, f, args=[], kwargs={})
+ t.start()
+ t.cancel() # stop the timer's action if it's still waiting
+ """
+
+ def __init__(self, interval, function, args=[], kwargs={}):
+ Thread.__init__(self)
+ self.interval = interval
+ self.function = function
+ self.args = args
+ self.kwargs = kwargs
+ self.finished = Event()
+
+ def cancel(self):
+ """Stop the timer if it hasn't finished yet"""
+ self.finished.set()
+
+ def run(self):
+ self.finished.wait(self.interval)
+ if not self.finished.isSet():
+ self.function(*self.args, **self.kwargs)
+ self.finished.set()
+
+# Special thread class to represent the main thread
+# This is garbage collected through an exit handler
+
+class _MainThread(Thread):
+
+ def __init__(self):
+ Thread.__init__(self, name="MainThread")
+ self._Thread__started = True
+ _active_limbo_lock.acquire()
+ _active[_get_ident()] = self
+ _active_limbo_lock.release()
+ import atexit
+ atexit.register(self.__exitfunc)
+
+ def _set_daemon(self):
+ return False
+
+ def __exitfunc(self):
+ self._Thread__stop()
+ t = _pickSomeNonDaemonThread()
+ if t:
+ if __debug__:
+ self._note("%s: waiting for other threads", self)
+ while t:
+ t.join()
+ t = _pickSomeNonDaemonThread()
+ if __debug__:
+ self._note("%s: exiting", self)
+ self._Thread__delete()
+
+def _pickSomeNonDaemonThread():
+ for t in enumerate():
+ if not t.isDaemon() and t.isAlive():
+ return t
+ return None
+
+
+# Dummy thread class to represent threads not started here.
+# These aren't garbage collected when they die,
+# nor can they be waited for.
+# Their purpose is to return *something* from currentThread().
+# They are marked as daemon threads so we won't wait for them
+# when we exit (conform previous semantics).
+
+class _DummyThread(Thread):
+
+ def __init__(self):
+ Thread.__init__(self, name=_newname("Dummy-%d"))
+ self._Thread__started = True
+ _active_limbo_lock.acquire()
+ _active[_get_ident()] = self
+ _active_limbo_lock.release()
+
+ def _set_daemon(self):
+ return True
+
+ def join(self, timeout=None):
+ assert False, "cannot join a dummy thread"
+
+
+# Global API functions
+
+def currentThread():
+ try:
+ return _active[_get_ident()]
+ except KeyError:
+ ##print "currentThread(): no current thread for", _get_ident()
+ return _DummyThread()
+
+def activeCount():
+ _active_limbo_lock.acquire()
+ count = len(_active) + len(_limbo)
+ _active_limbo_lock.release()
+ return count
+
+def enumerate():
+ _active_limbo_lock.acquire()
+ active = _active.values() + _limbo.values()
+ _active_limbo_lock.release()
+ return active
+
+# Create the main thread object
+
+_MainThread()
+
+
+# Self-test code
+
+def _test():
+
+ class BoundedQueue(_Verbose):
+
+ def __init__(self, limit):
+ _Verbose.__init__(self)
+ self.mon = RLock()
+ self.rc = Condition(self.mon)
+ self.wc = Condition(self.mon)
+ self.limit = limit
+ self.queue = []
+
+ def put(self, item):
+ self.mon.acquire()
+ while len(self.queue) >= self.limit:
+ self._note("put(%s): queue full", item)
+ self.wc.wait()
+ self.queue.append(item)
+ self._note("put(%s): appended, length now %d",
+ item, len(self.queue))
+ self.rc.notify()
+ self.mon.release()
+
+ def get(self):
+ self.mon.acquire()
+ while not self.queue:
+ self._note("get(): queue empty")
+ self.rc.wait()
+ item = self.queue.pop(0)
+ self._note("get(): got %s, %d left", item, len(self.queue))
+ self.wc.notify()
+ self.mon.release()
+ return item
+
+ class ProducerThread(Thread):
+
+ def __init__(self, queue, quota):
+ Thread.__init__(self, name="Producer")
+ self.queue = queue
+ self.quota = quota
+
+ def run(self):
+ from random import random
+ counter = 0
+ while counter < self.quota:
+ counter = counter + 1
+ self.queue.put("%s.%d" % (self.getName(), counter))
+ _sleep(random() * 0.00001)
+
+
+ class ConsumerThread(Thread):
+
+ def __init__(self, queue, count):
+ Thread.__init__(self, name="Consumer")
+ self.queue = queue
+ self.count = count
+
+ def run(self):
+ while self.count > 0:
+ item = self.queue.get()
+ print item
+ self.count = self.count - 1
+
+ NP = 3
+ QL = 4
+ NI = 5
+
+ Q = BoundedQueue(QL)
+ P = []
+ for i in range(NP):
+ t = ProducerThread(Q, NI)
+ t.setName("Producer-%d" % (i+1))
+ P.append(t)
+ C = ConsumerThread(Q, NI*NP)
+ for t in P:
+ t.start()
+ _sleep(0.000001)
+ C.start()
+ for t in P:
+ t.join()
+ C.join()
+
+if __name__ == '__main__':
+ _test()
diff --git a/gnuradio-core/src/python/gnuradio/gr/gr_threading_24.py b/gnuradio-core/src/python/gnuradio/gr/gr_threading_24.py
new file mode 100644
index 0000000000..8539bfc047
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/gr_threading_24.py
@@ -0,0 +1,793 @@
+"""Thread module emulating a subset of Java's threading model."""
+
+# This started life as the threading.py module of Python 2.4
+# It's been patched to fix a problem with join, where a KeyboardInterrupt
+# caused a lock to be left in the acquired state.
+
+import sys as _sys
+
+try:
+ import thread
+except ImportError:
+ del _sys.modules[__name__]
+ raise
+
+from time import time as _time, sleep as _sleep
+from traceback import format_exc as _format_exc
+from collections import deque
+
+# Rename some stuff so "from threading import *" is safe
+__all__ = ['activeCount', 'Condition', 'currentThread', 'enumerate', 'Event',
+ 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread',
+ 'Timer', 'setprofile', 'settrace', 'local']
+
+_start_new_thread = thread.start_new_thread
+_allocate_lock = thread.allocate_lock
+_get_ident = thread.get_ident
+ThreadError = thread.error
+del thread
+
+
+# Debug support (adapted from ihooks.py).
+# All the major classes here derive from _Verbose. We force that to
+# be a new-style class so that all the major classes here are new-style.
+# This helps debugging (type(instance) is more revealing for instances
+# of new-style classes).
+
+_VERBOSE = False
+
+if __debug__:
+
+ class _Verbose(object):
+
+ def __init__(self, verbose=None):
+ if verbose is None:
+ verbose = _VERBOSE
+ self.__verbose = verbose
+
+ def _note(self, format, *args):
+ if self.__verbose:
+ format = format % args
+ format = "%s: %s\n" % (
+ currentThread().getName(), format)
+ _sys.stderr.write(format)
+
+else:
+ # Disable this when using "python -O"
+ class _Verbose(object):
+ def __init__(self, verbose=None):
+ pass
+ def _note(self, *args):
+ pass
+
+# Support for profile and trace hooks
+
+_profile_hook = None
+_trace_hook = None
+
+def setprofile(func):
+ global _profile_hook
+ _profile_hook = func
+
+def settrace(func):
+ global _trace_hook
+ _trace_hook = func
+
+# Synchronization classes
+
+Lock = _allocate_lock
+
+def RLock(*args, **kwargs):
+ return _RLock(*args, **kwargs)
+
+class _RLock(_Verbose):
+
+ def __init__(self, verbose=None):
+ _Verbose.__init__(self, verbose)
+ self.__block = _allocate_lock()
+ self.__owner = None
+ self.__count = 0
+
+ def __repr__(self):
+ return "<%s(%s, %d)>" % (
+ self.__class__.__name__,
+ self.__owner and self.__owner.getName(),
+ self.__count)
+
+ def acquire(self, blocking=1):
+ me = currentThread()
+ if self.__owner is me:
+ self.__count = self.__count + 1
+ if __debug__:
+ self._note("%s.acquire(%s): recursive success", self, blocking)
+ return 1
+ rc = self.__block.acquire(blocking)
+ if rc:
+ self.__owner = me
+ self.__count = 1
+ if __debug__:
+ self._note("%s.acquire(%s): initial succes", self, blocking)
+ else:
+ if __debug__:
+ self._note("%s.acquire(%s): failure", self, blocking)
+ return rc
+
+ def release(self):
+ me = currentThread()
+ assert self.__owner is me, "release() of un-acquire()d lock"
+ self.__count = count = self.__count - 1
+ if not count:
+ self.__owner = None
+ self.__block.release()
+ if __debug__:
+ self._note("%s.release(): final release", self)
+ else:
+ if __debug__:
+ self._note("%s.release(): non-final release", self)
+
+ # Internal methods used by condition variables
+
+ def _acquire_restore(self, (count, owner)):
+ self.__block.acquire()
+ self.__count = count
+ self.__owner = owner
+ if __debug__:
+ self._note("%s._acquire_restore()", self)
+
+ def _release_save(self):
+ if __debug__:
+ self._note("%s._release_save()", self)
+ count = self.__count
+ self.__count = 0
+ owner = self.__owner
+ self.__owner = None
+ self.__block.release()
+ return (count, owner)
+
+ def _is_owned(self):
+ return self.__owner is currentThread()
+
+
+def Condition(*args, **kwargs):
+ return _Condition(*args, **kwargs)
+
+class _Condition(_Verbose):
+
+ def __init__(self, lock=None, verbose=None):
+ _Verbose.__init__(self, verbose)
+ if lock is None:
+ lock = RLock()
+ self.__lock = lock
+ # Export the lock's acquire() and release() methods
+ self.acquire = lock.acquire
+ self.release = lock.release
+ # If the lock defines _release_save() and/or _acquire_restore(),
+ # these override the default implementations (which just call
+ # release() and acquire() on the lock). Ditto for _is_owned().
+ try:
+ self._release_save = lock._release_save
+ except AttributeError:
+ pass
+ try:
+ self._acquire_restore = lock._acquire_restore
+ except AttributeError:
+ pass
+ try:
+ self._is_owned = lock._is_owned
+ except AttributeError:
+ pass
+ self.__waiters = []
+
+ def __repr__(self):
+ return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters))
+
+ def _release_save(self):
+ self.__lock.release() # No state to save
+
+ def _acquire_restore(self, x):
+ self.__lock.acquire() # Ignore saved state
+
+ def _is_owned(self):
+ # Return True if lock is owned by currentThread.
+ # This method is called only if __lock doesn't have _is_owned().
+ if self.__lock.acquire(0):
+ self.__lock.release()
+ return False
+ else:
+ return True
+
+ def wait(self, timeout=None):
+ assert self._is_owned(), "wait() of un-acquire()d lock"
+ waiter = _allocate_lock()
+ waiter.acquire()
+ self.__waiters.append(waiter)
+ saved_state = self._release_save()
+ try: # restore state no matter what (e.g., KeyboardInterrupt)
+ if timeout is None:
+ waiter.acquire()
+ if __debug__:
+ self._note("%s.wait(): got it", self)
+ else:
+ # Balancing act: We can't afford a pure busy loop, so we
+ # have to sleep; but if we sleep the whole timeout time,
+ # we'll be unresponsive. The scheme here sleeps very
+ # little at first, longer as time goes on, but never longer
+ # than 20 times per second (or the timeout time remaining).
+ endtime = _time() + timeout
+ delay = 0.0005 # 500 us -> initial delay of 1 ms
+ while True:
+ gotit = waiter.acquire(0)
+ if gotit:
+ break
+ remaining = endtime - _time()
+ if remaining <= 0:
+ break
+ delay = min(delay * 2, remaining, .05)
+ _sleep(delay)
+ if not gotit:
+ if __debug__:
+ self._note("%s.wait(%s): timed out", self, timeout)
+ try:
+ self.__waiters.remove(waiter)
+ except ValueError:
+ pass
+ else:
+ if __debug__:
+ self._note("%s.wait(%s): got it", self, timeout)
+ finally:
+ self._acquire_restore(saved_state)
+
+ def notify(self, n=1):
+ assert self._is_owned(), "notify() of un-acquire()d lock"
+ __waiters = self.__waiters
+ waiters = __waiters[:n]
+ if not waiters:
+ if __debug__:
+ self._note("%s.notify(): no waiters", self)
+ return
+ self._note("%s.notify(): notifying %d waiter%s", self, n,
+ n!=1 and "s" or "")
+ for waiter in waiters:
+ waiter.release()
+ try:
+ __waiters.remove(waiter)
+ except ValueError:
+ pass
+
+ def notifyAll(self):
+ self.notify(len(self.__waiters))
+
+
+def Semaphore(*args, **kwargs):
+ return _Semaphore(*args, **kwargs)
+
+class _Semaphore(_Verbose):
+
+ # After Tim Peters' semaphore class, but not quite the same (no maximum)
+
+ def __init__(self, value=1, verbose=None):
+ assert value >= 0, "Semaphore initial value must be >= 0"
+ _Verbose.__init__(self, verbose)
+ self.__cond = Condition(Lock())
+ self.__value = value
+
+ def acquire(self, blocking=1):
+ rc = False
+ self.__cond.acquire()
+ while self.__value == 0:
+ if not blocking:
+ break
+ if __debug__:
+ self._note("%s.acquire(%s): blocked waiting, value=%s",
+ self, blocking, self.__value)
+ self.__cond.wait()
+ else:
+ self.__value = self.__value - 1
+ if __debug__:
+ self._note("%s.acquire: success, value=%s",
+ self, self.__value)
+ rc = True
+ self.__cond.release()
+ return rc
+
+ def release(self):
+ self.__cond.acquire()
+ self.__value = self.__value + 1
+ if __debug__:
+ self._note("%s.release: success, value=%s",
+ self, self.__value)
+ self.__cond.notify()
+ self.__cond.release()
+
+
+def BoundedSemaphore(*args, **kwargs):
+ return _BoundedSemaphore(*args, **kwargs)
+
+class _BoundedSemaphore(_Semaphore):
+ """Semaphore that checks that # releases is <= # acquires"""
+ def __init__(self, value=1, verbose=None):
+ _Semaphore.__init__(self, value, verbose)
+ self._initial_value = value
+
+ def release(self):
+ if self._Semaphore__value >= self._initial_value:
+ raise ValueError, "Semaphore released too many times"
+ return _Semaphore.release(self)
+
+
+def Event(*args, **kwargs):
+ return _Event(*args, **kwargs)
+
+class _Event(_Verbose):
+
+ # After Tim Peters' event class (without is_posted())
+
+ def __init__(self, verbose=None):
+ _Verbose.__init__(self, verbose)
+ self.__cond = Condition(Lock())
+ self.__flag = False
+
+ def isSet(self):
+ return self.__flag
+
+ def set(self):
+ self.__cond.acquire()
+ try:
+ self.__flag = True
+ self.__cond.notifyAll()
+ finally:
+ self.__cond.release()
+
+ def clear(self):
+ self.__cond.acquire()
+ try:
+ self.__flag = False
+ finally:
+ self.__cond.release()
+
+ def wait(self, timeout=None):
+ self.__cond.acquire()
+ try:
+ if not self.__flag:
+ self.__cond.wait(timeout)
+ finally:
+ self.__cond.release()
+
+# Helper to generate new thread names
+_counter = 0
+def _newname(template="Thread-%d"):
+ global _counter
+ _counter = _counter + 1
+ return template % _counter
+
+# Active thread administration
+_active_limbo_lock = _allocate_lock()
+_active = {}
+_limbo = {}
+
+
+# Main class for threads
+
+class Thread(_Verbose):
+
+ __initialized = False
+ # Need to store a reference to sys.exc_info for printing
+ # out exceptions when a thread tries to use a global var. during interp.
+ # shutdown and thus raises an exception about trying to perform some
+ # operation on/with a NoneType
+ __exc_info = _sys.exc_info
+
+ def __init__(self, group=None, target=None, name=None,
+ args=(), kwargs={}, verbose=None):
+ assert group is None, "group argument must be None for now"
+ _Verbose.__init__(self, verbose)
+ self.__target = target
+ self.__name = str(name or _newname())
+ self.__args = args
+ self.__kwargs = kwargs
+ self.__daemonic = self._set_daemon()
+ self.__started = False
+ self.__stopped = False
+ self.__block = Condition(Lock())
+ self.__initialized = True
+ # sys.stderr is not stored in the class like
+ # sys.exc_info since it can be changed between instances
+ self.__stderr = _sys.stderr
+
+ def _set_daemon(self):
+ # Overridden in _MainThread and _DummyThread
+ return currentThread().isDaemon()
+
+ def __repr__(self):
+ assert self.__initialized, "Thread.__init__() was not called"
+ status = "initial"
+ if self.__started:
+ status = "started"
+ if self.__stopped:
+ status = "stopped"
+ if self.__daemonic:
+ status = status + " daemon"
+ return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status)
+
+ def start(self):
+ assert self.__initialized, "Thread.__init__() not called"
+ assert not self.__started, "thread already started"
+ if __debug__:
+ self._note("%s.start(): starting thread", self)
+ _active_limbo_lock.acquire()
+ _limbo[self] = self
+ _active_limbo_lock.release()
+ _start_new_thread(self.__bootstrap, ())
+ self.__started = True
+ _sleep(0.000001) # 1 usec, to let the thread run (Solaris hack)
+
+ def run(self):
+ if self.__target:
+ self.__target(*self.__args, **self.__kwargs)
+
+ def __bootstrap(self):
+ try:
+ self.__started = True
+ _active_limbo_lock.acquire()
+ _active[_get_ident()] = self
+ del _limbo[self]
+ _active_limbo_lock.release()
+ if __debug__:
+ self._note("%s.__bootstrap(): thread started", self)
+
+ if _trace_hook:
+ self._note("%s.__bootstrap(): registering trace hook", self)
+ _sys.settrace(_trace_hook)
+ if _profile_hook:
+ self._note("%s.__bootstrap(): registering profile hook", self)
+ _sys.setprofile(_profile_hook)
+
+ try:
+ self.run()
+ except SystemExit:
+ if __debug__:
+ self._note("%s.__bootstrap(): raised SystemExit", self)
+ except:
+ if __debug__:
+ self._note("%s.__bootstrap(): unhandled exception", self)
+ # If sys.stderr is no more (most likely from interpreter
+ # shutdown) use self.__stderr. Otherwise still use sys (as in
+ # _sys) in case sys.stderr was redefined since the creation of
+ # self.
+ if _sys:
+ _sys.stderr.write("Exception in thread %s:\n%s\n" %
+ (self.getName(), _format_exc()))
+ else:
+ # Do the best job possible w/o a huge amt. of code to
+ # approximate a traceback (code ideas from
+ # Lib/traceback.py)
+ exc_type, exc_value, exc_tb = self.__exc_info()
+ try:
+ print>>self.__stderr, (
+ "Exception in thread " + self.getName() +
+ " (most likely raised during interpreter shutdown):")
+ print>>self.__stderr, (
+ "Traceback (most recent call last):")
+ while exc_tb:
+ print>>self.__stderr, (
+ ' File "%s", line %s, in %s' %
+ (exc_tb.tb_frame.f_code.co_filename,
+ exc_tb.tb_lineno,
+ exc_tb.tb_frame.f_code.co_name))
+ exc_tb = exc_tb.tb_next
+ print>>self.__stderr, ("%s: %s" % (exc_type, exc_value))
+ # Make sure that exc_tb gets deleted since it is a memory
+ # hog; deleting everything else is just for thoroughness
+ finally:
+ del exc_type, exc_value, exc_tb
+ else:
+ if __debug__:
+ self._note("%s.__bootstrap(): normal return", self)
+ finally:
+ self.__stop()
+ try:
+ self.__delete()
+ except:
+ pass
+
+ def __stop(self):
+ self.__block.acquire()
+ self.__stopped = True
+ self.__block.notifyAll()
+ self.__block.release()
+
+ def __delete(self):
+ "Remove current thread from the dict of currently running threads."
+
+ # Notes about running with dummy_thread:
+ #
+ # Must take care to not raise an exception if dummy_thread is being
+ # used (and thus this module is being used as an instance of
+ # dummy_threading). dummy_thread.get_ident() always returns -1 since
+ # there is only one thread if dummy_thread is being used. Thus
+ # len(_active) is always <= 1 here, and any Thread instance created
+ # overwrites the (if any) thread currently registered in _active.
+ #
+ # An instance of _MainThread is always created by 'threading'. This
+ # gets overwritten the instant an instance of Thread is created; both
+ # threads return -1 from dummy_thread.get_ident() and thus have the
+ # same key in the dict. So when the _MainThread instance created by
+ # 'threading' tries to clean itself up when atexit calls this method
+ # it gets a KeyError if another Thread instance was created.
+ #
+ # This all means that KeyError from trying to delete something from
+ # _active if dummy_threading is being used is a red herring. But
+ # since it isn't if dummy_threading is *not* being used then don't
+ # hide the exception.
+
+ _active_limbo_lock.acquire()
+ try:
+ try:
+ del _active[_get_ident()]
+ except KeyError:
+ if 'dummy_threading' not in _sys.modules:
+ raise
+ finally:
+ _active_limbo_lock.release()
+
+ def join(self, timeout=None):
+ assert self.__initialized, "Thread.__init__() not called"
+ assert self.__started, "cannot join thread before it is started"
+ assert self is not currentThread(), "cannot join current thread"
+ if __debug__:
+ if not self.__stopped:
+ self._note("%s.join(): waiting until thread stops", self)
+ self.__block.acquire()
+ try:
+ if timeout is None:
+ while not self.__stopped:
+ self.__block.wait()
+ if __debug__:
+ self._note("%s.join(): thread stopped", self)
+ else:
+ deadline = _time() + timeout
+ while not self.__stopped:
+ delay = deadline - _time()
+ if delay <= 0:
+ if __debug__:
+ self._note("%s.join(): timed out", self)
+ break
+ self.__block.wait(delay)
+ else:
+ if __debug__:
+ self._note("%s.join(): thread stopped", self)
+ finally:
+ self.__block.release()
+
+ def getName(self):
+ assert self.__initialized, "Thread.__init__() not called"
+ return self.__name
+
+ def setName(self, name):
+ assert self.__initialized, "Thread.__init__() not called"
+ self.__name = str(name)
+
+ def isAlive(self):
+ assert self.__initialized, "Thread.__init__() not called"
+ return self.__started and not self.__stopped
+
+ def isDaemon(self):
+ assert self.__initialized, "Thread.__init__() not called"
+ return self.__daemonic
+
+ def setDaemon(self, daemonic):
+ assert self.__initialized, "Thread.__init__() not called"
+ assert not self.__started, "cannot set daemon status of active thread"
+ self.__daemonic = daemonic
+
+# The timer class was contributed by Itamar Shtull-Trauring
+
+def Timer(*args, **kwargs):
+ return _Timer(*args, **kwargs)
+
+class _Timer(Thread):
+ """Call a function after a specified number of seconds:
+
+ t = Timer(30.0, f, args=[], kwargs={})
+ t.start()
+ t.cancel() # stop the timer's action if it's still waiting
+ """
+
+ def __init__(self, interval, function, args=[], kwargs={}):
+ Thread.__init__(self)
+ self.interval = interval
+ self.function = function
+ self.args = args
+ self.kwargs = kwargs
+ self.finished = Event()
+
+ def cancel(self):
+ """Stop the timer if it hasn't finished yet"""
+ self.finished.set()
+
+ def run(self):
+ self.finished.wait(self.interval)
+ if not self.finished.isSet():
+ self.function(*self.args, **self.kwargs)
+ self.finished.set()
+
+# Special thread class to represent the main thread
+# This is garbage collected through an exit handler
+
+class _MainThread(Thread):
+
+ def __init__(self):
+ Thread.__init__(self, name="MainThread")
+ self._Thread__started = True
+ _active_limbo_lock.acquire()
+ _active[_get_ident()] = self
+ _active_limbo_lock.release()
+ import atexit
+ atexit.register(self.__exitfunc)
+
+ def _set_daemon(self):
+ return False
+
+ def __exitfunc(self):
+ self._Thread__stop()
+ t = _pickSomeNonDaemonThread()
+ if t:
+ if __debug__:
+ self._note("%s: waiting for other threads", self)
+ while t:
+ t.join()
+ t = _pickSomeNonDaemonThread()
+ if __debug__:
+ self._note("%s: exiting", self)
+ self._Thread__delete()
+
+def _pickSomeNonDaemonThread():
+ for t in enumerate():
+ if not t.isDaemon() and t.isAlive():
+ return t
+ return None
+
+
+# Dummy thread class to represent threads not started here.
+# These aren't garbage collected when they die,
+# nor can they be waited for.
+# Their purpose is to return *something* from currentThread().
+# They are marked as daemon threads so we won't wait for them
+# when we exit (conform previous semantics).
+
+class _DummyThread(Thread):
+
+ def __init__(self):
+ Thread.__init__(self, name=_newname("Dummy-%d"))
+ self._Thread__started = True
+ _active_limbo_lock.acquire()
+ _active[_get_ident()] = self
+ _active_limbo_lock.release()
+
+ def _set_daemon(self):
+ return True
+
+ def join(self, timeout=None):
+ assert False, "cannot join a dummy thread"
+
+
+# Global API functions
+
+def currentThread():
+ try:
+ return _active[_get_ident()]
+ except KeyError:
+ ##print "currentThread(): no current thread for", _get_ident()
+ return _DummyThread()
+
+def activeCount():
+ _active_limbo_lock.acquire()
+ count = len(_active) + len(_limbo)
+ _active_limbo_lock.release()
+ return count
+
+def enumerate():
+ _active_limbo_lock.acquire()
+ active = _active.values() + _limbo.values()
+ _active_limbo_lock.release()
+ return active
+
+# Create the main thread object
+
+_MainThread()
+
+# get thread-local implementation, either from the thread
+# module, or from the python fallback
+
+try:
+ from thread import _local as local
+except ImportError:
+ from _threading_local import local
+
+
+# Self-test code
+
+def _test():
+
+ class BoundedQueue(_Verbose):
+
+ def __init__(self, limit):
+ _Verbose.__init__(self)
+ self.mon = RLock()
+ self.rc = Condition(self.mon)
+ self.wc = Condition(self.mon)
+ self.limit = limit
+ self.queue = deque()
+
+ def put(self, item):
+ self.mon.acquire()
+ while len(self.queue) >= self.limit:
+ self._note("put(%s): queue full", item)
+ self.wc.wait()
+ self.queue.append(item)
+ self._note("put(%s): appended, length now %d",
+ item, len(self.queue))
+ self.rc.notify()
+ self.mon.release()
+
+ def get(self):
+ self.mon.acquire()
+ while not self.queue:
+ self._note("get(): queue empty")
+ self.rc.wait()
+ item = self.queue.popleft()
+ self._note("get(): got %s, %d left", item, len(self.queue))
+ self.wc.notify()
+ self.mon.release()
+ return item
+
+ class ProducerThread(Thread):
+
+ def __init__(self, queue, quota):
+ Thread.__init__(self, name="Producer")
+ self.queue = queue
+ self.quota = quota
+
+ def run(self):
+ from random import random
+ counter = 0
+ while counter < self.quota:
+ counter = counter + 1
+ self.queue.put("%s.%d" % (self.getName(), counter))
+ _sleep(random() * 0.00001)
+
+
+ class ConsumerThread(Thread):
+
+ def __init__(self, queue, count):
+ Thread.__init__(self, name="Consumer")
+ self.queue = queue
+ self.count = count
+
+ def run(self):
+ while self.count > 0:
+ item = self.queue.get()
+ print item
+ self.count = self.count - 1
+
+ NP = 3
+ QL = 4
+ NI = 5
+
+ Q = BoundedQueue(QL)
+ P = []
+ for i in range(NP):
+ t = ProducerThread(Q, NI)
+ t.setName("Producer-%d" % (i+1))
+ P.append(t)
+ C = ConsumerThread(Q, NI*NP)
+ for t in P:
+ t.start()
+ _sleep(0.000001)
+ C.start()
+ for t in P:
+ t.join()
+ C.join()
+
+if __name__ == '__main__':
+ _test()
diff --git a/gnuradio-core/src/python/gnuradio/gr/hier_block.py b/gnuradio-core/src/python/gnuradio/gr/hier_block.py
new file mode 100644
index 0000000000..d669d71786
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/hier_block.py
@@ -0,0 +1,132 @@
+#
+# Copyright 2005 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio_swig_python import io_signature
+import weakref
+
+class hier_block_base(object):
+ """
+ Abstract base class for hierarchical signal processing blocks.
+ """
+ def __init__(self, fg):
+ """
+ @param fg: The flow graph that contains this hierarchical block.
+ @type fg: gr.flow_graph
+ """
+ self.fg = weakref.proxy(fg)
+
+ def input_signature(self):
+ """
+ @return input signature of hierarchical block.
+ @rtype gr.io_signature
+ """
+ raise NotImplemented
+
+ def output_signature(self):
+ """
+ @return output signature of hierarchical block.
+ @rtype gr.io_signature
+ """
+ raise NotImplemented
+
+ def resolve_input_port(self, port_number):
+ """
+ @param port_number: which input port number to resolve to an endpoint.
+ @type port_number: int
+ @return: sequence of endpoints
+ @rtype: sequence of endpoint
+
+ Note that an input port can resolve to more than one endpoint.
+ """
+ raise NotImplemented
+
+ def resolve_output_port(self, port_number):
+ """
+ @param port_number: which output port number to resolve to an endpoint.
+ @type port_number: int
+ @return: endpoint
+ @rtype: endpoint
+
+ Output ports resolve to a single endpoint.
+ """
+ raise NotImplemented
+
+
+class hier_block(hier_block_base):
+ """
+ Simple concrete class for building hierarchical blocks.
+
+ This class assumes that there is at most a single block at the
+ head of the chain and a single block at the end of the chain.
+ Either head or tail may be None indicating a sink or source respectively.
+
+ If you needs something more elaborate than this, derive a new class from
+ hier_block_base.
+ """
+ def __init__(self, fg, head_block, tail_block):
+ """
+ @param fg: The flow graph that contains this hierarchical block.
+ @type fg: flow_graph
+ @param head_block: the first block in the signal processing chain.
+ @type head_block: None or subclass of gr.block or gr.hier_block_base
+ @param tail_block: the last block in the signal processing chain.
+ @type tail_block: None or subclass of gr.block or gr.hier_block_base
+ """
+ hier_block_base.__init__(self, fg)
+ # FIXME add type checks here for easier debugging of misuse
+ self.head = head_block
+ self.tail = tail_block
+
+ def input_signature(self):
+ if self.head:
+ return self.head.input_signature()
+ else:
+ return io_signature(0,0,0)
+
+ def output_signature(self):
+ if self.tail:
+ return self.tail.output_signature()
+ else:
+ return io_signature(0,0,0)
+
+ def resolve_input_port(self, port_number):
+ return ((self.head, port_number),)
+
+ def resolve_output_port(self, port_number):
+ return (self.tail, port_number)
+
+
+class compose(hier_block):
+ """
+ Compose one or more blocks (primitive or hierarchical) into a new hierarchical block.
+ """
+ def __init__(self, fg, *blocks):
+ """
+ @param fg: flow graph
+ @type fg: gr.flow_graph
+ @param *blocks: list of blocks
+ @type *blocks: list of blocks
+ """
+ if len(blocks) < 1:
+ raise ValueError, ("compose requires at least one block; none provided.")
+ if len(blocks) > 1:
+ fg.connect(*blocks)
+ hier_block.__init__(self, fg, blocks[0], blocks[-1])
diff --git a/gnuradio-core/src/python/gnuradio/gr/prefs.py b/gnuradio-core/src/python/gnuradio/gr/prefs.py
new file mode 100644
index 0000000000..fa12912717
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/prefs.py
@@ -0,0 +1,129 @@
+#
+# Copyright 2006 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+import gnuradio_swig_python as gsp
+_prefs_base = gsp.gr_prefs
+
+
+import ConfigParser
+import os
+import os.path
+import sys
+
+
+def _user_prefs_filename():
+ return os.path.expanduser('~/.gnuradio/config.conf')
+
+def _sys_prefs_dirname():
+ return os.path.join(gsp.prefix(), 'etc/gnuradio/conf.d')
+
+def _bool(x):
+ """
+ Try to coerce obj to a True or False
+ """
+ if isinstance(x, bool):
+ return x
+ if isinstance(x, (float, int)):
+ return bool(x)
+ raise TypeError, x
+
+
+class _prefs(_prefs_base):
+ """
+ Derive our 'real class' from the stubbed out base class that has support
+ for SWIG directors. This allows C++ code to magically and transparently
+ invoke the methods in this python class.
+ """
+ def __init__(self):
+ _prefs_base.__init__(self)
+ self.cp = ConfigParser.RawConfigParser()
+
+ def _sys_prefs_filenames(self):
+ dir = _sys_prefs_dirname()
+ try:
+ fnames = os.listdir(dir)
+ except (IOError, OSError):
+ return []
+ fnames.sort()
+ return [os.path.join(dir, f) for f in fnames]
+
+
+ def _read_files(self):
+ filenames = self._sys_prefs_filenames()
+ filenames.append(_user_prefs_filename())
+ #print "filenames: ", filenames
+ self.cp.read(filenames)
+
+ def __getattr__(self, name):
+ return getattr(self.cp, name)
+
+ # ----------------------------------------------------------------
+ # These methods override the C++ virtual methods of the same name
+ # ----------------------------------------------------------------
+ def has_section(self, section):
+ return self.cp.has_section(section)
+
+ def has_option(self, section, option):
+ return self.cp.has_option(section, option)
+
+ def get_string(self, section, option, default_val):
+ try:
+ return self.cp.get(section, option)
+ except:
+ return default_val
+
+ def get_bool(self, section, option, default_val):
+ try:
+ return self.cp.getboolean(section, option)
+ except:
+ return default_val
+
+ def get_long(self, section, option, default_val):
+ try:
+ return self.cp.getint(section, option)
+ except:
+ return default_val
+
+ def get_double(self, section, option, default_val):
+ try:
+ return self.cp.getfloat(section, option)
+ except:
+ return default_val
+ # ----------------------------------------------------------------
+ # End override of C++ virtual methods
+ # ----------------------------------------------------------------
+
+
+_prefs_db = _prefs()
+
+# if GR_DONT_LOAD_PREFS is set, don't load them.
+# (make check uses this to avoid interactions.)
+if os.getenv("GR_DONT_LOAD_PREFS", None) is None:
+ _prefs_db._read_files()
+
+
+_prefs_base.set_singleton(_prefs_db) # tell C++ what instance to use
+
+def prefs():
+ """
+ Return the global preference data base
+ """
+ return _prefs_db
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_add_and_friends.py b/gnuradio-core/src/python/gnuradio/gr/qa_add_and_friends.py
new file mode 100755
index 0000000000..6cb74e9f52
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_add_and_friends.py
@@ -0,0 +1,129 @@
+#!/usr/bin/env python
+#
+# Copyright 2004 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+
+class test_head (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.fg = gr.flow_graph ()
+
+ def tearDown (self):
+ self.fg = None
+
+ def help_ii (self, src_data, exp_data, op):
+ for s in zip (range (len (src_data)), src_data):
+ src = gr.vector_source_i (s[1])
+ self.fg.connect (src, (op, s[0]))
+ dst = gr.vector_sink_i ()
+ self.fg.connect (op, dst)
+ self.fg.run ()
+ result_data = dst.data ()
+ self.assertEqual (exp_data, result_data)
+
+ def help_ff (self, src_data, exp_data, op):
+ for s in zip (range (len (src_data)), src_data):
+ src = gr.vector_source_f (s[1])
+ self.fg.connect (src, (op, s[0]))
+ dst = gr.vector_sink_f ()
+ self.fg.connect (op, dst)
+ self.fg.run ()
+ result_data = dst.data ()
+ self.assertEqual (exp_data, result_data)
+
+ def help_cc (self, src_data, exp_data, op):
+ for s in zip (range (len (src_data)), src_data):
+ src = gr.vector_source_c (s[1])
+ self.fg.connect (src, (op, s[0]))
+ dst = gr.vector_sink_c ()
+ self.fg.connect (op, dst)
+ self.fg.run ()
+ result_data = dst.data ()
+ self.assertEqual (exp_data, result_data)
+
+ def test_add_const_ii (self):
+ src_data = (1, 2, 3, 4, 5)
+ expected_result = (6, 7, 8, 9, 10)
+ op = gr.add_const_ii (5)
+ self.help_ii ((src_data,), expected_result, op)
+
+ def test_add_const_cc (self):
+ src_data = (1, 2, 3, 4, 5)
+ expected_result = (1+5j, 2+5j, 3+5j, 4+5j, 5+5j)
+ op = gr.add_const_cc (5j)
+ self.help_cc ((src_data,), expected_result, op)
+
+ def test_mult_const_ii (self):
+ src_data = (-1, 0, 1, 2, 3)
+ expected_result = (-5, 0, 5, 10, 15)
+ op = gr.multiply_const_ii (5)
+ self.help_ii ((src_data,), expected_result, op)
+
+ def test_add_ii (self):
+ src1_data = (1, 2, 3, 4, 5)
+ src2_data = (8, -3, 4, 8, 2)
+ expected_result = (9, -1, 7, 12, 7)
+ op = gr.add_ii ()
+ self.help_ii ((src1_data, src2_data),
+ expected_result, op)
+
+ def test_mult_ii (self):
+ src1_data = (1, 2, 3, 4, 5)
+ src2_data = (8, -3, 4, 8, 2)
+ expected_result = (8, -6, 12, 32, 10)
+ op = gr.multiply_ii ()
+ self.help_ii ((src1_data, src2_data),
+ expected_result, op)
+
+ def test_sub_ii_1 (self):
+ src1_data = (1, 2, 3, 4, 5)
+ expected_result = (-1, -2, -3, -4, -5)
+ op = gr.sub_ii ()
+ self.help_ii ((src1_data,),
+ expected_result, op)
+
+ def test_sub_ii_2 (self):
+ src1_data = (1, 2, 3, 4, 5)
+ src2_data = (8, -3, 4, 8, 2)
+ expected_result = (-7, 5, -1, -4, 3)
+ op = gr.sub_ii ()
+ self.help_ii ((src1_data, src2_data),
+ expected_result, op)
+
+ def test_div_ff_1 (self):
+ src1_data = (1, 2, 4, -8)
+ expected_result = (1, 0.5, 0.25, -.125)
+ op = gr.divide_ff ()
+ self.help_ff ((src1_data,),
+ expected_result, op)
+
+ def test_div_ff_2 (self):
+ src1_data = ( 5, 9, -15, 1024)
+ src2_data = (10, 3, -5, 64)
+ expected_result = (0.5, 3, 3, 16)
+ op = gr.divide_ff ()
+ self.help_ff ((src1_data, src2_data),
+ expected_result, op)
+
+
+if __name__ == '__main__':
+ gr_unittest.main ()
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_add_v_and_friends.py b/gnuradio-core/src/python/gnuradio/gr/qa_add_v_and_friends.py
new file mode 100755
index 0000000000..11e69e3733
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_add_v_and_friends.py
@@ -0,0 +1,353 @@
+#!/usr/bin/env python
+#
+# Copyright 2004 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+
+class test_add_v_and_friends(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.fg = gr.flow_graph()
+
+ def tearDown(self):
+ self.fg = None
+
+ def help_ss(self, size, src_data, exp_data, op):
+ for s in zip(range (len (src_data)), src_data):
+ src = gr.vector_source_s(s[1])
+ srcv = gr.stream_to_vector(gr.sizeof_short, size)
+ self.fg.connect(src, srcv)
+ self.fg.connect(srcv, (op, s[0]))
+ rhs = gr.vector_to_stream(gr.sizeof_short, size)
+ dst = gr.vector_sink_s()
+ self.fg.connect(op, rhs, dst)
+ self.fg.run()
+ result_data = dst.data()
+ self.assertEqual(exp_data, result_data)
+
+ def help_ii(self, size, src_data, exp_data, op):
+ for s in zip(range (len (src_data)), src_data):
+ src = gr.vector_source_i(s[1])
+ srcv = gr.stream_to_vector(gr.sizeof_int, size)
+ self.fg.connect(src, srcv)
+ self.fg.connect(srcv, (op, s[0]))
+ rhs = gr.vector_to_stream(gr.sizeof_int, size)
+ dst = gr.vector_sink_i()
+ self.fg.connect(op, rhs, dst)
+ self.fg.run()
+ result_data = dst.data()
+ self.assertEqual(exp_data, result_data)
+
+ def help_ff(self, size, src_data, exp_data, op):
+ for s in zip(range (len (src_data)), src_data):
+ src = gr.vector_source_f(s[1])
+ srcv = gr.stream_to_vector(gr.sizeof_float, size)
+ self.fg.connect(src, srcv)
+ self.fg.connect(srcv, (op, s[0]))
+ rhs = gr.vector_to_stream(gr.sizeof_float, size)
+ dst = gr.vector_sink_f()
+ self.fg.connect(op, rhs, dst)
+ self.fg.run()
+ result_data = dst.data()
+ self.assertEqual(exp_data, result_data)
+
+ def help_cc(self, size, src_data, exp_data, op):
+ for s in zip(range (len (src_data)), src_data):
+ src = gr.vector_source_c(s[1])
+ srcv = gr.stream_to_vector(gr.sizeof_gr_complex, size)
+ self.fg.connect(src, srcv)
+ self.fg.connect(srcv, (op, s[0]))
+ rhs = gr.vector_to_stream(gr.sizeof_gr_complex, size)
+ dst = gr.vector_sink_c()
+ self.fg.connect(op, rhs, dst)
+ self.fg.run()
+ result_data = dst.data()
+ self.assertEqual(exp_data, result_data)
+
+ def help_const_ss(self, src_data, exp_data, op):
+ src = gr.vector_source_s(src_data)
+ srcv = gr.stream_to_vector(gr.sizeof_short, len(src_data))
+ rhs = gr.vector_to_stream(gr.sizeof_short, len(src_data))
+ dst = gr.vector_sink_s()
+ self.fg.connect(src, srcv, op, rhs, dst)
+ self.fg.run()
+ result_data = dst.data()
+ self.assertEqual(exp_data, result_data)
+
+ def help_const_ii(self, src_data, exp_data, op):
+ src = gr.vector_source_i(src_data)
+ srcv = gr.stream_to_vector(gr.sizeof_int, len(src_data))
+ rhs = gr.vector_to_stream(gr.sizeof_int, len(src_data))
+ dst = gr.vector_sink_i()
+ self.fg.connect(src, srcv, op, rhs, dst)
+ self.fg.run()
+ result_data = dst.data()
+ self.assertEqual(exp_data, result_data)
+
+ def help_const_ff(self, src_data, exp_data, op):
+ src = gr.vector_source_f(src_data)
+ srcv = gr.stream_to_vector(gr.sizeof_float, len(src_data))
+ rhs = gr.vector_to_stream(gr.sizeof_float, len(src_data))
+ dst = gr.vector_sink_f()
+ self.fg.connect(src, srcv, op, rhs, dst)
+ self.fg.run()
+ result_data = dst.data()
+ self.assertEqual(exp_data, result_data)
+
+ def help_const_cc(self, src_data, exp_data, op):
+ src = gr.vector_source_c(src_data)
+ srcv = gr.stream_to_vector(gr.sizeof_gr_complex, len(src_data))
+ rhs = gr.vector_to_stream(gr.sizeof_gr_complex, len(src_data))
+ dst = gr.vector_sink_c()
+ self.fg.connect(src, srcv, op, rhs, dst)
+ self.fg.run()
+ result_data = dst.data()
+ self.assertEqual(exp_data, result_data)
+
+
+ def test_add_vss_one(self):
+ src1_data = (1,)
+ src2_data = (2,)
+ src3_data = (3,)
+ expected_result = (6,)
+ op = gr.add_vss(1)
+ self.help_ss(1, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_add_vss_five(self):
+ src1_data = (1, 2, 3, 4, 5)
+ src2_data = (6, 7, 8, 9, 10)
+ src3_data = (11, 12, 13, 14, 15)
+ expected_result = (18, 21, 24, 27, 30)
+ op = gr.add_vss(5)
+ self.help_ss(5, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_add_vii_one(self):
+ src1_data = (1,)
+ src2_data = (2,)
+ src3_data = (3,)
+ expected_result = (6,)
+ op = gr.add_vii(1)
+ self.help_ii(1, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_add_vii_five(self):
+ src1_data = (1, 2, 3, 4, 5)
+ src2_data = (6, 7, 8, 9, 10)
+ src3_data = (11, 12, 13, 14, 15)
+ expected_result = (18, 21, 24, 27, 30)
+ op = gr.add_vii(5)
+ self.help_ii(5, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_add_vff_one(self):
+ src1_data = (1.0,)
+ src2_data = (2.0,)
+ src3_data = (3.0,)
+ expected_result = (6.0,)
+ op = gr.add_vff(1)
+ self.help_ff(1, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_add_vff_five(self):
+ src1_data = (1.0, 2.0, 3.0, 4.0, 5.0)
+ src2_data = (6.0, 7.0, 8.0, 9.0, 10.0)
+ src3_data = (11.0, 12.0, 13.0, 14.0, 15.0)
+ expected_result = (18.0, 21.0, 24.0, 27.0, 30.0)
+ op = gr.add_vff(5)
+ self.help_ff(5, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_add_vcc_one(self):
+ src1_data = (1.0+2.0j,)
+ src2_data = (3.0+4.0j,)
+ src3_data = (5.0+6.0j,)
+ expected_result = (9.0+12j,)
+ op = gr.add_vcc(1)
+ self.help_cc(1, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_add_vcc_five(self):
+ src1_data = (1.0+2.0j, 3.0+4.0j, 5.0+6.0j, 7.0+8.0j, 9.0+10.0j)
+ src2_data = (11.0+12.0j, 13.0+14.0j, 15.0+16.0j, 17.0+18.0j, 19.0+20.0j)
+ src3_data = (21.0+22.0j, 23.0+24.0j, 25.0+26.0j, 27.0+28.0j, 29.0+30.0j)
+ expected_result = (33.0+36.0j, 39.0+42.0j, 45.0+48.0j, 51.0+54.0j, 57.0+60.0j)
+ op = gr.add_vcc(5)
+ self.help_cc(5, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_add_const_vss_one(self):
+ src_data = (1,)
+ op = gr.add_const_vss((2,))
+ exp_data = (3,)
+ self.help_const_ss(src_data, exp_data, op)
+
+ def test_add_const_vss_five(self):
+ src_data = (1, 2, 3, 4, 5)
+ op = gr.add_const_vss((6, 7, 8, 9, 10))
+ exp_data = (7, 9, 11, 13, 15)
+ self.help_const_ss(src_data, exp_data, op)
+
+ def test_add_const_vii_one(self):
+ src_data = (1,)
+ op = gr.add_const_vii((2,))
+ exp_data = (3,)
+ self.help_const_ii(src_data, exp_data, op)
+
+ def test_add_const_vii_five(self):
+ src_data = (1, 2, 3, 4, 5)
+ op = gr.add_const_vii((6, 7, 8, 9, 10))
+ exp_data = (7, 9, 11, 13, 15)
+ self.help_const_ii(src_data, exp_data, op)
+
+ def test_add_const_vff_one(self):
+ src_data = (1.0,)
+ op = gr.add_const_vff((2.0,))
+ exp_data = (3.0,)
+ self.help_const_ff(src_data, exp_data, op)
+
+ def test_add_const_vff_five(self):
+ src_data = (1.0, 2.0, 3.0, 4.0, 5.0)
+ op = gr.add_const_vff((6.0, 7.0, 8.0, 9.0, 10.0))
+ exp_data = (7.0, 9.0, 11.0, 13.0, 15.0)
+ self.help_const_ff(src_data, exp_data, op)
+
+ def test_add_const_vcc_one(self):
+ src_data = (1.0+2.0j,)
+ op = gr.add_const_vcc((2.0+3.0j,))
+ exp_data = (3.0+5.0j,)
+ self.help_const_cc(src_data, exp_data, op)
+
+ def test_add_const_vcc_five(self):
+ src_data = (1.0+2.0j, 3.0+4.0j, 5.0+6.0j, 7.0+8.0j, 9.0+10.0j)
+ op = gr.add_const_vcc((11.0+12.0j, 13.0+14.0j, 15.0+16.0j, 17.0+18.0j, 19.0+20.0j))
+ exp_data = (12.0+14.0j, 16.0+18.0j, 20.0+22.0j, 24.0+26.0j, 28.0+30.0j)
+ self.help_const_cc(src_data, exp_data, op)
+
+
+ def test_multiply_vss_one(self):
+ src1_data = (1,)
+ src2_data = (2,)
+ src3_data = (3,)
+ expected_result = (6,)
+ op = gr.multiply_vss(1)
+ self.help_ss(1, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_multiply_vss_five(self):
+ src1_data = (1, 2, 3, 4, 5)
+ src2_data = (6, 7, 8, 9, 10)
+ src3_data = (11, 12, 13, 14, 15)
+ expected_result = (66, 168, 312, 504, 750)
+ op = gr.multiply_vss(5)
+ self.help_ss(5, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_multiply_vii_one(self):
+ src1_data = (1,)
+ src2_data = (2,)
+ src3_data = (3,)
+ expected_result = (6,)
+ op = gr.multiply_vii(1)
+ self.help_ii(1, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_multiply_vii_five(self):
+ src1_data = (1, 2, 3, 4, 5)
+ src2_data = (6, 7, 8, 9, 10)
+ src3_data = (11, 12, 13, 14, 15)
+ expected_result = (66, 168, 312, 504, 750)
+ op = gr.multiply_vii(5)
+ self.help_ii(5, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_multiply_vff_one(self):
+ src1_data = (1.0,)
+ src2_data = (2.0,)
+ src3_data = (3.0,)
+ expected_result = (6.0,)
+ op = gr.multiply_vff(1)
+ self.help_ff(1, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_multiply_vff_five(self):
+ src1_data = (1.0, 2.0, 3.0, 4.0, 5.0)
+ src2_data = (6.0, 7.0, 8.0, 9.0, 10.0)
+ src3_data = (11.0, 12.0, 13.0, 14.0, 15.0)
+ expected_result = (66.0, 168.0, 312.0, 504.0, 750.0)
+ op = gr.multiply_vff(5)
+ self.help_ff(5, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_multiply_vcc_one(self):
+ src1_data = (1.0+2.0j,)
+ src2_data = (3.0+4.0j,)
+ src3_data = (5.0+6.0j,)
+ expected_result = (-85+20j,)
+ op = gr.multiply_vcc(1)
+ self.help_cc(1, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_multiply_vcc_five(self):
+ src1_data = (1.0+2.0j, 3.0+4.0j, 5.0+6.0j, 7.0+8.0j, 9.0+10.0j)
+ src2_data = (11.0+12.0j, 13.0+14.0j, 15.0+16.0j, 17.0+18.0j, 19.0+20.0j)
+ src3_data = (21.0+22.0j, 23.0+24.0j, 25.0+26.0j, 27.0+28.0j, 29.0+30.0j)
+ expected_result = (-1021.0+428.0j, -2647.0+1754.0j, -4945.0+3704.0j, -8011.0+6374.0j, -11941.0+9860.0j)
+ op = gr.multiply_vcc(5)
+ self.help_cc(5, (src1_data, src2_data, src3_data), expected_result, op)
+
+ def test_multiply_const_vss_one(self):
+ src_data = (2,)
+ op = gr.multiply_const_vss((3,))
+ exp_data = (6,)
+ self.help_const_ss(src_data, exp_data, op)
+
+ def test_multiply_const_vss_five(self):
+ src_data = (1, 2, 3, 4, 5)
+ op = gr.multiply_const_vss((6, 7, 8, 9, 10))
+ exp_data = (6, 14, 24, 36, 50)
+ self.help_const_ss(src_data, exp_data, op)
+
+ def test_multiply_const_vii_one(self):
+ src_data = (2,)
+ op = gr.multiply_const_vii((3,))
+ exp_data = (6,)
+ self.help_const_ii(src_data, exp_data, op)
+
+ def test_multiply_const_vii_five(self):
+ src_data = (1, 2, 3, 4, 5)
+ op = gr.multiply_const_vii((6, 7, 8, 9, 10))
+ exp_data = (6, 14, 24, 36, 50)
+ self.help_const_ii(src_data, exp_data, op)
+
+ def test_multiply_const_vff_one(self):
+ src_data = (2.0,)
+ op = gr.multiply_const_vff((3.0,))
+ exp_data = (6.0,)
+ self.help_const_ff(src_data, exp_data, op)
+
+ def test_multiply_const_vff_five(self):
+ src_data = (1.0, 2.0, 3.0, 4.0, 5.0)
+ op = gr.multiply_const_vff((6.0, 7.0, 8.0, 9.0, 10.0))
+ exp_data = (6.0, 14.0, 24.0, 36.0, 50.0)
+ self.help_const_ff(src_data, exp_data, op)
+
+ def test_multiply_const_vcc_one(self):
+ src_data = (1.0+2.0j,)
+ op = gr.multiply_const_vcc((2.0+3.0j,))
+ exp_data = (-4.0+7.0j,)
+ self.help_const_cc(src_data, exp_data, op)
+
+ def test_multiply_const_vcc_five(self):
+ src_data = (1.0+2.0j, 3.0+4.0j, 5.0+6.0j, 7.0+8.0j, 9.0+10.0j)
+ op = gr.multiply_const_vcc((11.0+12.0j, 13.0+14.0j, 15.0+16.0j, 17.0+18.0j, 19.0+20.0j))
+ exp_data = (-13.0+34.0j, -17.0+94.0j, -21.0+170.0j, -25.0+262.0j, -29.0+370.0j)
+ self.help_const_cc(src_data, exp_data, op)
+
+
+if __name__ == '__main__':
+ gr_unittest.main ()
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_basic_flow_graph.py b/gnuradio-core/src/python/gnuradio/gr/qa_basic_flow_graph.py
new file mode 100755
index 0000000000..0799c72e48
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_basic_flow_graph.py
@@ -0,0 +1,190 @@
+#!/usr/bin/env python
+
+from gnuradio import gr, gr_unittest
+
+
+# ----------------------------------------------------------------
+
+
+class test_basic_flow_graph (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.fg = gr.basic_flow_graph ()
+
+ def tearDown (self):
+ self.fg = None
+
+ def test_000_create_delete (self):
+ pass
+
+ def test_001a_insert_1 (self):
+ fg = self.fg
+ src1 = gr.null_source (gr.sizeof_int)
+ dst1 = gr.null_sink (gr.sizeof_int)
+ fg.connect (gr.endpoint (src1, 0), gr.endpoint (dst1, 0))
+
+ def test_001b_insert_1 (self):
+ fg = self.fg
+ src1 = gr.null_source (gr.sizeof_int)
+ dst1 = gr.null_sink (gr.sizeof_int)
+ fg.connect (src1, gr.endpoint (dst1, 0))
+
+ def test_001c_insert_1 (self):
+ fg = self.fg
+ src1 = gr.null_source (gr.sizeof_int)
+ dst1 = gr.null_sink (gr.sizeof_int)
+ fg.connect (gr.endpoint (src1, 0), dst1)
+
+ def test_001d_insert_1 (self):
+ fg = self.fg
+ src1 = gr.null_source (gr.sizeof_int)
+ dst1 = gr.null_sink (gr.sizeof_int)
+ fg.connect (src1, dst1)
+
+ def test_001e_insert_1 (self):
+ fg = self.fg
+ src1 = gr.null_source (gr.sizeof_int)
+ dst1 = gr.null_sink (gr.sizeof_int)
+ fg.connect ((src1, 0), (dst1, 0))
+
+ def test_002_dst_in_use (self):
+ fg = self.fg
+ src1 = gr.null_source (gr.sizeof_int)
+ src2 = gr.null_source (gr.sizeof_int)
+ dst1 = gr.null_sink (gr.sizeof_int)
+ fg.connect ((src1, 0), (dst1, 0))
+ self.assertRaises (ValueError,
+ lambda : fg.connect ((src2, 0),
+ (dst1, 0)))
+
+ def test_003_no_such_src_port (self):
+ fg = self.fg
+ src1 = gr.null_source (gr.sizeof_int)
+ dst1 = gr.null_sink (gr.sizeof_int)
+ self.assertRaises (ValueError,
+ lambda : fg.connect ((src1, 1),
+ (dst1, 0)))
+
+ def test_004_no_such_dst_port (self):
+ fg = self.fg
+ src1 = gr.null_source (gr.sizeof_int)
+ dst1 = gr.null_sink (gr.sizeof_int)
+ self.assertRaises (ValueError,
+ lambda : fg.connect ((src1, 0),
+ (dst1, 1)))
+
+ def test_005_one_src_two_dst (self):
+ fg = self.fg
+ src1 = gr.null_source (gr.sizeof_int)
+ dst1 = gr.null_sink (gr.sizeof_int)
+ dst2 = gr.null_sink (gr.sizeof_int)
+ fg.connect ((src1, 0), (dst1, 0))
+ fg.connect ((src1, 0), (dst2, 0))
+
+ def test_006_check_item_sizes (self):
+ fg = self.fg
+ src1 = gr.null_source (gr.sizeof_int)
+ dst1 = gr.null_sink (gr.sizeof_char)
+ self.assertRaises (ValueError,
+ lambda : fg.connect ((src1, 0),
+ (dst1, 0)))
+
+ def test_007_validate (self):
+ fg = self.fg
+ src1 = gr.null_source (gr.sizeof_int)
+ dst1 = gr.null_sink (gr.sizeof_int)
+ dst2 = gr.null_sink (gr.sizeof_int)
+ fg.connect ((src1, 0), (dst1, 0))
+ fg.connect ((src1, 0), (dst2, 0))
+ fg.validate ()
+
+ def test_008_validate (self):
+ fg = self.fg
+ src1 = gr.null_source (gr.sizeof_int)
+ nop1 = gr.nop (gr.sizeof_int)
+ dst1 = gr.null_sink (gr.sizeof_int)
+ dst2 = gr.null_sink (gr.sizeof_int)
+ fg.connect ((src1, 0), (nop1, 0))
+ fg.connect ((src1, 0), (nop1, 1))
+ fg.connect ((nop1, 0), (dst1, 0))
+ fg.connect ((nop1, 1), (dst2, 0))
+ fg.validate ()
+
+ def test_009_validate (self):
+ fg = self.fg
+ src1 = gr.null_source (gr.sizeof_int)
+ nop1 = gr.nop (gr.sizeof_int)
+ dst1 = gr.null_sink (gr.sizeof_int)
+ dst2 = gr.null_sink (gr.sizeof_int)
+ fg.connect ((src1, 0), (nop1, 0))
+ fg.connect ((src1, 0), (nop1, 2))
+ fg.connect ((nop1, 0), (dst1, 0))
+ fg.connect ((nop1, 1), (dst2, 0))
+ self.assertRaises (ValueError,
+ lambda : fg.validate ())
+
+
+ def test_010_validate (self):
+ fg = self.fg
+ src1 = gr.null_source (gr.sizeof_int)
+ nop1 = gr.nop (gr.sizeof_int)
+ dst1 = gr.null_sink (gr.sizeof_int)
+ dst2 = gr.null_sink (gr.sizeof_int)
+ fg.connect ((src1, 0), (nop1, 0))
+ fg.connect ((src1, 0), (nop1, 1))
+ fg.connect ((nop1, 0), (dst1, 0))
+ fg.connect ((nop1, 2), (dst2, 0))
+ self.assertRaises (ValueError,
+ lambda : fg.validate ())
+
+
+ def test_011_disconnect (self):
+ fg = self.fg
+ src1 = gr.null_source (gr.sizeof_int)
+ nop1 = gr.nop (gr.sizeof_int)
+ dst1 = gr.null_sink (gr.sizeof_int)
+ dst2 = gr.null_sink (gr.sizeof_int)
+ fg.connect ((src1, 0), (nop1, 0))
+ fg.connect ((src1, 0), (nop1, 1))
+ fg.connect ((nop1, 0), (dst1, 0))
+ fg.connect ((nop1, 1), (dst2, 0))
+ fg.validate ()
+ fg.disconnect ((src1, 0), (nop1, 1))
+ fg.validate ()
+ self.assertRaises (ValueError,
+ lambda : fg.disconnect ((src1, 0),
+ (nop1, 1)))
+
+ def test_012_connect (self):
+ fg = self.fg
+ src_data = (0, 1, 2, 3)
+ expected_result = (2, 3, 4, 5)
+ src1 = gr.vector_source_i (src_data)
+ op = gr.add_const_ii (2)
+ dst1 = gr.vector_sink_i ()
+ fg.connect (src1, op)
+ fg.connect (op, dst1)
+ # print "edge_list:", fg.edge_list
+ fg.validate ()
+
+ def test_013_connect_varargs (self):
+ fg = self.fg
+ src1 = gr.null_source (gr.sizeof_int)
+ nop1 = gr.nop (gr.sizeof_int)
+ dst1 = gr.null_sink (gr.sizeof_int)
+ self.assertRaises (ValueError,
+ lambda : fg.connect ())
+ self.assertRaises (ValueError,
+ lambda : fg.connect (src1))
+
+ def test_014_connect_varargs (self):
+ fg = self.fg
+ src1 = gr.null_source (gr.sizeof_int)
+ nop1 = gr.nop (gr.sizeof_int)
+ dst1 = gr.null_sink (gr.sizeof_int)
+ fg.connect (src1, nop1, dst1)
+ fg.validate ()
+
+if __name__ == '__main__':
+ gr_unittest.main ()
+
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_cma_equalizer.py b/gnuradio-core/src/python/gnuradio/gr/qa_cma_equalizer.py
new file mode 100755
index 0000000000..6b3e9aa9e3
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_cma_equalizer.py
@@ -0,0 +1,29 @@
+#!/usr/bin/env python
+
+from gnuradio import gr, gr_unittest
+
+class test_cma_equalizer_fir(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.fg = gr.flow_graph()
+
+ def tearDown(self):
+ self.fg = None
+
+ def transform(self, src_data):
+ SRC = gr.vector_source_c(src_data, False)
+ EQU = gr.cma_equalizer_cc(4, 1.0, .001)
+ DST = gr.vector_sink_c()
+ self.fg.connect(SRC, EQU, DST)
+ self.fg.run()
+ return DST.data()
+
+ def test_001_identity(self):
+ # Constant modulus signal so no adjustments
+ src_data = (1+0j, 0+1j, -1+0j, 0-1j)*1000
+ expected_data = src_data
+ result = self.transform(src_data)
+ self.assertComplexTuplesAlmostEqual(expected_data, result)
+
+if __name__ == "__main__":
+ gr_unittest.main() \ No newline at end of file
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_complex_to_xxx.py b/gnuradio-core/src/python/gnuradio/gr/qa_complex_to_xxx.py
new file mode 100755
index 0000000000..4bc1933500
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_complex_to_xxx.py
@@ -0,0 +1,116 @@
+#!/usr/bin/env python
+#
+# Copyright 2004 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import math
+
+class test_complex_ops (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.fg = gr.flow_graph ()
+
+ def tearDown (self):
+ self.fg = None
+
+ def test_complex_to_float_1 (self):
+ src_data = (0, 1, -1, 3+4j, -3-4j, -3+4j)
+ expected_result = (0, 1, -1, 3, -3, -3)
+ src = gr.vector_source_c (src_data)
+ op = gr.complex_to_float ()
+ dst = gr.vector_sink_f ()
+ self.fg.connect (src, op)
+ self.fg.connect (op, dst)
+ self.fg.run () # run the graph and wait for it to finish
+ actual_result = dst.data () # fetch the contents of the sink
+ self.assertFloatTuplesAlmostEqual (expected_result, actual_result)
+
+ def test_complex_to_float_2 (self):
+ src_data = (0, 1, -1, 3+4j, -3-4j, -3+4j)
+ expected_result0 = (0, 1, -1, 3, -3, -3)
+ expected_result1 = (0, 0, 0, 4, -4, 4)
+ src = gr.vector_source_c (src_data)
+ op = gr.complex_to_float ()
+ dst0 = gr.vector_sink_f ()
+ dst1 = gr.vector_sink_f ()
+ self.fg.connect (src, op)
+ self.fg.connect ((op, 0), dst0)
+ self.fg.connect ((op, 1), dst1)
+ self.fg.run ()
+ actual_result = dst0.data ()
+ self.assertFloatTuplesAlmostEqual (expected_result0, actual_result)
+ actual_result = dst1.data ()
+ self.assertFloatTuplesAlmostEqual (expected_result1, actual_result)
+
+ def test_complex_to_real (self):
+ src_data = (0, 1, -1, 3+4j, -3-4j, -3+4j)
+ expected_result = (0, 1, -1, 3, -3, -3)
+ src = gr.vector_source_c (src_data)
+ op = gr.complex_to_real ()
+ dst = gr.vector_sink_f ()
+ self.fg.connect (src, op)
+ self.fg.connect (op, dst)
+ self.fg.run ()
+ actual_result = dst.data ()
+ self.assertFloatTuplesAlmostEqual (expected_result, actual_result)
+
+ def test_complex_to_imag (self):
+ src_data = (0, 1, -1, 3+4j, -3-4j, -3+4j)
+ expected_result = (0, 0, 0, 4, -4, 4)
+ src = gr.vector_source_c (src_data)
+ op = gr.complex_to_imag ()
+ dst = gr.vector_sink_f ()
+ self.fg.connect (src, op)
+ self.fg.connect (op, dst)
+ self.fg.run ()
+ actual_result = dst.data ()
+ self.assertFloatTuplesAlmostEqual (expected_result, actual_result,5)
+
+ def test_complex_to_mag (self):
+ src_data = (0, 1, -1, 3+4j, -3-4j, -3+4j)
+ expected_result = (0, 1, 1, 5, 5, 5)
+ src = gr.vector_source_c (src_data)
+ op = gr.complex_to_mag ()
+ dst = gr.vector_sink_f ()
+ self.fg.connect (src, op)
+ self.fg.connect (op, dst)
+ self.fg.run ()
+ actual_result = dst.data ()
+ self.assertFloatTuplesAlmostEqual (expected_result, actual_result,5)
+
+ def test_complex_to_arg (self):
+ pi = math.pi
+ expected_result = (0, pi/6, pi/4, pi/2, 3*pi/4, 7*pi/8,
+ -pi/6, -pi/4, -pi/2, -3*pi/4, -7*pi/8)
+ src_data = tuple ([math.cos (x) + math.sin (x) * 1j for x in expected_result])
+ src = gr.vector_source_c (src_data)
+ op = gr.complex_to_arg ()
+ dst = gr.vector_sink_f ()
+ self.fg.connect (src, op)
+ self.fg.connect (op, dst)
+ self.fg.run ()
+ actual_result = dst.data ()
+ self.assertFloatTuplesAlmostEqual (expected_result, actual_result, 5)
+
+
+if __name__ == '__main__':
+ gr_unittest.main ()
+
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_constellation_decoder_cb.py b/gnuradio-core/src/python/gnuradio/gr/qa_constellation_decoder_cb.py
new file mode 100755
index 0000000000..9564122283
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_constellation_decoder_cb.py
@@ -0,0 +1,53 @@
+#!/usr/bin/env python
+#
+# Copyright 2004 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import math
+
+class test_head (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.fg = gr.flow_graph ()
+
+ def tearDown (self):
+ self.fg = None
+
+ def test_constellation_decoder_cb (self):
+ symbol_positions = [1 + 0j, 0 + 1j , -1 + 0j, 0 - 1j]
+ symbol_values_out = [0, 1, 2, 3]
+ expected_result = ( 0, 3, 2, 1, 0, 0, 3)
+ src_data = (0.5 + 0j, 0.1 - 1.2j, -0.8 - 0.1j, -0.45 + 0.8j, 0.8 - 0j, 0.5 + 0j, 0.1 - 1.2j)
+ src = gr.vector_source_c (src_data)
+ op = gr.constellation_decoder_cb (symbol_positions, symbol_values_out)
+ dst = gr.vector_sink_b ()
+ self.fg.connect (src, op)
+ self.fg.connect (op, dst)
+ self.fg.run () # run the graph and wait for it to finish
+ actual_result = dst.data () # fetch the contents of the sink
+ #print "actual result", actual_result
+ #print "expected result", expected_result
+ self.assertFloatTuplesAlmostEqual (expected_result, actual_result)
+
+
+if __name__ == '__main__':
+ gr_unittest.main ()
+
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_correlate_access_code.py b/gnuradio-core/src/python/gnuradio/gr/qa_correlate_access_code.py
new file mode 100755
index 0000000000..89b4909ebd
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_correlate_access_code.py
@@ -0,0 +1,83 @@
+#!/usr/bin/env python
+#
+# Copyright 2006 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import math
+
+default_access_code = '\xAC\xDD\xA4\xE2\xF2\x8C\x20\xFC'
+
+def string_to_1_0_list(s):
+ r = []
+ for ch in s:
+ x = ord(ch)
+ for i in range(8):
+ t = (x >> i) & 0x1
+ r.append(t)
+
+ return r
+
+def to_1_0_string(L):
+ return ''.join(map(lambda x: chr(x + ord('0')), L))
+
+class test_correlate_access_code(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.fg = gr.flow_graph()
+
+ def tearDown(self):
+ self.fg = None
+
+ def test_001(self):
+ pad = (0,) * 64
+ # 0 0 0 1 0 0 0 1
+ src_data = (1, 0, 1, 1, 1, 1, 0, 1, 1) + pad + (0,) * 7
+ expected_result = pad + (1, 0, 1, 1, 3, 1, 0, 1, 1, 2) + (0,) * 6
+ src = gr.vector_source_b (src_data)
+ op = gr.correlate_access_code_bb("1011", 0)
+ dst = gr.vector_sink_b ()
+ self.fg.connect (src, op, dst)
+ self.fg.run ()
+ result_data = dst.data ()
+ self.assertEqual (expected_result, result_data)
+
+
+ def test_002(self):
+ code = tuple(string_to_1_0_list(default_access_code))
+ access_code = to_1_0_string(code)
+ pad = (0,) * 64
+ #print code
+ #print access_code
+ src_data = code + (1, 0, 1, 1) + pad
+ expected_result = pad + code + (3, 0, 1, 1)
+ src = gr.vector_source_b (src_data)
+ op = gr.correlate_access_code_bb(access_code, 0)
+ dst = gr.vector_sink_b ()
+ self.fg.connect (src, op, dst)
+ self.fg.run ()
+ result_data = dst.data ()
+ self.assertEqual (expected_result, result_data)
+
+
+
+if __name__ == '__main__':
+ gr_unittest.main ()
+
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_diff_encoder.py b/gnuradio-core/src/python/gnuradio/gr/qa_diff_encoder.py
new file mode 100755
index 0000000000..44840709be
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_diff_encoder.py
@@ -0,0 +1,86 @@
+#!/usr/bin/env python
+#
+# Copyright 2006 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import math
+import random
+
+def make_random_int_tuple(L, min, max):
+ result = []
+ for x in range(L):
+ result.append(random.randint(min, max))
+ return tuple(result)
+
+
+class test_encoder (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.fg = gr.flow_graph ()
+
+ def tearDown (self):
+ self.fg = None
+
+ def test_diff_encdec_000(self):
+ random.seed(0)
+ modulus = 2
+ src_data = make_random_int_tuple(1000, 0, modulus-1)
+ expected_result = src_data
+ src = gr.vector_source_b(src_data)
+ enc = gr.diff_encoder_bb(modulus)
+ dec = gr.diff_decoder_bb(modulus)
+ dst = gr.vector_sink_b()
+ self.fg.connect(src, enc, dec, dst)
+ self.fg.run() # run the graph and wait for it to finish
+ actual_result = dst.data() # fetch the contents of the sink
+ self.assertEqual(expected_result, actual_result)
+
+ def test_diff_encdec_001(self):
+ random.seed(0)
+ modulus = 4
+ src_data = make_random_int_tuple(1000, 0, modulus-1)
+ expected_result = src_data
+ src = gr.vector_source_b(src_data)
+ enc = gr.diff_encoder_bb(modulus)
+ dec = gr.diff_decoder_bb(modulus)
+ dst = gr.vector_sink_b()
+ self.fg.connect(src, enc, dec, dst)
+ self.fg.run() # run the graph and wait for it to finish
+ actual_result = dst.data() # fetch the contents of the sink
+ self.assertEqual(expected_result, actual_result)
+
+ def test_diff_encdec_002(self):
+ random.seed(0)
+ modulus = 8
+ src_data = make_random_int_tuple(40000, 0, modulus-1)
+ expected_result = src_data
+ src = gr.vector_source_b(src_data)
+ enc = gr.diff_encoder_bb(modulus)
+ dec = gr.diff_decoder_bb(modulus)
+ dst = gr.vector_sink_b()
+ self.fg.connect(src, enc, dec, dst)
+ self.fg.run() # run the graph and wait for it to finish
+ actual_result = dst.data() # fetch the contents of the sink
+ self.assertEqual(expected_result, actual_result)
+
+if __name__ == '__main__':
+ gr_unittest.main ()
+
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_diff_phasor_cc.py b/gnuradio-core/src/python/gnuradio/gr/qa_diff_phasor_cc.py
new file mode 100755
index 0000000000..0b71602021
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_diff_phasor_cc.py
@@ -0,0 +1,50 @@
+#!/usr/bin/env python
+#
+# Copyright 2004 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import math
+
+class test_complex_ops (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.fg = gr.flow_graph ()
+
+ def tearDown (self):
+ self.fg = None
+
+ def test_diff_phasor_cc (self):
+ src_data = (0+0j, 1+0j, -1+0j, 3+4j, -3-4j, -3+4j)
+ expected_result = (0+0j, 0+0j, -1+0j, -3-4j, -25+0j, -7-24j)
+ src = gr.vector_source_c (src_data)
+ op = gr.diff_phasor_cc ()
+ dst = gr.vector_sink_c ()
+ self.fg.connect (src, op)
+ self.fg.connect (op, dst)
+ self.fg.run () # run the graph and wait for it to finish
+ actual_result = dst.data () # fetch the contents of the sink
+ self.assertComplexTuplesAlmostEqual (expected_result, actual_result)
+
+
+
+if __name__ == '__main__':
+ gr_unittest.main ()
+
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_feval.py b/gnuradio-core/src/python/gnuradio/gr/qa_feval.py
new file mode 100755
index 0000000000..1de49369a3
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_feval.py
@@ -0,0 +1,92 @@
+#!/usr/bin/env python
+#
+# Copyright 2006 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+
+class my_add2_dd(gr.feval_dd):
+ def eval(self, x):
+ return x + 2
+
+class my_add2_ll(gr.feval_ll):
+ def eval(self, x):
+ return x + 2
+
+class my_add2_cc(gr.feval_cc):
+ def eval(self, x):
+ return x + (2 - 2j)
+
+
+class test_feval(gr_unittest.TestCase):
+
+ def test_dd_1(self):
+ f = my_add2_dd()
+ src_data = (0.0, 1.0, 2.0, 3.0, 4.0)
+ expected_result = (2.0, 3.0, 4.0, 5.0, 6.0)
+ # this is all in python...
+ actual_result = tuple([f.eval(x) for x in src_data])
+ self.assertEqual(expected_result, actual_result)
+
+ def test_dd_2(self):
+ f = my_add2_dd()
+ src_data = (0.0, 1.0, 2.0, 3.0, 4.0)
+ expected_result = (2.0, 3.0, 4.0, 5.0, 6.0)
+ # this is python -> C++ -> python and back again...
+ actual_result = tuple([gr.feval_dd_example(f, x) for x in src_data])
+ self.assertEqual(expected_result, actual_result)
+
+
+ def test_ll_1(self):
+ f = my_add2_ll()
+ src_data = (0, 1, 2, 3, 4)
+ expected_result = (2, 3, 4, 5, 6)
+ # this is all in python...
+ actual_result = tuple([f.eval(x) for x in src_data])
+ self.assertEqual(expected_result, actual_result)
+
+ def test_ll_2(self):
+ f = my_add2_ll()
+ src_data = (0, 1, 2, 3, 4)
+ expected_result = (2, 3, 4, 5, 6)
+ # this is python -> C++ -> python and back again...
+ actual_result = tuple([gr.feval_ll_example(f, x) for x in src_data])
+ self.assertEqual(expected_result, actual_result)
+
+
+ def test_cc_1(self):
+ f = my_add2_cc()
+ src_data = (0+1j, 2+3j, 4+5j, 6+7j)
+ expected_result = (2-1j, 4+1j, 6+3j, 8+5j)
+ # this is all in python...
+ actual_result = tuple([f.eval(x) for x in src_data])
+ self.assertEqual(expected_result, actual_result)
+
+ def test_cc_2(self):
+ f = my_add2_cc()
+ src_data = (0+1j, 2+3j, 4+5j, 6+7j)
+ expected_result = (2-1j, 4+1j, 6+3j, 8+5j)
+ # this is python -> C++ -> python and back again...
+ actual_result = tuple([gr.feval_cc_example(f, x) for x in src_data])
+ self.assertEqual(expected_result, actual_result)
+
+
+if __name__ == '__main__':
+ gr_unittest.main ()
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_fft_filter.py b/gnuradio-core/src/python/gnuradio/gr/qa_fft_filter.py
new file mode 100755
index 0000000000..cb2e76b181
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_fft_filter.py
@@ -0,0 +1,268 @@
+#!/usr/bin/env python
+#
+# Copyright 2004,2005 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import sys
+import random
+
+def make_random_complex_tuple(L):
+ result = []
+ for x in range(L):
+ result.append(complex(random.uniform(-1000,1000),
+ random.uniform(-1000,1000)))
+ return tuple(result)
+
+def make_random_float_tuple(L):
+ result = []
+ for x in range(L):
+ result.append(float(int(random.uniform(-1000,1000))))
+ return tuple(result)
+
+
+def reference_filter_ccc(dec, taps, input):
+ """
+ compute result using conventional fir filter
+ """
+ fg = gr.flow_graph()
+ #src = gr.vector_source_c(((0,) * (len(taps) - 1)) + input)
+ src = gr.vector_source_c(input)
+ op = gr.fir_filter_ccc(dec, taps)
+ dst = gr.vector_sink_c()
+ fg.connect(src, op, dst)
+ fg.run()
+ return dst.data()
+
+def reference_filter_fff(dec, taps, input):
+ """
+ compute result using conventional fir filter
+ """
+ fg = gr.flow_graph()
+ #src = gr.vector_source_f(((0,) * (len(taps) - 1)) + input)
+ src = gr.vector_source_f(input)
+ op = gr.fir_filter_fff(dec, taps)
+ dst = gr.vector_sink_f()
+ fg.connect(src, op, dst)
+ fg.run()
+ return dst.data()
+
+
+def print_complex(x):
+ for i in x:
+ i = complex(i)
+ sys.stdout.write("(%6.3f,%6.3fj), " % (i.real, i.imag))
+ sys.stdout.write('\n')
+
+
+class test_fft_filter(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.fg = gr.flow_graph ()
+
+ def tearDown(self):
+ self.fg = None
+
+ def assert_fft_ok2(self, expected_result, result_data):
+ expected_result = expected_result[:len(result_data)]
+ self.assertComplexTuplesAlmostEqual2 (expected_result, result_data,
+ abs_eps=1e-9, rel_eps=4e-4)
+
+ def assert_fft_float_ok2(self, expected_result, result_data, abs_eps=1e-9, rel_eps=4e-4):
+ expected_result = expected_result[:len(result_data)]
+ self.assertFloatTuplesAlmostEqual2 (expected_result, result_data,
+ abs_eps, rel_eps)
+
+ #def test_ccc_000(self):
+ # self.assertRaises (RuntimeError, gr.fft_filter_ccc, 2, (1,))
+
+ def test_ccc_001(self):
+ src_data = (0,1,2,3,4,5,6,7)
+ taps = (1,)
+ expected_result = tuple([complex(x) for x in (0,1,2,3,4,5,6,7)])
+ src = gr.vector_source_c(src_data)
+ op = gr.fft_filter_ccc(1, taps)
+ dst = gr.vector_sink_c()
+ self.fg.connect(src, op, dst)
+ self.fg.run()
+ result_data = dst.data()
+ #print 'expected:', expected_result
+ #print 'results: ', result_data
+ self.assertComplexTuplesAlmostEqual (expected_result, result_data, 5)
+
+
+ def test_ccc_002(self):
+ src_data = (0,1,2,3,4,5,6,7)
+ taps = (2,)
+ expected_result = tuple([2 * complex(x) for x in (0,1,2,3,4,5,6,7)])
+ src = gr.vector_source_c(src_data)
+ op = gr.fft_filter_ccc(1, taps)
+ dst = gr.vector_sink_c()
+ self.fg.connect(src, op, dst)
+ self.fg.run()
+ result_data = dst.data()
+ #print 'expected:', expected_result
+ #print 'results: ', result_data
+ self.assertComplexTuplesAlmostEqual (expected_result, result_data, 5)
+
+ def test_ccc_004(self):
+ random.seed(0)
+ for i in xrange(25):
+ # sys.stderr.write("\n>>> Loop = %d\n" % (i,))
+ src_len = 4*1024
+ src_data = make_random_complex_tuple(src_len)
+ ntaps = int(random.uniform(2, 1000))
+ taps = make_random_complex_tuple(ntaps)
+ expected_result = reference_filter_ccc(1, taps, src_data)
+
+ src = gr.vector_source_c(src_data)
+ op = gr.fft_filter_ccc(1, taps)
+ dst = gr.vector_sink_c()
+ self.fg.connect(src, op, dst)
+ self.fg.run()
+ result_data = dst.data()
+
+ self.assert_fft_ok2(expected_result, result_data)
+
+ def test_ccc_005(self):
+ random.seed(0)
+ for i in xrange(25):
+ # sys.stderr.write("\n>>> Loop = %d\n" % (i,))
+ dec = i + 1
+ src_len = 4*1024
+ src_data = make_random_complex_tuple(src_len)
+ ntaps = int(random.uniform(2, 100))
+ taps = make_random_complex_tuple(ntaps)
+ expected_result = reference_filter_ccc(dec, taps, src_data)
+
+ src = gr.vector_source_c(src_data)
+ op = gr.fft_filter_ccc(dec, taps)
+ dst = gr.vector_sink_c()
+ self.fg.connect(src, op, dst)
+ self.fg.run()
+ result_data = dst.data()
+
+ self.assert_fft_ok2(expected_result, result_data)
+
+ # ----------------------------------------------------------------
+ # test _fff version
+ # ----------------------------------------------------------------
+
+ def test_fff_001(self):
+ src_data = (0,1,2,3,4,5,6,7)
+ taps = (1,)
+ expected_result = tuple([float(x) for x in (0,1,2,3,4,5,6,7)])
+ src = gr.vector_source_f(src_data)
+ op = gr.fft_filter_fff(1, taps)
+ dst = gr.vector_sink_f()
+ self.fg.connect(src, op, dst)
+ self.fg.run()
+ result_data = dst.data()
+ #print 'expected:', expected_result
+ #print 'results: ', result_data
+ self.assertFloatTuplesAlmostEqual (expected_result, result_data, 5)
+
+
+ def test_fff_002(self):
+ src_data = (0,1,2,3,4,5,6,7)
+ taps = (2,)
+ expected_result = tuple([2 * float(x) for x in (0,1,2,3,4,5,6,7)])
+ src = gr.vector_source_f(src_data)
+ op = gr.fft_filter_fff(1, taps)
+ dst = gr.vector_sink_f()
+ self.fg.connect(src, op, dst)
+ self.fg.run()
+ result_data = dst.data()
+ #print 'expected:', expected_result
+ #print 'results: ', result_data
+ self.assertFloatTuplesAlmostEqual (expected_result, result_data, 5)
+
+ def xtest_fff_003(self):
+ random.seed(0)
+ for i in xrange(25):
+ sys.stderr.write("\n>>> Loop = %d\n" % (i,))
+ src_len = 4096
+ src_data = make_random_float_tuple(src_len)
+ ntaps = int(random.uniform(2, 1000))
+ taps = make_random_float_tuple(ntaps)
+ expected_result = reference_filter_fff(1, taps, src_data)
+
+ src = gr.vector_source_f(src_data)
+ op = gr.fft_filter_fff(1, taps)
+ dst = gr.vector_sink_f()
+ self.fg.connect(src, op, dst)
+ self.fg.run()
+ result_data = dst.data()
+
+ #print "src_len =", src_len, " ntaps =", ntaps
+ try:
+ self.assert_fft_float_ok2(expected_result, result_data, abs_eps=1.0)
+ except:
+ expected = open('expected', 'w')
+ for x in expected_result:
+ expected.write(`x` + '\n')
+ actual = open('actual', 'w')
+ for x in result_data:
+ actual.write(`x` + '\n')
+ raise
+
+ def xtest_fff_004(self):
+ random.seed(0)
+ for i in xrange(25):
+ sys.stderr.write("\n>>> Loop = %d\n" % (i,))
+ src_len = 4*1024
+ src_data = make_random_float_tuple(src_len)
+ ntaps = int(random.uniform(2, 1000))
+ taps = make_random_float_tuple(ntaps)
+ expected_result = reference_filter_fff(1, taps, src_data)
+
+ src = gr.vector_source_f(src_data)
+ op = gr.fft_filter_fff(1, taps)
+ dst = gr.vector_sink_f()
+ self.fg.connect(src, op, dst)
+ self.fg.run()
+ result_data = dst.data()
+
+ self.assert_fft_float_ok2(expected_result, result_data, abs_eps=2.0)
+
+ def xtest_fff_005(self):
+ random.seed(0)
+ for i in xrange(25):
+ sys.stderr.write("\n>>> Loop = %d\n" % (i,))
+ dec = i + 1
+ src_len = 4*1024
+ src_data = make_random_float_tuple(src_len)
+ ntaps = int(random.uniform(2, 100))
+ taps = make_random_float_tuple(ntaps)
+ expected_result = reference_filter_fff(dec, taps, src_data)
+
+ src = gr.vector_source_f(src_data)
+ op = gr.fft_filter_fff(dec, taps)
+ dst = gr.vector_sink_f()
+ self.fg.connect(src, op, dst)
+ self.fg.run()
+ result_data = dst.data()
+
+ self.assert_fft_float_ok2(expected_result, result_data)
+
+
+if __name__ == '__main__':
+ gr_unittest.main ()
+
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_filter_delay_fc.py b/gnuradio-core/src/python/gnuradio/gr/qa_filter_delay_fc.py
new file mode 100755
index 0000000000..191552ca27
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_filter_delay_fc.py
@@ -0,0 +1,317 @@
+#!/usr/bin/env python
+#
+# Copyright 2004 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import math
+
+class qa_filter_delay_fc (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.fg = gr.flow_graph ()
+
+ def tearDown (self):
+ self.fg = None
+
+ def test_001_filter_delay_one_input (self):
+
+ # expected result
+ expected_result = ( -1.4678005338941702e-11j,
+ -0.0011950774351134896j,
+ -0.0019336787518113852j,
+ -0.0034673355985432863j,
+ -0.0036765895783901215j,
+ -0.004916108213365078j,
+ -0.0042778430506587029j,
+ -0.006028641015291214j,
+ -0.005476709920912981j,
+ -0.0092810001224279404j,
+ -0.0095402700826525688j,
+ -0.016060983762145042j,
+ -0.016446959227323532j,
+ -0.02523401565849781j,
+ -0.024382550269365311j,
+ -0.035477779805660248j,
+ -0.033021725714206696j,
+ -0.048487484455108643j,
+ -0.04543270543217659j,
+ -0.069477587938308716j,
+ -0.066984444856643677j,
+ -0.10703597217798233j,
+ -0.10620346665382385j,
+ -0.1852707713842392j,
+ -0.19357112050056458j,
+ (7.2191945754696007e-09 -0.50004088878631592j),
+ (0.58778399229049683 -0.6155126690864563j),
+ (0.95105588436126709 -0.12377222627401352j),
+ (0.95105588436126709 +0.41524654626846313j),
+ (0.5877838134765625 +0.91611981391906738j),
+ (5.8516356205018383e-09 +1.0670661926269531j),
+ (-0.5877840518951416 +0.87856143712997437j),
+ (-0.95105588436126709 +0.35447561740875244j),
+ (-0.95105588436126709 -0.26055556535720825j),
+ (-0.5877838134765625 -0.77606213092803955j),
+ (-8.7774534307527574e-09 -0.96460390090942383j),
+ (0.58778399229049683 -0.78470128774642944j),
+ (0.95105588436126709 -0.28380891680717468j),
+ (0.95105588436126709 +0.32548999786376953j),
+ (0.5877838134765625 +0.82514488697052002j),
+ (1.4629089051254596e-08 +1.0096219778060913j),
+ (-0.5877840518951416 +0.81836479902267456j),
+ (-0.95105588436126709 +0.31451958417892456j),
+ (-0.95105588436126709 -0.3030143678188324j),
+ (-0.5877838134765625 -0.80480599403381348j),
+ (-1.7554906861505515e-08 -0.99516552686691284j),
+ (0.58778399229049683 -0.80540722608566284j),
+ (0.95105582475662231 -0.30557557940483093j),
+ (0.95105588436126709 +0.31097668409347534j),
+ (0.5877838134765625 +0.81027895212173462j),
+ (2.3406542482007353e-08 +1.0000816583633423j),
+ (-0.5877840518951416 +0.80908381938934326j),
+ (-0.95105588436126709 +0.30904293060302734j),
+ (-0.95105588436126709 -0.30904296040534973j),
+ (-0.5877838134765625 -0.80908387899398804j),
+ (-2.6332360292258272e-08 -1.0000815391540527j),
+ (0.58778399229049683 -0.80908381938934326j),
+ (0.95105582475662231 -0.30904299020767212j),
+ (0.95105588436126709 +0.30904293060302734j),
+ (0.5877838134765625 +0.80908381938934326j),
+ (3.218399768911695e-08 +1.0000815391540527j))
+
+ fg = self.fg
+
+ sampling_freq = 100
+
+ ntaps = 51
+ src1 = gr.sig_source_f (sampling_freq, gr.GR_SIN_WAVE,
+ sampling_freq * 0.10, 1.0)
+ head = gr.head (gr.sizeof_float, int (ntaps + sampling_freq * 0.10))
+ dst2 = gr.vector_sink_c ()
+
+ # calculate taps
+ taps = gr.firdes_hilbert (ntaps)
+ hd = gr.filter_delay_fc (taps)
+
+ fg.connect (src1, head)
+ fg.connect (head, hd)
+ fg.connect (hd,dst2)
+
+ fg.run ()
+
+ # get output
+ result_data = dst2.data ()
+ self.assertComplexTuplesAlmostEqual (expected_result, result_data, 5)
+
+ def test_002_filter_delay_two_inputs (self):
+
+ # giving the same signal to both the inputs should fetch the same results
+ # as above
+
+ # expected result
+ expected_result = ( -1.4678005338941702e-11j,
+ -0.0011950774351134896j,
+ -0.0019336787518113852j,
+ -0.0034673355985432863j,
+ -0.0036765895783901215j,
+ -0.004916108213365078j,
+ -0.0042778430506587029j,
+ -0.006028641015291214j,
+ -0.005476709920912981j,
+ -0.0092810001224279404j,
+ -0.0095402700826525688j,
+ -0.016060983762145042j,
+ -0.016446959227323532j,
+ -0.02523401565849781j,
+ -0.024382550269365311j,
+ -0.035477779805660248j,
+ -0.033021725714206696j,
+ -0.048487484455108643j,
+ -0.04543270543217659j,
+ -0.069477587938308716j,
+ -0.066984444856643677j,
+ -0.10703597217798233j,
+ -0.10620346665382385j,
+ -0.1852707713842392j,
+ -0.19357112050056458j,
+ (7.2191945754696007e-09 -0.50004088878631592j),
+ (0.58778399229049683 -0.6155126690864563j),
+ (0.95105588436126709 -0.12377222627401352j),
+ (0.95105588436126709 +0.41524654626846313j),
+ (0.5877838134765625 +0.91611981391906738j),
+ (5.8516356205018383e-09 +1.0670661926269531j),
+ (-0.5877840518951416 +0.87856143712997437j),
+ (-0.95105588436126709 +0.35447561740875244j),
+ (-0.95105588436126709 -0.26055556535720825j),
+ (-0.5877838134765625 -0.77606213092803955j),
+ (-8.7774534307527574e-09 -0.96460390090942383j),
+ (0.58778399229049683 -0.78470128774642944j),
+ (0.95105588436126709 -0.28380891680717468j),
+ (0.95105588436126709 +0.32548999786376953j),
+ (0.5877838134765625 +0.82514488697052002j),
+ (1.4629089051254596e-08 +1.0096219778060913j),
+ (-0.5877840518951416 +0.81836479902267456j),
+ (-0.95105588436126709 +0.31451958417892456j),
+ (-0.95105588436126709 -0.3030143678188324j),
+ (-0.5877838134765625 -0.80480599403381348j),
+ (-1.7554906861505515e-08 -0.99516552686691284j),
+ (0.58778399229049683 -0.80540722608566284j),
+ (0.95105582475662231 -0.30557557940483093j),
+ (0.95105588436126709 +0.31097668409347534j),
+ (0.5877838134765625 +0.81027895212173462j),
+ (2.3406542482007353e-08 +1.0000816583633423j),
+ (-0.5877840518951416 +0.80908381938934326j),
+ (-0.95105588436126709 +0.30904293060302734j),
+ (-0.95105588436126709 -0.30904296040534973j),
+ (-0.5877838134765625 -0.80908387899398804j),
+ (-2.6332360292258272e-08 -1.0000815391540527j),
+ (0.58778399229049683 -0.80908381938934326j),
+ (0.95105582475662231 -0.30904299020767212j),
+ (0.95105588436126709 +0.30904293060302734j),
+ (0.5877838134765625 +0.80908381938934326j),
+ (3.218399768911695e-08 +1.0000815391540527j))
+
+
+ fg = self.fg
+
+ sampling_freq = 100
+ ntaps = 51
+ src1 = gr.sig_source_f (sampling_freq, gr.GR_SIN_WAVE,
+ sampling_freq * 0.10, 1.0)
+ head = gr.head (gr.sizeof_float, int (ntaps + sampling_freq * 0.10))
+ dst2 = gr.vector_sink_c ()
+
+
+ # calculate taps
+ taps = gr.firdes_hilbert (ntaps)
+ hd = gr.filter_delay_fc (taps)
+
+ fg.connect (src1, head)
+ fg.connect (head, (hd,0))
+ fg.connect (head, (hd,1))
+ fg.connect (hd,dst2)
+ fg.run ()
+
+ # get output
+ result_data = dst2.data ()
+
+ self.assertComplexTuplesAlmostEqual (expected_result, result_data, 5)
+
+
+ def test_003_filter_delay_two_inputs (self):
+
+ # give two different inputs
+
+ # expected result
+ expected_result = ( -0.0020331963896751404j,
+ -0.0016448829555884004j,
+ -0.0032375147566199303j,
+ -0.0014826074475422502j,
+ -0.0033034090884029865j,
+ -0.00051144487224519253j,
+ -0.0043686260469257832j,
+ -0.0010198024101555347j,
+ -0.0082517862319946289j,
+ -0.003456643782556057j,
+ -0.014193611219525337j,
+ -0.005875137634575367j,
+ -0.020293503999710083j,
+ -0.0067503536120057106j,
+ -0.026798896491527557j,
+ -0.0073488112539052963j,
+ -0.037041611969470978j,
+ -0.010557252913713455j,
+ -0.055669989436864853j,
+ -0.018332764506340027j,
+ -0.089904911816120148j,
+ -0.033361352980136871j,
+ -0.16902604699134827j,
+ -0.074318811297416687j,
+ -0.58429563045501709j,
+ (7.2191945754696007e-09 -0.35892376303672791j),
+ (0.58778399229049683 +0.63660913705825806j),
+ (0.95105588436126709 +0.87681591510772705j),
+ (0.95105588436126709 +0.98705857992172241j),
+ (0.5877838134765625 +0.55447429418563843j),
+ (5.8516356205018383e-09 +0.026006083935499191j),
+ (-0.5877840518951416 -0.60616838932037354j),
+ (-0.95105588436126709 -0.9311758279800415j),
+ (-0.95105588436126709 -0.96169203519821167j),
+ (-0.5877838134765625 -0.57292771339416504j),
+ (-8.7774534307527574e-09 -0.0073488391935825348j),
+ (0.58778399229049683 +0.59720659255981445j),
+ (0.95105588436126709 +0.94438445568084717j),
+ (0.95105588436126709 +0.95582199096679688j),
+ (0.5877838134765625 +0.58196049928665161j),
+ (1.4629089051254596e-08 +0.0026587247848510742j),
+ (-0.5877840518951416 -0.59129220247268677j),
+ (-0.95105588436126709 -0.94841635227203369j),
+ (-0.95105588436126709 -0.95215457677841187j),
+ (-0.5877838134765625 -0.58535969257354736j),
+ (-1.7554906861505515e-08 -0.00051158666610717773j),
+ (0.58778399229049683 +0.58867418766021729j),
+ (0.95105582475662231 +0.94965213537216187j),
+ (0.95105588436126709 +0.95050644874572754j),
+ (0.5877838134765625 +0.58619076013565063j),
+ (2.3406542482007353e-08 +1.1920928955078125e-07j),
+ (-0.5877840518951416 -0.58783555030822754j),
+ (-0.95105588436126709 -0.95113480091094971j),
+ (-0.95105588436126709 -0.95113474130630493j),
+ (-0.5877838134765625 -0.58783555030822754j),
+ (-2.6332360292258272e-08 -8.1956386566162109e-08j),
+ (0.58778399229049683 +0.58783555030822754j),
+ (0.95105582475662231 +0.95113474130630493j),
+ (0.95105588436126709 +0.95113474130630493j),
+ (0.5877838134765625 +0.58783560991287231j),
+ (3.218399768911695e-08 +1.1920928955078125e-07j))
+
+ fg = self.fg
+
+ sampling_freq = 100
+ ntaps = 51
+
+ src1 = gr.sig_source_f (sampling_freq, gr.GR_SIN_WAVE,sampling_freq * 0.10, 1.0)
+ src2 = gr.sig_source_f (sampling_freq, gr.GR_COS_WAVE,sampling_freq * 0.10, 1.0)
+
+ head1 = gr.head (gr.sizeof_float, int (ntaps + sampling_freq * 0.10))
+ head2 = gr.head (gr.sizeof_float, int (ntaps + sampling_freq * 0.10))
+
+ taps = gr.firdes_hilbert (ntaps)
+ hd = gr.filter_delay_fc (taps)
+
+ dst2 = gr.vector_sink_c ()
+
+ fg.connect (src1, head1)
+ fg.connect (src2, head2)
+
+ fg.connect (head1, (hd,0))
+ fg.connect (head2, (hd,1))
+ fg.connect (hd, dst2)
+
+ fg.run ()
+
+ # get output
+ result_data = dst2.data ()
+
+ self.assertComplexTuplesAlmostEqual (expected_result, result_data, 5)
+
+
+if __name__ == '__main__':
+ gr_unittest.main ()
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_flow_graph.py b/gnuradio-core/src/python/gnuradio/gr/qa_flow_graph.py
new file mode 100755
index 0000000000..455cc13590
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_flow_graph.py
@@ -0,0 +1,356 @@
+#!/usr/bin/env python
+#
+# Copyright 2004 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import qa_basic_flow_graph
+
+
+def all_counts ():
+ return (gr.block_ncurrently_allocated (),
+ gr.block_detail_ncurrently_allocated (),
+ gr.buffer_ncurrently_allocated (),
+ gr.buffer_reader_ncurrently_allocated ())
+
+
+class wrap_add(gr.hier_block):
+ def __init__(self, fg, a):
+ add = gr.add_const_ii (a)
+ gr.hier_block.__init__ (self, fg, add, add)
+
+class mult_add(gr.hier_block):
+ def __init__(self, fg, m, a):
+ mult = gr.multiply_const_ii (m)
+ add = gr.add_const_ii (a)
+ fg.connect (mult, add)
+ gr.hier_block.__init__ (self, fg, mult, add)
+
+
+class test_flow_graph (qa_basic_flow_graph.test_basic_flow_graph):
+
+ def setUp (self):
+ ''' override qa_basic_flow_graph setUp in order to use our class'''
+ self.fg = gr.flow_graph ()
+
+ def tearDown (self):
+ qa_basic_flow_graph.test_basic_flow_graph.tearDown (self)
+
+
+ # we inherit all their tests, so we can be sure we don't break
+ # any of the underlying code
+
+
+ def leak_check (self, fct):
+ begin = all_counts ()
+ fct ()
+ # tear down early so we can check for leaks
+ self.tearDown ()
+ end = all_counts ()
+ self.assertEqual (begin, end)
+
+ def test_100_tsort_null (self):
+ self.assertEqual ([], self.fg.topological_sort (self.fg.all_blocks ()))
+
+ def test_101_tsort_two (self):
+ fg = self.fg
+ src1 = gr.null_source (gr.sizeof_int)
+ dst1 = gr.null_sink (gr.sizeof_int)
+ fg.connect ((src1, 0), (dst1, 0))
+ fg.validate ()
+ self.assertEqual ([src1, dst1], fg.topological_sort (fg.all_blocks ()))
+
+ def test_102_tsort_three_a (self):
+ fg = self.fg
+ src1 = gr.null_source (gr.sizeof_int)
+ nop1 = gr.nop (gr.sizeof_int)
+ dst1 = gr.null_sink (gr.sizeof_int)
+ fg.connect (src1, nop1)
+ fg.connect (nop1, dst1)
+ fg.validate ()
+ self.assertEqual ([src1, nop1, dst1], fg.topological_sort (fg.all_blocks ()))
+
+ def test_103_tsort_three_b (self):
+ fg = self.fg
+ src1 = gr.null_source (gr.sizeof_int)
+ nop1 = gr.nop (gr.sizeof_int)
+ dst1 = gr.null_sink (gr.sizeof_int)
+ fg.connect (nop1, dst1)
+ fg.connect (src1, nop1)
+ fg.validate ()
+ self.assertEqual ([src1, nop1, dst1], fg.topological_sort (fg.all_blocks ()))
+
+ def test_104_trivial_dag_check (self):
+ fg = self.fg
+ nop1 = gr.nop (gr.sizeof_int)
+ fg.connect (nop1, nop1)
+ fg.validate ()
+ self.assertRaises (gr.NotDAG,
+ lambda : fg.topological_sort (fg.all_blocks ()))
+
+ def test_105 (self):
+ fg = self.fg
+ src1 = gr.null_source (gr.sizeof_int)
+ src2 = gr.null_source (gr.sizeof_int)
+ nop1 = gr.nop (gr.sizeof_int)
+ nop2 = gr.nop (gr.sizeof_int)
+ nop3 = gr.nop (gr.sizeof_int)
+ dst1 = gr.null_sink (gr.sizeof_int)
+
+ fg.connect (src1, nop1)
+ fg.connect (src2, nop2)
+ fg.connect (nop1, (nop3, 0))
+ fg.connect (nop2, (nop3, 1))
+ fg.connect (nop3, dst1)
+ fg.validate ()
+ ts = fg.topological_sort (fg.all_blocks ())
+ self.assertEqual ([src2, nop2, src1, nop1, nop3, dst1], ts)
+
+
+ def test_106 (self):
+ fg = self.fg
+ src1 = gr.null_source (gr.sizeof_int)
+ src2 = gr.null_source (gr.sizeof_int)
+ nop1 = gr.nop (gr.sizeof_int)
+ nop2 = gr.nop (gr.sizeof_int)
+ nop3 = gr.nop (gr.sizeof_int)
+ dst1 = gr.null_sink (gr.sizeof_int)
+
+ fg.connect (nop3, dst1)
+ fg.connect (nop2, (nop3, 1))
+ fg.connect (nop1, (nop3, 0))
+ fg.connect (src2, nop2)
+ fg.connect (src1, nop1)
+ fg.validate ()
+ ts = fg.topological_sort (fg.all_blocks ())
+ self.assertEqual ([src2, nop2, src1, nop1, nop3, dst1], ts)
+
+ def test_107 (self):
+ fg = self.fg
+ src1 = gr.null_source (gr.sizeof_int)
+ src2 = gr.null_source (gr.sizeof_int)
+ nop1 = gr.nop (gr.sizeof_int)
+ nop2 = gr.nop (gr.sizeof_int)
+ dst1 = gr.null_sink (gr.sizeof_int)
+ dst2 = gr.null_sink (gr.sizeof_int)
+
+ fg.connect (src1, nop1)
+ fg.connect (nop1, dst1)
+ fg.connect (src2, nop2)
+ fg.connect (nop2, dst2)
+ fg.validate ()
+ ts = fg.topological_sort (fg.all_blocks ())
+ self.assertEqual ([src2, nop2, dst2, src1, nop1, dst1], ts)
+
+ def test_108 (self):
+ self.leak_check (self.body_108)
+
+ def body_108 (self):
+ fg = self.fg
+ src1 = gr.null_source (gr.sizeof_int)
+ src2 = gr.null_source (gr.sizeof_int)
+ nop1 = gr.nop (gr.sizeof_int)
+ nop2 = gr.nop (gr.sizeof_int)
+ dst1 = gr.null_sink (gr.sizeof_int)
+ dst2 = gr.null_sink (gr.sizeof_int)
+
+ fg.connect (nop2, dst2)
+ fg.connect (src1, nop1)
+ fg.connect (src2, nop2)
+ fg.connect (nop1, dst1)
+ fg.validate ()
+ ts = fg.topological_sort (fg.all_blocks ())
+ self.assertEqual ([src2, nop2, dst2, src1, nop1, dst1], ts)
+ self.assertEqual ((6,0,0,0), all_counts ())
+
+ def test_109__setup_connections (self):
+ self.leak_check (self.body_109)
+
+ def body_109 (self):
+ fg = self.fg
+ src1 = gr.null_source (gr.sizeof_int)
+ dst1 = gr.null_sink (gr.sizeof_int)
+ fg.connect (src1, dst1)
+ fg._setup_connections ()
+ self.assertEqual ((2,2,1,1), all_counts ())
+
+ def test_110_scheduler (self):
+ self.leak_check (self.body_110)
+
+ def body_110 (self):
+ fg = self.fg
+ src_data = (0, 1, 2, 3)
+ src1 = gr.vector_source_i (src_data)
+ dst1 = gr.vector_sink_i ()
+ fg.connect ((src1, 0), (dst1, 0))
+ fg.run ()
+ dst_data = dst1.data ()
+ self.assertEqual (src_data, dst_data)
+
+ def test_111_scheduler (self):
+ self.leak_check (self.body_111)
+
+ def body_111 (self):
+ fg = self.fg
+ src_data = (0, 1, 2, 3)
+ expected_result = (2, 3, 4, 5)
+ src1 = gr.vector_source_i (src_data)
+ op = gr.add_const_ii (2)
+ dst1 = gr.vector_sink_i ()
+ fg.connect (src1, op)
+ fg.connect (op, dst1)
+ fg.run ()
+ dst_data = dst1.data ()
+ self.assertEqual (expected_result, dst_data)
+
+ def test_111v_scheduler (self):
+ self.leak_check (self.body_111v)
+
+ def body_111v (self):
+ fg = self.fg
+ src_data = (0, 1, 2, 3)
+ expected_result = (2, 3, 4, 5)
+ src1 = gr.vector_source_i (src_data)
+ op = gr.add_const_ii (2)
+ dst1 = gr.vector_sink_i ()
+ fg.connect (src1, op, dst1)
+ fg.run ()
+ dst_data = dst1.data ()
+ self.assertEqual (expected_result, dst_data)
+
+ def test_112 (self):
+ fg = self.fg
+ nop1 = gr.nop (gr.sizeof_int)
+ nop2 = gr.nop (gr.sizeof_int)
+ nop3 = gr.nop (gr.sizeof_int)
+ fg.connect ((nop1, 0), (nop2, 0))
+ fg.connect ((nop1, 1), (nop3, 0))
+ fg._setup_connections ()
+ self.assertEqual (2, nop1.detail().noutputs())
+
+ def test_113 (self):
+ fg = self.fg
+ nop1 = gr.nop (gr.sizeof_int)
+ nop2 = gr.nop (gr.sizeof_int)
+ nop3 = gr.nop (gr.sizeof_int)
+ fg.connect ((nop1, 0), (nop2, 0))
+ fg.connect ((nop1, 0), (nop3, 0))
+ fg._setup_connections ()
+ self.assertEqual (1, nop1.detail().noutputs())
+
+ def test_200_partition (self):
+ self.leak_check (self.body_200)
+
+ def body_200 (self):
+ fg = self.fg
+ src1 = gr.null_source (gr.sizeof_int)
+ nop1 = gr.nop (gr.sizeof_int)
+ dst1 = gr.null_sink (gr.sizeof_int)
+ fg.connect (nop1, dst1)
+ fg.connect (src1, nop1)
+ fg.validate ()
+ p = fg.partition_graph (fg.all_blocks ())
+ self.assertEqual ([[src1, nop1, dst1]], p)
+ self.assertEqual ((3,0,0,0), all_counts ())
+
+ def test_201_partition (self):
+ self.leak_check (self.body_201)
+
+ def body_201 (self):
+ fg = self.fg
+ src1 = gr.null_source (gr.sizeof_int)
+ src2 = gr.null_source (gr.sizeof_int)
+ nop1 = gr.nop (gr.sizeof_int)
+ nop2 = gr.nop (gr.sizeof_int)
+ dst1 = gr.null_sink (gr.sizeof_int)
+ dst2 = gr.null_sink (gr.sizeof_int)
+
+ fg.connect (nop2, dst2)
+ fg.connect (src1, nop1)
+ fg.connect (src2, nop2)
+ fg.connect (nop1, dst1)
+ fg.validate ()
+ p = fg.partition_graph (fg.all_blocks ())
+ self.assertEqual ([[src2, nop2, dst2], [src1, nop1, dst1]], p)
+ self.assertEqual ((6,0,0,0), all_counts ())
+
+ def test_300_hier (self):
+ self.leak_check (self.body_300_hier)
+
+ def body_300_hier (self):
+ fg = self.fg
+ src_data = (0, 1, 2, 3)
+ expected_result = (10, 11, 12, 13)
+ src1 = gr.vector_source_i (src_data)
+ op = wrap_add (fg, 10)
+ dst1 = gr.vector_sink_i ()
+ fg.connect (src1, op, dst1)
+ fg.run ()
+ dst_data = dst1.data ()
+ self.assertEqual (expected_result, dst_data)
+
+ def test_301_hier (self):
+ self.leak_check (self.body_301_hier)
+
+ def body_301_hier (self):
+ fg = self.fg
+ src_data = (0, 1, 2, 3)
+ expected_result = (5, 8, 11, 14)
+ src1 = gr.vector_source_i (src_data)
+ op = mult_add (fg, 3, 5)
+ dst1 = gr.vector_sink_i ()
+ fg.connect (src1, op, dst1)
+ fg.run ()
+ dst_data = dst1.data ()
+ self.assertEqual (expected_result, dst_data)
+
+ def test_302_hier (self):
+ self.leak_check (self.body_302_hier)
+
+ def body_302_hier (self):
+ fg = self.fg
+ src_data = (0, 1, 2, 3)
+ expected_result = (10, 11, 12, 13)
+ src1 = gr.vector_source_i (src_data)
+ op = gr.compose (fg, gr.add_const_ii (10))
+ dst1 = gr.vector_sink_i ()
+ fg.connect (src1, op, dst1)
+ fg.run ()
+ dst_data = dst1.data ()
+ self.assertEqual (expected_result, dst_data)
+
+ def test_303_hier (self):
+ self.leak_check (self.body_303_hier)
+
+ def body_303_hier (self):
+ fg = self.fg
+ src_data = (0, 1, 2, 3)
+ expected_result = (35, 38, 41, 44)
+ src1 = gr.vector_source_i (src_data)
+ op = gr.compose (fg, gr.add_const_ii (10), mult_add (fg, 3, 5))
+ dst1 = gr.vector_sink_i ()
+ fg.connect (src1, op, dst1)
+ fg.run ()
+ dst_data = dst1.data ()
+ self.assertEqual (expected_result, dst_data)
+
+
+if __name__ == '__main__':
+ gr_unittest.main ()
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_frequency_modulator.py b/gnuradio-core/src/python/gnuradio/gr/qa_frequency_modulator.py
new file mode 100755
index 0000000000..cfa34830d6
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_frequency_modulator.py
@@ -0,0 +1,56 @@
+#!/usr/bin/env python
+#
+# Copyright 2004 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import math
+
+def sincos(x):
+ return math.cos(x) + math.sin(x) * 1j
+
+
+class test_frequency_modulator (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.fg = gr.flow_graph ()
+
+ def tearDown (self):
+ self.fg = None
+
+ def test_fm_001 (self):
+ pi = math.pi
+ sensitivity = pi/4
+ src_data = (1.0/4, 1.0/2, 1.0/4, -1.0/4, -1.0/2, -1/4.0)
+ running_sum = (pi/16, 3*pi/16, pi/4, 3*pi/16, pi/16, 0)
+ expected_result = tuple ([sincos (x) for x in running_sum])
+ src = gr.vector_source_f (src_data)
+ op = gr.frequency_modulator_fc (sensitivity)
+ dst = gr.vector_sink_c ()
+ self.fg.connect (src, op)
+ self.fg.connect (op, dst)
+ self.fg.run ()
+ result_data = dst.data ()
+ self.assertComplexTuplesAlmostEqual (expected_result, result_data)
+
+
+if __name__ == '__main__':
+ gr_unittest.main ()
+
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_fsk_stuff.py b/gnuradio-core/src/python/gnuradio/gr/qa_fsk_stuff.py
new file mode 100755
index 0000000000..d61ddb1a05
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_fsk_stuff.py
@@ -0,0 +1,75 @@
+#!/usr/bin/env python
+#
+# Copyright 2004 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import math
+
+def sincos(x):
+ return math.cos(x) + math.sin(x) * 1j
+
+class test_bytes_to_syms (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.fg = gr.flow_graph ()
+
+ def tearDown (self):
+ self.fg = None
+
+ def test_bytes_to_syms_001 (self):
+ src_data = (0x01, 0x80, 0x03)
+ expected_result = (-1, -1, -1, -1, -1, -1, -1, +1,
+ +1, -1, -1, -1, -1, -1, -1, -1,
+ -1, -1, -1, -1, -1, -1, +1, +1)
+ src = gr.vector_source_b (src_data)
+ op = gr.bytes_to_syms ()
+ dst = gr.vector_sink_f ()
+ self.fg.connect (src, op)
+ self.fg.connect (op, dst)
+ self.fg.run ()
+ result_data = dst.data ()
+ self.assertEqual (expected_result, result_data)
+
+ def test_simple_framer (self):
+ src_data = (0x00, 0x11, 0x22, 0x33,
+ 0x44, 0x55, 0x66, 0x77,
+ 0x88, 0x99, 0xaa, 0xbb,
+ 0xcc, 0xdd, 0xee, 0xff)
+
+ expected_result = (
+ 0xac, 0xdd, 0xa4, 0xe2, 0xf2, 0x8c, 0x20, 0xfc, 0x00, 0x00, 0x11, 0x22, 0x33, 0x55,
+ 0xac, 0xdd, 0xa4, 0xe2, 0xf2, 0x8c, 0x20, 0xfc, 0x01, 0x44, 0x55, 0x66, 0x77, 0x55,
+ 0xac, 0xdd, 0xa4, 0xe2, 0xf2, 0x8c, 0x20, 0xfc, 0x02, 0x88, 0x99, 0xaa, 0xbb, 0x55,
+ 0xac, 0xdd, 0xa4, 0xe2, 0xf2, 0x8c, 0x20, 0xfc, 0x03, 0xcc, 0xdd, 0xee, 0xff, 0x55)
+
+ src = gr.vector_source_b (src_data)
+ op = gr.simple_framer (4)
+ dst = gr.vector_sink_b ()
+ self.fg.connect (src, op)
+ self.fg.connect (op, dst)
+ self.fg.run ()
+ result_data = dst.data ()
+ self.assertEqual (expected_result, result_data)
+
+
+if __name__ == '__main__':
+ gr_unittest.main ()
+
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_goertzel.py b/gnuradio-core/src/python/gnuradio/gr/qa_goertzel.py
new file mode 100755
index 0000000000..da9d65f625
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_goertzel.py
@@ -0,0 +1,64 @@
+#!/usr/bin/env python
+#
+# Copyright 2006 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+from math import pi, cos
+
+class test_goertzel(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.fg = gr.flow_graph()
+
+ def tearDown(self):
+ self.fg = None
+
+ def make_tone_data(self, rate, freq):
+ return [cos(2*pi*x*freq/rate) for x in range(rate)]
+
+ def transform(self, src_data, rate, freq):
+ src = gr.vector_source_f(src_data, False)
+ dft = gr.goertzel_fc(rate, rate, freq)
+ dst = gr.vector_sink_c()
+ self.fg.connect(src, dft, dst)
+ self.fg.run()
+ return dst.data()
+
+ def test_001(self): # Measure single tone magnitude
+ rate = 8000
+ freq = 100
+ bin = freq
+ src_data = self.make_tone_data(rate, freq)
+ expected_result = 0.5
+ actual_result = abs(self.transform(src_data, rate, bin)[0])
+ self.assertAlmostEqual(expected_result, actual_result, places=5)
+
+ def test_002(self): # Measure off frequency magnitude
+ rate = 8000
+ freq = 100
+ bin = freq/2
+ src_data = self.make_tone_data(rate, freq)
+ expected_result = 0.0
+ actual_result = abs(self.transform(src_data, rate, bin)[0])
+ self.assertAlmostEqual(expected_result, actual_result, places=5)
+
+if __name__ == '__main__':
+ gr_unittest.main()
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_head.py b/gnuradio-core/src/python/gnuradio/gr/qa_head.py
new file mode 100755
index 0000000000..a6fcd7f983
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_head.py
@@ -0,0 +1,47 @@
+#!/usr/bin/env python
+#
+# Copyright 2004 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+
+class test_head (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.fg = gr.flow_graph ()
+
+ def tearDown (self):
+ self.fg = None
+
+ def test_head (self):
+ src_data = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+ expected_result = (1, 2, 3, 4)
+ src1 = gr.vector_source_i (src_data)
+ op = gr.head (gr.sizeof_int, 4)
+ dst1 = gr.vector_sink_i ()
+ self.fg.connect (src1, op)
+ self.fg.connect (op, dst1)
+ self.fg.run ()
+ dst_data = dst1.data ()
+ self.assertEqual (expected_result, dst_data)
+
+
+if __name__ == '__main__':
+ gr_unittest.main ()
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_hilbert.py b/gnuradio-core/src/python/gnuradio/gr/qa_hilbert.py
new file mode 100755
index 0000000000..a8f39dd6d9
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_hilbert.py
@@ -0,0 +1,116 @@
+#!/usr/bin/env python
+#
+# Copyright 2004 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import math
+
+class test_sig_source (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.fg = gr.flow_graph ()
+
+ def tearDown (self):
+ self.fg = None
+
+ def test_hilbert (self):
+ fg = self.fg
+ ntaps = 51
+ sampling_freq = 100
+
+ expected_result = ( -1.4678005338941702e-11j,
+ -0.0011950774351134896j,
+ -0.0019336787518113852j,
+ -0.0034673355985432863j,
+ -0.0036765895783901215j,
+ -0.004916108213365078j,
+ -0.0042778430506587029j,
+ -0.006028641015291214j,
+ -0.005476709920912981j,
+ -0.0092810001224279404j,
+ -0.0095402700826525688j,
+ -0.016060983762145042j,
+ -0.016446959227323532j,
+ -0.02523401565849781j,
+ -0.024382550269365311j,
+ -0.035477779805660248j,
+ -0.033021725714206696j,
+ -0.048487484455108643j,
+ -0.04543270543217659j,
+ -0.069477587938308716j,
+ -0.066984444856643677j,
+ -0.10703597217798233j,
+ -0.10620346665382385j,
+ -0.1852707713842392j,
+ -0.19357112050056458j,
+ (7.2191945754696007e-09 -0.50004088878631592j),
+ (0.58778399229049683 -0.6155126690864563j),
+ (0.95105588436126709 -0.12377222627401352j),
+ (0.95105588436126709 +0.41524654626846313j),
+ (0.5877838134765625 +0.91611981391906738j),
+ (5.8516356205018383e-09 +1.0670661926269531j),
+ (-0.5877840518951416 +0.87856143712997437j),
+ (-0.95105588436126709 +0.35447561740875244j),
+ (-0.95105588436126709 -0.26055556535720825j),
+ (-0.5877838134765625 -0.77606213092803955j),
+ (-8.7774534307527574e-09 -0.96460390090942383j),
+ (0.58778399229049683 -0.78470128774642944j),
+ (0.95105588436126709 -0.28380891680717468j),
+ (0.95105588436126709 +0.32548999786376953j),
+ (0.5877838134765625 +0.82514488697052002j),
+ (1.4629089051254596e-08 +1.0096219778060913j),
+ (-0.5877840518951416 +0.81836479902267456j),
+ (-0.95105588436126709 +0.31451958417892456j),
+ (-0.95105588436126709 -0.3030143678188324j),
+ (-0.5877838134765625 -0.80480599403381348j),
+ (-1.7554906861505515e-08 -0.99516552686691284j),
+ (0.58778399229049683 -0.80540722608566284j),
+ (0.95105582475662231 -0.30557557940483093j),
+ (0.95105588436126709 +0.31097668409347534j),
+ (0.5877838134765625 +0.81027895212173462j),
+ (2.3406542482007353e-08 +1.0000816583633423j),
+ (-0.5877840518951416 +0.80908381938934326j),
+ (-0.95105588436126709 +0.30904293060302734j),
+ (-0.95105588436126709 -0.30904296040534973j),
+ (-0.5877838134765625 -0.80908387899398804j),
+ (-2.6332360292258272e-08 -1.0000815391540527j),
+ (0.58778399229049683 -0.80908381938934326j),
+ (0.95105582475662231 -0.30904299020767212j),
+ (0.95105588436126709 +0.30904293060302734j),
+ (0.5877838134765625 +0.80908381938934326j),
+ (3.218399768911695e-08 +1.0000815391540527j))
+
+
+ src1 = gr.sig_source_f (sampling_freq, gr.GR_SIN_WAVE,
+ sampling_freq * 0.10, 1.0)
+
+ head = gr.head (gr.sizeof_float, int (ntaps + sampling_freq * 0.10))
+ hilb = gr.hilbert_fc (ntaps)
+ dst1 = gr.vector_sink_c ()
+ fg.connect (src1, head)
+ fg.connect (head, hilb)
+ fg.connect (hilb, dst1)
+ fg.run ()
+ dst_data = dst1.data ()
+ self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 5)
+
+if __name__ == '__main__':
+ gr_unittest.main ()
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_iir.py b/gnuradio-core/src/python/gnuradio/gr/qa_iir.py
new file mode 100755
index 0000000000..a1f2aa0772
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_iir.py
@@ -0,0 +1,135 @@
+#!/usr/bin/env python
+#
+# Copyright 2004 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+
+class test_iir (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.fg = gr.flow_graph ()
+
+ def tearDown (self):
+ self.fg = None
+
+ def test_iir_direct_001 (self):
+ src_data = (1, 2, 3, 4, 5, 6, 7, 8)
+ fftaps = ()
+ fbtaps = ()
+ expected_result = (0, 0, 0, 0, 0, 0, 0, 0)
+ src = gr.vector_source_f (src_data)
+ op = gr.iir_filter_ffd (fftaps, fbtaps)
+ dst = gr.vector_sink_f ()
+ self.fg.connect (src, op)
+ self.fg.connect (op, dst)
+ self.fg.run ()
+ result_data = dst.data ()
+ self.assertFloatTuplesAlmostEqual (expected_result, result_data)
+
+ def test_iir_direct_002 (self):
+ src_data = (1, 2, 3, 4, 5, 6, 7, 8)
+ fftaps = (2,)
+ fbtaps = (0,)
+ expected_result = (2, 4, 6, 8, 10, 12, 14, 16)
+ src = gr.vector_source_f (src_data)
+ op = gr.iir_filter_ffd (fftaps, fbtaps)
+ dst = gr.vector_sink_f ()
+ self.fg.connect (src, op)
+ self.fg.connect (op, dst)
+ self.fg.run ()
+ result_data = dst.data ()
+ self.assertFloatTuplesAlmostEqual (expected_result, result_data)
+
+ def test_iir_direct_003 (self):
+ src_data = (1, 2, 3, 4, 5, 6, 7, 8)
+ fftaps = (2, 11)
+ fbtaps = (0, 0)
+ expected_result = (2, 15, 28, 41, 54, 67, 80, 93)
+ src = gr.vector_source_f (src_data)
+ op = gr.iir_filter_ffd (fftaps, fbtaps)
+ dst = gr.vector_sink_f ()
+ self.fg.connect (src, op)
+ self.fg.connect (op, dst)
+ self.fg.run ()
+ result_data = dst.data ()
+ self.assertFloatTuplesAlmostEqual (expected_result, result_data)
+
+ def test_iir_direct_004 (self):
+ src_data = (1, 2, 3, 4, 5, 6, 7, 8)
+ fftaps = (2, 11)
+ fbtaps = (0, -1)
+ expected_result = (2, 13, 15, 26, 28, 39, 41, 52)
+ src = gr.vector_source_f (src_data)
+ op = gr.iir_filter_ffd (fftaps, fbtaps)
+ dst = gr.vector_sink_f ()
+ self.fg.connect (src, op)
+ self.fg.connect (op, dst)
+ self.fg.run ()
+ result_data = dst.data ()
+ self.assertFloatTuplesAlmostEqual (expected_result, result_data)
+
+ def test_iir_direct_005 (self):
+ src_data = (1, 2, 3, 4, 5, 6, 7, 8)
+ fftaps = (2, 11, 0)
+ fbtaps = (0, -1, 3)
+ expected_result = (2, 13, 21, 59, 58, 186, 68, 583)
+ src = gr.vector_source_f (src_data)
+ op = gr.iir_filter_ffd (fftaps, fbtaps)
+ dst = gr.vector_sink_f ()
+ self.fg.connect (src, op)
+ self.fg.connect (op, dst)
+ self.fg.run ()
+ result_data = dst.data ()
+ self.assertFloatTuplesAlmostEqual (expected_result, result_data)
+
+ def test_iir_direct_006 (self):
+ fftaps = (2, 11, 0)
+ fbtaps = (0, -1)
+ self.assertRaises ((RuntimeError, ValueError), gr.iir_filter_ffd, fftaps, fbtaps)
+
+ def test_iir_direct_007 (self):
+ src_data = (1, 2, 3, 4, 5, 6, 7, 8)
+ expected_result = (2, 13, 21, 59, 58, 186, 68, 583)
+ fftaps = (2, 1)
+ fbtaps = (0, -1)
+ src = gr.vector_source_f (src_data)
+ op = gr.iir_filter_ffd (fftaps, fbtaps)
+ fftaps = (2, 11, 0)
+ fbtaps = (0, -1, 3)
+ op.set_taps (fftaps, fbtaps)
+ dst = gr.vector_sink_f ()
+ self.fg.connect (src, op)
+ self.fg.connect (op, dst)
+ self.fg.run ()
+ result_data = dst.data ()
+ self.assertFloatTuplesAlmostEqual (expected_result, result_data)
+
+ def test_iir_direct_008 (self):
+ fftaps = (2, 1)
+ fbtaps = (0, -1)
+ op = gr.iir_filter_ffd (fftaps, fbtaps)
+ fftaps = (2, 11)
+ fbtaps = (0, -1, 3)
+ self.assertRaises ((RuntimeError, ValueError), op.set_taps, fftaps, fbtaps)
+
+if __name__ == '__main__':
+ gr_unittest.main ()
+
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_interleave.py b/gnuradio-core/src/python/gnuradio/gr/qa_interleave.py
new file mode 100755
index 0000000000..003d12e645
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_interleave.py
@@ -0,0 +1,81 @@
+#!/usr/bin/env python
+#
+# Copyright 2004 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import math
+
+class test_interleave (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.fg = gr.flow_graph ()
+
+ def tearDown (self):
+ self.fg = None
+
+ def test_int_001 (self):
+ lenx = 64
+ src0 = gr.vector_source_f (range (0, lenx, 4))
+ src1 = gr.vector_source_f (range (1, lenx, 4))
+ src2 = gr.vector_source_f (range (2, lenx, 4))
+ src3 = gr.vector_source_f (range (3, lenx, 4))
+ op = gr.interleave (gr.sizeof_float)
+ dst = gr.vector_sink_f ()
+
+ self.fg.connect (src0, (op, 0))
+ self.fg.connect (src1, (op, 1))
+ self.fg.connect (src2, (op, 2))
+ self.fg.connect (src3, (op, 3))
+ self.fg.connect (op, dst)
+ self.fg.run ()
+ expected_result = tuple (range (lenx))
+ result_data = dst.data ()
+ self.assertFloatTuplesAlmostEqual (expected_result, result_data)
+
+ def test_deint_001 (self):
+ lenx = 64
+ src = gr.vector_source_f (range (lenx))
+ op = gr.deinterleave (gr.sizeof_float)
+ dst0 = gr.vector_sink_f ()
+ dst1 = gr.vector_sink_f ()
+ dst2 = gr.vector_sink_f ()
+ dst3 = gr.vector_sink_f ()
+
+ self.fg.connect (src, op)
+ self.fg.connect ((op, 0), dst0)
+ self.fg.connect ((op, 1), dst1)
+ self.fg.connect ((op, 2), dst2)
+ self.fg.connect ((op, 3), dst3)
+ self.fg.run ()
+
+ expected_result0 = tuple (range (0, lenx, 4))
+ expected_result1 = tuple (range (1, lenx, 4))
+ expected_result2 = tuple (range (2, lenx, 4))
+ expected_result3 = tuple (range (3, lenx, 4))
+
+ self.assertFloatTuplesAlmostEqual (expected_result0, dst0.data ())
+ self.assertFloatTuplesAlmostEqual (expected_result1, dst1.data ())
+ self.assertFloatTuplesAlmostEqual (expected_result2, dst2.data ())
+ self.assertFloatTuplesAlmostEqual (expected_result3, dst3.data ())
+
+if __name__ == '__main__':
+ gr_unittest.main ()
+
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_interp_fir_filter.py b/gnuradio-core/src/python/gnuradio/gr/qa_interp_fir_filter.py
new file mode 100755
index 0000000000..4119e3e2d6
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_interp_fir_filter.py
@@ -0,0 +1,54 @@
+#!/usr/bin/env python
+#
+# Copyright 2004 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import math
+
+class test_interp_fir_filter (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.fg = gr.flow_graph ()
+
+ def tearDown (self):
+ self.fg = None
+
+ def test_fff (self):
+ taps = [1, 10, 100, 1000, 10000]
+ src_data = (0, 2, 3, 5, 7, 11, 13, 17)
+ interpolation = 3
+ xr = (0,0,0,0,2,20,200,2003,20030,300,3005,30050,500,5007,50070,700,7011,70110,1100,11013,110130,1300,13017,130170)
+ expected_result = tuple ([float (x) for x in xr])
+
+ src = gr.vector_source_f (src_data)
+ op = gr.interp_fir_filter_fff (interpolation, taps)
+ dst = gr.vector_sink_f ()
+ self.fg.connect (src, op)
+ self.fg.connect (op, dst)
+ self.fg.run ()
+ result_data = dst.data ()
+ L = min(len(result_data), len(expected_result))
+ self.assertEqual (expected_result[0:L], result_data[0:L])
+
+
+if __name__ == '__main__':
+ gr_unittest.main ()
+
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_kludge_copy.py b/gnuradio-core/src/python/gnuradio/gr/qa_kludge_copy.py
new file mode 100755
index 0000000000..f5dee15286
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_kludge_copy.py
@@ -0,0 +1,89 @@
+#!/usr/bin/env python
+#
+# Copyright 2006 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import math
+import random
+
+
+class test_kludge_copy(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.fg = gr.flow_graph()
+ self.rng = random.Random()
+ self.rng.seed(0)
+
+ def tearDown(self):
+ self.fg = None
+ self.rng = None
+
+ def make_random_int_tuple(self, L):
+ result = []
+ for x in range(L):
+ result.append(self.rng.randint(int(-1e9), int(+1e9)))
+ return tuple(result)
+
+
+ def test_001(self):
+ # 1 input stream; 1 output stream
+ src0_data = self.make_random_int_tuple(16000)
+ src0 = gr.vector_source_i(src0_data)
+ op = gr.kludge_copy(gr.sizeof_int)
+ dst0 = gr.vector_sink_i()
+ self.fg.connect(src0, op, dst0)
+ self.fg.run()
+ dst0_data = dst0.data()
+ self.assertEqual(src0_data, dst0_data)
+
+ def test_002(self):
+ # 2 input streams; 2 output streams
+ src0_data = self.make_random_int_tuple(16000)
+ src1_data = self.make_random_int_tuple(16000)
+ src0 = gr.vector_source_i(src0_data)
+ src1 = gr.vector_source_i(src1_data)
+ op = gr.kludge_copy(gr.sizeof_int)
+ dst0 = gr.vector_sink_i()
+ dst1 = gr.vector_sink_i()
+ self.fg.connect(src0, (op, 0), dst0)
+ self.fg.connect(src1, (op, 1), dst1)
+ self.fg.run()
+ dst0_data = dst0.data()
+ dst1_data = dst1.data()
+ self.assertEqual(src0_data, dst0_data)
+ self.assertEqual(src1_data, dst1_data)
+
+ def test_003(self):
+ # number of input streams != number of output streams
+ src0_data = self.make_random_int_tuple(16000)
+ src1_data = self.make_random_int_tuple(16000)
+ src0 = gr.vector_source_i(src0_data)
+ src1 = gr.vector_source_i(src1_data)
+ op = gr.kludge_copy(gr.sizeof_int)
+ dst0 = gr.vector_sink_i()
+ dst1 = gr.vector_sink_i()
+ self.fg.connect(src0, (op, 0), dst0)
+ self.fg.connect(src1, (op, 1))
+ self.assertRaises(ValueError, self.fg.run)
+
+if __name__ == '__main__':
+ gr_unittest.main ()
+
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_kludged_imports.py b/gnuradio-core/src/python/gnuradio/gr/qa_kludged_imports.py
new file mode 100755
index 0000000000..b208677226
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_kludged_imports.py
@@ -0,0 +1,43 @@
+#!/usr/bin/env python
+#
+# Copyright 2005 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+
+class test_head (gr_unittest.TestCase):
+
+ def setUp(self):
+ pass
+
+ def tearDown(self):
+ pass
+
+ def test_blks_import(self):
+ # make sure that this somewhat magic import works
+ from gnuradio import blks
+
+ def test_gru_import(self):
+ # make sure that this somewhat magic import works
+ from gnuradio import gru
+
+
+if __name__ == '__main__':
+ gr_unittest.main ()
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_message.py b/gnuradio-core/src/python/gnuradio/gr/qa_message.py
new file mode 100755
index 0000000000..d5d5111495
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_message.py
@@ -0,0 +1,117 @@
+#!/usr/bin/env python
+#
+# Copyright 2004 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import qa_basic_flow_graph
+
+
+def all_counts ():
+ return (gr.block_ncurrently_allocated (),
+ gr.block_detail_ncurrently_allocated (),
+ gr.buffer_ncurrently_allocated (),
+ gr.buffer_reader_ncurrently_allocated (),
+ gr.message_ncurrently_allocated ())
+
+
+class test_message (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.fg = gr.flow_graph()
+ self.msgq = gr.msg_queue ()
+
+ def tearDown (self):
+ self.fg = None
+ self.msgq = None
+
+ def leak_check (self, fct):
+ begin = all_counts ()
+ fct ()
+ # tear down early so we can check for leaks
+ self.tearDown ()
+ end = all_counts ()
+ self.assertEqual (begin, end)
+
+ def test_100 (self):
+ msg = gr.message (0, 1.5, 2.3)
+ self.assertEquals (0, msg.type())
+ self.assertAlmostEqual (1.5, msg.arg1())
+ self.assertAlmostEqual (2.3, msg.arg2())
+ self.assertEquals (0, msg.length())
+
+ def test_101 (self):
+ s = 'This is a test'
+ msg = gr.message_from_string(s)
+ self.assertEquals(s, msg.to_string())
+
+ def test_200 (self):
+ self.leak_check (self.body_200)
+
+ def body_200 (self):
+ self.msgq.insert_tail (gr.message (0))
+ self.assertEquals (1, self.msgq.count())
+ self.msgq.insert_tail (gr.message (1))
+ self.assertEquals (2, self.msgq.count())
+ msg0 = self.msgq.delete_head ()
+ self.assertEquals (0, msg0.type())
+ msg1 = self.msgq.delete_head ()
+ self.assertEquals (1, msg1.type())
+ self.assertEquals (0, self.msgq.count())
+
+ def test_201 (self):
+ self.leak_check (self.body_201)
+
+ def body_201 (self):
+ self.msgq.insert_tail (gr.message (0))
+ self.assertEquals (1, self.msgq.count())
+ self.msgq.insert_tail (gr.message (1))
+ self.assertEquals (2, self.msgq.count())
+
+ def test_202 (self):
+ self.leak_check (self.body_202)
+
+ def body_202 (self):
+ # global msg
+ msg = gr.message (666)
+
+ def test_300(self):
+ input_data = (0,1,2,3,4,5,6,7,8,9)
+ src = gr.vector_source_b(input_data)
+ dst = gr.vector_sink_b()
+ self.fg.connect(src, dst)
+ self.fg.run()
+ self.assertEquals(input_data, dst.data())
+
+ def test_301(self):
+ src = gr.message_source(gr.sizeof_char)
+ dst = gr.vector_sink_b()
+ self.fg.connect(src, dst)
+ src.msgq().insert_tail(gr.message_from_string('01234'))
+ src.msgq().insert_tail(gr.message_from_string('5'))
+ src.msgq().insert_tail(gr.message_from_string(''))
+ src.msgq().insert_tail(gr.message_from_string('6789'))
+ src.msgq().insert_tail(gr.message(1)) # send EOF
+ self.fg.run()
+ self.assertEquals(tuple(map(ord, '0123456789')), dst.data())
+
+
+if __name__ == '__main__':
+ gr_unittest.main ()
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_mute.py b/gnuradio-core/src/python/gnuradio/gr/qa_mute.py
new file mode 100755
index 0000000000..863c308a4a
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_mute.py
@@ -0,0 +1,89 @@
+#!/usr/bin/env python
+#
+# Copyright 2004,2005 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+
+class test_head (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.fg = gr.flow_graph ()
+
+ def tearDown (self):
+ self.fg = None
+
+ def help_ii (self, src_data, exp_data, op):
+ for s in zip (range (len (src_data)), src_data):
+ src = gr.vector_source_i (s[1])
+ self.fg.connect (src, (op, s[0]))
+ dst = gr.vector_sink_i ()
+ self.fg.connect (op, dst)
+ self.fg.run ()
+ result_data = dst.data ()
+ self.assertEqual (exp_data, result_data)
+
+ def help_ff (self, src_data, exp_data, op):
+ for s in zip (range (len (src_data)), src_data):
+ src = gr.vector_source_f (s[1])
+ self.fg.connect (src, (op, s[0]))
+ dst = gr.vector_sink_f ()
+ self.fg.connect (op, dst)
+ self.fg.run ()
+ result_data = dst.data ()
+ self.assertEqual (exp_data, result_data)
+
+ def help_cc (self, src_data, exp_data, op):
+ for s in zip (range (len (src_data)), src_data):
+ src = gr.vector_source_c (s[1])
+ self.fg.connect (src, (op, s[0]))
+ dst = gr.vector_sink_c ()
+ self.fg.connect (op, dst)
+ self.fg.run ()
+ result_data = dst.data ()
+ self.assertEqual (exp_data, result_data)
+
+ def test_unmute_ii(self):
+ src_data = (1, 2, 3, 4, 5)
+ expected_result = (1, 2, 3, 4, 5)
+ op = gr.mute_ii (False)
+ self.help_ii ((src_data,), expected_result, op)
+
+ def test_mute_ii(self):
+ src_data = (1, 2, 3, 4, 5)
+ expected_result = (0, 0, 0, 0, 0)
+ op = gr.mute_ii (True)
+ self.help_ii ((src_data,), expected_result, op)
+
+ def test_unmute_cc (self):
+ src_data = (1+5j, 2+5j, 3+5j, 4+5j, 5+5j)
+ expected_result = (1+5j, 2+5j, 3+5j, 4+5j, 5+5j)
+ op = gr.mute_cc (False)
+ self.help_cc ((src_data,), expected_result, op)
+
+ def test_unmute_cc (self):
+ src_data = (1+5j, 2+5j, 3+5j, 4+5j, 5+5j)
+ expected_result = (0+0j, 0+0j, 0+0j, 0+0j, 0+0j)
+ op = gr.mute_cc (True)
+ self.help_cc ((src_data,), expected_result, op)
+
+
+if __name__ == '__main__':
+ gr_unittest.main ()
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_nlog10.py b/gnuradio-core/src/python/gnuradio/gr/qa_nlog10.py
new file mode 100755
index 0000000000..f056d1100b
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_nlog10.py
@@ -0,0 +1,47 @@
+#!/usr/bin/env python
+#
+# Copyright 2005 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+
+class test_single_pole_iir(gr_unittest.TestCase):
+
+ def setUp (self):
+ self.fg = gr.flow_graph ()
+
+ def tearDown (self):
+ self.fg = None
+
+ def test_001(self):
+ src_data = (-10, 0, 10, 100, 1000, 10000, 100000)
+ expected_result = (-180, -180, 10, 20, 30, 40, 50)
+ src = gr.vector_source_f(src_data)
+ op = gr.nlog10_ff(10)
+ dst = gr.vector_sink_f()
+ self.fg.connect (src, op, dst)
+ self.fg.run()
+ result_data = dst.data()
+ self.assertFloatTuplesAlmostEqual (expected_result, result_data)
+
+
+if __name__ == '__main__':
+ gr_unittest.main ()
+
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_packed_to_unpacked.py b/gnuradio-core/src/python/gnuradio/gr/qa_packed_to_unpacked.py
new file mode 100755
index 0000000000..d5472c8d72
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_packed_to_unpacked.py
@@ -0,0 +1,405 @@
+#!/usr/bin/env python
+#
+# Copyright 2005 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import random
+
+class test_packing(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.fg = gr.flow_graph ()
+
+ def tearDown(self):
+ self.fg = None
+
+ def test_001(self):
+ """
+ Test stream_to_streams.
+ """
+ src_data = (0x80,)
+ expected_results = (1,0,0,0,0,0,0,0)
+ src = gr.vector_source_b(src_data,False)
+ op = gr.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST)
+ self.fg.connect(src, op)
+
+ dst = gr.vector_sink_b()
+ self.fg.connect(op, dst)
+
+ self.fg.run()
+
+ self.assertEqual(expected_results, dst.data())
+
+ def test_002(self):
+ """
+ Test stream_to_streams.
+ """
+ src_data = (0x80,)
+ expected_results = (0,0,0,0,0,0,0, 1)
+ src = gr.vector_source_b(src_data,False)
+ op = gr.packed_to_unpacked_bb(1, gr.GR_LSB_FIRST)
+ self.fg.connect(src, op)
+
+ dst = gr.vector_sink_b()
+ self.fg.connect(op, dst)
+
+ self.fg.run()
+
+ self.assertEqual(expected_results, dst.data())
+
+ def test_003(self):
+ """
+ Test stream_to_streams.
+ """
+ src_data = (0x11,)
+ expected_results = (4, 2)
+ src = gr.vector_source_b(src_data,False)
+ op = gr.packed_to_unpacked_bb(3, gr.GR_LSB_FIRST)
+ self.fg.connect(src, op)
+
+ dst = gr.vector_sink_b()
+ self.fg.connect(op, dst)
+
+ self.fg.run()
+
+ self.assertEqual(expected_results, dst.data())
+
+ def test_004(self):
+ """
+ Test stream_to_streams.
+ """
+ src_data = (0x11,)
+ expected_results = (0, 4)
+ src = gr.vector_source_b(src_data,False)
+ op = gr.packed_to_unpacked_bb(3, gr.GR_MSB_FIRST)
+ self.fg.connect(src, op)
+
+ dst = gr.vector_sink_b()
+ self.fg.connect(op, dst)
+
+ self.fg.run()
+
+ self.assertEqual(expected_results, dst.data())
+
+ def test_005(self):
+ """
+ Test stream_to_streams.
+ """
+ src_data = (1,0,0,0,0,0,1,0,0,1,0,1,1,0,1,0)
+ expected_results = (0x82,0x5a)
+ src = gr.vector_source_b(src_data,False)
+ op = gr.unpacked_to_packed_bb(1, gr.GR_MSB_FIRST)
+ self.fg.connect(src, op)
+
+ dst = gr.vector_sink_b()
+ self.fg.connect(op, dst)
+
+ self.fg.run()
+
+ self.assertEqual(expected_results, dst.data())
+
+ def test_006(self):
+ """
+ Test stream_to_streams.
+ """
+ src_data = (0,1,0,0,0,0,0,1,0,1,0,1,1,0,1,0)
+ expected_results = (0x82,0x5a)
+ src = gr.vector_source_b(src_data,False)
+ op = gr.unpacked_to_packed_bb(1, gr.GR_LSB_FIRST)
+ self.fg.connect(src, op)
+
+ dst = gr.vector_sink_b()
+ self.fg.connect(op, dst)
+
+ self.fg.run()
+
+ self.assertEqual(expected_results, dst.data())
+
+
+ def test_007(self):
+ """
+ Test stream_to_streams.
+ """
+ src_data = (4, 2, 0,0,0)
+ expected_results = (0x11,)
+ src = gr.vector_source_b(src_data,False)
+ op = gr.unpacked_to_packed_bb(3, gr.GR_LSB_FIRST)
+ self.fg.connect(src, op)
+
+ dst = gr.vector_sink_b()
+ self.fg.connect(op, dst)
+
+ self.fg.run()
+
+ self.assertEqual(expected_results, dst.data())
+
+ def test_008(self):
+ """
+ Test stream_to_streams.
+ """
+ src_data = (0, 4, 2,0,0)
+ expected_results = (0x11,)
+ src = gr.vector_source_b(src_data,False)
+ op = gr.unpacked_to_packed_bb(3, gr.GR_MSB_FIRST)
+ self.fg.connect(src, op)
+
+ dst = gr.vector_sink_b()
+ self.fg.connect(op, dst)
+
+ self.fg.run()
+
+ self.assertEqual(expected_results, dst.data())
+
+ def test_009(self):
+ """
+ Test stream_to_streams.
+ """
+
+ random.seed(0)
+ src_data = []
+ for i in xrange(202):
+ src_data.append((random.randint(0,255)))
+ src_data = tuple(src_data)
+ expected_results = src_data
+
+ src = gr.vector_source_b(tuple(src_data),False)
+ op1 = gr.packed_to_unpacked_bb(3, gr.GR_MSB_FIRST)
+ op2 = gr.unpacked_to_packed_bb(3, gr.GR_MSB_FIRST)
+ self.fg.connect(src, op1, op2)
+
+ dst = gr.vector_sink_b()
+ self.fg.connect(op2, dst)
+
+ self.fg.run()
+
+ self.assertEqual(expected_results[0:201], dst.data())
+
+ def test_010(self):
+ """
+ Test stream_to_streams.
+ """
+
+ random.seed(0)
+ src_data = []
+ for i in xrange(56):
+ src_data.append((random.randint(0,255)))
+ src_data = tuple(src_data)
+ expected_results = src_data
+ src = gr.vector_source_b(tuple(src_data),False)
+ op1 = gr.packed_to_unpacked_bb(7, gr.GR_MSB_FIRST)
+ op2 = gr.unpacked_to_packed_bb(7, gr.GR_MSB_FIRST)
+ self.fg.connect(src, op1, op2)
+ dst = gr.vector_sink_b()
+ self.fg.connect(op2, dst)
+
+ self.fg.run()
+ self.assertEqual(expected_results[0:201], dst.data())
+
+ def test_011(self):
+ """
+ Test stream_to_streams.
+ """
+
+ random.seed(0)
+ src_data = []
+ for i in xrange(56):
+ src_data.append((random.randint(0,255)))
+ src_data = tuple(src_data)
+ expected_results = src_data
+ src = gr.vector_source_b(tuple(src_data),False)
+ op1 = gr.packed_to_unpacked_bb(7, gr.GR_LSB_FIRST)
+ op2 = gr.unpacked_to_packed_bb(7, gr.GR_LSB_FIRST)
+ self.fg.connect(src, op1, op2)
+ dst = gr.vector_sink_b()
+ self.fg.connect(op2, dst)
+
+ self.fg.run()
+ self.assertEqual(expected_results[0:201], dst.data())
+
+
+ # tests on shorts
+
+ def test_100a(self):
+ """
+ test short version
+ """
+ random.seed(0)
+ src_data = []
+ for i in xrange(100):
+ src_data.append((random.randint(-2**15,2**15-1)))
+ src_data = tuple(src_data)
+ expected_results = src_data
+ src = gr.vector_source_s(tuple(src_data),False)
+ op1 = gr.packed_to_unpacked_ss(1, gr.GR_MSB_FIRST)
+ op2 = gr.unpacked_to_packed_ss(1, gr.GR_MSB_FIRST)
+ self.fg.connect(src, op1, op2)
+ dst = gr.vector_sink_s()
+ self.fg.connect(op2, dst)
+
+ self.fg.run()
+ self.assertEqual(expected_results, dst.data())
+
+ def test_100b(self):
+ """
+ test short version
+ """
+ random.seed(0)
+ src_data = []
+ for i in xrange(100):
+ src_data.append((random.randint(-2**15,2**15-1)))
+ src_data = tuple(src_data)
+ expected_results = src_data
+ src = gr.vector_source_s(tuple(src_data),False)
+ op1 = gr.packed_to_unpacked_ss(1, gr.GR_LSB_FIRST)
+ op2 = gr.unpacked_to_packed_ss(1, gr.GR_LSB_FIRST)
+ self.fg.connect(src, op1, op2)
+ dst = gr.vector_sink_s()
+ self.fg.connect(op2, dst)
+
+ self.fg.run()
+ self.assertEqual(expected_results, dst.data())
+
+ def test_101a(self):
+ """
+ test short version
+ """
+ random.seed(0)
+ src_data = []
+ for i in xrange(100):
+ src_data.append((random.randint(-2**15,2**15-1)))
+ src_data = tuple(src_data)
+ expected_results = src_data
+ src = gr.vector_source_s(tuple(src_data),False)
+ op1 = gr.packed_to_unpacked_ss(8, gr.GR_MSB_FIRST)
+ op2 = gr.unpacked_to_packed_ss(8, gr.GR_MSB_FIRST)
+ self.fg.connect(src, op1, op2)
+ dst = gr.vector_sink_s()
+ self.fg.connect(op2, dst)
+
+ self.fg.run()
+ self.assertEqual(expected_results, dst.data())
+
+ def test_101b(self):
+ """
+ test short version
+ """
+ random.seed(0)
+ src_data = []
+ for i in xrange(100):
+ src_data.append((random.randint(-2**15,2**15-1)))
+ src_data = tuple(src_data)
+ expected_results = src_data
+ src = gr.vector_source_s(tuple(src_data),False)
+ op1 = gr.packed_to_unpacked_ss(8, gr.GR_LSB_FIRST)
+ op2 = gr.unpacked_to_packed_ss(8, gr.GR_LSB_FIRST)
+ self.fg.connect(src, op1, op2)
+ dst = gr.vector_sink_s()
+ self.fg.connect(op2, dst)
+
+ self.fg.run()
+ self.assertEqual(expected_results, dst.data())
+
+ # tests on ints
+
+ def test_200a(self):
+ """
+ test int version
+ """
+ random.seed(0)
+ src_data = []
+ for i in xrange(100):
+ src_data.append((random.randint(-2**31,2**31-1)))
+ src_data = tuple(src_data)
+ expected_results = src_data
+ src = gr.vector_source_i(tuple(src_data),False)
+ op1 = gr.packed_to_unpacked_ii(1, gr.GR_MSB_FIRST)
+ op2 = gr.unpacked_to_packed_ii(1, gr.GR_MSB_FIRST)
+ self.fg.connect(src, op1, op2)
+ dst = gr.vector_sink_i()
+ self.fg.connect(op2, dst)
+
+ self.fg.run()
+ self.assertEqual(expected_results, dst.data())
+
+ def test_200b(self):
+ """
+ test int version
+ """
+ random.seed(0)
+ src_data = []
+ for i in xrange(100):
+ src_data.append((random.randint(-2**31,2**31-1)))
+ src_data = tuple(src_data)
+ expected_results = src_data
+ src = gr.vector_source_i(tuple(src_data),False)
+ op1 = gr.packed_to_unpacked_ii(1, gr.GR_LSB_FIRST)
+ op2 = gr.unpacked_to_packed_ii(1, gr.GR_LSB_FIRST)
+ self.fg.connect(src, op1, op2)
+ dst = gr.vector_sink_i()
+ self.fg.connect(op2, dst)
+
+ self.fg.run()
+ self.assertEqual(expected_results, dst.data())
+
+ def test_201a(self):
+ """
+ test int version
+ """
+ random.seed(0)
+ src_data = []
+ for i in xrange(100):
+ src_data.append((random.randint(-2**31,2**31-1)))
+ src_data = tuple(src_data)
+ expected_results = src_data
+ src = gr.vector_source_i(tuple(src_data),False)
+ op1 = gr.packed_to_unpacked_ii(8, gr.GR_MSB_FIRST)
+ op2 = gr.unpacked_to_packed_ii(8, gr.GR_MSB_FIRST)
+ self.fg.connect(src, op1, op2)
+ dst = gr.vector_sink_i()
+ self.fg.connect(op2, dst)
+
+ self.fg.run()
+ self.assertEqual(expected_results, dst.data())
+
+ def test_201b(self):
+ """
+ test int version
+ """
+ random.seed(0)
+ src_data = []
+ for i in xrange(100):
+ src_data.append((random.randint(-2**31,2**31-1)))
+ src_data = tuple(src_data)
+ expected_results = src_data
+ src = gr.vector_source_i(tuple(src_data),False)
+ op1 = gr.packed_to_unpacked_ii(8, gr.GR_LSB_FIRST)
+ op2 = gr.unpacked_to_packed_ii(8, gr.GR_LSB_FIRST)
+ self.fg.connect(src, op1, op2)
+ dst = gr.vector_sink_i()
+ self.fg.connect(op2, dst)
+
+ self.fg.run()
+ self.assertEqual(expected_results, dst.data())
+
+
+if __name__ == '__main__':
+ gr_unittest.main ()
+
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_pipe_fittings.py b/gnuradio-core/src/python/gnuradio/gr/qa_pipe_fittings.py
new file mode 100755
index 0000000000..dca18c8ecc
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_pipe_fittings.py
@@ -0,0 +1,143 @@
+#!/usr/bin/env python
+#
+# Copyright 2005 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+
+if 0:
+ import os
+ print "pid =", os.getpid()
+ raw_input("Attach, then press Enter to continue")
+
+
+def calc_expected_result(src_data, n):
+ assert (len(src_data) % n) == 0
+ result = [list() for x in range(n)]
+ #print "len(result) =", len(result)
+ for i in xrange(len(src_data)):
+ (result[i % n]).append(src_data[i])
+ return [tuple(x) for x in result]
+
+
+class test_pipe_fittings(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.fg = gr.flow_graph ()
+
+ def tearDown(self):
+ self.fg = None
+
+ def test_001(self):
+ """
+ Test stream_to_streams.
+ """
+ n = 8
+ src_len = n * 8
+ src_data = range(src_len)
+
+ expected_results = calc_expected_result(src_data, n)
+ #print "expected results: ", expected_results
+ src = gr.vector_source_i(src_data)
+ op = gr.stream_to_streams(gr.sizeof_int, n)
+ self.fg.connect(src, op)
+
+ dsts = []
+ for i in range(n):
+ dst = gr.vector_sink_i()
+ self.fg.connect((op, i), (dst, 0))
+ dsts.append(dst)
+
+ self.fg.run()
+
+ for d in range(n):
+ self.assertEqual(expected_results[d], dsts[d].data())
+
+ def test_002(self):
+ """
+ Test streams_to_stream (using stream_to_streams).
+ """
+ n = 8
+ src_len = n * 8
+ src_data = tuple(range(src_len))
+ expected_results = src_data
+
+ src = gr.vector_source_i(src_data)
+ op1 = gr.stream_to_streams(gr.sizeof_int, n)
+ op2 = gr.streams_to_stream(gr.sizeof_int, n)
+ dst = gr.vector_sink_i()
+
+ self.fg.connect(src, op1)
+ for i in range(n):
+ self.fg.connect((op1, i), (op2, i))
+ self.fg.connect(op2, dst)
+
+ self.fg.run()
+ self.assertEqual(expected_results, dst.data())
+
+ def test_003(self):
+ """
+ Test streams_to_vector (using stream_to_streams & vector_to_stream).
+ """
+ n = 8
+ src_len = n * 8
+ src_data = tuple(range(src_len))
+ expected_results = src_data
+
+ src = gr.vector_source_i(src_data)
+ op1 = gr.stream_to_streams(gr.sizeof_int, n)
+ op2 = gr.streams_to_vector(gr.sizeof_int, n)
+ op3 = gr.vector_to_stream(gr.sizeof_int, n)
+ dst = gr.vector_sink_i()
+
+ self.fg.connect(src, op1)
+ for i in range(n):
+ self.fg.connect((op1, i), (op2, i))
+ self.fg.connect(op2, op3, dst)
+
+ self.fg.run()
+ self.assertEqual(expected_results, dst.data())
+
+ def test_004(self):
+ """
+ Test vector_to_streams.
+ """
+ n = 8
+ src_len = n * 8
+ src_data = tuple(range(src_len))
+ expected_results = src_data
+
+ src = gr.vector_source_i(src_data)
+ op1 = gr.stream_to_vector(gr.sizeof_int, n)
+ op2 = gr.vector_to_streams(gr.sizeof_int, n)
+ op3 = gr.streams_to_stream(gr.sizeof_int, n)
+ dst = gr.vector_sink_i()
+
+ self.fg.connect(src, op1, op2)
+ for i in range(n):
+ self.fg.connect((op2, i), (op3, i))
+ self.fg.connect(op3, dst)
+
+ self.fg.run()
+ self.assertEqual(expected_results, dst.data())
+
+if __name__ == '__main__':
+ gr_unittest.main ()
+
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_rational_resampler.py b/gnuradio-core/src/python/gnuradio/gr/qa_rational_resampler.py
new file mode 100755
index 0000000000..2505482d51
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_rational_resampler.py
@@ -0,0 +1,290 @@
+#!/usr/bin/env python
+#
+# Copyright 2005,2006 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+from gnuradio import blks
+import math
+import random
+import sys
+
+#import os
+#print os.getpid()
+#raw_input('Attach with gdb, then press Enter: ')
+
+
+def random_floats(n):
+ r = []
+ for x in xrange(n):
+ r.append(float(random.randint(-32768, 32768)))
+ return tuple(r)
+
+
+def reference_dec_filter(src_data, decim, taps):
+ fg = gr.flow_graph()
+ src = gr.vector_source_f(src_data)
+ op = gr.fir_filter_fff(decim, taps)
+ dst = gr.vector_sink_f()
+ fg.connect(src, op, dst)
+ fg.run()
+ result_data = dst.data()
+ fg = None
+ return result_data
+
+def reference_interp_filter(src_data, interp, taps):
+ fg = gr.flow_graph()
+ src = gr.vector_source_f(src_data)
+ op = gr.interp_fir_filter_fff(interp, taps)
+ dst = gr.vector_sink_f()
+ fg.connect(src, op, dst)
+ fg.run()
+ result_data = dst.data()
+ fg = None
+ return result_data
+
+def reference_interp_dec_filter(src_data, interp, decim, taps):
+ fg = gr.flow_graph()
+ src = gr.vector_source_f(src_data)
+ up = gr.interp_fir_filter_fff(interp, (1,))
+ dn = gr.fir_filter_fff(decim, taps)
+ dst = gr.vector_sink_f()
+ fg.connect(src, up, dn, dst)
+ fg.run()
+ result_data = dst.data()
+ fg = None
+ return result_data
+
+
+class test_rational_resampler (gr_unittest.TestCase):
+
+ def setUp(self):
+ self.fg = gr.flow_graph()
+
+ def tearDown(self):
+ self.fg = None
+
+ #
+ # test the gr.rational_resampler_base primitives...
+ #
+
+ def test_000_1_to_1(self):
+ taps = (-4, 5)
+ src_data = (234, -4, 23, -56, 45, 98, -23, -7)
+ xr = (-936, 1186, -112, 339, -460, -167, 582)
+ expected_result = tuple([float(x) for x in xr])
+
+ src = gr.vector_source_f(src_data)
+ op = gr.rational_resampler_base_fff(1, 1, taps)
+ dst = gr.vector_sink_f()
+ self.fg.connect(src, op)
+ self.fg.connect(op, dst)
+ self.fg.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
+
+ def test_001_interp(self):
+ taps = [1, 10, 100, 1000, 10000]
+ src_data = (0, 2, 3, 5, 7, 11, 13, 17)
+ interpolation = 3
+ xr = (0,0,0,0,2,20,200,2003,20030,300,3005,30050,500,5007,50070,700,7011,70110,1100,11013,110130,1300,13017,130170,1700.0,17000.0,170000.0)
+ expected_result = tuple([float(x) for x in xr])
+
+ src = gr.vector_source_f(src_data)
+ op = gr.rational_resampler_base_fff(interpolation, 1, taps)
+ dst = gr.vector_sink_f()
+ self.fg.connect(src, op)
+ self.fg.connect(op, dst)
+ self.fg.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
+
+ def test_002_interp(self):
+ taps = random_floats(31)
+ #src_data = random_floats(10000) # FIXME the 10k case fails!
+ src_data = random_floats(1000)
+ interpolation = 3
+
+ expected_result = reference_interp_filter(src_data, interpolation, taps)
+
+ src = gr.vector_source_f(src_data)
+ op = gr.rational_resampler_base_fff(interpolation, 1, taps)
+ dst = gr.vector_sink_f()
+ self.fg.connect(src, op)
+ self.fg.connect(op, dst)
+ self.fg.run()
+ result_data = dst.data()
+
+ L1 = len(result_data)
+ L2 = len(expected_result)
+ L = min(L1, L2)
+ if False:
+ sys.stderr.write('delta = %2d: ntaps = %d interp = %d ilen = %d\n' %
+ (L2 - L1, len(taps), interpolation, len(src_data)))
+ sys.stderr.write(' len(result_data) = %d len(expected_result) = %d\n' %
+ (len(result_data), len(expected_result)))
+ #self.assertEqual(expected_result[0:L], result_data[0:L])
+ # FIXME check first 3 answers
+ self.assertEqual(expected_result[3:L], result_data[3:L])
+
+ def test_003_interp(self):
+ taps = random_floats(31)
+ src_data = random_floats(10000)
+ decimation = 3
+
+ expected_result = reference_dec_filter(src_data, decimation, taps)
+
+ src = gr.vector_source_f(src_data)
+ op = gr.rational_resampler_base_fff(1, decimation, taps)
+ dst = gr.vector_sink_f()
+ self.fg.connect(src, op)
+ self.fg.connect(op, dst)
+ self.fg.run()
+ result_data = dst.data()
+
+ L1 = len(result_data)
+ L2 = len(expected_result)
+ L = min(L1, L2)
+ if False:
+ sys.stderr.write('delta = %2d: ntaps = %d decim = %d ilen = %d\n' %
+ (L2 - L1, len(taps), decimation, len(src_data)))
+ sys.stderr.write(' len(result_data) = %d len(expected_result) = %d\n' %
+ (len(result_data), len(expected_result)))
+ self.assertEqual(expected_result[0:L], result_data[0:L])
+
+ # FIXME disabled. Triggers hang on SuSE 10.0
+ def xtest_004_decim_random_vals(self):
+ MAX_TAPS = 9
+ MAX_DECIM = 7
+ OUTPUT_LEN = 9
+
+ random.seed(0) # we want reproducibility
+
+ for ntaps in xrange(1, MAX_TAPS + 1):
+ for decim in xrange(1, MAX_DECIM+1):
+ for ilen in xrange(ntaps + decim, ntaps + OUTPUT_LEN*decim):
+ src_data = random_floats(ilen)
+ taps = random_floats(ntaps)
+ expected_result = reference_dec_filter(src_data, decim, taps)
+
+ fg = gr.flow_graph()
+ src = gr.vector_source_f(src_data)
+ op = gr.rational_resampler_base_fff(1, decim, taps)
+ dst = gr.vector_sink_f()
+ fg.connect(src, op, dst)
+ fg.run()
+ fg = None
+ result_data = dst.data()
+ L1 = len(result_data)
+ L2 = len(expected_result)
+ L = min(L1, L2)
+ if False:
+ sys.stderr.write('delta = %2d: ntaps = %d decim = %d ilen = %d\n' % (L2 - L1, ntaps, decim, ilen))
+ sys.stderr.write(' len(result_data) = %d len(expected_result) = %d\n' %
+ (len(result_data), len(expected_result)))
+ self.assertEqual(expected_result[0:L], result_data[0:L])
+
+
+ # FIXME disabled. Triggers hang on SuSE 10.0
+ def xtest_005_interp_random_vals(self):
+ MAX_TAPS = 9
+ MAX_INTERP = 7
+ INPUT_LEN = 9
+
+ random.seed(0) # we want reproducibility
+
+ for ntaps in xrange(1, MAX_TAPS + 1):
+ for interp in xrange(1, MAX_INTERP+1):
+ for ilen in xrange(ntaps, ntaps + INPUT_LEN):
+ src_data = random_floats(ilen)
+ taps = random_floats(ntaps)
+ expected_result = reference_interp_filter(src_data, interp, taps)
+
+ fg = gr.flow_graph()
+ src = gr.vector_source_f(src_data)
+ op = gr.rational_resampler_base_fff(interp, 1, taps)
+ dst = gr.vector_sink_f()
+ fg.connect(src, op, dst)
+ fg.run()
+ fg = None
+ result_data = dst.data()
+ L1 = len(result_data)
+ L2 = len(expected_result)
+ L = min(L1, L2)
+ #if True or abs(L1-L2) > 1:
+ if False:
+ sys.stderr.write('delta = %2d: ntaps = %d interp = %d ilen = %d\n' % (L2 - L1, ntaps, interp, ilen))
+ #sys.stderr.write(' len(result_data) = %d len(expected_result) = %d\n' %
+ # (len(result_data), len(expected_result)))
+ #self.assertEqual(expected_result[0:L], result_data[0:L])
+ # FIXME check first ntaps+1 answers
+ self.assertEqual(expected_result[ntaps+1:L], result_data[ntaps+1:L])
+
+
+ def test_006_interp_decim(self):
+ taps = (0,1,0,0)
+ src_data = range(10000)
+ interp = 3
+ decimation = 2
+
+ expected_result = reference_interp_dec_filter(src_data, interp, decimation, taps)
+
+ src = gr.vector_source_f(src_data)
+ op = gr.rational_resampler_base_fff(interp, decimation, taps)
+ dst = gr.vector_sink_f()
+ self.fg.connect(src, op)
+ self.fg.connect(op, dst)
+ self.fg.run()
+ result_data = dst.data()
+
+ L1 = len(result_data)
+ L2 = len(expected_result)
+ L = min(L1, L2)
+ if False:
+ sys.stderr.write('delta = %2d: ntaps = %d decim = %d ilen = %d\n' %
+ (L2 - L1, len(taps), decimation, len(src_data)))
+ sys.stderr.write(' len(result_data) = %d len(expected_result) = %d\n' %
+ (len(result_data), len(expected_result)))
+ self.assertEqual(expected_result[1:L], result_data[1:L])
+
+ #
+ # test the blks.rational_resampler_??? primitives...
+ #
+
+ def test_101_interp(self):
+ taps = [1, 10, 100, 1000, 10000]
+ src_data = (0, 2, 3, 5, 7, 11, 13, 17)
+ interpolation = 3
+ xr = (0,0,0,0,2,20,200,2003,20030,300,3005,30050,500,5007,50070,700,7011,70110,1100,11013,110130,1300,13017,130170,1700.0,17000.0,170000.0)
+ expected_result = tuple([float(x) for x in xr])
+
+ src = gr.vector_source_f(src_data)
+ op = blks.rational_resampler_fff(self.fg, interpolation, 1, taps=taps)
+ dst = gr.vector_sink_f()
+ self.fg.connect(src, op)
+ self.fg.connect(op, dst)
+ self.fg.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
+
+
+if __name__ == '__main__':
+ gr_unittest.main()
+
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_sig_source.py b/gnuradio-core/src/python/gnuradio/gr/qa_sig_source.py
new file mode 100755
index 0000000000..39866ac43b
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_sig_source.py
@@ -0,0 +1,85 @@
+#!/usr/bin/env python
+#
+# Copyright 2004 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import math
+
+class test_sig_source (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.fg = gr.flow_graph ()
+
+ def tearDown (self):
+ self.fg = None
+
+ def test_const_f (self):
+ fg = self.fg
+ expected_result = (1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5)
+ src1 = gr.sig_source_f (1e6, gr.GR_CONST_WAVE, 0, 1.5)
+ op = gr.head (gr.sizeof_float, 10)
+ dst1 = gr.vector_sink_f ()
+ fg.connect (src1, op)
+ fg.connect (op, dst1)
+ fg.run ()
+ dst_data = dst1.data ()
+ self.assertEqual (expected_result, dst_data)
+
+ def test_const_i (self):
+ fg = self.fg
+ expected_result = (1, 1, 1, 1)
+ src1 = gr.sig_source_i (1e6, gr.GR_CONST_WAVE, 0, 1)
+ op = gr.head (gr.sizeof_int, 4)
+ dst1 = gr.vector_sink_i ()
+ fg.connect (src1, op)
+ fg.connect (op, dst1)
+ fg.run ()
+ dst_data = dst1.data ()
+ self.assertEqual (expected_result, dst_data)
+
+ def test_sine_f (self):
+ fg = self.fg
+ sqrt2 = math.sqrt(2) / 2
+ expected_result = (0, sqrt2, 1, sqrt2, 0, -sqrt2, -1, -sqrt2, 0)
+ src1 = gr.sig_source_f (8, gr.GR_SIN_WAVE, 1.0, 1.0)
+ op = gr.head (gr.sizeof_float, 9)
+ dst1 = gr.vector_sink_f ()
+ fg.connect (src1, op)
+ fg.connect (op, dst1)
+ fg.run ()
+ dst_data = dst1.data ()
+ self.assertFloatTuplesAlmostEqual (expected_result, dst_data, 5)
+
+ def test_cosine_f (self):
+ fg = self.fg
+ sqrt2 = math.sqrt(2) / 2
+ expected_result = (1, sqrt2, 0, -sqrt2, -1, -sqrt2, 0, sqrt2, 1)
+ src1 = gr.sig_source_f (8, gr.GR_COS_WAVE, 1.0, 1.0)
+ op = gr.head (gr.sizeof_float, 9)
+ dst1 = gr.vector_sink_f ()
+ fg.connect (src1, op)
+ fg.connect (op, dst1)
+ fg.run ()
+ dst_data = dst1.data ()
+ self.assertFloatTuplesAlmostEqual (expected_result, dst_data, 5)
+
+if __name__ == '__main__':
+ gr_unittest.main ()
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_single_pole_iir.py b/gnuradio-core/src/python/gnuradio/gr/qa_single_pole_iir.py
new file mode 100755
index 0000000000..5898188f82
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_single_pole_iir.py
@@ -0,0 +1,72 @@
+#!/usr/bin/env python
+#
+# Copyright 2005 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+
+class test_single_pole_iir(gr_unittest.TestCase):
+
+ def setUp (self):
+ self.fg = gr.flow_graph ()
+
+ def tearDown (self):
+ self.fg = None
+
+ def test_001(self):
+ src_data = (0, 1000, 2000, 3000, 4000, 5000)
+ expected_result = src_data
+ src = gr.vector_source_f(src_data)
+ op = gr.single_pole_iir_filter_ff (1.0)
+ dst = gr.vector_sink_f()
+ self.fg.connect (src, op, dst)
+ self.fg.run()
+ result_data = dst.data()
+ self.assertFloatTuplesAlmostEqual (expected_result, result_data)
+
+ def test_002(self):
+ src_data = (0, 1000, 2000, 3000, 4000, 5000)
+ expected_result = (0, 125, 359.375, 689.453125, 1103.271484, 1590.36255)
+ src = gr.vector_source_f(src_data)
+ op = gr.single_pole_iir_filter_ff (0.125)
+ dst = gr.vector_sink_f()
+ self.fg.connect (src, op, dst)
+ self.fg.run()
+ result_data = dst.data()
+ self.assertFloatTuplesAlmostEqual (expected_result, result_data, 3)
+
+ def test_003(self):
+ block_size = 2
+ src_data = (0, 1000, 2000, 3000, 4000, 5000)
+ expected_result = (0, 125, 250, 484.375, 718.75, 1048.828125)
+ src = gr.vector_source_f(src_data)
+ s2p = gr.serial_to_parallel(gr.sizeof_float, block_size)
+ op = gr.single_pole_iir_filter_ff (0.125, block_size)
+ p2s = gr.parallel_to_serial(gr.sizeof_float, block_size)
+ dst = gr.vector_sink_f()
+ self.fg.connect (src, s2p, op, p2s, dst)
+ self.fg.run()
+ result_data = dst.data()
+ self.assertFloatTuplesAlmostEqual (expected_result, result_data, 3)
+
+
+if __name__ == '__main__':
+ gr_unittest.main ()
+
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_single_pole_iir_cc.py b/gnuradio-core/src/python/gnuradio/gr/qa_single_pole_iir_cc.py
new file mode 100755
index 0000000000..a7889d177d
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_single_pole_iir_cc.py
@@ -0,0 +1,72 @@
+#!/usr/bin/env python
+#
+# Copyright 2005,2006 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+
+class test_single_pole_iir_cc(gr_unittest.TestCase):
+
+ def setUp (self):
+ self.fg = gr.flow_graph ()
+
+ def tearDown (self):
+ self.fg = None
+
+ def test_001(self):
+ src_data = (0+0j, 1000+1000j, 2000+2000j, 3000+3000j, 4000+4000j, 5000+5000j)
+ expected_result = src_data
+ src = gr.vector_source_c(src_data)
+ op = gr.single_pole_iir_filter_cc (1.0)
+ dst = gr.vector_sink_c()
+ self.fg.connect (src, op, dst)
+ self.fg.run()
+ result_data = dst.data()
+ self.assertComplexTuplesAlmostEqual (expected_result, result_data)
+
+ def test_002(self):
+ src_data = (complex(0,0), complex(1000,-1000), complex(2000,-2000), complex(3000,-3000), complex(4000,-4000), complex(5000,-5000))
+ expected_result = (complex(0,0), complex(125,-125), complex(359.375,-359.375), complex(689.453125,-689.453125), complex(1103.271484,-1103.271484), complex(1590.36255,-1590.36255))
+ src = gr.vector_source_c(src_data)
+ op = gr.single_pole_iir_filter_cc (0.125)
+ dst = gr.vector_sink_c()
+ self.fg.connect (src, op, dst)
+ self.fg.run()
+ result_data = dst.data()
+ self.assertComplexTuplesAlmostEqual (expected_result, result_data, 3)
+
+ def test_003(self):
+ block_size = 2
+ src_data = (complex(0,0), complex(1000,-1000), complex(2000,-2000), complex(3000,-3000), complex(4000,-4000), complex(5000,-5000))
+ expected_result = (complex(0,0), complex(125,-125), complex(250,-250), complex(484.375,-484.375), complex(718.75,-718.75), complex(1048.828125,-1048.828125))
+ src = gr.vector_source_c(src_data)
+ s2p = gr.serial_to_parallel(gr.sizeof_gr_complex, block_size)
+ op = gr.single_pole_iir_filter_cc (0.125, block_size)
+ p2s = gr.parallel_to_serial(gr.sizeof_gr_complex, block_size)
+ dst = gr.vector_sink_c()
+ self.fg.connect (src, s2p, op, p2s, dst)
+ self.fg.run()
+ result_data = dst.data()
+ self.assertComplexTuplesAlmostEqual (expected_result, result_data, 3)
+
+
+if __name__ == '__main__':
+ gr_unittest.main ()
+
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_unpack_k_bits.py b/gnuradio-core/src/python/gnuradio/gr/qa_unpack_k_bits.py
new file mode 100755
index 0000000000..59f838a4df
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/qa_unpack_k_bits.py
@@ -0,0 +1,57 @@
+#!/usr/bin/env python
+#
+# Copyright 2006 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import random
+
+class test_unpack(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.fg = gr.flow_graph ()
+
+ def tearDown(self):
+ self.fg = None
+
+ def test_001(self):
+ src_data = (1,0,1,1,0,1,1,0)
+ expected_results = (1,0,1,1,0,1,1,0)
+ src = gr.vector_source_b(src_data,False)
+ op = gr.unpack_k_bits_bb(1)
+ dst = gr.vector_sink_b()
+ self.fg.connect(src, op, dst)
+ self.fg.run()
+ self.assertEqual(expected_results, dst.data())
+
+ def test_002(self):
+ src_data = ( 2, 3, 0, 1)
+ expected_results = (1,0,1,1,0,0,0,1)
+ src = gr.vector_source_b(src_data,False)
+ op = gr.unpack_k_bits_bb(2)
+ dst = gr.vector_sink_b()
+ self.fg.connect(src, op, dst)
+ self.fg.run()
+ self.assertEqual(expected_results, dst.data())
+
+
+if __name__ == '__main__':
+ gr_unittest.main ()
+
diff --git a/gnuradio-core/src/python/gnuradio/gr/run_tests.in b/gnuradio-core/src/python/gnuradio/gr/run_tests.in
new file mode 100755
index 0000000000..87d0afdafd
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/run_tests.in
@@ -0,0 +1,33 @@
+#!/bin/sh
+
+swigbld=@abs_top_builddir@/gnuradio-core/src/lib/swig
+swigsrc=@abs_top_srcdir@/gnuradio-core/src/lib/swig
+py=@abs_top_srcdir@/gnuradio-core/src/python
+
+PYTHONPATH="$swigbld:$swigbld/.libs:$swigsrc:$py"
+export PYTHONPATH
+
+# for OS/X
+DYLD_LIBRARY_PATH="@abs_top_builddir@/gnuradio-core/src/lib/.libs"
+export DYLD_LIBRARY_PATH
+
+# Don't load user or system prefs
+GR_DONT_LOAD_PREFS=1
+export GR_DONT_LOAD_PREFS
+
+ok=yes
+for file in @srcdir@/qa_*.py
+do
+ echo $file
+ if ! $file
+ then
+ ok=no
+ fi
+done
+
+if [ $ok = yes ]
+then
+ exit 0
+else
+ exit 1
+fi
diff --git a/gnuradio-core/src/python/gnuradio/gr/scheduler.py b/gnuradio-core/src/python/gnuradio/gr/scheduler.py
new file mode 100644
index 0000000000..919d07f0a4
--- /dev/null
+++ b/gnuradio-core/src/python/gnuradio/gr/scheduler.py
@@ -0,0 +1,70 @@
+#
+# Copyright 2004 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 59 Temple Place - Suite 330,
+# Boston, MA 02111-1307, USA.
+#
+
+from gnuradio.gr.exceptions import *
+from gnuradio_swig_python import single_threaded_scheduler, sts_pyrun
+import gr_threading as _threading
+#import threading as _threading
+
+class scheduler_thread(_threading.Thread):
+ def __init__(self, sts):
+ _threading.Thread.__init__(self)
+ self.sts = sts
+ def run(self):
+ # Invoke the single threaded scheduler's run method
+ #
+ # Note that we're in a new thread, and that sts_pyrun
+ # releases the global interpreter lock. This has the
+ # effect of evaluating the graph in parallel to the
+ # main line control code.
+ sts_pyrun(self.sts)
+ self.sts = None
+
+class scheduler(object):
+ def __init__(self, fg):
+ graphs = fg.partition_graph(fg.blocks)
+ # print "@@@ # graphs = %d" % (len(graphs))
+
+ self.state = []
+
+ for g in graphs:
+ list_of_blocks = [x.block() for x in g]
+ sts = single_threaded_scheduler(list_of_blocks)
+ thread = scheduler_thread(sts)
+ thread.setDaemon(1)
+ self.state.append((sts, thread))
+
+ def start(self):
+ for (sts, thread) in self.state:
+ thread.start()
+
+ def stop(self):
+ for (sts, thread) in self.state:
+ sts.stop()
+ self.wait()
+
+ def wait(self):
+ for (sts, thread) in self.state:
+ timeout = 0.100
+ while True:
+ thread.join(timeout)
+ if not thread.isAlive():
+ break