diff options
author | jcorgan <jcorgan@221aa14e-8319-0410-a670-987f0aec2ac5> | 2006-08-03 04:51:51 +0000 |
---|---|---|
committer | jcorgan <jcorgan@221aa14e-8319-0410-a670-987f0aec2ac5> | 2006-08-03 04:51:51 +0000 |
commit | 5d69a524f81f234b3fbc41d49ba18d6f6886baba (patch) | |
tree | b71312bf7f1e8d10fef0f3ac6f28784065e73e72 /gnuradio-core/src/python/gnuradio/gr |
Houston, we have a trunk.
git-svn-id: http://gnuradio.org/svn/gnuradio/trunk@3122 221aa14e-8319-0410-a670-987f0aec2ac5
Diffstat (limited to 'gnuradio-core/src/python/gnuradio/gr')
46 files changed, 6875 insertions, 0 deletions
diff --git a/gnuradio-core/src/python/gnuradio/gr/Makefile.am b/gnuradio-core/src/python/gnuradio/gr/Makefile.am new file mode 100644 index 0000000000..05dee29f08 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/Makefile.am @@ -0,0 +1,77 @@ +# +# Copyright 2004,2005,2006 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +include $(top_srcdir)/Makefile.common + +EXTRA_DIST = run_tests.in + + +TESTS = \ + run_tests + + +grgrpythondir = $(grpythondir)/gr + +grgrpython_PYTHON = \ + __init__.py \ + basic_flow_graph.py \ + exceptions.py \ + flow_graph.py \ + gr_threading.py \ + gr_threading_23.py \ + gr_threading_24.py \ + hier_block.py \ + prefs.py \ + scheduler.py + +noinst_PYTHON = \ + qa_add_and_friends.py \ + qa_basic_flow_graph.py \ + qa_complex_to_xxx.py \ + qa_correlate_access_code.py \ + qa_diff_encoder.py \ + qa_diff_phasor_cc.py \ + qa_feval.py \ + qa_fft_filter.py \ + qa_filter_delay_fc.py \ + qa_flow_graph.py \ + qa_frequency_modulator.py \ + qa_fsk_stuff.py \ + qa_head.py \ + qa_hilbert.py \ + qa_iir.py \ + qa_interleave.py \ + qa_interp_fir_filter.py \ + qa_kludge_copy.py \ + qa_kludged_imports.py \ + qa_message.py \ + qa_mute.py \ + qa_nlog10.py \ + qa_packed_to_unpacked.py \ + qa_pipe_fittings.py \ + qa_rational_resampler.py \ + qa_sig_source.py \ + qa_single_pole_iir.py \ + qa_single_pole_iir_cc.py \ + qa_unpack_k_bits.py + + +CLEANFILES = *.pyc diff --git a/gnuradio-core/src/python/gnuradio/gr/__init__.py b/gnuradio-core/src/python/gnuradio/gr/__init__.py new file mode 100644 index 0000000000..5583c412a4 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/__init__.py @@ -0,0 +1,40 @@ +# +# Copyright 2003,2004,2006 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +# The presence of this file turns this directory into a Python package + +# This is the main GNU Radio python module. +# We pull the swig output and the other modules into the gnuradio.gr namespace + +from gnuradio_swig_python import * +from basic_flow_graph import * +from flow_graph import * +from exceptions import * +from hier_block import * + + +# create a couple of aliases +serial_to_parallel = stream_to_vector +parallel_to_serial = vector_to_stream + +# Force the preference database to be initialized +from prefs import prefs + diff --git a/gnuradio-core/src/python/gnuradio/gr/basic_flow_graph.py b/gnuradio-core/src/python/gnuradio/gr/basic_flow_graph.py new file mode 100644 index 0000000000..1afc96298a --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/basic_flow_graph.py @@ -0,0 +1,267 @@ +# +# Copyright 2004 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio_swig_python import gr_block_sptr +import types +import hier_block + +def remove_duplicates (seq): + new = [] + for x in seq: + if not x in new: + new.append (x) + return new + + +class endpoint (object): + __slots__ = ['block', 'port'] + def __init__ (self, block, port): + self.block = block + self.port = port + + def __cmp__ (self, other): + if self.block == other.block and self.port == other.port: + return 0 + return 1 + + def __str__ (self): + return '<endpoint (%s, %s)>' % (self.block, self.port) + +def expand_src_endpoint (src_endpoint): + # A src_endpoint is an output of a block + src_endpoint = coerce_endpoint (src_endpoint) + if isinstance (src_endpoint.block, hier_block.hier_block_base): + return expand_src_endpoint ( + coerce_endpoint (src_endpoint.block.resolve_output_port(src_endpoint.port))) + else: + return src_endpoint + +def expand_dst_endpoint (dst_endpoint): + # a dst_endpoint is the input to a block + dst_endpoint = coerce_endpoint (dst_endpoint) + if isinstance (dst_endpoint.block, hier_block.hier_block_base): + exp = [coerce_endpoint(x) for x in + dst_endpoint.block.resolve_input_port(dst_endpoint.port)] + return expand_dst_endpoints (exp) + else: + return [dst_endpoint] + +def expand_dst_endpoints (endpoint_list): + r = [] + for e in endpoint_list: + r.extend (expand_dst_endpoint (e)) + return r + + +def coerce_endpoint (x): + if isinstance (x, endpoint): + return x + elif isinstance (x, types.TupleType) and len (x) == 2: + return endpoint (x[0], x[1]) + elif hasattr (x, 'block'): # assume it's a block + return endpoint (x, 0) + elif isinstance(x, hier_block.hier_block_base): + return endpoint (x, 0) + else: + raise ValueError, "Not coercible to endpoint: %s" % (x,) + + +class edge (object): + __slots__ = ['src', 'dst'] + def __init__ (self, src_endpoint, dst_endpoint): + self.src = src_endpoint + self.dst = dst_endpoint + + def __cmp__ (self, other): + if self.src == other.src and self.dst == other.dst: + return 0 + return 1 + + def __repr__ (self): + return '<edge (%s, %s)>' % (self.src, self.dst) + +class basic_flow_graph (object): + '''basic_flow_graph -- describe connections between blocks''' + # __slots__ is incompatible with weakrefs (for some reason!) + # __slots__ = ['edge_list'] + def __init__ (self): + self.edge_list = [] + + def connect (self, *points): + '''connect requires two or more arguments that can be coerced to endpoints. + If more than two arguments are provided, they are connected together successively. + ''' + if len (points) < 2: + raise ValueError, ("connect requires at least two endpoints; %d provided." % (len (points),)) + for i in range (1, len (points)): + self._connect (points[i-1], points[i]) + + def _connect (self, src_endpoint, dst_endpoint): + s = expand_src_endpoint (src_endpoint) + for d in expand_dst_endpoint (dst_endpoint): + self._connect_prim (s, d) + + def _connect_prim (self, src_endpoint, dst_endpoint): + src_endpoint = coerce_endpoint (src_endpoint) + dst_endpoint = coerce_endpoint (dst_endpoint) + self._check_valid_src_port (src_endpoint) + self._check_valid_dst_port (dst_endpoint) + self._check_dst_in_use (dst_endpoint) + self._check_type_match (src_endpoint, dst_endpoint) + self.edge_list.append (edge (src_endpoint, dst_endpoint)) + + def disconnect (self, src_endpoint, dst_endpoint): + s = expand_src_endpoint (src_endpoint) + for d in expand_dst_endpoint (dst_endpoint): + self._disconnect_prim (s, d) + + def _disconnect_prim (self, src_endpoint, dst_endpoint): + src_endpoint = coerce_endpoint (src_endpoint) + dst_endpoint = coerce_endpoint (dst_endpoint) + e = edge (src_endpoint, dst_endpoint) + self.edge_list.remove (e) + + def disconnect_all (self): + self.edge_list = [] + + def validate (self): + # check all blocks to ensure: + # (1a) their input ports are contiguously assigned + # (1b) the number of input ports is between min and max + # (2a) their output ports are contiguously assigned + # (2b) the number of output ports is between min and max + # (3) check_topology returns true + + for m in self.all_blocks (): + # print m + + edges = self.in_edges (m) + used_ports = [e.dst.port for e in edges] + ninputs = self._check_contiguity (m, m.input_signature (), used_ports, "input") + + edges = self.out_edges (m) + used_ports = [e.src.port for e in edges] + noutputs = self._check_contiguity (m, m.output_signature (), used_ports, "output") + + if not m.check_topology (ninputs, noutputs): + raise ValueError, ("%s::check_topology (%d, %d) failed" % (m, ninputs, noutputs)) + + + # --- public utilities --- + + def all_blocks (self): + '''return list of all blocks in the graph''' + all_blocks = [] + for edge in self.edge_list: + m = edge.src.block + if not m in all_blocks: + all_blocks.append (m) + m = edge.dst.block + if not m in all_blocks: + all_blocks.append (m) + return all_blocks + + def in_edges (self, m): + '''return list of all edges that have M as a destination''' + return [e for e in self.edge_list if e.dst.block == m] + + def out_edges (self, m): + '''return list of all edges that have M as a source''' + return [e for e in self.edge_list if e.src.block == m] + + def downstream_verticies (self, m): + return [e.dst.block for e in self.out_edges (m)] + + def downstream_verticies_port (self, m, port): + return [e.dst.block for e in self.out_edges(m) if e.src.port == port] + + def upstream_verticies (self, m): + return [e.src.block for e in self.in_edges (m)] + + def adjacent_verticies (self, m): + '''return list of all verticies adjacent to M''' + return self.downstream_verticies (m) + self.upstream_verticies (m) + + def sink_p (self, m): + '''return True iff this block is a sink''' + e = self.out_edges (m) + return len (e) == 0 + + def source_p (self, m): + '''return True iff this block is a source''' + e = self.in_edges (m) + return len (e) == 0 + + # --- internal methods --- + + def _check_dst_in_use (self, dst_endpoint): + '''Ensure that there is not already an endpoint that terminates at dst_endpoint.''' + x = [ep for ep in self.edge_list if ep.dst == dst_endpoint] + if x: # already in use + raise ValueError, ("destination endpoint already in use: %s" % (dst_endpoint)) + + def _check_valid_src_port (self, src_endpoint): + self._check_port (src_endpoint.block.output_signature(), src_endpoint.port) + + def _check_valid_dst_port (self, dst_endpoint): + self._check_port (dst_endpoint.block.input_signature(), dst_endpoint.port) + + def _check_port (self, signature, port): + if port < 0: + raise ValueError, 'port number out of range.' + if signature.max_streams () == -1: # infinite + return # OK + if port >= signature.max_streams (): + raise ValueError, 'port number out of range.' + + def _check_type_match (self, src_endpoint, dst_endpoint): + # for now, we just ensure that the stream item sizes match + src_sig = src_endpoint.block.output_signature () + dst_sig = dst_endpoint.block.input_signature () + src_size = src_sig.sizeof_stream_item (src_endpoint.port) + dst_size = dst_sig.sizeof_stream_item (dst_endpoint.port) + if src_size != dst_size: + raise ValueError, 'source and destination data sizes are different' + + def _check_contiguity (self, m, sig, used_ports, dir): + used_ports.sort () + used_ports = remove_duplicates (used_ports) + min_s = sig.min_streams () + + l = len (used_ports) + if l == 0: + if min_s == 0: + return l + raise ValueError, ("%s requires %d %s connections. It has none." % + (m, min_s, dir)) + + if used_ports[-1] + 1 < min_s: + raise ValueError, ("%s requires %d %s connections. It has %d." % + (m, min_s, dir, used_ports[-1] + 1)) + + if used_ports[-1] + 1 != l: + for i in range (l): + if used_ports[i] != i: + raise ValueError, ("%s %s port %d is not connected" % + (m, dir, i)) + + # print "%s ports: %s" % (dir, used_ports) + return l diff --git a/gnuradio-core/src/python/gnuradio/gr/benchmark_filters.py b/gnuradio-core/src/python/gnuradio/gr/benchmark_filters.py new file mode 100755 index 0000000000..7b2f44c445 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/benchmark_filters.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python + +import time +import random +from optparse import OptionParser +from gnuradio import gr +from gnuradio.eng_option import eng_option + +def make_random_complex_tuple(L): + result = [] + for x in range(L): + result.append(complex(random.uniform(-1000,1000), + random.uniform(-1000,1000))) + return tuple(result) + +def benchmark(name, creator, dec, ntaps, total_test_size, block_size): + block_size = 32768 + + fg = gr.flow_graph() + taps = make_random_complex_tuple(ntaps) + src = gr.vector_source_c(make_random_complex_tuple(block_size), True) + head = gr.head(gr.sizeof_gr_complex, int(total_test_size)) + op = creator(dec, taps) + dst = gr.null_sink(gr.sizeof_gr_complex) + fg.connect(src, head, op, dst) + start = time.time() + fg.run() + stop = time.time() + delta = stop - start + print "%16s: taps: %4d input: %4g, time: %6.3f taps/sec: %10.4g" % ( + name, ntaps, total_test_size, delta, ntaps*total_test_size/delta) + +def main(): + parser = OptionParser(option_class=eng_option) + parser.add_option("-n", "--ntaps", type="int", default=256) + parser.add_option("-t", "--total-input-size", type="eng_float", default=40e6) + parser.add_option("-b", "--block-size", type="intx", default=50000) + parser.add_option("-d", "--decimation", type="int", default=1) + (options, args) = parser.parse_args() + if len(args) != 0: + parser.print_help() + sys.exit(1) + + ntaps = options.ntaps + total_input_size = options.total_input_size + block_size = options.block_size + dec = options.decimation + + benchmark("gr.fir_filter_ccc", gr.fir_filter_ccc, + dec, ntaps, total_input_size, block_size) + benchmark("gr.fft_filter_ccc", gr.fft_filter_ccc, + dec, ntaps, total_input_size, block_size) + +if __name__ == '__main__': + main() diff --git a/gnuradio-core/src/python/gnuradio/gr/exceptions.py b/gnuradio-core/src/python/gnuradio/gr/exceptions.py new file mode 100644 index 0000000000..0cbeb143a4 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/exceptions.py @@ -0,0 +1,27 @@ +# +# Copyright 2004 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. + +class NotDAG (Exception): + """Not a directed acyclic graph""" + pass + +class CantHappen (Exception): + """Can't happen""" + pass diff --git a/gnuradio-core/src/python/gnuradio/gr/flow_graph.py b/gnuradio-core/src/python/gnuradio/gr/flow_graph.py new file mode 100644 index 0000000000..db9c58768a --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/flow_graph.py @@ -0,0 +1,234 @@ +# +# Copyright 2004,2006 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio.gr.basic_flow_graph import remove_duplicates, endpoint, edge, \ + basic_flow_graph + +from gnuradio.gr.exceptions import * +from gnuradio_swig_python import buffer, buffer_add_reader, block_detail, \ + single_threaded_scheduler + +from gnuradio.gr.scheduler import scheduler + +_WHITE = 0 # graph coloring tags +_GRAY = 1 +_BLACK = 2 + +_flow_graph_debug = False + +def set_flow_graph_debug(on): + global _flow_graph_debug + _flow_graph_debug = on + + +class buffer_sizes (object): + """compute buffer sizes to use""" + def __init__ (self, flow_graph): + # We could scan the graph here and determine individual sizes + # based on relative_rate, number of readers, etc. + + # The simplest thing that could possibly work: all buffers same size + self.flow_graph = flow_graph + self.fixed_buffer_size = 32*1024 + + def allocate (self, m, index): + """allocate buffer for output index of block m""" + item_size = m.output_signature().sizeof_stream_item (index) + nitems = self.fixed_buffer_size / item_size + if nitems < 2 * m.output_multiple (): + nitems = 2 * m.output_multiple () + + # if any downstream blocks is a decimator and/or has a large output_multiple, + # ensure that we have a buffer at least 2 * their decimation_factor*output_multiple + for mdown in self.flow_graph.downstream_verticies_port(m, index): + decimation = int(1.0 / mdown.relative_rate()) + nitems = max(nitems, 2 * (decimation * mdown.output_multiple() + mdown.history())) + + return buffer (nitems, item_size) + + +class flow_graph (basic_flow_graph): + """add physical connection info to simple_flow_graph + """ + # __slots__ is incompatible with weakrefs (for some reason!) + # __slots__ = ['blocks', 'scheduler'] + + def __init__ (self): + basic_flow_graph.__init__ (self); + self.blocks = None + self.scheduler = None + + def __del__(self): + # print "\nflow_graph.__del__" + # this ensures that i/o devices such as the USRP get shutdown gracefully + self.stop() + + def start (self): + '''start graph, forking thread(s), return immediately''' + if self.scheduler: + raise RuntimeError, "Scheduler already running" + self._setup_connections () + + # cast down to gr_module_sptr + # t = [x.block () for x in self.topological_sort (self.blocks)] + self.scheduler = scheduler (self) + self.scheduler.start () + + def stop (self): + '''tells scheduler to stop and waits for it to happen''' + if self.scheduler: + self.scheduler.stop () + self.scheduler = None + + def wait (self): + '''waits for scheduler to stop''' + if self.scheduler: + self.scheduler.wait () + self.scheduler = None + + def is_running (self): + return not not self.scheduler + + def run (self): + '''start graph, wait for completion''' + self.start () + self.wait () + + def _setup_connections (self): + """given the basic flow graph, setup all the physical connections""" + self.validate () + self.blocks = self.all_blocks () + self._assign_details () + self._assign_buffers () + self._connect_inputs () + + def _assign_details (self): + for m in self.blocks: + edges = self.in_edges (m) + used_ports = remove_duplicates ([e.dst.port for e in edges]) + ninputs = len (used_ports) + + edges = self.out_edges (m) + used_ports = remove_duplicates ([e.src.port for e in edges]) + noutputs = len (used_ports) + + m.set_detail (block_detail (ninputs, noutputs)) + + def _assign_buffers (self): + """determine the buffer sizes to use, allocate them and attach to detail""" + sizes = buffer_sizes (self) + for m in self.blocks: + d = m.detail () + for index in range (d.noutputs ()): + d.set_output (index, sizes.allocate (m, index)) + + def _connect_inputs (self): + """connect all block inputs to appropriate upstream buffers""" + for m in self.blocks: + d = m.detail () + # print "%r history = %d" % (m, m.history()) + for e in self.in_edges(m): + # FYI, sources don't have any in_edges + our_port = e.dst.port + upstream_block = e.src.block + upstream_port = e.src.port + upstream_buffer = upstream_block.detail().output(upstream_port) + d.set_input(our_port, buffer_add_reader(upstream_buffer, m.history())) + + + def topological_sort (self, all_v): + ''' + Return a topologically sorted list of vertices. + This is basically a depth-first search with checks + for back edges (the non-DAG condition) + + ''' + + # it's correct without this sort, but this + # should give better ordering for cache utilization + all_v = self._sort_sources_first (all_v) + + output = [] + for v in all_v: + v.ts_color = _WHITE + for v in all_v: + if v.ts_color == _WHITE: + self._dfs_visit (v, output) + output.reverse () + return output + + def _dfs_visit (self, u, output): + # print "dfs_visit (enter): ", u + u.ts_color = _GRAY + for v in self.downstream_verticies (u): + if v.ts_color == _WHITE: # (u, v) is a tree edge + self._dfs_visit (v, output) + elif v.ts_color == _GRAY: # (u, v) is a back edge + raise NotDAG, "The graph is not an acyclic graph (It's got a loop)" + elif v.ts_color == _BLACK: # (u, v) is a cross or forward edge + pass + else: + raise CantHappen, "Invalid color on vertex" + u.ts_color = _BLACK + output.append (u) + # print "dfs_visit (exit): ", u, output + + def _sort_sources_first (self, all_v): + # the sort function is not guaranteed to be stable. + # We add the unique_id in to the key so we're guaranteed + # of reproducible results. This is important for the test + # code. There is often more than one valid topological sort. + # We want to force a reproducible choice. + x = [(not self.source_p(v), v.unique_id(), v) for v in all_v] + x.sort () + x = [v[2] for v in x] + # print "sorted: ", x + return x + + def partition_graph (self, all_v): + '''Return a list of lists of nodes that are connected. + The result is a list of disjoint graphs. + The sublists are topologically sorted. + ''' + result = [] + working_v = all_v[:] # make copy + while working_v: + rv = self._reachable_verticies (working_v[0], working_v) + result.append (self.topological_sort (rv)) + for v in rv: + working_v.remove (v) + if _flow_graph_debug: + print "partition_graph:", result + return result + + def _reachable_verticies (self, start, all_v): + for v in all_v: + v.ts_color = _WHITE + + self._reachable_dfs_visit (start) + return [v for v in all_v if v.ts_color == _BLACK] + + def _reachable_dfs_visit (self, u): + u.ts_color = _BLACK + for v in self.adjacent_verticies (u): + if v.ts_color == _WHITE: + self._reachable_dfs_visit (v) + return None diff --git a/gnuradio-core/src/python/gnuradio/gr/gr_threading.py b/gnuradio-core/src/python/gnuradio/gr/gr_threading.py new file mode 100644 index 0000000000..6a09a3239f --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/gr_threading.py @@ -0,0 +1,35 @@ +# +# Copyright 2005 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from sys import version_info as _version_info + +# import patched version of standard threading module + +if _version_info[0:2] == (2, 3): + #print "Importing gr_threading_23" + from gr_threading_23 import * +elif _version_info[0:2] == (2, 4): + #print "Importing gr_threading_24" + from gr_threading_24 import * +else: + # assume the patch was applied... + #print "Importing system provided threading" + from threading import * diff --git a/gnuradio-core/src/python/gnuradio/gr/gr_threading_23.py b/gnuradio-core/src/python/gnuradio/gr/gr_threading_23.py new file mode 100644 index 0000000000..dee8034c1c --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/gr_threading_23.py @@ -0,0 +1,724 @@ +"""Thread module emulating a subset of Java's threading model.""" + +# This started life as the threading.py module of Python 2.3 +# It's been patched to fix a problem with join, where a KeyboardInterrupt +# caused a lock to be left in the acquired state. + +import sys as _sys + +try: + import thread +except ImportError: + del _sys.modules[__name__] + raise + +from StringIO import StringIO as _StringIO +from time import time as _time, sleep as _sleep +from traceback import print_exc as _print_exc + +# Rename some stuff so "from threading import *" is safe +__all__ = ['activeCount', 'Condition', 'currentThread', 'enumerate', 'Event', + 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', + 'Timer', 'setprofile', 'settrace'] + +_start_new_thread = thread.start_new_thread +_allocate_lock = thread.allocate_lock +_get_ident = thread.get_ident +ThreadError = thread.error +del thread + + +# Debug support (adapted from ihooks.py). +# All the major classes here derive from _Verbose. We force that to +# be a new-style class so that all the major classes here are new-style. +# This helps debugging (type(instance) is more revealing for instances +# of new-style classes). + +_VERBOSE = False + +if __debug__: + + class _Verbose(object): + + def __init__(self, verbose=None): + if verbose is None: + verbose = _VERBOSE + self.__verbose = verbose + + def _note(self, format, *args): + if self.__verbose: + format = format % args + format = "%s: %s\n" % ( + currentThread().getName(), format) + _sys.stderr.write(format) + +else: + # Disable this when using "python -O" + class _Verbose(object): + def __init__(self, verbose=None): + pass + def _note(self, *args): + pass + +# Support for profile and trace hooks + +_profile_hook = None +_trace_hook = None + +def setprofile(func): + global _profile_hook + _profile_hook = func + +def settrace(func): + global _trace_hook + _trace_hook = func + +# Synchronization classes + +Lock = _allocate_lock + +def RLock(*args, **kwargs): + return _RLock(*args, **kwargs) + +class _RLock(_Verbose): + + def __init__(self, verbose=None): + _Verbose.__init__(self, verbose) + self.__block = _allocate_lock() + self.__owner = None + self.__count = 0 + + def __repr__(self): + return "<%s(%s, %d)>" % ( + self.__class__.__name__, + self.__owner and self.__owner.getName(), + self.__count) + + def acquire(self, blocking=1): + me = currentThread() + if self.__owner is me: + self.__count = self.__count + 1 + if __debug__: + self._note("%s.acquire(%s): recursive success", self, blocking) + return 1 + rc = self.__block.acquire(blocking) + if rc: + self.__owner = me + self.__count = 1 + if __debug__: + self._note("%s.acquire(%s): initial succes", self, blocking) + else: + if __debug__: + self._note("%s.acquire(%s): failure", self, blocking) + return rc + + def release(self): + me = currentThread() + assert self.__owner is me, "release() of un-acquire()d lock" + self.__count = count = self.__count - 1 + if not count: + self.__owner = None + self.__block.release() + if __debug__: + self._note("%s.release(): final release", self) + else: + if __debug__: + self._note("%s.release(): non-final release", self) + + # Internal methods used by condition variables + + def _acquire_restore(self, (count, owner)): + self.__block.acquire() + self.__count = count + self.__owner = owner + if __debug__: + self._note("%s._acquire_restore()", self) + + def _release_save(self): + if __debug__: + self._note("%s._release_save()", self) + count = self.__count + self.__count = 0 + owner = self.__owner + self.__owner = None + self.__block.release() + return (count, owner) + + def _is_owned(self): + return self.__owner is currentThread() + + +def Condition(*args, **kwargs): + return _Condition(*args, **kwargs) + +class _Condition(_Verbose): + + def __init__(self, lock=None, verbose=None): + _Verbose.__init__(self, verbose) + if lock is None: + lock = RLock() + self.__lock = lock + # Export the lock's acquire() and release() methods + self.acquire = lock.acquire + self.release = lock.release + # If the lock defines _release_save() and/or _acquire_restore(), + # these override the default implementations (which just call + # release() and acquire() on the lock). Ditto for _is_owned(). + try: + self._release_save = lock._release_save + except AttributeError: + pass + try: + self._acquire_restore = lock._acquire_restore + except AttributeError: + pass + try: + self._is_owned = lock._is_owned + except AttributeError: + pass + self.__waiters = [] + + def __repr__(self): + return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters)) + + def _release_save(self): + self.__lock.release() # No state to save + + def _acquire_restore(self, x): + self.__lock.acquire() # Ignore saved state + + def _is_owned(self): + # Return True if lock is owned by currentThread. + # This method is called only if __lock doesn't have _is_owned(). + if self.__lock.acquire(0): + self.__lock.release() + return False + else: + return True + + def wait(self, timeout=None): + currentThread() # for side-effect + assert self._is_owned(), "wait() of un-acquire()d lock" + waiter = _allocate_lock() + waiter.acquire() + self.__waiters.append(waiter) + saved_state = self._release_save() + try: # restore state no matter what (e.g., KeyboardInterrupt) + if timeout is None: + waiter.acquire() + if __debug__: + self._note("%s.wait(): got it", self) + else: + # Balancing act: We can't afford a pure busy loop, so we + # have to sleep; but if we sleep the whole timeout time, + # we'll be unresponsive. The scheme here sleeps very + # little at first, longer as time goes on, but never longer + # than 20 times per second (or the timeout time remaining). + endtime = _time() + timeout + delay = 0.0005 # 500 us -> initial delay of 1 ms + while True: + gotit = waiter.acquire(0) + if gotit: + break + remaining = endtime - _time() + if remaining <= 0: + break + delay = min(delay * 2, remaining, .05) + _sleep(delay) + if not gotit: + if __debug__: + self._note("%s.wait(%s): timed out", self, timeout) + try: + self.__waiters.remove(waiter) + except ValueError: + pass + else: + if __debug__: + self._note("%s.wait(%s): got it", self, timeout) + finally: + self._acquire_restore(saved_state) + + def notify(self, n=1): + currentThread() # for side-effect + assert self._is_owned(), "notify() of un-acquire()d lock" + __waiters = self.__waiters + waiters = __waiters[:n] + if not waiters: + if __debug__: + self._note("%s.notify(): no waiters", self) + return + self._note("%s.notify(): notifying %d waiter%s", self, n, + n!=1 and "s" or "") + for waiter in waiters: + waiter.release() + try: + __waiters.remove(waiter) + except ValueError: + pass + + def notifyAll(self): + self.notify(len(self.__waiters)) + + +def Semaphore(*args, **kwargs): + return _Semaphore(*args, **kwargs) + +class _Semaphore(_Verbose): + + # After Tim Peters' semaphore class, but not quite the same (no maximum) + + def __init__(self, value=1, verbose=None): + assert value >= 0, "Semaphore initial value must be >= 0" + _Verbose.__init__(self, verbose) + self.__cond = Condition(Lock()) + self.__value = value + + def acquire(self, blocking=1): + rc = False + self.__cond.acquire() + while self.__value == 0: + if not blocking: + break + if __debug__: + self._note("%s.acquire(%s): blocked waiting, value=%s", + self, blocking, self.__value) + self.__cond.wait() + else: + self.__value = self.__value - 1 + if __debug__: + self._note("%s.acquire: success, value=%s", + self, self.__value) + rc = True + self.__cond.release() + return rc + + def release(self): + self.__cond.acquire() + self.__value = self.__value + 1 + if __debug__: + self._note("%s.release: success, value=%s", + self, self.__value) + self.__cond.notify() + self.__cond.release() + + +def BoundedSemaphore(*args, **kwargs): + return _BoundedSemaphore(*args, **kwargs) + +class _BoundedSemaphore(_Semaphore): + """Semaphore that checks that # releases is <= # acquires""" + def __init__(self, value=1, verbose=None): + _Semaphore.__init__(self, value, verbose) + self._initial_value = value + + def release(self): + if self._Semaphore__value >= self._initial_value: + raise ValueError, "Semaphore released too many times" + return _Semaphore.release(self) + + +def Event(*args, **kwargs): + return _Event(*args, **kwargs) + +class _Event(_Verbose): + + # After Tim Peters' event class (without is_posted()) + + def __init__(self, verbose=None): + _Verbose.__init__(self, verbose) + self.__cond = Condition(Lock()) + self.__flag = False + + def isSet(self): + return self.__flag + + def set(self): + self.__cond.acquire() + try: + self.__flag = True + self.__cond.notifyAll() + finally: + self.__cond.release() + + def clear(self): + self.__cond.acquire() + try: + self.__flag = False + finally: + self.__cond.release() + + def wait(self, timeout=None): + self.__cond.acquire() + try: + if not self.__flag: + self.__cond.wait(timeout) + finally: + self.__cond.release() + +# Helper to generate new thread names +_counter = 0 +def _newname(template="Thread-%d"): + global _counter + _counter = _counter + 1 + return template % _counter + +# Active thread administration +_active_limbo_lock = _allocate_lock() +_active = {} +_limbo = {} + + +# Main class for threads + +class Thread(_Verbose): + + __initialized = False + + def __init__(self, group=None, target=None, name=None, + args=(), kwargs={}, verbose=None): + assert group is None, "group argument must be None for now" + _Verbose.__init__(self, verbose) + self.__target = target + self.__name = str(name or _newname()) + self.__args = args + self.__kwargs = kwargs + self.__daemonic = self._set_daemon() + self.__started = False + self.__stopped = False + self.__block = Condition(Lock()) + self.__initialized = True + + def _set_daemon(self): + # Overridden in _MainThread and _DummyThread + return currentThread().isDaemon() + + def __repr__(self): + assert self.__initialized, "Thread.__init__() was not called" + status = "initial" + if self.__started: + status = "started" + if self.__stopped: + status = "stopped" + if self.__daemonic: + status = status + " daemon" + return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status) + + def start(self): + assert self.__initialized, "Thread.__init__() not called" + assert not self.__started, "thread already started" + if __debug__: + self._note("%s.start(): starting thread", self) + _active_limbo_lock.acquire() + _limbo[self] = self + _active_limbo_lock.release() + _start_new_thread(self.__bootstrap, ()) + self.__started = True + _sleep(0.000001) # 1 usec, to let the thread run (Solaris hack) + + def run(self): + if self.__target: + self.__target(*self.__args, **self.__kwargs) + + def __bootstrap(self): + try: + self.__started = True + _active_limbo_lock.acquire() + _active[_get_ident()] = self + del _limbo[self] + _active_limbo_lock.release() + if __debug__: + self._note("%s.__bootstrap(): thread started", self) + + if _trace_hook: + self._note("%s.__bootstrap(): registering trace hook", self) + _sys.settrace(_trace_hook) + if _profile_hook: + self._note("%s.__bootstrap(): registering profile hook", self) + _sys.setprofile(_profile_hook) + + try: + self.run() + except SystemExit: + if __debug__: + self._note("%s.__bootstrap(): raised SystemExit", self) + except: + if __debug__: + self._note("%s.__bootstrap(): unhandled exception", self) + s = _StringIO() + _print_exc(file=s) + _sys.stderr.write("Exception in thread %s:\n%s\n" % + (self.getName(), s.getvalue())) + else: + if __debug__: + self._note("%s.__bootstrap(): normal return", self) + finally: + self.__stop() + try: + self.__delete() + except: + pass + + def __stop(self): + self.__block.acquire() + self.__stopped = True + self.__block.notifyAll() + self.__block.release() + + def __delete(self): + _active_limbo_lock.acquire() + del _active[_get_ident()] + _active_limbo_lock.release() + + def join(self, timeout=None): + assert self.__initialized, "Thread.__init__() not called" + assert self.__started, "cannot join thread before it is started" + assert self is not currentThread(), "cannot join current thread" + if __debug__: + if not self.__stopped: + self._note("%s.join(): waiting until thread stops", self) + self.__block.acquire() + try: + if timeout is None: + while not self.__stopped: + self.__block.wait() + if __debug__: + self._note("%s.join(): thread stopped", self) + else: + deadline = _time() + timeout + while not self.__stopped: + delay = deadline - _time() + if delay <= 0: + if __debug__: + self._note("%s.join(): timed out", self) + break + self.__block.wait(delay) + else: + if __debug__: + self._note("%s.join(): thread stopped", self) + finally: + self.__block.release() + + def getName(self): + assert self.__initialized, "Thread.__init__() not called" + return self.__name + + def setName(self, name): + assert self.__initialized, "Thread.__init__() not called" + self.__name = str(name) + + def isAlive(self): + assert self.__initialized, "Thread.__init__() not called" + return self.__started and not self.__stopped + + def isDaemon(self): + assert self.__initialized, "Thread.__init__() not called" + return self.__daemonic + + def setDaemon(self, daemonic): + assert self.__initialized, "Thread.__init__() not called" + assert not self.__started, "cannot set daemon status of active thread" + self.__daemonic = daemonic + +# The timer class was contributed by Itamar Shtull-Trauring + +def Timer(*args, **kwargs): + return _Timer(*args, **kwargs) + +class _Timer(Thread): + """Call a function after a specified number of seconds: + + t = Timer(30.0, f, args=[], kwargs={}) + t.start() + t.cancel() # stop the timer's action if it's still waiting + """ + + def __init__(self, interval, function, args=[], kwargs={}): + Thread.__init__(self) + self.interval = interval + self.function = function + self.args = args + self.kwargs = kwargs + self.finished = Event() + + def cancel(self): + """Stop the timer if it hasn't finished yet""" + self.finished.set() + + def run(self): + self.finished.wait(self.interval) + if not self.finished.isSet(): + self.function(*self.args, **self.kwargs) + self.finished.set() + +# Special thread class to represent the main thread +# This is garbage collected through an exit handler + +class _MainThread(Thread): + + def __init__(self): + Thread.__init__(self, name="MainThread") + self._Thread__started = True + _active_limbo_lock.acquire() + _active[_get_ident()] = self + _active_limbo_lock.release() + import atexit + atexit.register(self.__exitfunc) + + def _set_daemon(self): + return False + + def __exitfunc(self): + self._Thread__stop() + t = _pickSomeNonDaemonThread() + if t: + if __debug__: + self._note("%s: waiting for other threads", self) + while t: + t.join() + t = _pickSomeNonDaemonThread() + if __debug__: + self._note("%s: exiting", self) + self._Thread__delete() + +def _pickSomeNonDaemonThread(): + for t in enumerate(): + if not t.isDaemon() and t.isAlive(): + return t + return None + + +# Dummy thread class to represent threads not started here. +# These aren't garbage collected when they die, +# nor can they be waited for. +# Their purpose is to return *something* from currentThread(). +# They are marked as daemon threads so we won't wait for them +# when we exit (conform previous semantics). + +class _DummyThread(Thread): + + def __init__(self): + Thread.__init__(self, name=_newname("Dummy-%d")) + self._Thread__started = True + _active_limbo_lock.acquire() + _active[_get_ident()] = self + _active_limbo_lock.release() + + def _set_daemon(self): + return True + + def join(self, timeout=None): + assert False, "cannot join a dummy thread" + + +# Global API functions + +def currentThread(): + try: + return _active[_get_ident()] + except KeyError: + ##print "currentThread(): no current thread for", _get_ident() + return _DummyThread() + +def activeCount(): + _active_limbo_lock.acquire() + count = len(_active) + len(_limbo) + _active_limbo_lock.release() + return count + +def enumerate(): + _active_limbo_lock.acquire() + active = _active.values() + _limbo.values() + _active_limbo_lock.release() + return active + +# Create the main thread object + +_MainThread() + + +# Self-test code + +def _test(): + + class BoundedQueue(_Verbose): + + def __init__(self, limit): + _Verbose.__init__(self) + self.mon = RLock() + self.rc = Condition(self.mon) + self.wc = Condition(self.mon) + self.limit = limit + self.queue = [] + + def put(self, item): + self.mon.acquire() + while len(self.queue) >= self.limit: + self._note("put(%s): queue full", item) + self.wc.wait() + self.queue.append(item) + self._note("put(%s): appended, length now %d", + item, len(self.queue)) + self.rc.notify() + self.mon.release() + + def get(self): + self.mon.acquire() + while not self.queue: + self._note("get(): queue empty") + self.rc.wait() + item = self.queue.pop(0) + self._note("get(): got %s, %d left", item, len(self.queue)) + self.wc.notify() + self.mon.release() + return item + + class ProducerThread(Thread): + + def __init__(self, queue, quota): + Thread.__init__(self, name="Producer") + self.queue = queue + self.quota = quota + + def run(self): + from random import random + counter = 0 + while counter < self.quota: + counter = counter + 1 + self.queue.put("%s.%d" % (self.getName(), counter)) + _sleep(random() * 0.00001) + + + class ConsumerThread(Thread): + + def __init__(self, queue, count): + Thread.__init__(self, name="Consumer") + self.queue = queue + self.count = count + + def run(self): + while self.count > 0: + item = self.queue.get() + print item + self.count = self.count - 1 + + NP = 3 + QL = 4 + NI = 5 + + Q = BoundedQueue(QL) + P = [] + for i in range(NP): + t = ProducerThread(Q, NI) + t.setName("Producer-%d" % (i+1)) + P.append(t) + C = ConsumerThread(Q, NI*NP) + for t in P: + t.start() + _sleep(0.000001) + C.start() + for t in P: + t.join() + C.join() + +if __name__ == '__main__': + _test() diff --git a/gnuradio-core/src/python/gnuradio/gr/gr_threading_24.py b/gnuradio-core/src/python/gnuradio/gr/gr_threading_24.py new file mode 100644 index 0000000000..8539bfc047 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/gr_threading_24.py @@ -0,0 +1,793 @@ +"""Thread module emulating a subset of Java's threading model.""" + +# This started life as the threading.py module of Python 2.4 +# It's been patched to fix a problem with join, where a KeyboardInterrupt +# caused a lock to be left in the acquired state. + +import sys as _sys + +try: + import thread +except ImportError: + del _sys.modules[__name__] + raise + +from time import time as _time, sleep as _sleep +from traceback import format_exc as _format_exc +from collections import deque + +# Rename some stuff so "from threading import *" is safe +__all__ = ['activeCount', 'Condition', 'currentThread', 'enumerate', 'Event', + 'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Thread', + 'Timer', 'setprofile', 'settrace', 'local'] + +_start_new_thread = thread.start_new_thread +_allocate_lock = thread.allocate_lock +_get_ident = thread.get_ident +ThreadError = thread.error +del thread + + +# Debug support (adapted from ihooks.py). +# All the major classes here derive from _Verbose. We force that to +# be a new-style class so that all the major classes here are new-style. +# This helps debugging (type(instance) is more revealing for instances +# of new-style classes). + +_VERBOSE = False + +if __debug__: + + class _Verbose(object): + + def __init__(self, verbose=None): + if verbose is None: + verbose = _VERBOSE + self.__verbose = verbose + + def _note(self, format, *args): + if self.__verbose: + format = format % args + format = "%s: %s\n" % ( + currentThread().getName(), format) + _sys.stderr.write(format) + +else: + # Disable this when using "python -O" + class _Verbose(object): + def __init__(self, verbose=None): + pass + def _note(self, *args): + pass + +# Support for profile and trace hooks + +_profile_hook = None +_trace_hook = None + +def setprofile(func): + global _profile_hook + _profile_hook = func + +def settrace(func): + global _trace_hook + _trace_hook = func + +# Synchronization classes + +Lock = _allocate_lock + +def RLock(*args, **kwargs): + return _RLock(*args, **kwargs) + +class _RLock(_Verbose): + + def __init__(self, verbose=None): + _Verbose.__init__(self, verbose) + self.__block = _allocate_lock() + self.__owner = None + self.__count = 0 + + def __repr__(self): + return "<%s(%s, %d)>" % ( + self.__class__.__name__, + self.__owner and self.__owner.getName(), + self.__count) + + def acquire(self, blocking=1): + me = currentThread() + if self.__owner is me: + self.__count = self.__count + 1 + if __debug__: + self._note("%s.acquire(%s): recursive success", self, blocking) + return 1 + rc = self.__block.acquire(blocking) + if rc: + self.__owner = me + self.__count = 1 + if __debug__: + self._note("%s.acquire(%s): initial succes", self, blocking) + else: + if __debug__: + self._note("%s.acquire(%s): failure", self, blocking) + return rc + + def release(self): + me = currentThread() + assert self.__owner is me, "release() of un-acquire()d lock" + self.__count = count = self.__count - 1 + if not count: + self.__owner = None + self.__block.release() + if __debug__: + self._note("%s.release(): final release", self) + else: + if __debug__: + self._note("%s.release(): non-final release", self) + + # Internal methods used by condition variables + + def _acquire_restore(self, (count, owner)): + self.__block.acquire() + self.__count = count + self.__owner = owner + if __debug__: + self._note("%s._acquire_restore()", self) + + def _release_save(self): + if __debug__: + self._note("%s._release_save()", self) + count = self.__count + self.__count = 0 + owner = self.__owner + self.__owner = None + self.__block.release() + return (count, owner) + + def _is_owned(self): + return self.__owner is currentThread() + + +def Condition(*args, **kwargs): + return _Condition(*args, **kwargs) + +class _Condition(_Verbose): + + def __init__(self, lock=None, verbose=None): + _Verbose.__init__(self, verbose) + if lock is None: + lock = RLock() + self.__lock = lock + # Export the lock's acquire() and release() methods + self.acquire = lock.acquire + self.release = lock.release + # If the lock defines _release_save() and/or _acquire_restore(), + # these override the default implementations (which just call + # release() and acquire() on the lock). Ditto for _is_owned(). + try: + self._release_save = lock._release_save + except AttributeError: + pass + try: + self._acquire_restore = lock._acquire_restore + except AttributeError: + pass + try: + self._is_owned = lock._is_owned + except AttributeError: + pass + self.__waiters = [] + + def __repr__(self): + return "<Condition(%s, %d)>" % (self.__lock, len(self.__waiters)) + + def _release_save(self): + self.__lock.release() # No state to save + + def _acquire_restore(self, x): + self.__lock.acquire() # Ignore saved state + + def _is_owned(self): + # Return True if lock is owned by currentThread. + # This method is called only if __lock doesn't have _is_owned(). + if self.__lock.acquire(0): + self.__lock.release() + return False + else: + return True + + def wait(self, timeout=None): + assert self._is_owned(), "wait() of un-acquire()d lock" + waiter = _allocate_lock() + waiter.acquire() + self.__waiters.append(waiter) + saved_state = self._release_save() + try: # restore state no matter what (e.g., KeyboardInterrupt) + if timeout is None: + waiter.acquire() + if __debug__: + self._note("%s.wait(): got it", self) + else: + # Balancing act: We can't afford a pure busy loop, so we + # have to sleep; but if we sleep the whole timeout time, + # we'll be unresponsive. The scheme here sleeps very + # little at first, longer as time goes on, but never longer + # than 20 times per second (or the timeout time remaining). + endtime = _time() + timeout + delay = 0.0005 # 500 us -> initial delay of 1 ms + while True: + gotit = waiter.acquire(0) + if gotit: + break + remaining = endtime - _time() + if remaining <= 0: + break + delay = min(delay * 2, remaining, .05) + _sleep(delay) + if not gotit: + if __debug__: + self._note("%s.wait(%s): timed out", self, timeout) + try: + self.__waiters.remove(waiter) + except ValueError: + pass + else: + if __debug__: + self._note("%s.wait(%s): got it", self, timeout) + finally: + self._acquire_restore(saved_state) + + def notify(self, n=1): + assert self._is_owned(), "notify() of un-acquire()d lock" + __waiters = self.__waiters + waiters = __waiters[:n] + if not waiters: + if __debug__: + self._note("%s.notify(): no waiters", self) + return + self._note("%s.notify(): notifying %d waiter%s", self, n, + n!=1 and "s" or "") + for waiter in waiters: + waiter.release() + try: + __waiters.remove(waiter) + except ValueError: + pass + + def notifyAll(self): + self.notify(len(self.__waiters)) + + +def Semaphore(*args, **kwargs): + return _Semaphore(*args, **kwargs) + +class _Semaphore(_Verbose): + + # After Tim Peters' semaphore class, but not quite the same (no maximum) + + def __init__(self, value=1, verbose=None): + assert value >= 0, "Semaphore initial value must be >= 0" + _Verbose.__init__(self, verbose) + self.__cond = Condition(Lock()) + self.__value = value + + def acquire(self, blocking=1): + rc = False + self.__cond.acquire() + while self.__value == 0: + if not blocking: + break + if __debug__: + self._note("%s.acquire(%s): blocked waiting, value=%s", + self, blocking, self.__value) + self.__cond.wait() + else: + self.__value = self.__value - 1 + if __debug__: + self._note("%s.acquire: success, value=%s", + self, self.__value) + rc = True + self.__cond.release() + return rc + + def release(self): + self.__cond.acquire() + self.__value = self.__value + 1 + if __debug__: + self._note("%s.release: success, value=%s", + self, self.__value) + self.__cond.notify() + self.__cond.release() + + +def BoundedSemaphore(*args, **kwargs): + return _BoundedSemaphore(*args, **kwargs) + +class _BoundedSemaphore(_Semaphore): + """Semaphore that checks that # releases is <= # acquires""" + def __init__(self, value=1, verbose=None): + _Semaphore.__init__(self, value, verbose) + self._initial_value = value + + def release(self): + if self._Semaphore__value >= self._initial_value: + raise ValueError, "Semaphore released too many times" + return _Semaphore.release(self) + + +def Event(*args, **kwargs): + return _Event(*args, **kwargs) + +class _Event(_Verbose): + + # After Tim Peters' event class (without is_posted()) + + def __init__(self, verbose=None): + _Verbose.__init__(self, verbose) + self.__cond = Condition(Lock()) + self.__flag = False + + def isSet(self): + return self.__flag + + def set(self): + self.__cond.acquire() + try: + self.__flag = True + self.__cond.notifyAll() + finally: + self.__cond.release() + + def clear(self): + self.__cond.acquire() + try: + self.__flag = False + finally: + self.__cond.release() + + def wait(self, timeout=None): + self.__cond.acquire() + try: + if not self.__flag: + self.__cond.wait(timeout) + finally: + self.__cond.release() + +# Helper to generate new thread names +_counter = 0 +def _newname(template="Thread-%d"): + global _counter + _counter = _counter + 1 + return template % _counter + +# Active thread administration +_active_limbo_lock = _allocate_lock() +_active = {} +_limbo = {} + + +# Main class for threads + +class Thread(_Verbose): + + __initialized = False + # Need to store a reference to sys.exc_info for printing + # out exceptions when a thread tries to use a global var. during interp. + # shutdown and thus raises an exception about trying to perform some + # operation on/with a NoneType + __exc_info = _sys.exc_info + + def __init__(self, group=None, target=None, name=None, + args=(), kwargs={}, verbose=None): + assert group is None, "group argument must be None for now" + _Verbose.__init__(self, verbose) + self.__target = target + self.__name = str(name or _newname()) + self.__args = args + self.__kwargs = kwargs + self.__daemonic = self._set_daemon() + self.__started = False + self.__stopped = False + self.__block = Condition(Lock()) + self.__initialized = True + # sys.stderr is not stored in the class like + # sys.exc_info since it can be changed between instances + self.__stderr = _sys.stderr + + def _set_daemon(self): + # Overridden in _MainThread and _DummyThread + return currentThread().isDaemon() + + def __repr__(self): + assert self.__initialized, "Thread.__init__() was not called" + status = "initial" + if self.__started: + status = "started" + if self.__stopped: + status = "stopped" + if self.__daemonic: + status = status + " daemon" + return "<%s(%s, %s)>" % (self.__class__.__name__, self.__name, status) + + def start(self): + assert self.__initialized, "Thread.__init__() not called" + assert not self.__started, "thread already started" + if __debug__: + self._note("%s.start(): starting thread", self) + _active_limbo_lock.acquire() + _limbo[self] = self + _active_limbo_lock.release() + _start_new_thread(self.__bootstrap, ()) + self.__started = True + _sleep(0.000001) # 1 usec, to let the thread run (Solaris hack) + + def run(self): + if self.__target: + self.__target(*self.__args, **self.__kwargs) + + def __bootstrap(self): + try: + self.__started = True + _active_limbo_lock.acquire() + _active[_get_ident()] = self + del _limbo[self] + _active_limbo_lock.release() + if __debug__: + self._note("%s.__bootstrap(): thread started", self) + + if _trace_hook: + self._note("%s.__bootstrap(): registering trace hook", self) + _sys.settrace(_trace_hook) + if _profile_hook: + self._note("%s.__bootstrap(): registering profile hook", self) + _sys.setprofile(_profile_hook) + + try: + self.run() + except SystemExit: + if __debug__: + self._note("%s.__bootstrap(): raised SystemExit", self) + except: + if __debug__: + self._note("%s.__bootstrap(): unhandled exception", self) + # If sys.stderr is no more (most likely from interpreter + # shutdown) use self.__stderr. Otherwise still use sys (as in + # _sys) in case sys.stderr was redefined since the creation of + # self. + if _sys: + _sys.stderr.write("Exception in thread %s:\n%s\n" % + (self.getName(), _format_exc())) + else: + # Do the best job possible w/o a huge amt. of code to + # approximate a traceback (code ideas from + # Lib/traceback.py) + exc_type, exc_value, exc_tb = self.__exc_info() + try: + print>>self.__stderr, ( + "Exception in thread " + self.getName() + + " (most likely raised during interpreter shutdown):") + print>>self.__stderr, ( + "Traceback (most recent call last):") + while exc_tb: + print>>self.__stderr, ( + ' File "%s", line %s, in %s' % + (exc_tb.tb_frame.f_code.co_filename, + exc_tb.tb_lineno, + exc_tb.tb_frame.f_code.co_name)) + exc_tb = exc_tb.tb_next + print>>self.__stderr, ("%s: %s" % (exc_type, exc_value)) + # Make sure that exc_tb gets deleted since it is a memory + # hog; deleting everything else is just for thoroughness + finally: + del exc_type, exc_value, exc_tb + else: + if __debug__: + self._note("%s.__bootstrap(): normal return", self) + finally: + self.__stop() + try: + self.__delete() + except: + pass + + def __stop(self): + self.__block.acquire() + self.__stopped = True + self.__block.notifyAll() + self.__block.release() + + def __delete(self): + "Remove current thread from the dict of currently running threads." + + # Notes about running with dummy_thread: + # + # Must take care to not raise an exception if dummy_thread is being + # used (and thus this module is being used as an instance of + # dummy_threading). dummy_thread.get_ident() always returns -1 since + # there is only one thread if dummy_thread is being used. Thus + # len(_active) is always <= 1 here, and any Thread instance created + # overwrites the (if any) thread currently registered in _active. + # + # An instance of _MainThread is always created by 'threading'. This + # gets overwritten the instant an instance of Thread is created; both + # threads return -1 from dummy_thread.get_ident() and thus have the + # same key in the dict. So when the _MainThread instance created by + # 'threading' tries to clean itself up when atexit calls this method + # it gets a KeyError if another Thread instance was created. + # + # This all means that KeyError from trying to delete something from + # _active if dummy_threading is being used is a red herring. But + # since it isn't if dummy_threading is *not* being used then don't + # hide the exception. + + _active_limbo_lock.acquire() + try: + try: + del _active[_get_ident()] + except KeyError: + if 'dummy_threading' not in _sys.modules: + raise + finally: + _active_limbo_lock.release() + + def join(self, timeout=None): + assert self.__initialized, "Thread.__init__() not called" + assert self.__started, "cannot join thread before it is started" + assert self is not currentThread(), "cannot join current thread" + if __debug__: + if not self.__stopped: + self._note("%s.join(): waiting until thread stops", self) + self.__block.acquire() + try: + if timeout is None: + while not self.__stopped: + self.__block.wait() + if __debug__: + self._note("%s.join(): thread stopped", self) + else: + deadline = _time() + timeout + while not self.__stopped: + delay = deadline - _time() + if delay <= 0: + if __debug__: + self._note("%s.join(): timed out", self) + break + self.__block.wait(delay) + else: + if __debug__: + self._note("%s.join(): thread stopped", self) + finally: + self.__block.release() + + def getName(self): + assert self.__initialized, "Thread.__init__() not called" + return self.__name + + def setName(self, name): + assert self.__initialized, "Thread.__init__() not called" + self.__name = str(name) + + def isAlive(self): + assert self.__initialized, "Thread.__init__() not called" + return self.__started and not self.__stopped + + def isDaemon(self): + assert self.__initialized, "Thread.__init__() not called" + return self.__daemonic + + def setDaemon(self, daemonic): + assert self.__initialized, "Thread.__init__() not called" + assert not self.__started, "cannot set daemon status of active thread" + self.__daemonic = daemonic + +# The timer class was contributed by Itamar Shtull-Trauring + +def Timer(*args, **kwargs): + return _Timer(*args, **kwargs) + +class _Timer(Thread): + """Call a function after a specified number of seconds: + + t = Timer(30.0, f, args=[], kwargs={}) + t.start() + t.cancel() # stop the timer's action if it's still waiting + """ + + def __init__(self, interval, function, args=[], kwargs={}): + Thread.__init__(self) + self.interval = interval + self.function = function + self.args = args + self.kwargs = kwargs + self.finished = Event() + + def cancel(self): + """Stop the timer if it hasn't finished yet""" + self.finished.set() + + def run(self): + self.finished.wait(self.interval) + if not self.finished.isSet(): + self.function(*self.args, **self.kwargs) + self.finished.set() + +# Special thread class to represent the main thread +# This is garbage collected through an exit handler + +class _MainThread(Thread): + + def __init__(self): + Thread.__init__(self, name="MainThread") + self._Thread__started = True + _active_limbo_lock.acquire() + _active[_get_ident()] = self + _active_limbo_lock.release() + import atexit + atexit.register(self.__exitfunc) + + def _set_daemon(self): + return False + + def __exitfunc(self): + self._Thread__stop() + t = _pickSomeNonDaemonThread() + if t: + if __debug__: + self._note("%s: waiting for other threads", self) + while t: + t.join() + t = _pickSomeNonDaemonThread() + if __debug__: + self._note("%s: exiting", self) + self._Thread__delete() + +def _pickSomeNonDaemonThread(): + for t in enumerate(): + if not t.isDaemon() and t.isAlive(): + return t + return None + + +# Dummy thread class to represent threads not started here. +# These aren't garbage collected when they die, +# nor can they be waited for. +# Their purpose is to return *something* from currentThread(). +# They are marked as daemon threads so we won't wait for them +# when we exit (conform previous semantics). + +class _DummyThread(Thread): + + def __init__(self): + Thread.__init__(self, name=_newname("Dummy-%d")) + self._Thread__started = True + _active_limbo_lock.acquire() + _active[_get_ident()] = self + _active_limbo_lock.release() + + def _set_daemon(self): + return True + + def join(self, timeout=None): + assert False, "cannot join a dummy thread" + + +# Global API functions + +def currentThread(): + try: + return _active[_get_ident()] + except KeyError: + ##print "currentThread(): no current thread for", _get_ident() + return _DummyThread() + +def activeCount(): + _active_limbo_lock.acquire() + count = len(_active) + len(_limbo) + _active_limbo_lock.release() + return count + +def enumerate(): + _active_limbo_lock.acquire() + active = _active.values() + _limbo.values() + _active_limbo_lock.release() + return active + +# Create the main thread object + +_MainThread() + +# get thread-local implementation, either from the thread +# module, or from the python fallback + +try: + from thread import _local as local +except ImportError: + from _threading_local import local + + +# Self-test code + +def _test(): + + class BoundedQueue(_Verbose): + + def __init__(self, limit): + _Verbose.__init__(self) + self.mon = RLock() + self.rc = Condition(self.mon) + self.wc = Condition(self.mon) + self.limit = limit + self.queue = deque() + + def put(self, item): + self.mon.acquire() + while len(self.queue) >= self.limit: + self._note("put(%s): queue full", item) + self.wc.wait() + self.queue.append(item) + self._note("put(%s): appended, length now %d", + item, len(self.queue)) + self.rc.notify() + self.mon.release() + + def get(self): + self.mon.acquire() + while not self.queue: + self._note("get(): queue empty") + self.rc.wait() + item = self.queue.popleft() + self._note("get(): got %s, %d left", item, len(self.queue)) + self.wc.notify() + self.mon.release() + return item + + class ProducerThread(Thread): + + def __init__(self, queue, quota): + Thread.__init__(self, name="Producer") + self.queue = queue + self.quota = quota + + def run(self): + from random import random + counter = 0 + while counter < self.quota: + counter = counter + 1 + self.queue.put("%s.%d" % (self.getName(), counter)) + _sleep(random() * 0.00001) + + + class ConsumerThread(Thread): + + def __init__(self, queue, count): + Thread.__init__(self, name="Consumer") + self.queue = queue + self.count = count + + def run(self): + while self.count > 0: + item = self.queue.get() + print item + self.count = self.count - 1 + + NP = 3 + QL = 4 + NI = 5 + + Q = BoundedQueue(QL) + P = [] + for i in range(NP): + t = ProducerThread(Q, NI) + t.setName("Producer-%d" % (i+1)) + P.append(t) + C = ConsumerThread(Q, NI*NP) + for t in P: + t.start() + _sleep(0.000001) + C.start() + for t in P: + t.join() + C.join() + +if __name__ == '__main__': + _test() diff --git a/gnuradio-core/src/python/gnuradio/gr/hier_block.py b/gnuradio-core/src/python/gnuradio/gr/hier_block.py new file mode 100644 index 0000000000..d669d71786 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/hier_block.py @@ -0,0 +1,132 @@ +# +# Copyright 2005 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio_swig_python import io_signature +import weakref + +class hier_block_base(object): + """ + Abstract base class for hierarchical signal processing blocks. + """ + def __init__(self, fg): + """ + @param fg: The flow graph that contains this hierarchical block. + @type fg: gr.flow_graph + """ + self.fg = weakref.proxy(fg) + + def input_signature(self): + """ + @return input signature of hierarchical block. + @rtype gr.io_signature + """ + raise NotImplemented + + def output_signature(self): + """ + @return output signature of hierarchical block. + @rtype gr.io_signature + """ + raise NotImplemented + + def resolve_input_port(self, port_number): + """ + @param port_number: which input port number to resolve to an endpoint. + @type port_number: int + @return: sequence of endpoints + @rtype: sequence of endpoint + + Note that an input port can resolve to more than one endpoint. + """ + raise NotImplemented + + def resolve_output_port(self, port_number): + """ + @param port_number: which output port number to resolve to an endpoint. + @type port_number: int + @return: endpoint + @rtype: endpoint + + Output ports resolve to a single endpoint. + """ + raise NotImplemented + + +class hier_block(hier_block_base): + """ + Simple concrete class for building hierarchical blocks. + + This class assumes that there is at most a single block at the + head of the chain and a single block at the end of the chain. + Either head or tail may be None indicating a sink or source respectively. + + If you needs something more elaborate than this, derive a new class from + hier_block_base. + """ + def __init__(self, fg, head_block, tail_block): + """ + @param fg: The flow graph that contains this hierarchical block. + @type fg: flow_graph + @param head_block: the first block in the signal processing chain. + @type head_block: None or subclass of gr.block or gr.hier_block_base + @param tail_block: the last block in the signal processing chain. + @type tail_block: None or subclass of gr.block or gr.hier_block_base + """ + hier_block_base.__init__(self, fg) + # FIXME add type checks here for easier debugging of misuse + self.head = head_block + self.tail = tail_block + + def input_signature(self): + if self.head: + return self.head.input_signature() + else: + return io_signature(0,0,0) + + def output_signature(self): + if self.tail: + return self.tail.output_signature() + else: + return io_signature(0,0,0) + + def resolve_input_port(self, port_number): + return ((self.head, port_number),) + + def resolve_output_port(self, port_number): + return (self.tail, port_number) + + +class compose(hier_block): + """ + Compose one or more blocks (primitive or hierarchical) into a new hierarchical block. + """ + def __init__(self, fg, *blocks): + """ + @param fg: flow graph + @type fg: gr.flow_graph + @param *blocks: list of blocks + @type *blocks: list of blocks + """ + if len(blocks) < 1: + raise ValueError, ("compose requires at least one block; none provided.") + if len(blocks) > 1: + fg.connect(*blocks) + hier_block.__init__(self, fg, blocks[0], blocks[-1]) diff --git a/gnuradio-core/src/python/gnuradio/gr/prefs.py b/gnuradio-core/src/python/gnuradio/gr/prefs.py new file mode 100644 index 0000000000..fa12912717 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/prefs.py @@ -0,0 +1,129 @@ +# +# Copyright 2006 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +import gnuradio_swig_python as gsp +_prefs_base = gsp.gr_prefs + + +import ConfigParser +import os +import os.path +import sys + + +def _user_prefs_filename(): + return os.path.expanduser('~/.gnuradio/config.conf') + +def _sys_prefs_dirname(): + return os.path.join(gsp.prefix(), 'etc/gnuradio/conf.d') + +def _bool(x): + """ + Try to coerce obj to a True or False + """ + if isinstance(x, bool): + return x + if isinstance(x, (float, int)): + return bool(x) + raise TypeError, x + + +class _prefs(_prefs_base): + """ + Derive our 'real class' from the stubbed out base class that has support + for SWIG directors. This allows C++ code to magically and transparently + invoke the methods in this python class. + """ + def __init__(self): + _prefs_base.__init__(self) + self.cp = ConfigParser.RawConfigParser() + + def _sys_prefs_filenames(self): + dir = _sys_prefs_dirname() + try: + fnames = os.listdir(dir) + except (IOError, OSError): + return [] + fnames.sort() + return [os.path.join(dir, f) for f in fnames] + + + def _read_files(self): + filenames = self._sys_prefs_filenames() + filenames.append(_user_prefs_filename()) + #print "filenames: ", filenames + self.cp.read(filenames) + + def __getattr__(self, name): + return getattr(self.cp, name) + + # ---------------------------------------------------------------- + # These methods override the C++ virtual methods of the same name + # ---------------------------------------------------------------- + def has_section(self, section): + return self.cp.has_section(section) + + def has_option(self, section, option): + return self.cp.has_option(section, option) + + def get_string(self, section, option, default_val): + try: + return self.cp.get(section, option) + except: + return default_val + + def get_bool(self, section, option, default_val): + try: + return self.cp.getboolean(section, option) + except: + return default_val + + def get_long(self, section, option, default_val): + try: + return self.cp.getint(section, option) + except: + return default_val + + def get_double(self, section, option, default_val): + try: + return self.cp.getfloat(section, option) + except: + return default_val + # ---------------------------------------------------------------- + # End override of C++ virtual methods + # ---------------------------------------------------------------- + + +_prefs_db = _prefs() + +# if GR_DONT_LOAD_PREFS is set, don't load them. +# (make check uses this to avoid interactions.) +if os.getenv("GR_DONT_LOAD_PREFS", None) is None: + _prefs_db._read_files() + + +_prefs_base.set_singleton(_prefs_db) # tell C++ what instance to use + +def prefs(): + """ + Return the global preference data base + """ + return _prefs_db diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_add_and_friends.py b/gnuradio-core/src/python/gnuradio/gr/qa_add_and_friends.py new file mode 100755 index 0000000000..6cb74e9f52 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_add_and_friends.py @@ -0,0 +1,129 @@ +#!/usr/bin/env python +# +# Copyright 2004 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest + +class test_head (gr_unittest.TestCase): + + def setUp (self): + self.fg = gr.flow_graph () + + def tearDown (self): + self.fg = None + + def help_ii (self, src_data, exp_data, op): + for s in zip (range (len (src_data)), src_data): + src = gr.vector_source_i (s[1]) + self.fg.connect (src, (op, s[0])) + dst = gr.vector_sink_i () + self.fg.connect (op, dst) + self.fg.run () + result_data = dst.data () + self.assertEqual (exp_data, result_data) + + def help_ff (self, src_data, exp_data, op): + for s in zip (range (len (src_data)), src_data): + src = gr.vector_source_f (s[1]) + self.fg.connect (src, (op, s[0])) + dst = gr.vector_sink_f () + self.fg.connect (op, dst) + self.fg.run () + result_data = dst.data () + self.assertEqual (exp_data, result_data) + + def help_cc (self, src_data, exp_data, op): + for s in zip (range (len (src_data)), src_data): + src = gr.vector_source_c (s[1]) + self.fg.connect (src, (op, s[0])) + dst = gr.vector_sink_c () + self.fg.connect (op, dst) + self.fg.run () + result_data = dst.data () + self.assertEqual (exp_data, result_data) + + def test_add_const_ii (self): + src_data = (1, 2, 3, 4, 5) + expected_result = (6, 7, 8, 9, 10) + op = gr.add_const_ii (5) + self.help_ii ((src_data,), expected_result, op) + + def test_add_const_cc (self): + src_data = (1, 2, 3, 4, 5) + expected_result = (1+5j, 2+5j, 3+5j, 4+5j, 5+5j) + op = gr.add_const_cc (5j) + self.help_cc ((src_data,), expected_result, op) + + def test_mult_const_ii (self): + src_data = (-1, 0, 1, 2, 3) + expected_result = (-5, 0, 5, 10, 15) + op = gr.multiply_const_ii (5) + self.help_ii ((src_data,), expected_result, op) + + def test_add_ii (self): + src1_data = (1, 2, 3, 4, 5) + src2_data = (8, -3, 4, 8, 2) + expected_result = (9, -1, 7, 12, 7) + op = gr.add_ii () + self.help_ii ((src1_data, src2_data), + expected_result, op) + + def test_mult_ii (self): + src1_data = (1, 2, 3, 4, 5) + src2_data = (8, -3, 4, 8, 2) + expected_result = (8, -6, 12, 32, 10) + op = gr.multiply_ii () + self.help_ii ((src1_data, src2_data), + expected_result, op) + + def test_sub_ii_1 (self): + src1_data = (1, 2, 3, 4, 5) + expected_result = (-1, -2, -3, -4, -5) + op = gr.sub_ii () + self.help_ii ((src1_data,), + expected_result, op) + + def test_sub_ii_2 (self): + src1_data = (1, 2, 3, 4, 5) + src2_data = (8, -3, 4, 8, 2) + expected_result = (-7, 5, -1, -4, 3) + op = gr.sub_ii () + self.help_ii ((src1_data, src2_data), + expected_result, op) + + def test_div_ff_1 (self): + src1_data = (1, 2, 4, -8) + expected_result = (1, 0.5, 0.25, -.125) + op = gr.divide_ff () + self.help_ff ((src1_data,), + expected_result, op) + + def test_div_ff_2 (self): + src1_data = ( 5, 9, -15, 1024) + src2_data = (10, 3, -5, 64) + expected_result = (0.5, 3, 3, 16) + op = gr.divide_ff () + self.help_ff ((src1_data, src2_data), + expected_result, op) + + +if __name__ == '__main__': + gr_unittest.main () diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_add_v_and_friends.py b/gnuradio-core/src/python/gnuradio/gr/qa_add_v_and_friends.py new file mode 100755 index 0000000000..11e69e3733 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_add_v_and_friends.py @@ -0,0 +1,353 @@ +#!/usr/bin/env python +# +# Copyright 2004 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest + +class test_add_v_and_friends(gr_unittest.TestCase): + + def setUp(self): + self.fg = gr.flow_graph() + + def tearDown(self): + self.fg = None + + def help_ss(self, size, src_data, exp_data, op): + for s in zip(range (len (src_data)), src_data): + src = gr.vector_source_s(s[1]) + srcv = gr.stream_to_vector(gr.sizeof_short, size) + self.fg.connect(src, srcv) + self.fg.connect(srcv, (op, s[0])) + rhs = gr.vector_to_stream(gr.sizeof_short, size) + dst = gr.vector_sink_s() + self.fg.connect(op, rhs, dst) + self.fg.run() + result_data = dst.data() + self.assertEqual(exp_data, result_data) + + def help_ii(self, size, src_data, exp_data, op): + for s in zip(range (len (src_data)), src_data): + src = gr.vector_source_i(s[1]) + srcv = gr.stream_to_vector(gr.sizeof_int, size) + self.fg.connect(src, srcv) + self.fg.connect(srcv, (op, s[0])) + rhs = gr.vector_to_stream(gr.sizeof_int, size) + dst = gr.vector_sink_i() + self.fg.connect(op, rhs, dst) + self.fg.run() + result_data = dst.data() + self.assertEqual(exp_data, result_data) + + def help_ff(self, size, src_data, exp_data, op): + for s in zip(range (len (src_data)), src_data): + src = gr.vector_source_f(s[1]) + srcv = gr.stream_to_vector(gr.sizeof_float, size) + self.fg.connect(src, srcv) + self.fg.connect(srcv, (op, s[0])) + rhs = gr.vector_to_stream(gr.sizeof_float, size) + dst = gr.vector_sink_f() + self.fg.connect(op, rhs, dst) + self.fg.run() + result_data = dst.data() + self.assertEqual(exp_data, result_data) + + def help_cc(self, size, src_data, exp_data, op): + for s in zip(range (len (src_data)), src_data): + src = gr.vector_source_c(s[1]) + srcv = gr.stream_to_vector(gr.sizeof_gr_complex, size) + self.fg.connect(src, srcv) + self.fg.connect(srcv, (op, s[0])) + rhs = gr.vector_to_stream(gr.sizeof_gr_complex, size) + dst = gr.vector_sink_c() + self.fg.connect(op, rhs, dst) + self.fg.run() + result_data = dst.data() + self.assertEqual(exp_data, result_data) + + def help_const_ss(self, src_data, exp_data, op): + src = gr.vector_source_s(src_data) + srcv = gr.stream_to_vector(gr.sizeof_short, len(src_data)) + rhs = gr.vector_to_stream(gr.sizeof_short, len(src_data)) + dst = gr.vector_sink_s() + self.fg.connect(src, srcv, op, rhs, dst) + self.fg.run() + result_data = dst.data() + self.assertEqual(exp_data, result_data) + + def help_const_ii(self, src_data, exp_data, op): + src = gr.vector_source_i(src_data) + srcv = gr.stream_to_vector(gr.sizeof_int, len(src_data)) + rhs = gr.vector_to_stream(gr.sizeof_int, len(src_data)) + dst = gr.vector_sink_i() + self.fg.connect(src, srcv, op, rhs, dst) + self.fg.run() + result_data = dst.data() + self.assertEqual(exp_data, result_data) + + def help_const_ff(self, src_data, exp_data, op): + src = gr.vector_source_f(src_data) + srcv = gr.stream_to_vector(gr.sizeof_float, len(src_data)) + rhs = gr.vector_to_stream(gr.sizeof_float, len(src_data)) + dst = gr.vector_sink_f() + self.fg.connect(src, srcv, op, rhs, dst) + self.fg.run() + result_data = dst.data() + self.assertEqual(exp_data, result_data) + + def help_const_cc(self, src_data, exp_data, op): + src = gr.vector_source_c(src_data) + srcv = gr.stream_to_vector(gr.sizeof_gr_complex, len(src_data)) + rhs = gr.vector_to_stream(gr.sizeof_gr_complex, len(src_data)) + dst = gr.vector_sink_c() + self.fg.connect(src, srcv, op, rhs, dst) + self.fg.run() + result_data = dst.data() + self.assertEqual(exp_data, result_data) + + + def test_add_vss_one(self): + src1_data = (1,) + src2_data = (2,) + src3_data = (3,) + expected_result = (6,) + op = gr.add_vss(1) + self.help_ss(1, (src1_data, src2_data, src3_data), expected_result, op) + + def test_add_vss_five(self): + src1_data = (1, 2, 3, 4, 5) + src2_data = (6, 7, 8, 9, 10) + src3_data = (11, 12, 13, 14, 15) + expected_result = (18, 21, 24, 27, 30) + op = gr.add_vss(5) + self.help_ss(5, (src1_data, src2_data, src3_data), expected_result, op) + + def test_add_vii_one(self): + src1_data = (1,) + src2_data = (2,) + src3_data = (3,) + expected_result = (6,) + op = gr.add_vii(1) + self.help_ii(1, (src1_data, src2_data, src3_data), expected_result, op) + + def test_add_vii_five(self): + src1_data = (1, 2, 3, 4, 5) + src2_data = (6, 7, 8, 9, 10) + src3_data = (11, 12, 13, 14, 15) + expected_result = (18, 21, 24, 27, 30) + op = gr.add_vii(5) + self.help_ii(5, (src1_data, src2_data, src3_data), expected_result, op) + + def test_add_vff_one(self): + src1_data = (1.0,) + src2_data = (2.0,) + src3_data = (3.0,) + expected_result = (6.0,) + op = gr.add_vff(1) + self.help_ff(1, (src1_data, src2_data, src3_data), expected_result, op) + + def test_add_vff_five(self): + src1_data = (1.0, 2.0, 3.0, 4.0, 5.0) + src2_data = (6.0, 7.0, 8.0, 9.0, 10.0) + src3_data = (11.0, 12.0, 13.0, 14.0, 15.0) + expected_result = (18.0, 21.0, 24.0, 27.0, 30.0) + op = gr.add_vff(5) + self.help_ff(5, (src1_data, src2_data, src3_data), expected_result, op) + + def test_add_vcc_one(self): + src1_data = (1.0+2.0j,) + src2_data = (3.0+4.0j,) + src3_data = (5.0+6.0j,) + expected_result = (9.0+12j,) + op = gr.add_vcc(1) + self.help_cc(1, (src1_data, src2_data, src3_data), expected_result, op) + + def test_add_vcc_five(self): + src1_data = (1.0+2.0j, 3.0+4.0j, 5.0+6.0j, 7.0+8.0j, 9.0+10.0j) + src2_data = (11.0+12.0j, 13.0+14.0j, 15.0+16.0j, 17.0+18.0j, 19.0+20.0j) + src3_data = (21.0+22.0j, 23.0+24.0j, 25.0+26.0j, 27.0+28.0j, 29.0+30.0j) + expected_result = (33.0+36.0j, 39.0+42.0j, 45.0+48.0j, 51.0+54.0j, 57.0+60.0j) + op = gr.add_vcc(5) + self.help_cc(5, (src1_data, src2_data, src3_data), expected_result, op) + + def test_add_const_vss_one(self): + src_data = (1,) + op = gr.add_const_vss((2,)) + exp_data = (3,) + self.help_const_ss(src_data, exp_data, op) + + def test_add_const_vss_five(self): + src_data = (1, 2, 3, 4, 5) + op = gr.add_const_vss((6, 7, 8, 9, 10)) + exp_data = (7, 9, 11, 13, 15) + self.help_const_ss(src_data, exp_data, op) + + def test_add_const_vii_one(self): + src_data = (1,) + op = gr.add_const_vii((2,)) + exp_data = (3,) + self.help_const_ii(src_data, exp_data, op) + + def test_add_const_vii_five(self): + src_data = (1, 2, 3, 4, 5) + op = gr.add_const_vii((6, 7, 8, 9, 10)) + exp_data = (7, 9, 11, 13, 15) + self.help_const_ii(src_data, exp_data, op) + + def test_add_const_vff_one(self): + src_data = (1.0,) + op = gr.add_const_vff((2.0,)) + exp_data = (3.0,) + self.help_const_ff(src_data, exp_data, op) + + def test_add_const_vff_five(self): + src_data = (1.0, 2.0, 3.0, 4.0, 5.0) + op = gr.add_const_vff((6.0, 7.0, 8.0, 9.0, 10.0)) + exp_data = (7.0, 9.0, 11.0, 13.0, 15.0) + self.help_const_ff(src_data, exp_data, op) + + def test_add_const_vcc_one(self): + src_data = (1.0+2.0j,) + op = gr.add_const_vcc((2.0+3.0j,)) + exp_data = (3.0+5.0j,) + self.help_const_cc(src_data, exp_data, op) + + def test_add_const_vcc_five(self): + src_data = (1.0+2.0j, 3.0+4.0j, 5.0+6.0j, 7.0+8.0j, 9.0+10.0j) + op = gr.add_const_vcc((11.0+12.0j, 13.0+14.0j, 15.0+16.0j, 17.0+18.0j, 19.0+20.0j)) + exp_data = (12.0+14.0j, 16.0+18.0j, 20.0+22.0j, 24.0+26.0j, 28.0+30.0j) + self.help_const_cc(src_data, exp_data, op) + + + def test_multiply_vss_one(self): + src1_data = (1,) + src2_data = (2,) + src3_data = (3,) + expected_result = (6,) + op = gr.multiply_vss(1) + self.help_ss(1, (src1_data, src2_data, src3_data), expected_result, op) + + def test_multiply_vss_five(self): + src1_data = (1, 2, 3, 4, 5) + src2_data = (6, 7, 8, 9, 10) + src3_data = (11, 12, 13, 14, 15) + expected_result = (66, 168, 312, 504, 750) + op = gr.multiply_vss(5) + self.help_ss(5, (src1_data, src2_data, src3_data), expected_result, op) + + def test_multiply_vii_one(self): + src1_data = (1,) + src2_data = (2,) + src3_data = (3,) + expected_result = (6,) + op = gr.multiply_vii(1) + self.help_ii(1, (src1_data, src2_data, src3_data), expected_result, op) + + def test_multiply_vii_five(self): + src1_data = (1, 2, 3, 4, 5) + src2_data = (6, 7, 8, 9, 10) + src3_data = (11, 12, 13, 14, 15) + expected_result = (66, 168, 312, 504, 750) + op = gr.multiply_vii(5) + self.help_ii(5, (src1_data, src2_data, src3_data), expected_result, op) + + def test_multiply_vff_one(self): + src1_data = (1.0,) + src2_data = (2.0,) + src3_data = (3.0,) + expected_result = (6.0,) + op = gr.multiply_vff(1) + self.help_ff(1, (src1_data, src2_data, src3_data), expected_result, op) + + def test_multiply_vff_five(self): + src1_data = (1.0, 2.0, 3.0, 4.0, 5.0) + src2_data = (6.0, 7.0, 8.0, 9.0, 10.0) + src3_data = (11.0, 12.0, 13.0, 14.0, 15.0) + expected_result = (66.0, 168.0, 312.0, 504.0, 750.0) + op = gr.multiply_vff(5) + self.help_ff(5, (src1_data, src2_data, src3_data), expected_result, op) + + def test_multiply_vcc_one(self): + src1_data = (1.0+2.0j,) + src2_data = (3.0+4.0j,) + src3_data = (5.0+6.0j,) + expected_result = (-85+20j,) + op = gr.multiply_vcc(1) + self.help_cc(1, (src1_data, src2_data, src3_data), expected_result, op) + + def test_multiply_vcc_five(self): + src1_data = (1.0+2.0j, 3.0+4.0j, 5.0+6.0j, 7.0+8.0j, 9.0+10.0j) + src2_data = (11.0+12.0j, 13.0+14.0j, 15.0+16.0j, 17.0+18.0j, 19.0+20.0j) + src3_data = (21.0+22.0j, 23.0+24.0j, 25.0+26.0j, 27.0+28.0j, 29.0+30.0j) + expected_result = (-1021.0+428.0j, -2647.0+1754.0j, -4945.0+3704.0j, -8011.0+6374.0j, -11941.0+9860.0j) + op = gr.multiply_vcc(5) + self.help_cc(5, (src1_data, src2_data, src3_data), expected_result, op) + + def test_multiply_const_vss_one(self): + src_data = (2,) + op = gr.multiply_const_vss((3,)) + exp_data = (6,) + self.help_const_ss(src_data, exp_data, op) + + def test_multiply_const_vss_five(self): + src_data = (1, 2, 3, 4, 5) + op = gr.multiply_const_vss((6, 7, 8, 9, 10)) + exp_data = (6, 14, 24, 36, 50) + self.help_const_ss(src_data, exp_data, op) + + def test_multiply_const_vii_one(self): + src_data = (2,) + op = gr.multiply_const_vii((3,)) + exp_data = (6,) + self.help_const_ii(src_data, exp_data, op) + + def test_multiply_const_vii_five(self): + src_data = (1, 2, 3, 4, 5) + op = gr.multiply_const_vii((6, 7, 8, 9, 10)) + exp_data = (6, 14, 24, 36, 50) + self.help_const_ii(src_data, exp_data, op) + + def test_multiply_const_vff_one(self): + src_data = (2.0,) + op = gr.multiply_const_vff((3.0,)) + exp_data = (6.0,) + self.help_const_ff(src_data, exp_data, op) + + def test_multiply_const_vff_five(self): + src_data = (1.0, 2.0, 3.0, 4.0, 5.0) + op = gr.multiply_const_vff((6.0, 7.0, 8.0, 9.0, 10.0)) + exp_data = (6.0, 14.0, 24.0, 36.0, 50.0) + self.help_const_ff(src_data, exp_data, op) + + def test_multiply_const_vcc_one(self): + src_data = (1.0+2.0j,) + op = gr.multiply_const_vcc((2.0+3.0j,)) + exp_data = (-4.0+7.0j,) + self.help_const_cc(src_data, exp_data, op) + + def test_multiply_const_vcc_five(self): + src_data = (1.0+2.0j, 3.0+4.0j, 5.0+6.0j, 7.0+8.0j, 9.0+10.0j) + op = gr.multiply_const_vcc((11.0+12.0j, 13.0+14.0j, 15.0+16.0j, 17.0+18.0j, 19.0+20.0j)) + exp_data = (-13.0+34.0j, -17.0+94.0j, -21.0+170.0j, -25.0+262.0j, -29.0+370.0j) + self.help_const_cc(src_data, exp_data, op) + + +if __name__ == '__main__': + gr_unittest.main () diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_basic_flow_graph.py b/gnuradio-core/src/python/gnuradio/gr/qa_basic_flow_graph.py new file mode 100755 index 0000000000..0799c72e48 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_basic_flow_graph.py @@ -0,0 +1,190 @@ +#!/usr/bin/env python + +from gnuradio import gr, gr_unittest + + +# ---------------------------------------------------------------- + + +class test_basic_flow_graph (gr_unittest.TestCase): + + def setUp (self): + self.fg = gr.basic_flow_graph () + + def tearDown (self): + self.fg = None + + def test_000_create_delete (self): + pass + + def test_001a_insert_1 (self): + fg = self.fg + src1 = gr.null_source (gr.sizeof_int) + dst1 = gr.null_sink (gr.sizeof_int) + fg.connect (gr.endpoint (src1, 0), gr.endpoint (dst1, 0)) + + def test_001b_insert_1 (self): + fg = self.fg + src1 = gr.null_source (gr.sizeof_int) + dst1 = gr.null_sink (gr.sizeof_int) + fg.connect (src1, gr.endpoint (dst1, 0)) + + def test_001c_insert_1 (self): + fg = self.fg + src1 = gr.null_source (gr.sizeof_int) + dst1 = gr.null_sink (gr.sizeof_int) + fg.connect (gr.endpoint (src1, 0), dst1) + + def test_001d_insert_1 (self): + fg = self.fg + src1 = gr.null_source (gr.sizeof_int) + dst1 = gr.null_sink (gr.sizeof_int) + fg.connect (src1, dst1) + + def test_001e_insert_1 (self): + fg = self.fg + src1 = gr.null_source (gr.sizeof_int) + dst1 = gr.null_sink (gr.sizeof_int) + fg.connect ((src1, 0), (dst1, 0)) + + def test_002_dst_in_use (self): + fg = self.fg + src1 = gr.null_source (gr.sizeof_int) + src2 = gr.null_source (gr.sizeof_int) + dst1 = gr.null_sink (gr.sizeof_int) + fg.connect ((src1, 0), (dst1, 0)) + self.assertRaises (ValueError, + lambda : fg.connect ((src2, 0), + (dst1, 0))) + + def test_003_no_such_src_port (self): + fg = self.fg + src1 = gr.null_source (gr.sizeof_int) + dst1 = gr.null_sink (gr.sizeof_int) + self.assertRaises (ValueError, + lambda : fg.connect ((src1, 1), + (dst1, 0))) + + def test_004_no_such_dst_port (self): + fg = self.fg + src1 = gr.null_source (gr.sizeof_int) + dst1 = gr.null_sink (gr.sizeof_int) + self.assertRaises (ValueError, + lambda : fg.connect ((src1, 0), + (dst1, 1))) + + def test_005_one_src_two_dst (self): + fg = self.fg + src1 = gr.null_source (gr.sizeof_int) + dst1 = gr.null_sink (gr.sizeof_int) + dst2 = gr.null_sink (gr.sizeof_int) + fg.connect ((src1, 0), (dst1, 0)) + fg.connect ((src1, 0), (dst2, 0)) + + def test_006_check_item_sizes (self): + fg = self.fg + src1 = gr.null_source (gr.sizeof_int) + dst1 = gr.null_sink (gr.sizeof_char) + self.assertRaises (ValueError, + lambda : fg.connect ((src1, 0), + (dst1, 0))) + + def test_007_validate (self): + fg = self.fg + src1 = gr.null_source (gr.sizeof_int) + dst1 = gr.null_sink (gr.sizeof_int) + dst2 = gr.null_sink (gr.sizeof_int) + fg.connect ((src1, 0), (dst1, 0)) + fg.connect ((src1, 0), (dst2, 0)) + fg.validate () + + def test_008_validate (self): + fg = self.fg + src1 = gr.null_source (gr.sizeof_int) + nop1 = gr.nop (gr.sizeof_int) + dst1 = gr.null_sink (gr.sizeof_int) + dst2 = gr.null_sink (gr.sizeof_int) + fg.connect ((src1, 0), (nop1, 0)) + fg.connect ((src1, 0), (nop1, 1)) + fg.connect ((nop1, 0), (dst1, 0)) + fg.connect ((nop1, 1), (dst2, 0)) + fg.validate () + + def test_009_validate (self): + fg = self.fg + src1 = gr.null_source (gr.sizeof_int) + nop1 = gr.nop (gr.sizeof_int) + dst1 = gr.null_sink (gr.sizeof_int) + dst2 = gr.null_sink (gr.sizeof_int) + fg.connect ((src1, 0), (nop1, 0)) + fg.connect ((src1, 0), (nop1, 2)) + fg.connect ((nop1, 0), (dst1, 0)) + fg.connect ((nop1, 1), (dst2, 0)) + self.assertRaises (ValueError, + lambda : fg.validate ()) + + + def test_010_validate (self): + fg = self.fg + src1 = gr.null_source (gr.sizeof_int) + nop1 = gr.nop (gr.sizeof_int) + dst1 = gr.null_sink (gr.sizeof_int) + dst2 = gr.null_sink (gr.sizeof_int) + fg.connect ((src1, 0), (nop1, 0)) + fg.connect ((src1, 0), (nop1, 1)) + fg.connect ((nop1, 0), (dst1, 0)) + fg.connect ((nop1, 2), (dst2, 0)) + self.assertRaises (ValueError, + lambda : fg.validate ()) + + + def test_011_disconnect (self): + fg = self.fg + src1 = gr.null_source (gr.sizeof_int) + nop1 = gr.nop (gr.sizeof_int) + dst1 = gr.null_sink (gr.sizeof_int) + dst2 = gr.null_sink (gr.sizeof_int) + fg.connect ((src1, 0), (nop1, 0)) + fg.connect ((src1, 0), (nop1, 1)) + fg.connect ((nop1, 0), (dst1, 0)) + fg.connect ((nop1, 1), (dst2, 0)) + fg.validate () + fg.disconnect ((src1, 0), (nop1, 1)) + fg.validate () + self.assertRaises (ValueError, + lambda : fg.disconnect ((src1, 0), + (nop1, 1))) + + def test_012_connect (self): + fg = self.fg + src_data = (0, 1, 2, 3) + expected_result = (2, 3, 4, 5) + src1 = gr.vector_source_i (src_data) + op = gr.add_const_ii (2) + dst1 = gr.vector_sink_i () + fg.connect (src1, op) + fg.connect (op, dst1) + # print "edge_list:", fg.edge_list + fg.validate () + + def test_013_connect_varargs (self): + fg = self.fg + src1 = gr.null_source (gr.sizeof_int) + nop1 = gr.nop (gr.sizeof_int) + dst1 = gr.null_sink (gr.sizeof_int) + self.assertRaises (ValueError, + lambda : fg.connect ()) + self.assertRaises (ValueError, + lambda : fg.connect (src1)) + + def test_014_connect_varargs (self): + fg = self.fg + src1 = gr.null_source (gr.sizeof_int) + nop1 = gr.nop (gr.sizeof_int) + dst1 = gr.null_sink (gr.sizeof_int) + fg.connect (src1, nop1, dst1) + fg.validate () + +if __name__ == '__main__': + gr_unittest.main () + diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_cma_equalizer.py b/gnuradio-core/src/python/gnuradio/gr/qa_cma_equalizer.py new file mode 100755 index 0000000000..6b3e9aa9e3 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_cma_equalizer.py @@ -0,0 +1,29 @@ +#!/usr/bin/env python + +from gnuradio import gr, gr_unittest + +class test_cma_equalizer_fir(gr_unittest.TestCase): + + def setUp(self): + self.fg = gr.flow_graph() + + def tearDown(self): + self.fg = None + + def transform(self, src_data): + SRC = gr.vector_source_c(src_data, False) + EQU = gr.cma_equalizer_cc(4, 1.0, .001) + DST = gr.vector_sink_c() + self.fg.connect(SRC, EQU, DST) + self.fg.run() + return DST.data() + + def test_001_identity(self): + # Constant modulus signal so no adjustments + src_data = (1+0j, 0+1j, -1+0j, 0-1j)*1000 + expected_data = src_data + result = self.transform(src_data) + self.assertComplexTuplesAlmostEqual(expected_data, result) + +if __name__ == "__main__": + gr_unittest.main()
\ No newline at end of file diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_complex_to_xxx.py b/gnuradio-core/src/python/gnuradio/gr/qa_complex_to_xxx.py new file mode 100755 index 0000000000..4bc1933500 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_complex_to_xxx.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python +# +# Copyright 2004 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest +import math + +class test_complex_ops (gr_unittest.TestCase): + + def setUp (self): + self.fg = gr.flow_graph () + + def tearDown (self): + self.fg = None + + def test_complex_to_float_1 (self): + src_data = (0, 1, -1, 3+4j, -3-4j, -3+4j) + expected_result = (0, 1, -1, 3, -3, -3) + src = gr.vector_source_c (src_data) + op = gr.complex_to_float () + dst = gr.vector_sink_f () + self.fg.connect (src, op) + self.fg.connect (op, dst) + self.fg.run () # run the graph and wait for it to finish + actual_result = dst.data () # fetch the contents of the sink + self.assertFloatTuplesAlmostEqual (expected_result, actual_result) + + def test_complex_to_float_2 (self): + src_data = (0, 1, -1, 3+4j, -3-4j, -3+4j) + expected_result0 = (0, 1, -1, 3, -3, -3) + expected_result1 = (0, 0, 0, 4, -4, 4) + src = gr.vector_source_c (src_data) + op = gr.complex_to_float () + dst0 = gr.vector_sink_f () + dst1 = gr.vector_sink_f () + self.fg.connect (src, op) + self.fg.connect ((op, 0), dst0) + self.fg.connect ((op, 1), dst1) + self.fg.run () + actual_result = dst0.data () + self.assertFloatTuplesAlmostEqual (expected_result0, actual_result) + actual_result = dst1.data () + self.assertFloatTuplesAlmostEqual (expected_result1, actual_result) + + def test_complex_to_real (self): + src_data = (0, 1, -1, 3+4j, -3-4j, -3+4j) + expected_result = (0, 1, -1, 3, -3, -3) + src = gr.vector_source_c (src_data) + op = gr.complex_to_real () + dst = gr.vector_sink_f () + self.fg.connect (src, op) + self.fg.connect (op, dst) + self.fg.run () + actual_result = dst.data () + self.assertFloatTuplesAlmostEqual (expected_result, actual_result) + + def test_complex_to_imag (self): + src_data = (0, 1, -1, 3+4j, -3-4j, -3+4j) + expected_result = (0, 0, 0, 4, -4, 4) + src = gr.vector_source_c (src_data) + op = gr.complex_to_imag () + dst = gr.vector_sink_f () + self.fg.connect (src, op) + self.fg.connect (op, dst) + self.fg.run () + actual_result = dst.data () + self.assertFloatTuplesAlmostEqual (expected_result, actual_result,5) + + def test_complex_to_mag (self): + src_data = (0, 1, -1, 3+4j, -3-4j, -3+4j) + expected_result = (0, 1, 1, 5, 5, 5) + src = gr.vector_source_c (src_data) + op = gr.complex_to_mag () + dst = gr.vector_sink_f () + self.fg.connect (src, op) + self.fg.connect (op, dst) + self.fg.run () + actual_result = dst.data () + self.assertFloatTuplesAlmostEqual (expected_result, actual_result,5) + + def test_complex_to_arg (self): + pi = math.pi + expected_result = (0, pi/6, pi/4, pi/2, 3*pi/4, 7*pi/8, + -pi/6, -pi/4, -pi/2, -3*pi/4, -7*pi/8) + src_data = tuple ([math.cos (x) + math.sin (x) * 1j for x in expected_result]) + src = gr.vector_source_c (src_data) + op = gr.complex_to_arg () + dst = gr.vector_sink_f () + self.fg.connect (src, op) + self.fg.connect (op, dst) + self.fg.run () + actual_result = dst.data () + self.assertFloatTuplesAlmostEqual (expected_result, actual_result, 5) + + +if __name__ == '__main__': + gr_unittest.main () + diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_constellation_decoder_cb.py b/gnuradio-core/src/python/gnuradio/gr/qa_constellation_decoder_cb.py new file mode 100755 index 0000000000..9564122283 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_constellation_decoder_cb.py @@ -0,0 +1,53 @@ +#!/usr/bin/env python +# +# Copyright 2004 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest +import math + +class test_head (gr_unittest.TestCase): + + def setUp (self): + self.fg = gr.flow_graph () + + def tearDown (self): + self.fg = None + + def test_constellation_decoder_cb (self): + symbol_positions = [1 + 0j, 0 + 1j , -1 + 0j, 0 - 1j] + symbol_values_out = [0, 1, 2, 3] + expected_result = ( 0, 3, 2, 1, 0, 0, 3) + src_data = (0.5 + 0j, 0.1 - 1.2j, -0.8 - 0.1j, -0.45 + 0.8j, 0.8 - 0j, 0.5 + 0j, 0.1 - 1.2j) + src = gr.vector_source_c (src_data) + op = gr.constellation_decoder_cb (symbol_positions, symbol_values_out) + dst = gr.vector_sink_b () + self.fg.connect (src, op) + self.fg.connect (op, dst) + self.fg.run () # run the graph and wait for it to finish + actual_result = dst.data () # fetch the contents of the sink + #print "actual result", actual_result + #print "expected result", expected_result + self.assertFloatTuplesAlmostEqual (expected_result, actual_result) + + +if __name__ == '__main__': + gr_unittest.main () + diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_correlate_access_code.py b/gnuradio-core/src/python/gnuradio/gr/qa_correlate_access_code.py new file mode 100755 index 0000000000..89b4909ebd --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_correlate_access_code.py @@ -0,0 +1,83 @@ +#!/usr/bin/env python +# +# Copyright 2006 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest +import math + +default_access_code = '\xAC\xDD\xA4\xE2\xF2\x8C\x20\xFC' + +def string_to_1_0_list(s): + r = [] + for ch in s: + x = ord(ch) + for i in range(8): + t = (x >> i) & 0x1 + r.append(t) + + return r + +def to_1_0_string(L): + return ''.join(map(lambda x: chr(x + ord('0')), L)) + +class test_correlate_access_code(gr_unittest.TestCase): + + def setUp(self): + self.fg = gr.flow_graph() + + def tearDown(self): + self.fg = None + + def test_001(self): + pad = (0,) * 64 + # 0 0 0 1 0 0 0 1 + src_data = (1, 0, 1, 1, 1, 1, 0, 1, 1) + pad + (0,) * 7 + expected_result = pad + (1, 0, 1, 1, 3, 1, 0, 1, 1, 2) + (0,) * 6 + src = gr.vector_source_b (src_data) + op = gr.correlate_access_code_bb("1011", 0) + dst = gr.vector_sink_b () + self.fg.connect (src, op, dst) + self.fg.run () + result_data = dst.data () + self.assertEqual (expected_result, result_data) + + + def test_002(self): + code = tuple(string_to_1_0_list(default_access_code)) + access_code = to_1_0_string(code) + pad = (0,) * 64 + #print code + #print access_code + src_data = code + (1, 0, 1, 1) + pad + expected_result = pad + code + (3, 0, 1, 1) + src = gr.vector_source_b (src_data) + op = gr.correlate_access_code_bb(access_code, 0) + dst = gr.vector_sink_b () + self.fg.connect (src, op, dst) + self.fg.run () + result_data = dst.data () + self.assertEqual (expected_result, result_data) + + + +if __name__ == '__main__': + gr_unittest.main () + diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_diff_encoder.py b/gnuradio-core/src/python/gnuradio/gr/qa_diff_encoder.py new file mode 100755 index 0000000000..44840709be --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_diff_encoder.py @@ -0,0 +1,86 @@ +#!/usr/bin/env python +# +# Copyright 2006 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest +import math +import random + +def make_random_int_tuple(L, min, max): + result = [] + for x in range(L): + result.append(random.randint(min, max)) + return tuple(result) + + +class test_encoder (gr_unittest.TestCase): + + def setUp (self): + self.fg = gr.flow_graph () + + def tearDown (self): + self.fg = None + + def test_diff_encdec_000(self): + random.seed(0) + modulus = 2 + src_data = make_random_int_tuple(1000, 0, modulus-1) + expected_result = src_data + src = gr.vector_source_b(src_data) + enc = gr.diff_encoder_bb(modulus) + dec = gr.diff_decoder_bb(modulus) + dst = gr.vector_sink_b() + self.fg.connect(src, enc, dec, dst) + self.fg.run() # run the graph and wait for it to finish + actual_result = dst.data() # fetch the contents of the sink + self.assertEqual(expected_result, actual_result) + + def test_diff_encdec_001(self): + random.seed(0) + modulus = 4 + src_data = make_random_int_tuple(1000, 0, modulus-1) + expected_result = src_data + src = gr.vector_source_b(src_data) + enc = gr.diff_encoder_bb(modulus) + dec = gr.diff_decoder_bb(modulus) + dst = gr.vector_sink_b() + self.fg.connect(src, enc, dec, dst) + self.fg.run() # run the graph and wait for it to finish + actual_result = dst.data() # fetch the contents of the sink + self.assertEqual(expected_result, actual_result) + + def test_diff_encdec_002(self): + random.seed(0) + modulus = 8 + src_data = make_random_int_tuple(40000, 0, modulus-1) + expected_result = src_data + src = gr.vector_source_b(src_data) + enc = gr.diff_encoder_bb(modulus) + dec = gr.diff_decoder_bb(modulus) + dst = gr.vector_sink_b() + self.fg.connect(src, enc, dec, dst) + self.fg.run() # run the graph and wait for it to finish + actual_result = dst.data() # fetch the contents of the sink + self.assertEqual(expected_result, actual_result) + +if __name__ == '__main__': + gr_unittest.main () + diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_diff_phasor_cc.py b/gnuradio-core/src/python/gnuradio/gr/qa_diff_phasor_cc.py new file mode 100755 index 0000000000..0b71602021 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_diff_phasor_cc.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python +# +# Copyright 2004 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest +import math + +class test_complex_ops (gr_unittest.TestCase): + + def setUp (self): + self.fg = gr.flow_graph () + + def tearDown (self): + self.fg = None + + def test_diff_phasor_cc (self): + src_data = (0+0j, 1+0j, -1+0j, 3+4j, -3-4j, -3+4j) + expected_result = (0+0j, 0+0j, -1+0j, -3-4j, -25+0j, -7-24j) + src = gr.vector_source_c (src_data) + op = gr.diff_phasor_cc () + dst = gr.vector_sink_c () + self.fg.connect (src, op) + self.fg.connect (op, dst) + self.fg.run () # run the graph and wait for it to finish + actual_result = dst.data () # fetch the contents of the sink + self.assertComplexTuplesAlmostEqual (expected_result, actual_result) + + + +if __name__ == '__main__': + gr_unittest.main () + diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_feval.py b/gnuradio-core/src/python/gnuradio/gr/qa_feval.py new file mode 100755 index 0000000000..1de49369a3 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_feval.py @@ -0,0 +1,92 @@ +#!/usr/bin/env python +# +# Copyright 2006 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest + +class my_add2_dd(gr.feval_dd): + def eval(self, x): + return x + 2 + +class my_add2_ll(gr.feval_ll): + def eval(self, x): + return x + 2 + +class my_add2_cc(gr.feval_cc): + def eval(self, x): + return x + (2 - 2j) + + +class test_feval(gr_unittest.TestCase): + + def test_dd_1(self): + f = my_add2_dd() + src_data = (0.0, 1.0, 2.0, 3.0, 4.0) + expected_result = (2.0, 3.0, 4.0, 5.0, 6.0) + # this is all in python... + actual_result = tuple([f.eval(x) for x in src_data]) + self.assertEqual(expected_result, actual_result) + + def test_dd_2(self): + f = my_add2_dd() + src_data = (0.0, 1.0, 2.0, 3.0, 4.0) + expected_result = (2.0, 3.0, 4.0, 5.0, 6.0) + # this is python -> C++ -> python and back again... + actual_result = tuple([gr.feval_dd_example(f, x) for x in src_data]) + self.assertEqual(expected_result, actual_result) + + + def test_ll_1(self): + f = my_add2_ll() + src_data = (0, 1, 2, 3, 4) + expected_result = (2, 3, 4, 5, 6) + # this is all in python... + actual_result = tuple([f.eval(x) for x in src_data]) + self.assertEqual(expected_result, actual_result) + + def test_ll_2(self): + f = my_add2_ll() + src_data = (0, 1, 2, 3, 4) + expected_result = (2, 3, 4, 5, 6) + # this is python -> C++ -> python and back again... + actual_result = tuple([gr.feval_ll_example(f, x) for x in src_data]) + self.assertEqual(expected_result, actual_result) + + + def test_cc_1(self): + f = my_add2_cc() + src_data = (0+1j, 2+3j, 4+5j, 6+7j) + expected_result = (2-1j, 4+1j, 6+3j, 8+5j) + # this is all in python... + actual_result = tuple([f.eval(x) for x in src_data]) + self.assertEqual(expected_result, actual_result) + + def test_cc_2(self): + f = my_add2_cc() + src_data = (0+1j, 2+3j, 4+5j, 6+7j) + expected_result = (2-1j, 4+1j, 6+3j, 8+5j) + # this is python -> C++ -> python and back again... + actual_result = tuple([gr.feval_cc_example(f, x) for x in src_data]) + self.assertEqual(expected_result, actual_result) + + +if __name__ == '__main__': + gr_unittest.main () diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_fft_filter.py b/gnuradio-core/src/python/gnuradio/gr/qa_fft_filter.py new file mode 100755 index 0000000000..cb2e76b181 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_fft_filter.py @@ -0,0 +1,268 @@ +#!/usr/bin/env python +# +# Copyright 2004,2005 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest +import sys +import random + +def make_random_complex_tuple(L): + result = [] + for x in range(L): + result.append(complex(random.uniform(-1000,1000), + random.uniform(-1000,1000))) + return tuple(result) + +def make_random_float_tuple(L): + result = [] + for x in range(L): + result.append(float(int(random.uniform(-1000,1000)))) + return tuple(result) + + +def reference_filter_ccc(dec, taps, input): + """ + compute result using conventional fir filter + """ + fg = gr.flow_graph() + #src = gr.vector_source_c(((0,) * (len(taps) - 1)) + input) + src = gr.vector_source_c(input) + op = gr.fir_filter_ccc(dec, taps) + dst = gr.vector_sink_c() + fg.connect(src, op, dst) + fg.run() + return dst.data() + +def reference_filter_fff(dec, taps, input): + """ + compute result using conventional fir filter + """ + fg = gr.flow_graph() + #src = gr.vector_source_f(((0,) * (len(taps) - 1)) + input) + src = gr.vector_source_f(input) + op = gr.fir_filter_fff(dec, taps) + dst = gr.vector_sink_f() + fg.connect(src, op, dst) + fg.run() + return dst.data() + + +def print_complex(x): + for i in x: + i = complex(i) + sys.stdout.write("(%6.3f,%6.3fj), " % (i.real, i.imag)) + sys.stdout.write('\n') + + +class test_fft_filter(gr_unittest.TestCase): + + def setUp(self): + self.fg = gr.flow_graph () + + def tearDown(self): + self.fg = None + + def assert_fft_ok2(self, expected_result, result_data): + expected_result = expected_result[:len(result_data)] + self.assertComplexTuplesAlmostEqual2 (expected_result, result_data, + abs_eps=1e-9, rel_eps=4e-4) + + def assert_fft_float_ok2(self, expected_result, result_data, abs_eps=1e-9, rel_eps=4e-4): + expected_result = expected_result[:len(result_data)] + self.assertFloatTuplesAlmostEqual2 (expected_result, result_data, + abs_eps, rel_eps) + + #def test_ccc_000(self): + # self.assertRaises (RuntimeError, gr.fft_filter_ccc, 2, (1,)) + + def test_ccc_001(self): + src_data = (0,1,2,3,4,5,6,7) + taps = (1,) + expected_result = tuple([complex(x) for x in (0,1,2,3,4,5,6,7)]) + src = gr.vector_source_c(src_data) + op = gr.fft_filter_ccc(1, taps) + dst = gr.vector_sink_c() + self.fg.connect(src, op, dst) + self.fg.run() + result_data = dst.data() + #print 'expected:', expected_result + #print 'results: ', result_data + self.assertComplexTuplesAlmostEqual (expected_result, result_data, 5) + + + def test_ccc_002(self): + src_data = (0,1,2,3,4,5,6,7) + taps = (2,) + expected_result = tuple([2 * complex(x) for x in (0,1,2,3,4,5,6,7)]) + src = gr.vector_source_c(src_data) + op = gr.fft_filter_ccc(1, taps) + dst = gr.vector_sink_c() + self.fg.connect(src, op, dst) + self.fg.run() + result_data = dst.data() + #print 'expected:', expected_result + #print 'results: ', result_data + self.assertComplexTuplesAlmostEqual (expected_result, result_data, 5) + + def test_ccc_004(self): + random.seed(0) + for i in xrange(25): + # sys.stderr.write("\n>>> Loop = %d\n" % (i,)) + src_len = 4*1024 + src_data = make_random_complex_tuple(src_len) + ntaps = int(random.uniform(2, 1000)) + taps = make_random_complex_tuple(ntaps) + expected_result = reference_filter_ccc(1, taps, src_data) + + src = gr.vector_source_c(src_data) + op = gr.fft_filter_ccc(1, taps) + dst = gr.vector_sink_c() + self.fg.connect(src, op, dst) + self.fg.run() + result_data = dst.data() + + self.assert_fft_ok2(expected_result, result_data) + + def test_ccc_005(self): + random.seed(0) + for i in xrange(25): + # sys.stderr.write("\n>>> Loop = %d\n" % (i,)) + dec = i + 1 + src_len = 4*1024 + src_data = make_random_complex_tuple(src_len) + ntaps = int(random.uniform(2, 100)) + taps = make_random_complex_tuple(ntaps) + expected_result = reference_filter_ccc(dec, taps, src_data) + + src = gr.vector_source_c(src_data) + op = gr.fft_filter_ccc(dec, taps) + dst = gr.vector_sink_c() + self.fg.connect(src, op, dst) + self.fg.run() + result_data = dst.data() + + self.assert_fft_ok2(expected_result, result_data) + + # ---------------------------------------------------------------- + # test _fff version + # ---------------------------------------------------------------- + + def test_fff_001(self): + src_data = (0,1,2,3,4,5,6,7) + taps = (1,) + expected_result = tuple([float(x) for x in (0,1,2,3,4,5,6,7)]) + src = gr.vector_source_f(src_data) + op = gr.fft_filter_fff(1, taps) + dst = gr.vector_sink_f() + self.fg.connect(src, op, dst) + self.fg.run() + result_data = dst.data() + #print 'expected:', expected_result + #print 'results: ', result_data + self.assertFloatTuplesAlmostEqual (expected_result, result_data, 5) + + + def test_fff_002(self): + src_data = (0,1,2,3,4,5,6,7) + taps = (2,) + expected_result = tuple([2 * float(x) for x in (0,1,2,3,4,5,6,7)]) + src = gr.vector_source_f(src_data) + op = gr.fft_filter_fff(1, taps) + dst = gr.vector_sink_f() + self.fg.connect(src, op, dst) + self.fg.run() + result_data = dst.data() + #print 'expected:', expected_result + #print 'results: ', result_data + self.assertFloatTuplesAlmostEqual (expected_result, result_data, 5) + + def xtest_fff_003(self): + random.seed(0) + for i in xrange(25): + sys.stderr.write("\n>>> Loop = %d\n" % (i,)) + src_len = 4096 + src_data = make_random_float_tuple(src_len) + ntaps = int(random.uniform(2, 1000)) + taps = make_random_float_tuple(ntaps) + expected_result = reference_filter_fff(1, taps, src_data) + + src = gr.vector_source_f(src_data) + op = gr.fft_filter_fff(1, taps) + dst = gr.vector_sink_f() + self.fg.connect(src, op, dst) + self.fg.run() + result_data = dst.data() + + #print "src_len =", src_len, " ntaps =", ntaps + try: + self.assert_fft_float_ok2(expected_result, result_data, abs_eps=1.0) + except: + expected = open('expected', 'w') + for x in expected_result: + expected.write(`x` + '\n') + actual = open('actual', 'w') + for x in result_data: + actual.write(`x` + '\n') + raise + + def xtest_fff_004(self): + random.seed(0) + for i in xrange(25): + sys.stderr.write("\n>>> Loop = %d\n" % (i,)) + src_len = 4*1024 + src_data = make_random_float_tuple(src_len) + ntaps = int(random.uniform(2, 1000)) + taps = make_random_float_tuple(ntaps) + expected_result = reference_filter_fff(1, taps, src_data) + + src = gr.vector_source_f(src_data) + op = gr.fft_filter_fff(1, taps) + dst = gr.vector_sink_f() + self.fg.connect(src, op, dst) + self.fg.run() + result_data = dst.data() + + self.assert_fft_float_ok2(expected_result, result_data, abs_eps=2.0) + + def xtest_fff_005(self): + random.seed(0) + for i in xrange(25): + sys.stderr.write("\n>>> Loop = %d\n" % (i,)) + dec = i + 1 + src_len = 4*1024 + src_data = make_random_float_tuple(src_len) + ntaps = int(random.uniform(2, 100)) + taps = make_random_float_tuple(ntaps) + expected_result = reference_filter_fff(dec, taps, src_data) + + src = gr.vector_source_f(src_data) + op = gr.fft_filter_fff(dec, taps) + dst = gr.vector_sink_f() + self.fg.connect(src, op, dst) + self.fg.run() + result_data = dst.data() + + self.assert_fft_float_ok2(expected_result, result_data) + + +if __name__ == '__main__': + gr_unittest.main () + diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_filter_delay_fc.py b/gnuradio-core/src/python/gnuradio/gr/qa_filter_delay_fc.py new file mode 100755 index 0000000000..191552ca27 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_filter_delay_fc.py @@ -0,0 +1,317 @@ +#!/usr/bin/env python +# +# Copyright 2004 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest +import math + +class qa_filter_delay_fc (gr_unittest.TestCase): + + def setUp (self): + self.fg = gr.flow_graph () + + def tearDown (self): + self.fg = None + + def test_001_filter_delay_one_input (self): + + # expected result + expected_result = ( -1.4678005338941702e-11j, + -0.0011950774351134896j, + -0.0019336787518113852j, + -0.0034673355985432863j, + -0.0036765895783901215j, + -0.004916108213365078j, + -0.0042778430506587029j, + -0.006028641015291214j, + -0.005476709920912981j, + -0.0092810001224279404j, + -0.0095402700826525688j, + -0.016060983762145042j, + -0.016446959227323532j, + -0.02523401565849781j, + -0.024382550269365311j, + -0.035477779805660248j, + -0.033021725714206696j, + -0.048487484455108643j, + -0.04543270543217659j, + -0.069477587938308716j, + -0.066984444856643677j, + -0.10703597217798233j, + -0.10620346665382385j, + -0.1852707713842392j, + -0.19357112050056458j, + (7.2191945754696007e-09 -0.50004088878631592j), + (0.58778399229049683 -0.6155126690864563j), + (0.95105588436126709 -0.12377222627401352j), + (0.95105588436126709 +0.41524654626846313j), + (0.5877838134765625 +0.91611981391906738j), + (5.8516356205018383e-09 +1.0670661926269531j), + (-0.5877840518951416 +0.87856143712997437j), + (-0.95105588436126709 +0.35447561740875244j), + (-0.95105588436126709 -0.26055556535720825j), + (-0.5877838134765625 -0.77606213092803955j), + (-8.7774534307527574e-09 -0.96460390090942383j), + (0.58778399229049683 -0.78470128774642944j), + (0.95105588436126709 -0.28380891680717468j), + (0.95105588436126709 +0.32548999786376953j), + (0.5877838134765625 +0.82514488697052002j), + (1.4629089051254596e-08 +1.0096219778060913j), + (-0.5877840518951416 +0.81836479902267456j), + (-0.95105588436126709 +0.31451958417892456j), + (-0.95105588436126709 -0.3030143678188324j), + (-0.5877838134765625 -0.80480599403381348j), + (-1.7554906861505515e-08 -0.99516552686691284j), + (0.58778399229049683 -0.80540722608566284j), + (0.95105582475662231 -0.30557557940483093j), + (0.95105588436126709 +0.31097668409347534j), + (0.5877838134765625 +0.81027895212173462j), + (2.3406542482007353e-08 +1.0000816583633423j), + (-0.5877840518951416 +0.80908381938934326j), + (-0.95105588436126709 +0.30904293060302734j), + (-0.95105588436126709 -0.30904296040534973j), + (-0.5877838134765625 -0.80908387899398804j), + (-2.6332360292258272e-08 -1.0000815391540527j), + (0.58778399229049683 -0.80908381938934326j), + (0.95105582475662231 -0.30904299020767212j), + (0.95105588436126709 +0.30904293060302734j), + (0.5877838134765625 +0.80908381938934326j), + (3.218399768911695e-08 +1.0000815391540527j)) + + fg = self.fg + + sampling_freq = 100 + + ntaps = 51 + src1 = gr.sig_source_f (sampling_freq, gr.GR_SIN_WAVE, + sampling_freq * 0.10, 1.0) + head = gr.head (gr.sizeof_float, int (ntaps + sampling_freq * 0.10)) + dst2 = gr.vector_sink_c () + + # calculate taps + taps = gr.firdes_hilbert (ntaps) + hd = gr.filter_delay_fc (taps) + + fg.connect (src1, head) + fg.connect (head, hd) + fg.connect (hd,dst2) + + fg.run () + + # get output + result_data = dst2.data () + self.assertComplexTuplesAlmostEqual (expected_result, result_data, 5) + + def test_002_filter_delay_two_inputs (self): + + # giving the same signal to both the inputs should fetch the same results + # as above + + # expected result + expected_result = ( -1.4678005338941702e-11j, + -0.0011950774351134896j, + -0.0019336787518113852j, + -0.0034673355985432863j, + -0.0036765895783901215j, + -0.004916108213365078j, + -0.0042778430506587029j, + -0.006028641015291214j, + -0.005476709920912981j, + -0.0092810001224279404j, + -0.0095402700826525688j, + -0.016060983762145042j, + -0.016446959227323532j, + -0.02523401565849781j, + -0.024382550269365311j, + -0.035477779805660248j, + -0.033021725714206696j, + -0.048487484455108643j, + -0.04543270543217659j, + -0.069477587938308716j, + -0.066984444856643677j, + -0.10703597217798233j, + -0.10620346665382385j, + -0.1852707713842392j, + -0.19357112050056458j, + (7.2191945754696007e-09 -0.50004088878631592j), + (0.58778399229049683 -0.6155126690864563j), + (0.95105588436126709 -0.12377222627401352j), + (0.95105588436126709 +0.41524654626846313j), + (0.5877838134765625 +0.91611981391906738j), + (5.8516356205018383e-09 +1.0670661926269531j), + (-0.5877840518951416 +0.87856143712997437j), + (-0.95105588436126709 +0.35447561740875244j), + (-0.95105588436126709 -0.26055556535720825j), + (-0.5877838134765625 -0.77606213092803955j), + (-8.7774534307527574e-09 -0.96460390090942383j), + (0.58778399229049683 -0.78470128774642944j), + (0.95105588436126709 -0.28380891680717468j), + (0.95105588436126709 +0.32548999786376953j), + (0.5877838134765625 +0.82514488697052002j), + (1.4629089051254596e-08 +1.0096219778060913j), + (-0.5877840518951416 +0.81836479902267456j), + (-0.95105588436126709 +0.31451958417892456j), + (-0.95105588436126709 -0.3030143678188324j), + (-0.5877838134765625 -0.80480599403381348j), + (-1.7554906861505515e-08 -0.99516552686691284j), + (0.58778399229049683 -0.80540722608566284j), + (0.95105582475662231 -0.30557557940483093j), + (0.95105588436126709 +0.31097668409347534j), + (0.5877838134765625 +0.81027895212173462j), + (2.3406542482007353e-08 +1.0000816583633423j), + (-0.5877840518951416 +0.80908381938934326j), + (-0.95105588436126709 +0.30904293060302734j), + (-0.95105588436126709 -0.30904296040534973j), + (-0.5877838134765625 -0.80908387899398804j), + (-2.6332360292258272e-08 -1.0000815391540527j), + (0.58778399229049683 -0.80908381938934326j), + (0.95105582475662231 -0.30904299020767212j), + (0.95105588436126709 +0.30904293060302734j), + (0.5877838134765625 +0.80908381938934326j), + (3.218399768911695e-08 +1.0000815391540527j)) + + + fg = self.fg + + sampling_freq = 100 + ntaps = 51 + src1 = gr.sig_source_f (sampling_freq, gr.GR_SIN_WAVE, + sampling_freq * 0.10, 1.0) + head = gr.head (gr.sizeof_float, int (ntaps + sampling_freq * 0.10)) + dst2 = gr.vector_sink_c () + + + # calculate taps + taps = gr.firdes_hilbert (ntaps) + hd = gr.filter_delay_fc (taps) + + fg.connect (src1, head) + fg.connect (head, (hd,0)) + fg.connect (head, (hd,1)) + fg.connect (hd,dst2) + fg.run () + + # get output + result_data = dst2.data () + + self.assertComplexTuplesAlmostEqual (expected_result, result_data, 5) + + + def test_003_filter_delay_two_inputs (self): + + # give two different inputs + + # expected result + expected_result = ( -0.0020331963896751404j, + -0.0016448829555884004j, + -0.0032375147566199303j, + -0.0014826074475422502j, + -0.0033034090884029865j, + -0.00051144487224519253j, + -0.0043686260469257832j, + -0.0010198024101555347j, + -0.0082517862319946289j, + -0.003456643782556057j, + -0.014193611219525337j, + -0.005875137634575367j, + -0.020293503999710083j, + -0.0067503536120057106j, + -0.026798896491527557j, + -0.0073488112539052963j, + -0.037041611969470978j, + -0.010557252913713455j, + -0.055669989436864853j, + -0.018332764506340027j, + -0.089904911816120148j, + -0.033361352980136871j, + -0.16902604699134827j, + -0.074318811297416687j, + -0.58429563045501709j, + (7.2191945754696007e-09 -0.35892376303672791j), + (0.58778399229049683 +0.63660913705825806j), + (0.95105588436126709 +0.87681591510772705j), + (0.95105588436126709 +0.98705857992172241j), + (0.5877838134765625 +0.55447429418563843j), + (5.8516356205018383e-09 +0.026006083935499191j), + (-0.5877840518951416 -0.60616838932037354j), + (-0.95105588436126709 -0.9311758279800415j), + (-0.95105588436126709 -0.96169203519821167j), + (-0.5877838134765625 -0.57292771339416504j), + (-8.7774534307527574e-09 -0.0073488391935825348j), + (0.58778399229049683 +0.59720659255981445j), + (0.95105588436126709 +0.94438445568084717j), + (0.95105588436126709 +0.95582199096679688j), + (0.5877838134765625 +0.58196049928665161j), + (1.4629089051254596e-08 +0.0026587247848510742j), + (-0.5877840518951416 -0.59129220247268677j), + (-0.95105588436126709 -0.94841635227203369j), + (-0.95105588436126709 -0.95215457677841187j), + (-0.5877838134765625 -0.58535969257354736j), + (-1.7554906861505515e-08 -0.00051158666610717773j), + (0.58778399229049683 +0.58867418766021729j), + (0.95105582475662231 +0.94965213537216187j), + (0.95105588436126709 +0.95050644874572754j), + (0.5877838134765625 +0.58619076013565063j), + (2.3406542482007353e-08 +1.1920928955078125e-07j), + (-0.5877840518951416 -0.58783555030822754j), + (-0.95105588436126709 -0.95113480091094971j), + (-0.95105588436126709 -0.95113474130630493j), + (-0.5877838134765625 -0.58783555030822754j), + (-2.6332360292258272e-08 -8.1956386566162109e-08j), + (0.58778399229049683 +0.58783555030822754j), + (0.95105582475662231 +0.95113474130630493j), + (0.95105588436126709 +0.95113474130630493j), + (0.5877838134765625 +0.58783560991287231j), + (3.218399768911695e-08 +1.1920928955078125e-07j)) + + fg = self.fg + + sampling_freq = 100 + ntaps = 51 + + src1 = gr.sig_source_f (sampling_freq, gr.GR_SIN_WAVE,sampling_freq * 0.10, 1.0) + src2 = gr.sig_source_f (sampling_freq, gr.GR_COS_WAVE,sampling_freq * 0.10, 1.0) + + head1 = gr.head (gr.sizeof_float, int (ntaps + sampling_freq * 0.10)) + head2 = gr.head (gr.sizeof_float, int (ntaps + sampling_freq * 0.10)) + + taps = gr.firdes_hilbert (ntaps) + hd = gr.filter_delay_fc (taps) + + dst2 = gr.vector_sink_c () + + fg.connect (src1, head1) + fg.connect (src2, head2) + + fg.connect (head1, (hd,0)) + fg.connect (head2, (hd,1)) + fg.connect (hd, dst2) + + fg.run () + + # get output + result_data = dst2.data () + + self.assertComplexTuplesAlmostEqual (expected_result, result_data, 5) + + +if __name__ == '__main__': + gr_unittest.main () diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_flow_graph.py b/gnuradio-core/src/python/gnuradio/gr/qa_flow_graph.py new file mode 100755 index 0000000000..455cc13590 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_flow_graph.py @@ -0,0 +1,356 @@ +#!/usr/bin/env python +# +# Copyright 2004 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest +import qa_basic_flow_graph + + +def all_counts (): + return (gr.block_ncurrently_allocated (), + gr.block_detail_ncurrently_allocated (), + gr.buffer_ncurrently_allocated (), + gr.buffer_reader_ncurrently_allocated ()) + + +class wrap_add(gr.hier_block): + def __init__(self, fg, a): + add = gr.add_const_ii (a) + gr.hier_block.__init__ (self, fg, add, add) + +class mult_add(gr.hier_block): + def __init__(self, fg, m, a): + mult = gr.multiply_const_ii (m) + add = gr.add_const_ii (a) + fg.connect (mult, add) + gr.hier_block.__init__ (self, fg, mult, add) + + +class test_flow_graph (qa_basic_flow_graph.test_basic_flow_graph): + + def setUp (self): + ''' override qa_basic_flow_graph setUp in order to use our class''' + self.fg = gr.flow_graph () + + def tearDown (self): + qa_basic_flow_graph.test_basic_flow_graph.tearDown (self) + + + # we inherit all their tests, so we can be sure we don't break + # any of the underlying code + + + def leak_check (self, fct): + begin = all_counts () + fct () + # tear down early so we can check for leaks + self.tearDown () + end = all_counts () + self.assertEqual (begin, end) + + def test_100_tsort_null (self): + self.assertEqual ([], self.fg.topological_sort (self.fg.all_blocks ())) + + def test_101_tsort_two (self): + fg = self.fg + src1 = gr.null_source (gr.sizeof_int) + dst1 = gr.null_sink (gr.sizeof_int) + fg.connect ((src1, 0), (dst1, 0)) + fg.validate () + self.assertEqual ([src1, dst1], fg.topological_sort (fg.all_blocks ())) + + def test_102_tsort_three_a (self): + fg = self.fg + src1 = gr.null_source (gr.sizeof_int) + nop1 = gr.nop (gr.sizeof_int) + dst1 = gr.null_sink (gr.sizeof_int) + fg.connect (src1, nop1) + fg.connect (nop1, dst1) + fg.validate () + self.assertEqual ([src1, nop1, dst1], fg.topological_sort (fg.all_blocks ())) + + def test_103_tsort_three_b (self): + fg = self.fg + src1 = gr.null_source (gr.sizeof_int) + nop1 = gr.nop (gr.sizeof_int) + dst1 = gr.null_sink (gr.sizeof_int) + fg.connect (nop1, dst1) + fg.connect (src1, nop1) + fg.validate () + self.assertEqual ([src1, nop1, dst1], fg.topological_sort (fg.all_blocks ())) + + def test_104_trivial_dag_check (self): + fg = self.fg + nop1 = gr.nop (gr.sizeof_int) + fg.connect (nop1, nop1) + fg.validate () + self.assertRaises (gr.NotDAG, + lambda : fg.topological_sort (fg.all_blocks ())) + + def test_105 (self): + fg = self.fg + src1 = gr.null_source (gr.sizeof_int) + src2 = gr.null_source (gr.sizeof_int) + nop1 = gr.nop (gr.sizeof_int) + nop2 = gr.nop (gr.sizeof_int) + nop3 = gr.nop (gr.sizeof_int) + dst1 = gr.null_sink (gr.sizeof_int) + + fg.connect (src1, nop1) + fg.connect (src2, nop2) + fg.connect (nop1, (nop3, 0)) + fg.connect (nop2, (nop3, 1)) + fg.connect (nop3, dst1) + fg.validate () + ts = fg.topological_sort (fg.all_blocks ()) + self.assertEqual ([src2, nop2, src1, nop1, nop3, dst1], ts) + + + def test_106 (self): + fg = self.fg + src1 = gr.null_source (gr.sizeof_int) + src2 = gr.null_source (gr.sizeof_int) + nop1 = gr.nop (gr.sizeof_int) + nop2 = gr.nop (gr.sizeof_int) + nop3 = gr.nop (gr.sizeof_int) + dst1 = gr.null_sink (gr.sizeof_int) + + fg.connect (nop3, dst1) + fg.connect (nop2, (nop3, 1)) + fg.connect (nop1, (nop3, 0)) + fg.connect (src2, nop2) + fg.connect (src1, nop1) + fg.validate () + ts = fg.topological_sort (fg.all_blocks ()) + self.assertEqual ([src2, nop2, src1, nop1, nop3, dst1], ts) + + def test_107 (self): + fg = self.fg + src1 = gr.null_source (gr.sizeof_int) + src2 = gr.null_source (gr.sizeof_int) + nop1 = gr.nop (gr.sizeof_int) + nop2 = gr.nop (gr.sizeof_int) + dst1 = gr.null_sink (gr.sizeof_int) + dst2 = gr.null_sink (gr.sizeof_int) + + fg.connect (src1, nop1) + fg.connect (nop1, dst1) + fg.connect (src2, nop2) + fg.connect (nop2, dst2) + fg.validate () + ts = fg.topological_sort (fg.all_blocks ()) + self.assertEqual ([src2, nop2, dst2, src1, nop1, dst1], ts) + + def test_108 (self): + self.leak_check (self.body_108) + + def body_108 (self): + fg = self.fg + src1 = gr.null_source (gr.sizeof_int) + src2 = gr.null_source (gr.sizeof_int) + nop1 = gr.nop (gr.sizeof_int) + nop2 = gr.nop (gr.sizeof_int) + dst1 = gr.null_sink (gr.sizeof_int) + dst2 = gr.null_sink (gr.sizeof_int) + + fg.connect (nop2, dst2) + fg.connect (src1, nop1) + fg.connect (src2, nop2) + fg.connect (nop1, dst1) + fg.validate () + ts = fg.topological_sort (fg.all_blocks ()) + self.assertEqual ([src2, nop2, dst2, src1, nop1, dst1], ts) + self.assertEqual ((6,0,0,0), all_counts ()) + + def test_109__setup_connections (self): + self.leak_check (self.body_109) + + def body_109 (self): + fg = self.fg + src1 = gr.null_source (gr.sizeof_int) + dst1 = gr.null_sink (gr.sizeof_int) + fg.connect (src1, dst1) + fg._setup_connections () + self.assertEqual ((2,2,1,1), all_counts ()) + + def test_110_scheduler (self): + self.leak_check (self.body_110) + + def body_110 (self): + fg = self.fg + src_data = (0, 1, 2, 3) + src1 = gr.vector_source_i (src_data) + dst1 = gr.vector_sink_i () + fg.connect ((src1, 0), (dst1, 0)) + fg.run () + dst_data = dst1.data () + self.assertEqual (src_data, dst_data) + + def test_111_scheduler (self): + self.leak_check (self.body_111) + + def body_111 (self): + fg = self.fg + src_data = (0, 1, 2, 3) + expected_result = (2, 3, 4, 5) + src1 = gr.vector_source_i (src_data) + op = gr.add_const_ii (2) + dst1 = gr.vector_sink_i () + fg.connect (src1, op) + fg.connect (op, dst1) + fg.run () + dst_data = dst1.data () + self.assertEqual (expected_result, dst_data) + + def test_111v_scheduler (self): + self.leak_check (self.body_111v) + + def body_111v (self): + fg = self.fg + src_data = (0, 1, 2, 3) + expected_result = (2, 3, 4, 5) + src1 = gr.vector_source_i (src_data) + op = gr.add_const_ii (2) + dst1 = gr.vector_sink_i () + fg.connect (src1, op, dst1) + fg.run () + dst_data = dst1.data () + self.assertEqual (expected_result, dst_data) + + def test_112 (self): + fg = self.fg + nop1 = gr.nop (gr.sizeof_int) + nop2 = gr.nop (gr.sizeof_int) + nop3 = gr.nop (gr.sizeof_int) + fg.connect ((nop1, 0), (nop2, 0)) + fg.connect ((nop1, 1), (nop3, 0)) + fg._setup_connections () + self.assertEqual (2, nop1.detail().noutputs()) + + def test_113 (self): + fg = self.fg + nop1 = gr.nop (gr.sizeof_int) + nop2 = gr.nop (gr.sizeof_int) + nop3 = gr.nop (gr.sizeof_int) + fg.connect ((nop1, 0), (nop2, 0)) + fg.connect ((nop1, 0), (nop3, 0)) + fg._setup_connections () + self.assertEqual (1, nop1.detail().noutputs()) + + def test_200_partition (self): + self.leak_check (self.body_200) + + def body_200 (self): + fg = self.fg + src1 = gr.null_source (gr.sizeof_int) + nop1 = gr.nop (gr.sizeof_int) + dst1 = gr.null_sink (gr.sizeof_int) + fg.connect (nop1, dst1) + fg.connect (src1, nop1) + fg.validate () + p = fg.partition_graph (fg.all_blocks ()) + self.assertEqual ([[src1, nop1, dst1]], p) + self.assertEqual ((3,0,0,0), all_counts ()) + + def test_201_partition (self): + self.leak_check (self.body_201) + + def body_201 (self): + fg = self.fg + src1 = gr.null_source (gr.sizeof_int) + src2 = gr.null_source (gr.sizeof_int) + nop1 = gr.nop (gr.sizeof_int) + nop2 = gr.nop (gr.sizeof_int) + dst1 = gr.null_sink (gr.sizeof_int) + dst2 = gr.null_sink (gr.sizeof_int) + + fg.connect (nop2, dst2) + fg.connect (src1, nop1) + fg.connect (src2, nop2) + fg.connect (nop1, dst1) + fg.validate () + p = fg.partition_graph (fg.all_blocks ()) + self.assertEqual ([[src2, nop2, dst2], [src1, nop1, dst1]], p) + self.assertEqual ((6,0,0,0), all_counts ()) + + def test_300_hier (self): + self.leak_check (self.body_300_hier) + + def body_300_hier (self): + fg = self.fg + src_data = (0, 1, 2, 3) + expected_result = (10, 11, 12, 13) + src1 = gr.vector_source_i (src_data) + op = wrap_add (fg, 10) + dst1 = gr.vector_sink_i () + fg.connect (src1, op, dst1) + fg.run () + dst_data = dst1.data () + self.assertEqual (expected_result, dst_data) + + def test_301_hier (self): + self.leak_check (self.body_301_hier) + + def body_301_hier (self): + fg = self.fg + src_data = (0, 1, 2, 3) + expected_result = (5, 8, 11, 14) + src1 = gr.vector_source_i (src_data) + op = mult_add (fg, 3, 5) + dst1 = gr.vector_sink_i () + fg.connect (src1, op, dst1) + fg.run () + dst_data = dst1.data () + self.assertEqual (expected_result, dst_data) + + def test_302_hier (self): + self.leak_check (self.body_302_hier) + + def body_302_hier (self): + fg = self.fg + src_data = (0, 1, 2, 3) + expected_result = (10, 11, 12, 13) + src1 = gr.vector_source_i (src_data) + op = gr.compose (fg, gr.add_const_ii (10)) + dst1 = gr.vector_sink_i () + fg.connect (src1, op, dst1) + fg.run () + dst_data = dst1.data () + self.assertEqual (expected_result, dst_data) + + def test_303_hier (self): + self.leak_check (self.body_303_hier) + + def body_303_hier (self): + fg = self.fg + src_data = (0, 1, 2, 3) + expected_result = (35, 38, 41, 44) + src1 = gr.vector_source_i (src_data) + op = gr.compose (fg, gr.add_const_ii (10), mult_add (fg, 3, 5)) + dst1 = gr.vector_sink_i () + fg.connect (src1, op, dst1) + fg.run () + dst_data = dst1.data () + self.assertEqual (expected_result, dst_data) + + +if __name__ == '__main__': + gr_unittest.main () diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_frequency_modulator.py b/gnuradio-core/src/python/gnuradio/gr/qa_frequency_modulator.py new file mode 100755 index 0000000000..cfa34830d6 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_frequency_modulator.py @@ -0,0 +1,56 @@ +#!/usr/bin/env python +# +# Copyright 2004 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest +import math + +def sincos(x): + return math.cos(x) + math.sin(x) * 1j + + +class test_frequency_modulator (gr_unittest.TestCase): + + def setUp (self): + self.fg = gr.flow_graph () + + def tearDown (self): + self.fg = None + + def test_fm_001 (self): + pi = math.pi + sensitivity = pi/4 + src_data = (1.0/4, 1.0/2, 1.0/4, -1.0/4, -1.0/2, -1/4.0) + running_sum = (pi/16, 3*pi/16, pi/4, 3*pi/16, pi/16, 0) + expected_result = tuple ([sincos (x) for x in running_sum]) + src = gr.vector_source_f (src_data) + op = gr.frequency_modulator_fc (sensitivity) + dst = gr.vector_sink_c () + self.fg.connect (src, op) + self.fg.connect (op, dst) + self.fg.run () + result_data = dst.data () + self.assertComplexTuplesAlmostEqual (expected_result, result_data) + + +if __name__ == '__main__': + gr_unittest.main () + diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_fsk_stuff.py b/gnuradio-core/src/python/gnuradio/gr/qa_fsk_stuff.py new file mode 100755 index 0000000000..d61ddb1a05 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_fsk_stuff.py @@ -0,0 +1,75 @@ +#!/usr/bin/env python +# +# Copyright 2004 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest +import math + +def sincos(x): + return math.cos(x) + math.sin(x) * 1j + +class test_bytes_to_syms (gr_unittest.TestCase): + + def setUp (self): + self.fg = gr.flow_graph () + + def tearDown (self): + self.fg = None + + def test_bytes_to_syms_001 (self): + src_data = (0x01, 0x80, 0x03) + expected_result = (-1, -1, -1, -1, -1, -1, -1, +1, + +1, -1, -1, -1, -1, -1, -1, -1, + -1, -1, -1, -1, -1, -1, +1, +1) + src = gr.vector_source_b (src_data) + op = gr.bytes_to_syms () + dst = gr.vector_sink_f () + self.fg.connect (src, op) + self.fg.connect (op, dst) + self.fg.run () + result_data = dst.data () + self.assertEqual (expected_result, result_data) + + def test_simple_framer (self): + src_data = (0x00, 0x11, 0x22, 0x33, + 0x44, 0x55, 0x66, 0x77, + 0x88, 0x99, 0xaa, 0xbb, + 0xcc, 0xdd, 0xee, 0xff) + + expected_result = ( + 0xac, 0xdd, 0xa4, 0xe2, 0xf2, 0x8c, 0x20, 0xfc, 0x00, 0x00, 0x11, 0x22, 0x33, 0x55, + 0xac, 0xdd, 0xa4, 0xe2, 0xf2, 0x8c, 0x20, 0xfc, 0x01, 0x44, 0x55, 0x66, 0x77, 0x55, + 0xac, 0xdd, 0xa4, 0xe2, 0xf2, 0x8c, 0x20, 0xfc, 0x02, 0x88, 0x99, 0xaa, 0xbb, 0x55, + 0xac, 0xdd, 0xa4, 0xe2, 0xf2, 0x8c, 0x20, 0xfc, 0x03, 0xcc, 0xdd, 0xee, 0xff, 0x55) + + src = gr.vector_source_b (src_data) + op = gr.simple_framer (4) + dst = gr.vector_sink_b () + self.fg.connect (src, op) + self.fg.connect (op, dst) + self.fg.run () + result_data = dst.data () + self.assertEqual (expected_result, result_data) + + +if __name__ == '__main__': + gr_unittest.main () + diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_goertzel.py b/gnuradio-core/src/python/gnuradio/gr/qa_goertzel.py new file mode 100755 index 0000000000..da9d65f625 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_goertzel.py @@ -0,0 +1,64 @@ +#!/usr/bin/env python +# +# Copyright 2006 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest +from math import pi, cos + +class test_goertzel(gr_unittest.TestCase): + + def setUp(self): + self.fg = gr.flow_graph() + + def tearDown(self): + self.fg = None + + def make_tone_data(self, rate, freq): + return [cos(2*pi*x*freq/rate) for x in range(rate)] + + def transform(self, src_data, rate, freq): + src = gr.vector_source_f(src_data, False) + dft = gr.goertzel_fc(rate, rate, freq) + dst = gr.vector_sink_c() + self.fg.connect(src, dft, dst) + self.fg.run() + return dst.data() + + def test_001(self): # Measure single tone magnitude + rate = 8000 + freq = 100 + bin = freq + src_data = self.make_tone_data(rate, freq) + expected_result = 0.5 + actual_result = abs(self.transform(src_data, rate, bin)[0]) + self.assertAlmostEqual(expected_result, actual_result, places=5) + + def test_002(self): # Measure off frequency magnitude + rate = 8000 + freq = 100 + bin = freq/2 + src_data = self.make_tone_data(rate, freq) + expected_result = 0.0 + actual_result = abs(self.transform(src_data, rate, bin)[0]) + self.assertAlmostEqual(expected_result, actual_result, places=5) + +if __name__ == '__main__': + gr_unittest.main() diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_head.py b/gnuradio-core/src/python/gnuradio/gr/qa_head.py new file mode 100755 index 0000000000..a6fcd7f983 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_head.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python +# +# Copyright 2004 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest + +class test_head (gr_unittest.TestCase): + + def setUp (self): + self.fg = gr.flow_graph () + + def tearDown (self): + self.fg = None + + def test_head (self): + src_data = (1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + expected_result = (1, 2, 3, 4) + src1 = gr.vector_source_i (src_data) + op = gr.head (gr.sizeof_int, 4) + dst1 = gr.vector_sink_i () + self.fg.connect (src1, op) + self.fg.connect (op, dst1) + self.fg.run () + dst_data = dst1.data () + self.assertEqual (expected_result, dst_data) + + +if __name__ == '__main__': + gr_unittest.main () diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_hilbert.py b/gnuradio-core/src/python/gnuradio/gr/qa_hilbert.py new file mode 100755 index 0000000000..a8f39dd6d9 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_hilbert.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python +# +# Copyright 2004 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest +import math + +class test_sig_source (gr_unittest.TestCase): + + def setUp (self): + self.fg = gr.flow_graph () + + def tearDown (self): + self.fg = None + + def test_hilbert (self): + fg = self.fg + ntaps = 51 + sampling_freq = 100 + + expected_result = ( -1.4678005338941702e-11j, + -0.0011950774351134896j, + -0.0019336787518113852j, + -0.0034673355985432863j, + -0.0036765895783901215j, + -0.004916108213365078j, + -0.0042778430506587029j, + -0.006028641015291214j, + -0.005476709920912981j, + -0.0092810001224279404j, + -0.0095402700826525688j, + -0.016060983762145042j, + -0.016446959227323532j, + -0.02523401565849781j, + -0.024382550269365311j, + -0.035477779805660248j, + -0.033021725714206696j, + -0.048487484455108643j, + -0.04543270543217659j, + -0.069477587938308716j, + -0.066984444856643677j, + -0.10703597217798233j, + -0.10620346665382385j, + -0.1852707713842392j, + -0.19357112050056458j, + (7.2191945754696007e-09 -0.50004088878631592j), + (0.58778399229049683 -0.6155126690864563j), + (0.95105588436126709 -0.12377222627401352j), + (0.95105588436126709 +0.41524654626846313j), + (0.5877838134765625 +0.91611981391906738j), + (5.8516356205018383e-09 +1.0670661926269531j), + (-0.5877840518951416 +0.87856143712997437j), + (-0.95105588436126709 +0.35447561740875244j), + (-0.95105588436126709 -0.26055556535720825j), + (-0.5877838134765625 -0.77606213092803955j), + (-8.7774534307527574e-09 -0.96460390090942383j), + (0.58778399229049683 -0.78470128774642944j), + (0.95105588436126709 -0.28380891680717468j), + (0.95105588436126709 +0.32548999786376953j), + (0.5877838134765625 +0.82514488697052002j), + (1.4629089051254596e-08 +1.0096219778060913j), + (-0.5877840518951416 +0.81836479902267456j), + (-0.95105588436126709 +0.31451958417892456j), + (-0.95105588436126709 -0.3030143678188324j), + (-0.5877838134765625 -0.80480599403381348j), + (-1.7554906861505515e-08 -0.99516552686691284j), + (0.58778399229049683 -0.80540722608566284j), + (0.95105582475662231 -0.30557557940483093j), + (0.95105588436126709 +0.31097668409347534j), + (0.5877838134765625 +0.81027895212173462j), + (2.3406542482007353e-08 +1.0000816583633423j), + (-0.5877840518951416 +0.80908381938934326j), + (-0.95105588436126709 +0.30904293060302734j), + (-0.95105588436126709 -0.30904296040534973j), + (-0.5877838134765625 -0.80908387899398804j), + (-2.6332360292258272e-08 -1.0000815391540527j), + (0.58778399229049683 -0.80908381938934326j), + (0.95105582475662231 -0.30904299020767212j), + (0.95105588436126709 +0.30904293060302734j), + (0.5877838134765625 +0.80908381938934326j), + (3.218399768911695e-08 +1.0000815391540527j)) + + + src1 = gr.sig_source_f (sampling_freq, gr.GR_SIN_WAVE, + sampling_freq * 0.10, 1.0) + + head = gr.head (gr.sizeof_float, int (ntaps + sampling_freq * 0.10)) + hilb = gr.hilbert_fc (ntaps) + dst1 = gr.vector_sink_c () + fg.connect (src1, head) + fg.connect (head, hilb) + fg.connect (hilb, dst1) + fg.run () + dst_data = dst1.data () + self.assertComplexTuplesAlmostEqual (expected_result, dst_data, 5) + +if __name__ == '__main__': + gr_unittest.main () diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_iir.py b/gnuradio-core/src/python/gnuradio/gr/qa_iir.py new file mode 100755 index 0000000000..a1f2aa0772 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_iir.py @@ -0,0 +1,135 @@ +#!/usr/bin/env python +# +# Copyright 2004 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest + +class test_iir (gr_unittest.TestCase): + + def setUp (self): + self.fg = gr.flow_graph () + + def tearDown (self): + self.fg = None + + def test_iir_direct_001 (self): + src_data = (1, 2, 3, 4, 5, 6, 7, 8) + fftaps = () + fbtaps = () + expected_result = (0, 0, 0, 0, 0, 0, 0, 0) + src = gr.vector_source_f (src_data) + op = gr.iir_filter_ffd (fftaps, fbtaps) + dst = gr.vector_sink_f () + self.fg.connect (src, op) + self.fg.connect (op, dst) + self.fg.run () + result_data = dst.data () + self.assertFloatTuplesAlmostEqual (expected_result, result_data) + + def test_iir_direct_002 (self): + src_data = (1, 2, 3, 4, 5, 6, 7, 8) + fftaps = (2,) + fbtaps = (0,) + expected_result = (2, 4, 6, 8, 10, 12, 14, 16) + src = gr.vector_source_f (src_data) + op = gr.iir_filter_ffd (fftaps, fbtaps) + dst = gr.vector_sink_f () + self.fg.connect (src, op) + self.fg.connect (op, dst) + self.fg.run () + result_data = dst.data () + self.assertFloatTuplesAlmostEqual (expected_result, result_data) + + def test_iir_direct_003 (self): + src_data = (1, 2, 3, 4, 5, 6, 7, 8) + fftaps = (2, 11) + fbtaps = (0, 0) + expected_result = (2, 15, 28, 41, 54, 67, 80, 93) + src = gr.vector_source_f (src_data) + op = gr.iir_filter_ffd (fftaps, fbtaps) + dst = gr.vector_sink_f () + self.fg.connect (src, op) + self.fg.connect (op, dst) + self.fg.run () + result_data = dst.data () + self.assertFloatTuplesAlmostEqual (expected_result, result_data) + + def test_iir_direct_004 (self): + src_data = (1, 2, 3, 4, 5, 6, 7, 8) + fftaps = (2, 11) + fbtaps = (0, -1) + expected_result = (2, 13, 15, 26, 28, 39, 41, 52) + src = gr.vector_source_f (src_data) + op = gr.iir_filter_ffd (fftaps, fbtaps) + dst = gr.vector_sink_f () + self.fg.connect (src, op) + self.fg.connect (op, dst) + self.fg.run () + result_data = dst.data () + self.assertFloatTuplesAlmostEqual (expected_result, result_data) + + def test_iir_direct_005 (self): + src_data = (1, 2, 3, 4, 5, 6, 7, 8) + fftaps = (2, 11, 0) + fbtaps = (0, -1, 3) + expected_result = (2, 13, 21, 59, 58, 186, 68, 583) + src = gr.vector_source_f (src_data) + op = gr.iir_filter_ffd (fftaps, fbtaps) + dst = gr.vector_sink_f () + self.fg.connect (src, op) + self.fg.connect (op, dst) + self.fg.run () + result_data = dst.data () + self.assertFloatTuplesAlmostEqual (expected_result, result_data) + + def test_iir_direct_006 (self): + fftaps = (2, 11, 0) + fbtaps = (0, -1) + self.assertRaises ((RuntimeError, ValueError), gr.iir_filter_ffd, fftaps, fbtaps) + + def test_iir_direct_007 (self): + src_data = (1, 2, 3, 4, 5, 6, 7, 8) + expected_result = (2, 13, 21, 59, 58, 186, 68, 583) + fftaps = (2, 1) + fbtaps = (0, -1) + src = gr.vector_source_f (src_data) + op = gr.iir_filter_ffd (fftaps, fbtaps) + fftaps = (2, 11, 0) + fbtaps = (0, -1, 3) + op.set_taps (fftaps, fbtaps) + dst = gr.vector_sink_f () + self.fg.connect (src, op) + self.fg.connect (op, dst) + self.fg.run () + result_data = dst.data () + self.assertFloatTuplesAlmostEqual (expected_result, result_data) + + def test_iir_direct_008 (self): + fftaps = (2, 1) + fbtaps = (0, -1) + op = gr.iir_filter_ffd (fftaps, fbtaps) + fftaps = (2, 11) + fbtaps = (0, -1, 3) + self.assertRaises ((RuntimeError, ValueError), op.set_taps, fftaps, fbtaps) + +if __name__ == '__main__': + gr_unittest.main () + diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_interleave.py b/gnuradio-core/src/python/gnuradio/gr/qa_interleave.py new file mode 100755 index 0000000000..003d12e645 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_interleave.py @@ -0,0 +1,81 @@ +#!/usr/bin/env python +# +# Copyright 2004 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest +import math + +class test_interleave (gr_unittest.TestCase): + + def setUp (self): + self.fg = gr.flow_graph () + + def tearDown (self): + self.fg = None + + def test_int_001 (self): + lenx = 64 + src0 = gr.vector_source_f (range (0, lenx, 4)) + src1 = gr.vector_source_f (range (1, lenx, 4)) + src2 = gr.vector_source_f (range (2, lenx, 4)) + src3 = gr.vector_source_f (range (3, lenx, 4)) + op = gr.interleave (gr.sizeof_float) + dst = gr.vector_sink_f () + + self.fg.connect (src0, (op, 0)) + self.fg.connect (src1, (op, 1)) + self.fg.connect (src2, (op, 2)) + self.fg.connect (src3, (op, 3)) + self.fg.connect (op, dst) + self.fg.run () + expected_result = tuple (range (lenx)) + result_data = dst.data () + self.assertFloatTuplesAlmostEqual (expected_result, result_data) + + def test_deint_001 (self): + lenx = 64 + src = gr.vector_source_f (range (lenx)) + op = gr.deinterleave (gr.sizeof_float) + dst0 = gr.vector_sink_f () + dst1 = gr.vector_sink_f () + dst2 = gr.vector_sink_f () + dst3 = gr.vector_sink_f () + + self.fg.connect (src, op) + self.fg.connect ((op, 0), dst0) + self.fg.connect ((op, 1), dst1) + self.fg.connect ((op, 2), dst2) + self.fg.connect ((op, 3), dst3) + self.fg.run () + + expected_result0 = tuple (range (0, lenx, 4)) + expected_result1 = tuple (range (1, lenx, 4)) + expected_result2 = tuple (range (2, lenx, 4)) + expected_result3 = tuple (range (3, lenx, 4)) + + self.assertFloatTuplesAlmostEqual (expected_result0, dst0.data ()) + self.assertFloatTuplesAlmostEqual (expected_result1, dst1.data ()) + self.assertFloatTuplesAlmostEqual (expected_result2, dst2.data ()) + self.assertFloatTuplesAlmostEqual (expected_result3, dst3.data ()) + +if __name__ == '__main__': + gr_unittest.main () + diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_interp_fir_filter.py b/gnuradio-core/src/python/gnuradio/gr/qa_interp_fir_filter.py new file mode 100755 index 0000000000..4119e3e2d6 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_interp_fir_filter.py @@ -0,0 +1,54 @@ +#!/usr/bin/env python +# +# Copyright 2004 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest +import math + +class test_interp_fir_filter (gr_unittest.TestCase): + + def setUp (self): + self.fg = gr.flow_graph () + + def tearDown (self): + self.fg = None + + def test_fff (self): + taps = [1, 10, 100, 1000, 10000] + src_data = (0, 2, 3, 5, 7, 11, 13, 17) + interpolation = 3 + xr = (0,0,0,0,2,20,200,2003,20030,300,3005,30050,500,5007,50070,700,7011,70110,1100,11013,110130,1300,13017,130170) + expected_result = tuple ([float (x) for x in xr]) + + src = gr.vector_source_f (src_data) + op = gr.interp_fir_filter_fff (interpolation, taps) + dst = gr.vector_sink_f () + self.fg.connect (src, op) + self.fg.connect (op, dst) + self.fg.run () + result_data = dst.data () + L = min(len(result_data), len(expected_result)) + self.assertEqual (expected_result[0:L], result_data[0:L]) + + +if __name__ == '__main__': + gr_unittest.main () + diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_kludge_copy.py b/gnuradio-core/src/python/gnuradio/gr/qa_kludge_copy.py new file mode 100755 index 0000000000..f5dee15286 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_kludge_copy.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python +# +# Copyright 2006 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest +import math +import random + + +class test_kludge_copy(gr_unittest.TestCase): + + def setUp(self): + self.fg = gr.flow_graph() + self.rng = random.Random() + self.rng.seed(0) + + def tearDown(self): + self.fg = None + self.rng = None + + def make_random_int_tuple(self, L): + result = [] + for x in range(L): + result.append(self.rng.randint(int(-1e9), int(+1e9))) + return tuple(result) + + + def test_001(self): + # 1 input stream; 1 output stream + src0_data = self.make_random_int_tuple(16000) + src0 = gr.vector_source_i(src0_data) + op = gr.kludge_copy(gr.sizeof_int) + dst0 = gr.vector_sink_i() + self.fg.connect(src0, op, dst0) + self.fg.run() + dst0_data = dst0.data() + self.assertEqual(src0_data, dst0_data) + + def test_002(self): + # 2 input streams; 2 output streams + src0_data = self.make_random_int_tuple(16000) + src1_data = self.make_random_int_tuple(16000) + src0 = gr.vector_source_i(src0_data) + src1 = gr.vector_source_i(src1_data) + op = gr.kludge_copy(gr.sizeof_int) + dst0 = gr.vector_sink_i() + dst1 = gr.vector_sink_i() + self.fg.connect(src0, (op, 0), dst0) + self.fg.connect(src1, (op, 1), dst1) + self.fg.run() + dst0_data = dst0.data() + dst1_data = dst1.data() + self.assertEqual(src0_data, dst0_data) + self.assertEqual(src1_data, dst1_data) + + def test_003(self): + # number of input streams != number of output streams + src0_data = self.make_random_int_tuple(16000) + src1_data = self.make_random_int_tuple(16000) + src0 = gr.vector_source_i(src0_data) + src1 = gr.vector_source_i(src1_data) + op = gr.kludge_copy(gr.sizeof_int) + dst0 = gr.vector_sink_i() + dst1 = gr.vector_sink_i() + self.fg.connect(src0, (op, 0), dst0) + self.fg.connect(src1, (op, 1)) + self.assertRaises(ValueError, self.fg.run) + +if __name__ == '__main__': + gr_unittest.main () + diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_kludged_imports.py b/gnuradio-core/src/python/gnuradio/gr/qa_kludged_imports.py new file mode 100755 index 0000000000..b208677226 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_kludged_imports.py @@ -0,0 +1,43 @@ +#!/usr/bin/env python +# +# Copyright 2005 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest + +class test_head (gr_unittest.TestCase): + + def setUp(self): + pass + + def tearDown(self): + pass + + def test_blks_import(self): + # make sure that this somewhat magic import works + from gnuradio import blks + + def test_gru_import(self): + # make sure that this somewhat magic import works + from gnuradio import gru + + +if __name__ == '__main__': + gr_unittest.main () diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_message.py b/gnuradio-core/src/python/gnuradio/gr/qa_message.py new file mode 100755 index 0000000000..d5d5111495 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_message.py @@ -0,0 +1,117 @@ +#!/usr/bin/env python +# +# Copyright 2004 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest +import qa_basic_flow_graph + + +def all_counts (): + return (gr.block_ncurrently_allocated (), + gr.block_detail_ncurrently_allocated (), + gr.buffer_ncurrently_allocated (), + gr.buffer_reader_ncurrently_allocated (), + gr.message_ncurrently_allocated ()) + + +class test_message (gr_unittest.TestCase): + + def setUp (self): + self.fg = gr.flow_graph() + self.msgq = gr.msg_queue () + + def tearDown (self): + self.fg = None + self.msgq = None + + def leak_check (self, fct): + begin = all_counts () + fct () + # tear down early so we can check for leaks + self.tearDown () + end = all_counts () + self.assertEqual (begin, end) + + def test_100 (self): + msg = gr.message (0, 1.5, 2.3) + self.assertEquals (0, msg.type()) + self.assertAlmostEqual (1.5, msg.arg1()) + self.assertAlmostEqual (2.3, msg.arg2()) + self.assertEquals (0, msg.length()) + + def test_101 (self): + s = 'This is a test' + msg = gr.message_from_string(s) + self.assertEquals(s, msg.to_string()) + + def test_200 (self): + self.leak_check (self.body_200) + + def body_200 (self): + self.msgq.insert_tail (gr.message (0)) + self.assertEquals (1, self.msgq.count()) + self.msgq.insert_tail (gr.message (1)) + self.assertEquals (2, self.msgq.count()) + msg0 = self.msgq.delete_head () + self.assertEquals (0, msg0.type()) + msg1 = self.msgq.delete_head () + self.assertEquals (1, msg1.type()) + self.assertEquals (0, self.msgq.count()) + + def test_201 (self): + self.leak_check (self.body_201) + + def body_201 (self): + self.msgq.insert_tail (gr.message (0)) + self.assertEquals (1, self.msgq.count()) + self.msgq.insert_tail (gr.message (1)) + self.assertEquals (2, self.msgq.count()) + + def test_202 (self): + self.leak_check (self.body_202) + + def body_202 (self): + # global msg + msg = gr.message (666) + + def test_300(self): + input_data = (0,1,2,3,4,5,6,7,8,9) + src = gr.vector_source_b(input_data) + dst = gr.vector_sink_b() + self.fg.connect(src, dst) + self.fg.run() + self.assertEquals(input_data, dst.data()) + + def test_301(self): + src = gr.message_source(gr.sizeof_char) + dst = gr.vector_sink_b() + self.fg.connect(src, dst) + src.msgq().insert_tail(gr.message_from_string('01234')) + src.msgq().insert_tail(gr.message_from_string('5')) + src.msgq().insert_tail(gr.message_from_string('')) + src.msgq().insert_tail(gr.message_from_string('6789')) + src.msgq().insert_tail(gr.message(1)) # send EOF + self.fg.run() + self.assertEquals(tuple(map(ord, '0123456789')), dst.data()) + + +if __name__ == '__main__': + gr_unittest.main () diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_mute.py b/gnuradio-core/src/python/gnuradio/gr/qa_mute.py new file mode 100755 index 0000000000..863c308a4a --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_mute.py @@ -0,0 +1,89 @@ +#!/usr/bin/env python +# +# Copyright 2004,2005 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest + +class test_head (gr_unittest.TestCase): + + def setUp (self): + self.fg = gr.flow_graph () + + def tearDown (self): + self.fg = None + + def help_ii (self, src_data, exp_data, op): + for s in zip (range (len (src_data)), src_data): + src = gr.vector_source_i (s[1]) + self.fg.connect (src, (op, s[0])) + dst = gr.vector_sink_i () + self.fg.connect (op, dst) + self.fg.run () + result_data = dst.data () + self.assertEqual (exp_data, result_data) + + def help_ff (self, src_data, exp_data, op): + for s in zip (range (len (src_data)), src_data): + src = gr.vector_source_f (s[1]) + self.fg.connect (src, (op, s[0])) + dst = gr.vector_sink_f () + self.fg.connect (op, dst) + self.fg.run () + result_data = dst.data () + self.assertEqual (exp_data, result_data) + + def help_cc (self, src_data, exp_data, op): + for s in zip (range (len (src_data)), src_data): + src = gr.vector_source_c (s[1]) + self.fg.connect (src, (op, s[0])) + dst = gr.vector_sink_c () + self.fg.connect (op, dst) + self.fg.run () + result_data = dst.data () + self.assertEqual (exp_data, result_data) + + def test_unmute_ii(self): + src_data = (1, 2, 3, 4, 5) + expected_result = (1, 2, 3, 4, 5) + op = gr.mute_ii (False) + self.help_ii ((src_data,), expected_result, op) + + def test_mute_ii(self): + src_data = (1, 2, 3, 4, 5) + expected_result = (0, 0, 0, 0, 0) + op = gr.mute_ii (True) + self.help_ii ((src_data,), expected_result, op) + + def test_unmute_cc (self): + src_data = (1+5j, 2+5j, 3+5j, 4+5j, 5+5j) + expected_result = (1+5j, 2+5j, 3+5j, 4+5j, 5+5j) + op = gr.mute_cc (False) + self.help_cc ((src_data,), expected_result, op) + + def test_unmute_cc (self): + src_data = (1+5j, 2+5j, 3+5j, 4+5j, 5+5j) + expected_result = (0+0j, 0+0j, 0+0j, 0+0j, 0+0j) + op = gr.mute_cc (True) + self.help_cc ((src_data,), expected_result, op) + + +if __name__ == '__main__': + gr_unittest.main () diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_nlog10.py b/gnuradio-core/src/python/gnuradio/gr/qa_nlog10.py new file mode 100755 index 0000000000..f056d1100b --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_nlog10.py @@ -0,0 +1,47 @@ +#!/usr/bin/env python +# +# Copyright 2005 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest + +class test_single_pole_iir(gr_unittest.TestCase): + + def setUp (self): + self.fg = gr.flow_graph () + + def tearDown (self): + self.fg = None + + def test_001(self): + src_data = (-10, 0, 10, 100, 1000, 10000, 100000) + expected_result = (-180, -180, 10, 20, 30, 40, 50) + src = gr.vector_source_f(src_data) + op = gr.nlog10_ff(10) + dst = gr.vector_sink_f() + self.fg.connect (src, op, dst) + self.fg.run() + result_data = dst.data() + self.assertFloatTuplesAlmostEqual (expected_result, result_data) + + +if __name__ == '__main__': + gr_unittest.main () + diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_packed_to_unpacked.py b/gnuradio-core/src/python/gnuradio/gr/qa_packed_to_unpacked.py new file mode 100755 index 0000000000..d5472c8d72 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_packed_to_unpacked.py @@ -0,0 +1,405 @@ +#!/usr/bin/env python +# +# Copyright 2005 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest +import random + +class test_packing(gr_unittest.TestCase): + + def setUp(self): + self.fg = gr.flow_graph () + + def tearDown(self): + self.fg = None + + def test_001(self): + """ + Test stream_to_streams. + """ + src_data = (0x80,) + expected_results = (1,0,0,0,0,0,0,0) + src = gr.vector_source_b(src_data,False) + op = gr.packed_to_unpacked_bb(1, gr.GR_MSB_FIRST) + self.fg.connect(src, op) + + dst = gr.vector_sink_b() + self.fg.connect(op, dst) + + self.fg.run() + + self.assertEqual(expected_results, dst.data()) + + def test_002(self): + """ + Test stream_to_streams. + """ + src_data = (0x80,) + expected_results = (0,0,0,0,0,0,0, 1) + src = gr.vector_source_b(src_data,False) + op = gr.packed_to_unpacked_bb(1, gr.GR_LSB_FIRST) + self.fg.connect(src, op) + + dst = gr.vector_sink_b() + self.fg.connect(op, dst) + + self.fg.run() + + self.assertEqual(expected_results, dst.data()) + + def test_003(self): + """ + Test stream_to_streams. + """ + src_data = (0x11,) + expected_results = (4, 2) + src = gr.vector_source_b(src_data,False) + op = gr.packed_to_unpacked_bb(3, gr.GR_LSB_FIRST) + self.fg.connect(src, op) + + dst = gr.vector_sink_b() + self.fg.connect(op, dst) + + self.fg.run() + + self.assertEqual(expected_results, dst.data()) + + def test_004(self): + """ + Test stream_to_streams. + """ + src_data = (0x11,) + expected_results = (0, 4) + src = gr.vector_source_b(src_data,False) + op = gr.packed_to_unpacked_bb(3, gr.GR_MSB_FIRST) + self.fg.connect(src, op) + + dst = gr.vector_sink_b() + self.fg.connect(op, dst) + + self.fg.run() + + self.assertEqual(expected_results, dst.data()) + + def test_005(self): + """ + Test stream_to_streams. + """ + src_data = (1,0,0,0,0,0,1,0,0,1,0,1,1,0,1,0) + expected_results = (0x82,0x5a) + src = gr.vector_source_b(src_data,False) + op = gr.unpacked_to_packed_bb(1, gr.GR_MSB_FIRST) + self.fg.connect(src, op) + + dst = gr.vector_sink_b() + self.fg.connect(op, dst) + + self.fg.run() + + self.assertEqual(expected_results, dst.data()) + + def test_006(self): + """ + Test stream_to_streams. + """ + src_data = (0,1,0,0,0,0,0,1,0,1,0,1,1,0,1,0) + expected_results = (0x82,0x5a) + src = gr.vector_source_b(src_data,False) + op = gr.unpacked_to_packed_bb(1, gr.GR_LSB_FIRST) + self.fg.connect(src, op) + + dst = gr.vector_sink_b() + self.fg.connect(op, dst) + + self.fg.run() + + self.assertEqual(expected_results, dst.data()) + + + def test_007(self): + """ + Test stream_to_streams. + """ + src_data = (4, 2, 0,0,0) + expected_results = (0x11,) + src = gr.vector_source_b(src_data,False) + op = gr.unpacked_to_packed_bb(3, gr.GR_LSB_FIRST) + self.fg.connect(src, op) + + dst = gr.vector_sink_b() + self.fg.connect(op, dst) + + self.fg.run() + + self.assertEqual(expected_results, dst.data()) + + def test_008(self): + """ + Test stream_to_streams. + """ + src_data = (0, 4, 2,0,0) + expected_results = (0x11,) + src = gr.vector_source_b(src_data,False) + op = gr.unpacked_to_packed_bb(3, gr.GR_MSB_FIRST) + self.fg.connect(src, op) + + dst = gr.vector_sink_b() + self.fg.connect(op, dst) + + self.fg.run() + + self.assertEqual(expected_results, dst.data()) + + def test_009(self): + """ + Test stream_to_streams. + """ + + random.seed(0) + src_data = [] + for i in xrange(202): + src_data.append((random.randint(0,255))) + src_data = tuple(src_data) + expected_results = src_data + + src = gr.vector_source_b(tuple(src_data),False) + op1 = gr.packed_to_unpacked_bb(3, gr.GR_MSB_FIRST) + op2 = gr.unpacked_to_packed_bb(3, gr.GR_MSB_FIRST) + self.fg.connect(src, op1, op2) + + dst = gr.vector_sink_b() + self.fg.connect(op2, dst) + + self.fg.run() + + self.assertEqual(expected_results[0:201], dst.data()) + + def test_010(self): + """ + Test stream_to_streams. + """ + + random.seed(0) + src_data = [] + for i in xrange(56): + src_data.append((random.randint(0,255))) + src_data = tuple(src_data) + expected_results = src_data + src = gr.vector_source_b(tuple(src_data),False) + op1 = gr.packed_to_unpacked_bb(7, gr.GR_MSB_FIRST) + op2 = gr.unpacked_to_packed_bb(7, gr.GR_MSB_FIRST) + self.fg.connect(src, op1, op2) + dst = gr.vector_sink_b() + self.fg.connect(op2, dst) + + self.fg.run() + self.assertEqual(expected_results[0:201], dst.data()) + + def test_011(self): + """ + Test stream_to_streams. + """ + + random.seed(0) + src_data = [] + for i in xrange(56): + src_data.append((random.randint(0,255))) + src_data = tuple(src_data) + expected_results = src_data + src = gr.vector_source_b(tuple(src_data),False) + op1 = gr.packed_to_unpacked_bb(7, gr.GR_LSB_FIRST) + op2 = gr.unpacked_to_packed_bb(7, gr.GR_LSB_FIRST) + self.fg.connect(src, op1, op2) + dst = gr.vector_sink_b() + self.fg.connect(op2, dst) + + self.fg.run() + self.assertEqual(expected_results[0:201], dst.data()) + + + # tests on shorts + + def test_100a(self): + """ + test short version + """ + random.seed(0) + src_data = [] + for i in xrange(100): + src_data.append((random.randint(-2**15,2**15-1))) + src_data = tuple(src_data) + expected_results = src_data + src = gr.vector_source_s(tuple(src_data),False) + op1 = gr.packed_to_unpacked_ss(1, gr.GR_MSB_FIRST) + op2 = gr.unpacked_to_packed_ss(1, gr.GR_MSB_FIRST) + self.fg.connect(src, op1, op2) + dst = gr.vector_sink_s() + self.fg.connect(op2, dst) + + self.fg.run() + self.assertEqual(expected_results, dst.data()) + + def test_100b(self): + """ + test short version + """ + random.seed(0) + src_data = [] + for i in xrange(100): + src_data.append((random.randint(-2**15,2**15-1))) + src_data = tuple(src_data) + expected_results = src_data + src = gr.vector_source_s(tuple(src_data),False) + op1 = gr.packed_to_unpacked_ss(1, gr.GR_LSB_FIRST) + op2 = gr.unpacked_to_packed_ss(1, gr.GR_LSB_FIRST) + self.fg.connect(src, op1, op2) + dst = gr.vector_sink_s() + self.fg.connect(op2, dst) + + self.fg.run() + self.assertEqual(expected_results, dst.data()) + + def test_101a(self): + """ + test short version + """ + random.seed(0) + src_data = [] + for i in xrange(100): + src_data.append((random.randint(-2**15,2**15-1))) + src_data = tuple(src_data) + expected_results = src_data + src = gr.vector_source_s(tuple(src_data),False) + op1 = gr.packed_to_unpacked_ss(8, gr.GR_MSB_FIRST) + op2 = gr.unpacked_to_packed_ss(8, gr.GR_MSB_FIRST) + self.fg.connect(src, op1, op2) + dst = gr.vector_sink_s() + self.fg.connect(op2, dst) + + self.fg.run() + self.assertEqual(expected_results, dst.data()) + + def test_101b(self): + """ + test short version + """ + random.seed(0) + src_data = [] + for i in xrange(100): + src_data.append((random.randint(-2**15,2**15-1))) + src_data = tuple(src_data) + expected_results = src_data + src = gr.vector_source_s(tuple(src_data),False) + op1 = gr.packed_to_unpacked_ss(8, gr.GR_LSB_FIRST) + op2 = gr.unpacked_to_packed_ss(8, gr.GR_LSB_FIRST) + self.fg.connect(src, op1, op2) + dst = gr.vector_sink_s() + self.fg.connect(op2, dst) + + self.fg.run() + self.assertEqual(expected_results, dst.data()) + + # tests on ints + + def test_200a(self): + """ + test int version + """ + random.seed(0) + src_data = [] + for i in xrange(100): + src_data.append((random.randint(-2**31,2**31-1))) + src_data = tuple(src_data) + expected_results = src_data + src = gr.vector_source_i(tuple(src_data),False) + op1 = gr.packed_to_unpacked_ii(1, gr.GR_MSB_FIRST) + op2 = gr.unpacked_to_packed_ii(1, gr.GR_MSB_FIRST) + self.fg.connect(src, op1, op2) + dst = gr.vector_sink_i() + self.fg.connect(op2, dst) + + self.fg.run() + self.assertEqual(expected_results, dst.data()) + + def test_200b(self): + """ + test int version + """ + random.seed(0) + src_data = [] + for i in xrange(100): + src_data.append((random.randint(-2**31,2**31-1))) + src_data = tuple(src_data) + expected_results = src_data + src = gr.vector_source_i(tuple(src_data),False) + op1 = gr.packed_to_unpacked_ii(1, gr.GR_LSB_FIRST) + op2 = gr.unpacked_to_packed_ii(1, gr.GR_LSB_FIRST) + self.fg.connect(src, op1, op2) + dst = gr.vector_sink_i() + self.fg.connect(op2, dst) + + self.fg.run() + self.assertEqual(expected_results, dst.data()) + + def test_201a(self): + """ + test int version + """ + random.seed(0) + src_data = [] + for i in xrange(100): + src_data.append((random.randint(-2**31,2**31-1))) + src_data = tuple(src_data) + expected_results = src_data + src = gr.vector_source_i(tuple(src_data),False) + op1 = gr.packed_to_unpacked_ii(8, gr.GR_MSB_FIRST) + op2 = gr.unpacked_to_packed_ii(8, gr.GR_MSB_FIRST) + self.fg.connect(src, op1, op2) + dst = gr.vector_sink_i() + self.fg.connect(op2, dst) + + self.fg.run() + self.assertEqual(expected_results, dst.data()) + + def test_201b(self): + """ + test int version + """ + random.seed(0) + src_data = [] + for i in xrange(100): + src_data.append((random.randint(-2**31,2**31-1))) + src_data = tuple(src_data) + expected_results = src_data + src = gr.vector_source_i(tuple(src_data),False) + op1 = gr.packed_to_unpacked_ii(8, gr.GR_LSB_FIRST) + op2 = gr.unpacked_to_packed_ii(8, gr.GR_LSB_FIRST) + self.fg.connect(src, op1, op2) + dst = gr.vector_sink_i() + self.fg.connect(op2, dst) + + self.fg.run() + self.assertEqual(expected_results, dst.data()) + + +if __name__ == '__main__': + gr_unittest.main () + diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_pipe_fittings.py b/gnuradio-core/src/python/gnuradio/gr/qa_pipe_fittings.py new file mode 100755 index 0000000000..dca18c8ecc --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_pipe_fittings.py @@ -0,0 +1,143 @@ +#!/usr/bin/env python +# +# Copyright 2005 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest + +if 0: + import os + print "pid =", os.getpid() + raw_input("Attach, then press Enter to continue") + + +def calc_expected_result(src_data, n): + assert (len(src_data) % n) == 0 + result = [list() for x in range(n)] + #print "len(result) =", len(result) + for i in xrange(len(src_data)): + (result[i % n]).append(src_data[i]) + return [tuple(x) for x in result] + + +class test_pipe_fittings(gr_unittest.TestCase): + + def setUp(self): + self.fg = gr.flow_graph () + + def tearDown(self): + self.fg = None + + def test_001(self): + """ + Test stream_to_streams. + """ + n = 8 + src_len = n * 8 + src_data = range(src_len) + + expected_results = calc_expected_result(src_data, n) + #print "expected results: ", expected_results + src = gr.vector_source_i(src_data) + op = gr.stream_to_streams(gr.sizeof_int, n) + self.fg.connect(src, op) + + dsts = [] + for i in range(n): + dst = gr.vector_sink_i() + self.fg.connect((op, i), (dst, 0)) + dsts.append(dst) + + self.fg.run() + + for d in range(n): + self.assertEqual(expected_results[d], dsts[d].data()) + + def test_002(self): + """ + Test streams_to_stream (using stream_to_streams). + """ + n = 8 + src_len = n * 8 + src_data = tuple(range(src_len)) + expected_results = src_data + + src = gr.vector_source_i(src_data) + op1 = gr.stream_to_streams(gr.sizeof_int, n) + op2 = gr.streams_to_stream(gr.sizeof_int, n) + dst = gr.vector_sink_i() + + self.fg.connect(src, op1) + for i in range(n): + self.fg.connect((op1, i), (op2, i)) + self.fg.connect(op2, dst) + + self.fg.run() + self.assertEqual(expected_results, dst.data()) + + def test_003(self): + """ + Test streams_to_vector (using stream_to_streams & vector_to_stream). + """ + n = 8 + src_len = n * 8 + src_data = tuple(range(src_len)) + expected_results = src_data + + src = gr.vector_source_i(src_data) + op1 = gr.stream_to_streams(gr.sizeof_int, n) + op2 = gr.streams_to_vector(gr.sizeof_int, n) + op3 = gr.vector_to_stream(gr.sizeof_int, n) + dst = gr.vector_sink_i() + + self.fg.connect(src, op1) + for i in range(n): + self.fg.connect((op1, i), (op2, i)) + self.fg.connect(op2, op3, dst) + + self.fg.run() + self.assertEqual(expected_results, dst.data()) + + def test_004(self): + """ + Test vector_to_streams. + """ + n = 8 + src_len = n * 8 + src_data = tuple(range(src_len)) + expected_results = src_data + + src = gr.vector_source_i(src_data) + op1 = gr.stream_to_vector(gr.sizeof_int, n) + op2 = gr.vector_to_streams(gr.sizeof_int, n) + op3 = gr.streams_to_stream(gr.sizeof_int, n) + dst = gr.vector_sink_i() + + self.fg.connect(src, op1, op2) + for i in range(n): + self.fg.connect((op2, i), (op3, i)) + self.fg.connect(op3, dst) + + self.fg.run() + self.assertEqual(expected_results, dst.data()) + +if __name__ == '__main__': + gr_unittest.main () + diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_rational_resampler.py b/gnuradio-core/src/python/gnuradio/gr/qa_rational_resampler.py new file mode 100755 index 0000000000..2505482d51 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_rational_resampler.py @@ -0,0 +1,290 @@ +#!/usr/bin/env python +# +# Copyright 2005,2006 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest +from gnuradio import blks +import math +import random +import sys + +#import os +#print os.getpid() +#raw_input('Attach with gdb, then press Enter: ') + + +def random_floats(n): + r = [] + for x in xrange(n): + r.append(float(random.randint(-32768, 32768))) + return tuple(r) + + +def reference_dec_filter(src_data, decim, taps): + fg = gr.flow_graph() + src = gr.vector_source_f(src_data) + op = gr.fir_filter_fff(decim, taps) + dst = gr.vector_sink_f() + fg.connect(src, op, dst) + fg.run() + result_data = dst.data() + fg = None + return result_data + +def reference_interp_filter(src_data, interp, taps): + fg = gr.flow_graph() + src = gr.vector_source_f(src_data) + op = gr.interp_fir_filter_fff(interp, taps) + dst = gr.vector_sink_f() + fg.connect(src, op, dst) + fg.run() + result_data = dst.data() + fg = None + return result_data + +def reference_interp_dec_filter(src_data, interp, decim, taps): + fg = gr.flow_graph() + src = gr.vector_source_f(src_data) + up = gr.interp_fir_filter_fff(interp, (1,)) + dn = gr.fir_filter_fff(decim, taps) + dst = gr.vector_sink_f() + fg.connect(src, up, dn, dst) + fg.run() + result_data = dst.data() + fg = None + return result_data + + +class test_rational_resampler (gr_unittest.TestCase): + + def setUp(self): + self.fg = gr.flow_graph() + + def tearDown(self): + self.fg = None + + # + # test the gr.rational_resampler_base primitives... + # + + def test_000_1_to_1(self): + taps = (-4, 5) + src_data = (234, -4, 23, -56, 45, 98, -23, -7) + xr = (-936, 1186, -112, 339, -460, -167, 582) + expected_result = tuple([float(x) for x in xr]) + + src = gr.vector_source_f(src_data) + op = gr.rational_resampler_base_fff(1, 1, taps) + dst = gr.vector_sink_f() + self.fg.connect(src, op) + self.fg.connect(op, dst) + self.fg.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) + + def test_001_interp(self): + taps = [1, 10, 100, 1000, 10000] + src_data = (0, 2, 3, 5, 7, 11, 13, 17) + interpolation = 3 + xr = (0,0,0,0,2,20,200,2003,20030,300,3005,30050,500,5007,50070,700,7011,70110,1100,11013,110130,1300,13017,130170,1700.0,17000.0,170000.0) + expected_result = tuple([float(x) for x in xr]) + + src = gr.vector_source_f(src_data) + op = gr.rational_resampler_base_fff(interpolation, 1, taps) + dst = gr.vector_sink_f() + self.fg.connect(src, op) + self.fg.connect(op, dst) + self.fg.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) + + def test_002_interp(self): + taps = random_floats(31) + #src_data = random_floats(10000) # FIXME the 10k case fails! + src_data = random_floats(1000) + interpolation = 3 + + expected_result = reference_interp_filter(src_data, interpolation, taps) + + src = gr.vector_source_f(src_data) + op = gr.rational_resampler_base_fff(interpolation, 1, taps) + dst = gr.vector_sink_f() + self.fg.connect(src, op) + self.fg.connect(op, dst) + self.fg.run() + result_data = dst.data() + + L1 = len(result_data) + L2 = len(expected_result) + L = min(L1, L2) + if False: + sys.stderr.write('delta = %2d: ntaps = %d interp = %d ilen = %d\n' % + (L2 - L1, len(taps), interpolation, len(src_data))) + sys.stderr.write(' len(result_data) = %d len(expected_result) = %d\n' % + (len(result_data), len(expected_result))) + #self.assertEqual(expected_result[0:L], result_data[0:L]) + # FIXME check first 3 answers + self.assertEqual(expected_result[3:L], result_data[3:L]) + + def test_003_interp(self): + taps = random_floats(31) + src_data = random_floats(10000) + decimation = 3 + + expected_result = reference_dec_filter(src_data, decimation, taps) + + src = gr.vector_source_f(src_data) + op = gr.rational_resampler_base_fff(1, decimation, taps) + dst = gr.vector_sink_f() + self.fg.connect(src, op) + self.fg.connect(op, dst) + self.fg.run() + result_data = dst.data() + + L1 = len(result_data) + L2 = len(expected_result) + L = min(L1, L2) + if False: + sys.stderr.write('delta = %2d: ntaps = %d decim = %d ilen = %d\n' % + (L2 - L1, len(taps), decimation, len(src_data))) + sys.stderr.write(' len(result_data) = %d len(expected_result) = %d\n' % + (len(result_data), len(expected_result))) + self.assertEqual(expected_result[0:L], result_data[0:L]) + + # FIXME disabled. Triggers hang on SuSE 10.0 + def xtest_004_decim_random_vals(self): + MAX_TAPS = 9 + MAX_DECIM = 7 + OUTPUT_LEN = 9 + + random.seed(0) # we want reproducibility + + for ntaps in xrange(1, MAX_TAPS + 1): + for decim in xrange(1, MAX_DECIM+1): + for ilen in xrange(ntaps + decim, ntaps + OUTPUT_LEN*decim): + src_data = random_floats(ilen) + taps = random_floats(ntaps) + expected_result = reference_dec_filter(src_data, decim, taps) + + fg = gr.flow_graph() + src = gr.vector_source_f(src_data) + op = gr.rational_resampler_base_fff(1, decim, taps) + dst = gr.vector_sink_f() + fg.connect(src, op, dst) + fg.run() + fg = None + result_data = dst.data() + L1 = len(result_data) + L2 = len(expected_result) + L = min(L1, L2) + if False: + sys.stderr.write('delta = %2d: ntaps = %d decim = %d ilen = %d\n' % (L2 - L1, ntaps, decim, ilen)) + sys.stderr.write(' len(result_data) = %d len(expected_result) = %d\n' % + (len(result_data), len(expected_result))) + self.assertEqual(expected_result[0:L], result_data[0:L]) + + + # FIXME disabled. Triggers hang on SuSE 10.0 + def xtest_005_interp_random_vals(self): + MAX_TAPS = 9 + MAX_INTERP = 7 + INPUT_LEN = 9 + + random.seed(0) # we want reproducibility + + for ntaps in xrange(1, MAX_TAPS + 1): + for interp in xrange(1, MAX_INTERP+1): + for ilen in xrange(ntaps, ntaps + INPUT_LEN): + src_data = random_floats(ilen) + taps = random_floats(ntaps) + expected_result = reference_interp_filter(src_data, interp, taps) + + fg = gr.flow_graph() + src = gr.vector_source_f(src_data) + op = gr.rational_resampler_base_fff(interp, 1, taps) + dst = gr.vector_sink_f() + fg.connect(src, op, dst) + fg.run() + fg = None + result_data = dst.data() + L1 = len(result_data) + L2 = len(expected_result) + L = min(L1, L2) + #if True or abs(L1-L2) > 1: + if False: + sys.stderr.write('delta = %2d: ntaps = %d interp = %d ilen = %d\n' % (L2 - L1, ntaps, interp, ilen)) + #sys.stderr.write(' len(result_data) = %d len(expected_result) = %d\n' % + # (len(result_data), len(expected_result))) + #self.assertEqual(expected_result[0:L], result_data[0:L]) + # FIXME check first ntaps+1 answers + self.assertEqual(expected_result[ntaps+1:L], result_data[ntaps+1:L]) + + + def test_006_interp_decim(self): + taps = (0,1,0,0) + src_data = range(10000) + interp = 3 + decimation = 2 + + expected_result = reference_interp_dec_filter(src_data, interp, decimation, taps) + + src = gr.vector_source_f(src_data) + op = gr.rational_resampler_base_fff(interp, decimation, taps) + dst = gr.vector_sink_f() + self.fg.connect(src, op) + self.fg.connect(op, dst) + self.fg.run() + result_data = dst.data() + + L1 = len(result_data) + L2 = len(expected_result) + L = min(L1, L2) + if False: + sys.stderr.write('delta = %2d: ntaps = %d decim = %d ilen = %d\n' % + (L2 - L1, len(taps), decimation, len(src_data))) + sys.stderr.write(' len(result_data) = %d len(expected_result) = %d\n' % + (len(result_data), len(expected_result))) + self.assertEqual(expected_result[1:L], result_data[1:L]) + + # + # test the blks.rational_resampler_??? primitives... + # + + def test_101_interp(self): + taps = [1, 10, 100, 1000, 10000] + src_data = (0, 2, 3, 5, 7, 11, 13, 17) + interpolation = 3 + xr = (0,0,0,0,2,20,200,2003,20030,300,3005,30050,500,5007,50070,700,7011,70110,1100,11013,110130,1300,13017,130170,1700.0,17000.0,170000.0) + expected_result = tuple([float(x) for x in xr]) + + src = gr.vector_source_f(src_data) + op = blks.rational_resampler_fff(self.fg, interpolation, 1, taps=taps) + dst = gr.vector_sink_f() + self.fg.connect(src, op) + self.fg.connect(op, dst) + self.fg.run() + result_data = dst.data() + self.assertEqual(expected_result, result_data) + + +if __name__ == '__main__': + gr_unittest.main() + diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_sig_source.py b/gnuradio-core/src/python/gnuradio/gr/qa_sig_source.py new file mode 100755 index 0000000000..39866ac43b --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_sig_source.py @@ -0,0 +1,85 @@ +#!/usr/bin/env python +# +# Copyright 2004 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest +import math + +class test_sig_source (gr_unittest.TestCase): + + def setUp (self): + self.fg = gr.flow_graph () + + def tearDown (self): + self.fg = None + + def test_const_f (self): + fg = self.fg + expected_result = (1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5) + src1 = gr.sig_source_f (1e6, gr.GR_CONST_WAVE, 0, 1.5) + op = gr.head (gr.sizeof_float, 10) + dst1 = gr.vector_sink_f () + fg.connect (src1, op) + fg.connect (op, dst1) + fg.run () + dst_data = dst1.data () + self.assertEqual (expected_result, dst_data) + + def test_const_i (self): + fg = self.fg + expected_result = (1, 1, 1, 1) + src1 = gr.sig_source_i (1e6, gr.GR_CONST_WAVE, 0, 1) + op = gr.head (gr.sizeof_int, 4) + dst1 = gr.vector_sink_i () + fg.connect (src1, op) + fg.connect (op, dst1) + fg.run () + dst_data = dst1.data () + self.assertEqual (expected_result, dst_data) + + def test_sine_f (self): + fg = self.fg + sqrt2 = math.sqrt(2) / 2 + expected_result = (0, sqrt2, 1, sqrt2, 0, -sqrt2, -1, -sqrt2, 0) + src1 = gr.sig_source_f (8, gr.GR_SIN_WAVE, 1.0, 1.0) + op = gr.head (gr.sizeof_float, 9) + dst1 = gr.vector_sink_f () + fg.connect (src1, op) + fg.connect (op, dst1) + fg.run () + dst_data = dst1.data () + self.assertFloatTuplesAlmostEqual (expected_result, dst_data, 5) + + def test_cosine_f (self): + fg = self.fg + sqrt2 = math.sqrt(2) / 2 + expected_result = (1, sqrt2, 0, -sqrt2, -1, -sqrt2, 0, sqrt2, 1) + src1 = gr.sig_source_f (8, gr.GR_COS_WAVE, 1.0, 1.0) + op = gr.head (gr.sizeof_float, 9) + dst1 = gr.vector_sink_f () + fg.connect (src1, op) + fg.connect (op, dst1) + fg.run () + dst_data = dst1.data () + self.assertFloatTuplesAlmostEqual (expected_result, dst_data, 5) + +if __name__ == '__main__': + gr_unittest.main () diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_single_pole_iir.py b/gnuradio-core/src/python/gnuradio/gr/qa_single_pole_iir.py new file mode 100755 index 0000000000..5898188f82 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_single_pole_iir.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python +# +# Copyright 2005 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest + +class test_single_pole_iir(gr_unittest.TestCase): + + def setUp (self): + self.fg = gr.flow_graph () + + def tearDown (self): + self.fg = None + + def test_001(self): + src_data = (0, 1000, 2000, 3000, 4000, 5000) + expected_result = src_data + src = gr.vector_source_f(src_data) + op = gr.single_pole_iir_filter_ff (1.0) + dst = gr.vector_sink_f() + self.fg.connect (src, op, dst) + self.fg.run() + result_data = dst.data() + self.assertFloatTuplesAlmostEqual (expected_result, result_data) + + def test_002(self): + src_data = (0, 1000, 2000, 3000, 4000, 5000) + expected_result = (0, 125, 359.375, 689.453125, 1103.271484, 1590.36255) + src = gr.vector_source_f(src_data) + op = gr.single_pole_iir_filter_ff (0.125) + dst = gr.vector_sink_f() + self.fg.connect (src, op, dst) + self.fg.run() + result_data = dst.data() + self.assertFloatTuplesAlmostEqual (expected_result, result_data, 3) + + def test_003(self): + block_size = 2 + src_data = (0, 1000, 2000, 3000, 4000, 5000) + expected_result = (0, 125, 250, 484.375, 718.75, 1048.828125) + src = gr.vector_source_f(src_data) + s2p = gr.serial_to_parallel(gr.sizeof_float, block_size) + op = gr.single_pole_iir_filter_ff (0.125, block_size) + p2s = gr.parallel_to_serial(gr.sizeof_float, block_size) + dst = gr.vector_sink_f() + self.fg.connect (src, s2p, op, p2s, dst) + self.fg.run() + result_data = dst.data() + self.assertFloatTuplesAlmostEqual (expected_result, result_data, 3) + + +if __name__ == '__main__': + gr_unittest.main () + diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_single_pole_iir_cc.py b/gnuradio-core/src/python/gnuradio/gr/qa_single_pole_iir_cc.py new file mode 100755 index 0000000000..a7889d177d --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_single_pole_iir_cc.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python +# +# Copyright 2005,2006 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest + +class test_single_pole_iir_cc(gr_unittest.TestCase): + + def setUp (self): + self.fg = gr.flow_graph () + + def tearDown (self): + self.fg = None + + def test_001(self): + src_data = (0+0j, 1000+1000j, 2000+2000j, 3000+3000j, 4000+4000j, 5000+5000j) + expected_result = src_data + src = gr.vector_source_c(src_data) + op = gr.single_pole_iir_filter_cc (1.0) + dst = gr.vector_sink_c() + self.fg.connect (src, op, dst) + self.fg.run() + result_data = dst.data() + self.assertComplexTuplesAlmostEqual (expected_result, result_data) + + def test_002(self): + src_data = (complex(0,0), complex(1000,-1000), complex(2000,-2000), complex(3000,-3000), complex(4000,-4000), complex(5000,-5000)) + expected_result = (complex(0,0), complex(125,-125), complex(359.375,-359.375), complex(689.453125,-689.453125), complex(1103.271484,-1103.271484), complex(1590.36255,-1590.36255)) + src = gr.vector_source_c(src_data) + op = gr.single_pole_iir_filter_cc (0.125) + dst = gr.vector_sink_c() + self.fg.connect (src, op, dst) + self.fg.run() + result_data = dst.data() + self.assertComplexTuplesAlmostEqual (expected_result, result_data, 3) + + def test_003(self): + block_size = 2 + src_data = (complex(0,0), complex(1000,-1000), complex(2000,-2000), complex(3000,-3000), complex(4000,-4000), complex(5000,-5000)) + expected_result = (complex(0,0), complex(125,-125), complex(250,-250), complex(484.375,-484.375), complex(718.75,-718.75), complex(1048.828125,-1048.828125)) + src = gr.vector_source_c(src_data) + s2p = gr.serial_to_parallel(gr.sizeof_gr_complex, block_size) + op = gr.single_pole_iir_filter_cc (0.125, block_size) + p2s = gr.parallel_to_serial(gr.sizeof_gr_complex, block_size) + dst = gr.vector_sink_c() + self.fg.connect (src, s2p, op, p2s, dst) + self.fg.run() + result_data = dst.data() + self.assertComplexTuplesAlmostEqual (expected_result, result_data, 3) + + +if __name__ == '__main__': + gr_unittest.main () + diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_unpack_k_bits.py b/gnuradio-core/src/python/gnuradio/gr/qa_unpack_k_bits.py new file mode 100755 index 0000000000..59f838a4df --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_unpack_k_bits.py @@ -0,0 +1,57 @@ +#!/usr/bin/env python +# +# Copyright 2006 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio import gr, gr_unittest +import random + +class test_unpack(gr_unittest.TestCase): + + def setUp(self): + self.fg = gr.flow_graph () + + def tearDown(self): + self.fg = None + + def test_001(self): + src_data = (1,0,1,1,0,1,1,0) + expected_results = (1,0,1,1,0,1,1,0) + src = gr.vector_source_b(src_data,False) + op = gr.unpack_k_bits_bb(1) + dst = gr.vector_sink_b() + self.fg.connect(src, op, dst) + self.fg.run() + self.assertEqual(expected_results, dst.data()) + + def test_002(self): + src_data = ( 2, 3, 0, 1) + expected_results = (1,0,1,1,0,0,0,1) + src = gr.vector_source_b(src_data,False) + op = gr.unpack_k_bits_bb(2) + dst = gr.vector_sink_b() + self.fg.connect(src, op, dst) + self.fg.run() + self.assertEqual(expected_results, dst.data()) + + +if __name__ == '__main__': + gr_unittest.main () + diff --git a/gnuradio-core/src/python/gnuradio/gr/run_tests.in b/gnuradio-core/src/python/gnuradio/gr/run_tests.in new file mode 100755 index 0000000000..87d0afdafd --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/run_tests.in @@ -0,0 +1,33 @@ +#!/bin/sh + +swigbld=@abs_top_builddir@/gnuradio-core/src/lib/swig +swigsrc=@abs_top_srcdir@/gnuradio-core/src/lib/swig +py=@abs_top_srcdir@/gnuradio-core/src/python + +PYTHONPATH="$swigbld:$swigbld/.libs:$swigsrc:$py" +export PYTHONPATH + +# for OS/X +DYLD_LIBRARY_PATH="@abs_top_builddir@/gnuradio-core/src/lib/.libs" +export DYLD_LIBRARY_PATH + +# Don't load user or system prefs +GR_DONT_LOAD_PREFS=1 +export GR_DONT_LOAD_PREFS + +ok=yes +for file in @srcdir@/qa_*.py +do + echo $file + if ! $file + then + ok=no + fi +done + +if [ $ok = yes ] +then + exit 0 +else + exit 1 +fi diff --git a/gnuradio-core/src/python/gnuradio/gr/scheduler.py b/gnuradio-core/src/python/gnuradio/gr/scheduler.py new file mode 100644 index 0000000000..919d07f0a4 --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/scheduler.py @@ -0,0 +1,70 @@ +# +# Copyright 2004 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 59 Temple Place - Suite 330, +# Boston, MA 02111-1307, USA. +# + +from gnuradio.gr.exceptions import * +from gnuradio_swig_python import single_threaded_scheduler, sts_pyrun +import gr_threading as _threading +#import threading as _threading + +class scheduler_thread(_threading.Thread): + def __init__(self, sts): + _threading.Thread.__init__(self) + self.sts = sts + def run(self): + # Invoke the single threaded scheduler's run method + # + # Note that we're in a new thread, and that sts_pyrun + # releases the global interpreter lock. This has the + # effect of evaluating the graph in parallel to the + # main line control code. + sts_pyrun(self.sts) + self.sts = None + +class scheduler(object): + def __init__(self, fg): + graphs = fg.partition_graph(fg.blocks) + # print "@@@ # graphs = %d" % (len(graphs)) + + self.state = [] + + for g in graphs: + list_of_blocks = [x.block() for x in g] + sts = single_threaded_scheduler(list_of_blocks) + thread = scheduler_thread(sts) + thread.setDaemon(1) + self.state.append((sts, thread)) + + def start(self): + for (sts, thread) in self.state: + thread.start() + + def stop(self): + for (sts, thread) in self.state: + sts.stop() + self.wait() + + def wait(self): + for (sts, thread) in self.state: + timeout = 0.100 + while True: + thread.join(timeout) + if not thread.isAlive(): + break |