summaryrefslogtreecommitdiff
path: root/gr-blocks/python
diff options
context:
space:
mode:
Diffstat (limited to 'gr-blocks/python')
-rw-r--r--gr-blocks/python/CMakeLists.txt1
-rw-r--r--gr-blocks/python/__init__.py3
-rw-r--r--gr-blocks/python/parse_file_metadata.py71
-rwxr-xr-xgr-blocks/python/qa_add_mult_v.py98
-rw-r--r--gr-blocks/python/qa_argmax.py78
-rwxr-xr-xgr-blocks/python/qa_bin_statistics.py231
-rw-r--r--gr-blocks/python/qa_file_metadata.py49
-rwxr-xr-xgr-blocks/python/qa_max.py71
-rwxr-xr-xgr-blocks/python/qa_pdu.py14
-rwxr-xr-xgr-blocks/python/qa_pipe_fittings.py20
-rw-r--r--gr-blocks/python/qa_probe_signal.py68
-rwxr-xr-xgr-blocks/python/qa_stream_mux.py2
-rwxr-xr-xgr-blocks/python/qa_stretch.py4
-rw-r--r--gr-blocks/python/stream_to_vector_decimator.py106
14 files changed, 687 insertions, 129 deletions
diff --git a/gr-blocks/python/CMakeLists.txt b/gr-blocks/python/CMakeLists.txt
index cab0b956f7..841588799b 100644
--- a/gr-blocks/python/CMakeLists.txt
+++ b/gr-blocks/python/CMakeLists.txt
@@ -24,6 +24,7 @@ GR_PYTHON_INSTALL(
FILES
__init__.py
parse_file_metadata.py
+ stream_to_vector_decimator.py
DESTINATION ${GR_PYTHON_DIR}/gnuradio/blocks
COMPONENT "blocks_python"
)
diff --git a/gr-blocks/python/__init__.py b/gr-blocks/python/__init__.py
index 6577d933e0..56d274918c 100644
--- a/gr-blocks/python/__init__.py
+++ b/gr-blocks/python/__init__.py
@@ -25,8 +25,9 @@ processing blocks common to many flowgraphs.
'''
from blocks_swig import *
+from stream_to_vector_decimator import *
-#alias old gr_add_vXX and gr_multiply_vXX
+#alias old add_vXX and multiply_vXX
add_vcc = add_cc
add_vff = add_ff
add_vii = add_ii
diff --git a/gr-blocks/python/parse_file_metadata.py b/gr-blocks/python/parse_file_metadata.py
index c8ac2def94..eaa8025bbf 100644
--- a/gr-blocks/python/parse_file_metadata.py
+++ b/gr-blocks/python/parse_file_metadata.py
@@ -66,14 +66,14 @@ def parse_header(p, VERBOSE=False):
info = dict()
- if(pmt.pmt_is_dict(p) is False):
+ if(pmt.is_dict(p) is False):
sys.stderr.write("Header is not a PMT dictionary: invalid or corrupt data file.\n")
sys.exit(1)
# GET FILE FORMAT VERSION NUMBER
- if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("version"))):
- r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("version"), dump)
- version = pmt.pmt_to_long(r)
+ if(pmt.dict_has_key(p, pmt.string_to_symbol("version"))):
+ r = pmt.dict_ref(p, pmt.string_to_symbol("version"), dump)
+ version = pmt.to_long(r)
if(VERBOSE):
print "Version Number: {0}".format(version)
else:
@@ -81,9 +81,9 @@ def parse_header(p, VERBOSE=False):
sys.exit(1)
# EXTRACT SAMPLE RATE
- if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("rx_rate"))):
- r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("rx_rate"), dump)
- samp_rate = pmt.pmt_to_double(r)
+ if(pmt.dict_has_key(p, pmt.string_to_symbol("rx_rate"))):
+ r = pmt.dict_ref(p, pmt.string_to_symbol("rx_rate"), dump)
+ samp_rate = pmt.to_double(r)
info["rx_rate"] = samp_rate
if(VERBOSE):
print "Sample Rate: {0:.2f} sps".format(samp_rate)
@@ -92,12 +92,12 @@ def parse_header(p, VERBOSE=False):
sys.exit(1)
# EXTRACT TIME STAMP
- if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("rx_time"))):
- r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("rx_time"), dump)
- pmt_secs = pmt.pmt_tuple_ref(r, 0)
- pmt_fracs = pmt.pmt_tuple_ref(r, 1)
- secs = float(pmt.pmt_to_uint64(pmt_secs))
- fracs = pmt.pmt_to_double(pmt_fracs)
+ if(pmt.dict_has_key(p, pmt.string_to_symbol("rx_time"))):
+ r = pmt.dict_ref(p, pmt.string_to_symbol("rx_time"), dump)
+ secs = pmt.tuple_ref(r, 0)
+ fracs = pmt.tuple_ref(r, 1)
+ secs = float(pmt.to_uint64(secs))
+ fracs = pmt.to_double(fracs)
t = secs + fracs
info["rx_time"] = t
if(VERBOSE):
@@ -107,9 +107,9 @@ def parse_header(p, VERBOSE=False):
sys.exit(1)
# EXTRACT ITEM SIZE
- if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("size"))):
- r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("size"), dump)
- dsize = pmt.pmt_to_long(r)
+ if(pmt.dict_has_key(p, pmt.string_to_symbol("size"))):
+ r = pmt.dict_ref(p, pmt.string_to_symbol("size"), dump)
+ dsize = pmt.to_long(r)
info["size"] = dsize
if(VERBOSE):
print "Item size: {0}".format(dsize)
@@ -118,9 +118,9 @@ def parse_header(p, VERBOSE=False):
sys.exit(1)
# EXTRACT DATA TYPE
- if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("type"))):
- r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("type"), dump)
- dtype = pmt.pmt_to_long(r)
+ if(pmt.dict_has_key(p, pmt.string_to_symbol("type"))):
+ r = pmt.dict_ref(p, pmt.string_to_symbol("type"), dump)
+ dtype = pmt.to_long(r)
stype = ftype_to_string[dtype]
info["type"] = stype
if(VERBOSE):
@@ -130,9 +130,9 @@ def parse_header(p, VERBOSE=False):
sys.exit(1)
# EXTRACT COMPLEX
- if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("cplx"))):
- r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("cplx"), dump)
- cplx = pmt.pmt_to_bool(r)
+ if(pmt.dict_has_key(p, pmt.string_to_symbol("cplx"))):
+ r = pmt.dict_ref(p, pmt.string_to_symbol("cplx"), dump)
+ cplx = pmt.to_bool(r)
info["cplx"] = cplx
if(VERBOSE):
print "Complex? {0}".format(cplx)
@@ -141,9 +141,9 @@ def parse_header(p, VERBOSE=False):
sys.exit(1)
# EXTRACT WHERE CURRENT SEGMENT STARTS
- if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("strt"))):
- r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("strt"), dump)
- seg_start = pmt.pmt_to_uint64(r)
+ if(pmt.dict_has_key(p, pmt.string_to_symbol("strt"))):
+ r = pmt.dict_ref(p, pmt.string_to_symbol("strt"), dump)
+ seg_start = pmt.to_uint64(r)
info["hdr_len"] = seg_start
info["extra_len"] = seg_start - HEADER_LENGTH
info["has_extra"] = info["extra_len"] > 0
@@ -156,9 +156,9 @@ def parse_header(p, VERBOSE=False):
sys.exit(1)
# EXTRACT SIZE OF DATA
- if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("bytes"))):
- r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("bytes"), dump)
- nbytes = pmt.pmt_to_uint64(r)
+ if(pmt.dict_has_key(p, pmt.string_to_symbol("bytes"))):
+ r = pmt.dict_ref(p, pmt.string_to_symbol("bytes"), dump)
+ nbytes = pmt.to_uint64(r)
nitems = nbytes/dsize
info["nitems"] = nitems
@@ -175,19 +175,18 @@ def parse_header(p, VERBOSE=False):
# IF THERE IS EXTRA DATA, PULL OUT THE DICTIONARY AND PARSE IT
def parse_extra_dict(p, info, VERBOSE=False):
- if(pmt.pmt_is_dict(p) is False):
+ if(pmt.is_dict(p) is False):
sys.stderr.write("Extra header is not a PMT dictionary: invalid or corrupt data file.\n")
sys.exit(1)
- items = pmt.pmt_dict_items(p)
- nitems = pmt.pmt_length(items)
+ items = pmt.dict_items(p)
+ nitems = pmt.length(items)
for i in xrange(nitems):
- item = pmt.pmt_nth(i, items)
- key = pmt.pmt_symbol_to_string(pmt.pmt_car(item))
- val = pmt.pmt_cdr(item)
+ item = pmt.nth(i, items)
+ key = pmt.symbol_to_string(pmt.car(item))
+ val = pmt.cdr(item)
info[key] = val
if(VERBOSE):
- print "{0}: ".format(key)
- pmt.pmt_print(val)
+ print "{0}: {1}".format(key, val)
return info
diff --git a/gr-blocks/python/qa_add_mult_v.py b/gr-blocks/python/qa_add_mult_v.py
index d362cb8859..13cb71df2c 100755
--- a/gr-blocks/python/qa_add_mult_v.py
+++ b/gr-blocks/python/qa_add_mult_v.py
@@ -21,7 +21,7 @@
#
from gnuradio import gr, gr_unittest
-import blocks_swig
+import blocks_swig as blocks
class test_add_mult_v(gr_unittest.TestCase):
@@ -34,10 +34,10 @@ class test_add_mult_v(gr_unittest.TestCase):
def help_ss(self, size, src_data, exp_data, op):
for s in zip(range (len (src_data)), src_data):
src = gr.vector_source_s(s[1])
- srcv = gr.stream_to_vector(gr.sizeof_short, size)
+ srcv = blocks.stream_to_vector(gr.sizeof_short, size)
self.tb.connect(src, srcv)
self.tb.connect(srcv, (op, s[0]))
- rhs = gr.vector_to_stream(gr.sizeof_short, size)
+ rhs = blocks.vector_to_stream(gr.sizeof_short, size)
dst = gr.vector_sink_s()
self.tb.connect(op, rhs, dst)
self.tb.run()
@@ -47,10 +47,10 @@ class test_add_mult_v(gr_unittest.TestCase):
def help_ii(self, size, src_data, exp_data, op):
for s in zip(range (len (src_data)), src_data):
src = gr.vector_source_i(s[1])
- srcv = gr.stream_to_vector(gr.sizeof_int, size)
+ srcv = blocks.stream_to_vector(gr.sizeof_int, size)
self.tb.connect(src, srcv)
self.tb.connect(srcv, (op, s[0]))
- rhs = gr.vector_to_stream(gr.sizeof_int, size)
+ rhs = blocks.vector_to_stream(gr.sizeof_int, size)
dst = gr.vector_sink_i()
self.tb.connect(op, rhs, dst)
self.tb.run()
@@ -60,10 +60,10 @@ class test_add_mult_v(gr_unittest.TestCase):
def help_ff(self, size, src_data, exp_data, op):
for s in zip(range (len (src_data)), src_data):
src = gr.vector_source_f(s[1])
- srcv = gr.stream_to_vector(gr.sizeof_float, size)
+ srcv = blocks.stream_to_vector(gr.sizeof_float, size)
self.tb.connect(src, srcv)
self.tb.connect(srcv, (op, s[0]))
- rhs = gr.vector_to_stream(gr.sizeof_float, size)
+ rhs = blocks.vector_to_stream(gr.sizeof_float, size)
dst = gr.vector_sink_f()
self.tb.connect(op, rhs, dst)
self.tb.run()
@@ -73,10 +73,10 @@ class test_add_mult_v(gr_unittest.TestCase):
def help_cc(self, size, src_data, exp_data, op):
for s in zip(range (len (src_data)), src_data):
src = gr.vector_source_c(s[1])
- srcv = gr.stream_to_vector(gr.sizeof_gr_complex, size)
+ srcv = blocks.stream_to_vector(gr.sizeof_gr_complex, size)
self.tb.connect(src, srcv)
self.tb.connect(srcv, (op, s[0]))
- rhs = gr.vector_to_stream(gr.sizeof_gr_complex, size)
+ rhs = blocks.vector_to_stream(gr.sizeof_gr_complex, size)
dst = gr.vector_sink_c()
self.tb.connect(op, rhs, dst)
self.tb.run()
@@ -85,8 +85,8 @@ class test_add_mult_v(gr_unittest.TestCase):
def help_const_ss(self, src_data, exp_data, op):
src = gr.vector_source_s(src_data)
- srcv = gr.stream_to_vector(gr.sizeof_short, len(src_data))
- rhs = gr.vector_to_stream(gr.sizeof_short, len(src_data))
+ srcv = blocks.stream_to_vector(gr.sizeof_short, len(src_data))
+ rhs = blocks.vector_to_stream(gr.sizeof_short, len(src_data))
dst = gr.vector_sink_s()
self.tb.connect(src, srcv, op, rhs, dst)
self.tb.run()
@@ -95,8 +95,8 @@ class test_add_mult_v(gr_unittest.TestCase):
def help_const_ii(self, src_data, exp_data, op):
src = gr.vector_source_i(src_data)
- srcv = gr.stream_to_vector(gr.sizeof_int, len(src_data))
- rhs = gr.vector_to_stream(gr.sizeof_int, len(src_data))
+ srcv = blocks.stream_to_vector(gr.sizeof_int, len(src_data))
+ rhs = blocks.vector_to_stream(gr.sizeof_int, len(src_data))
dst = gr.vector_sink_i()
self.tb.connect(src, srcv, op, rhs, dst)
self.tb.run()
@@ -105,8 +105,8 @@ class test_add_mult_v(gr_unittest.TestCase):
def help_const_ff(self, src_data, exp_data, op):
src = gr.vector_source_f(src_data)
- srcv = gr.stream_to_vector(gr.sizeof_float, len(src_data))
- rhs = gr.vector_to_stream(gr.sizeof_float, len(src_data))
+ srcv = blocks.stream_to_vector(gr.sizeof_float, len(src_data))
+ rhs = blocks.vector_to_stream(gr.sizeof_float, len(src_data))
dst = gr.vector_sink_f()
self.tb.connect(src, srcv, op, rhs, dst)
self.tb.run()
@@ -115,8 +115,8 @@ class test_add_mult_v(gr_unittest.TestCase):
def help_const_cc(self, src_data, exp_data, op):
src = gr.vector_source_c(src_data)
- srcv = gr.stream_to_vector(gr.sizeof_gr_complex, len(src_data))
- rhs = gr.vector_to_stream(gr.sizeof_gr_complex, len(src_data))
+ srcv = blocks.stream_to_vector(gr.sizeof_gr_complex, len(src_data))
+ rhs = blocks.vector_to_stream(gr.sizeof_gr_complex, len(src_data))
dst = gr.vector_sink_c()
self.tb.connect(src, srcv, op, rhs, dst)
self.tb.run()
@@ -130,7 +130,7 @@ class test_add_mult_v(gr_unittest.TestCase):
src2_data = (2,)
src3_data = (3,)
expected_result = (6,)
- op = blocks_swig.add_ss(1)
+ op = blocks.add_ss(1)
self.help_ss(1, (src1_data, src2_data, src3_data), expected_result, op)
def test_add_vss_five(self):
@@ -138,7 +138,7 @@ class test_add_mult_v(gr_unittest.TestCase):
src2_data = (6, 7, 8, 9, 10)
src3_data = (11, 12, 13, 14, 15)
expected_result = (18, 21, 24, 27, 30)
- op = blocks_swig.add_ss(5)
+ op = blocks.add_ss(5)
self.help_ss(5, (src1_data, src2_data, src3_data), expected_result, op)
def test_add_vii_one(self):
@@ -146,7 +146,7 @@ class test_add_mult_v(gr_unittest.TestCase):
src2_data = (2,)
src3_data = (3,)
expected_result = (6,)
- op = blocks_swig.add_ii(1)
+ op = blocks.add_ii(1)
self.help_ii(1, (src1_data, src2_data, src3_data), expected_result, op)
def test_add_vii_five(self):
@@ -154,7 +154,7 @@ class test_add_mult_v(gr_unittest.TestCase):
src2_data = (6, 7, 8, 9, 10)
src3_data = (11, 12, 13, 14, 15)
expected_result = (18, 21, 24, 27, 30)
- op = blocks_swig.add_ii(5)
+ op = blocks.add_ii(5)
self.help_ii(5, (src1_data, src2_data, src3_data), expected_result, op)
def test_add_vff_one(self):
@@ -162,7 +162,7 @@ class test_add_mult_v(gr_unittest.TestCase):
src2_data = (2.0,)
src3_data = (3.0,)
expected_result = (6.0,)
- op = blocks_swig.add_ff(1)
+ op = blocks.add_ff(1)
self.help_ff(1, (src1_data, src2_data, src3_data), expected_result, op)
def test_add_vff_five(self):
@@ -170,7 +170,7 @@ class test_add_mult_v(gr_unittest.TestCase):
src2_data = (6.0, 7.0, 8.0, 9.0, 10.0)
src3_data = (11.0, 12.0, 13.0, 14.0, 15.0)
expected_result = (18.0, 21.0, 24.0, 27.0, 30.0)
- op = blocks_swig.add_ff(5)
+ op = blocks.add_ff(5)
self.help_ff(5, (src1_data, src2_data, src3_data), expected_result, op)
def test_add_vcc_one(self):
@@ -178,7 +178,7 @@ class test_add_mult_v(gr_unittest.TestCase):
src2_data = (3.0+4.0j,)
src3_data = (5.0+6.0j,)
expected_result = (9.0+12j,)
- op = blocks_swig.add_cc(1)
+ op = blocks.add_cc(1)
self.help_cc(1, (src1_data, src2_data, src3_data), expected_result, op)
def test_add_vcc_five(self):
@@ -186,56 +186,56 @@ class test_add_mult_v(gr_unittest.TestCase):
src2_data = (11.0+12.0j, 13.0+14.0j, 15.0+16.0j, 17.0+18.0j, 19.0+20.0j)
src3_data = (21.0+22.0j, 23.0+24.0j, 25.0+26.0j, 27.0+28.0j, 29.0+30.0j)
expected_result = (33.0+36.0j, 39.0+42.0j, 45.0+48.0j, 51.0+54.0j, 57.0+60.0j)
- op = blocks_swig.add_cc(5)
+ op = blocks.add_cc(5)
self.help_cc(5, (src1_data, src2_data, src3_data), expected_result, op)
# add_const_vXX
def test_add_const_vss_one(self):
src_data = (1,)
- op = blocks_swig.add_const_vss((2,))
+ op = blocks.add_const_vss((2,))
exp_data = (3,)
self.help_const_ss(src_data, exp_data, op)
def test_add_const_vss_five(self):
src_data = (1, 2, 3, 4, 5)
- op = blocks_swig.add_const_vss((6, 7, 8, 9, 10))
+ op = blocks.add_const_vss((6, 7, 8, 9, 10))
exp_data = (7, 9, 11, 13, 15)
self.help_const_ss(src_data, exp_data, op)
def test_add_const_vii_one(self):
src_data = (1,)
- op = blocks_swig.add_const_vii((2,))
+ op = blocks.add_const_vii((2,))
exp_data = (3,)
self.help_const_ii(src_data, exp_data, op)
def test_add_const_vii_five(self):
src_data = (1, 2, 3, 4, 5)
- op = blocks_swig.add_const_vii((6, 7, 8, 9, 10))
+ op = blocks.add_const_vii((6, 7, 8, 9, 10))
exp_data = (7, 9, 11, 13, 15)
self.help_const_ii(src_data, exp_data, op)
def test_add_const_vff_one(self):
src_data = (1.0,)
- op = blocks_swig.add_const_vff((2.0,))
+ op = blocks.add_const_vff((2.0,))
exp_data = (3.0,)
self.help_const_ff(src_data, exp_data, op)
def test_add_const_vff_five(self):
src_data = (1.0, 2.0, 3.0, 4.0, 5.0)
- op = blocks_swig.add_const_vff((6.0, 7.0, 8.0, 9.0, 10.0))
+ op = blocks.add_const_vff((6.0, 7.0, 8.0, 9.0, 10.0))
exp_data = (7.0, 9.0, 11.0, 13.0, 15.0)
self.help_const_ff(src_data, exp_data, op)
def test_add_const_vcc_one(self):
src_data = (1.0+2.0j,)
- op = blocks_swig.add_const_vcc((2.0+3.0j,))
+ op = blocks.add_const_vcc((2.0+3.0j,))
exp_data = (3.0+5.0j,)
self.help_const_cc(src_data, exp_data, op)
def test_add_const_vcc_five(self):
src_data = (1.0+2.0j, 3.0+4.0j, 5.0+6.0j, 7.0+8.0j, 9.0+10.0j)
- op = blocks_swig.add_const_vcc((11.0+12.0j, 13.0+14.0j, 15.0+16.0j, 17.0+18.0j, 19.0+20.0j))
+ op = blocks.add_const_vcc((11.0+12.0j, 13.0+14.0j, 15.0+16.0j, 17.0+18.0j, 19.0+20.0j))
exp_data = (12.0+14.0j, 16.0+18.0j, 20.0+22.0j, 24.0+26.0j, 28.0+30.0j)
self.help_const_cc(src_data, exp_data, op)
@@ -246,7 +246,7 @@ class test_add_mult_v(gr_unittest.TestCase):
src2_data = (2,)
src3_data = (3,)
expected_result = (6,)
- op = gr.multiply_vss(1)
+ op = blocks.multiply_ss(1)
self.help_ss(1, (src1_data, src2_data, src3_data), expected_result, op)
def test_multiply_vss_five(self):
@@ -254,7 +254,7 @@ class test_add_mult_v(gr_unittest.TestCase):
src2_data = (6, 7, 8, 9, 10)
src3_data = (11, 12, 13, 14, 15)
expected_result = (66, 168, 312, 504, 750)
- op = gr.multiply_vss(5)
+ op = blocks.multiply_ss(5)
self.help_ss(5, (src1_data, src2_data, src3_data), expected_result, op)
def test_multiply_vii_one(self):
@@ -262,7 +262,7 @@ class test_add_mult_v(gr_unittest.TestCase):
src2_data = (2,)
src3_data = (3,)
expected_result = (6,)
- op = gr.multiply_vii(1)
+ op = blocks.multiply_ii(1)
self.help_ii(1, (src1_data, src2_data, src3_data), expected_result, op)
def test_multiply_vii_five(self):
@@ -270,7 +270,7 @@ class test_add_mult_v(gr_unittest.TestCase):
src2_data = (6, 7, 8, 9, 10)
src3_data = (11, 12, 13, 14, 15)
expected_result = (66, 168, 312, 504, 750)
- op = gr.multiply_vii(5)
+ op = blocks.multiply_ii(5)
self.help_ii(5, (src1_data, src2_data, src3_data), expected_result, op)
def test_multiply_vff_one(self):
@@ -278,7 +278,7 @@ class test_add_mult_v(gr_unittest.TestCase):
src2_data = (2.0,)
src3_data = (3.0,)
expected_result = (6.0,)
- op = gr.multiply_vff(1)
+ op = blocks.multiply_ff(1)
self.help_ff(1, (src1_data, src2_data, src3_data), expected_result, op)
def test_multiply_vff_five(self):
@@ -286,7 +286,7 @@ class test_add_mult_v(gr_unittest.TestCase):
src2_data = (6.0, 7.0, 8.0, 9.0, 10.0)
src3_data = (11.0, 12.0, 13.0, 14.0, 15.0)
expected_result = (66.0, 168.0, 312.0, 504.0, 750.0)
- op = gr.multiply_vff(5)
+ op = blocks.multiply_ff(5)
self.help_ff(5, (src1_data, src2_data, src3_data), expected_result, op)
def test_multiply_vcc_one(self):
@@ -294,7 +294,7 @@ class test_add_mult_v(gr_unittest.TestCase):
src2_data = (3.0+4.0j,)
src3_data = (5.0+6.0j,)
expected_result = (-85+20j,)
- op = gr.multiply_vcc(1)
+ op = blocks.multiply_cc(1)
self.help_cc(1, (src1_data, src2_data, src3_data), expected_result, op)
def test_multiply_vcc_five(self):
@@ -302,56 +302,56 @@ class test_add_mult_v(gr_unittest.TestCase):
src2_data = (11.0+12.0j, 13.0+14.0j, 15.0+16.0j, 17.0+18.0j, 19.0+20.0j)
src3_data = (21.0+22.0j, 23.0+24.0j, 25.0+26.0j, 27.0+28.0j, 29.0+30.0j)
expected_result = (-1021.0+428.0j, -2647.0+1754.0j, -4945.0+3704.0j, -8011.0+6374.0j, -11941.0+9860.0j)
- op = gr.multiply_vcc(5)
+ op = blocks.multiply_cc(5)
self.help_cc(5, (src1_data, src2_data, src3_data), expected_result, op)
# multiply_const_vXX
def test_multiply_const_vss_one(self):
src_data = (2,)
- op = gr.multiply_const_vss((3,))
+ op = blocks.multiply_const_vss((3,))
exp_data = (6,)
self.help_const_ss(src_data, exp_data, op)
def test_multiply_const_vss_five(self):
src_data = (1, 2, 3, 4, 5)
- op = gr.multiply_const_vss((6, 7, 8, 9, 10))
+ op = blocks.multiply_const_vss((6, 7, 8, 9, 10))
exp_data = (6, 14, 24, 36, 50)
self.help_const_ss(src_data, exp_data, op)
def test_multiply_const_vii_one(self):
src_data = (2,)
- op = gr.multiply_const_vii((3,))
+ op = blocks.multiply_const_vii((3,))
exp_data = (6,)
self.help_const_ii(src_data, exp_data, op)
def test_multiply_const_vii_five(self):
src_data = (1, 2, 3, 4, 5)
- op = gr.multiply_const_vii((6, 7, 8, 9, 10))
+ op = blocks.multiply_const_vii((6, 7, 8, 9, 10))
exp_data = (6, 14, 24, 36, 50)
self.help_const_ii(src_data, exp_data, op)
def test_multiply_const_vff_one(self):
src_data = (2.0,)
- op = gr.multiply_const_vff((3.0,))
+ op = blocks.multiply_const_vff((3.0,))
exp_data = (6.0,)
self.help_const_ff(src_data, exp_data, op)
def test_multiply_const_vff_five(self):
src_data = (1.0, 2.0, 3.0, 4.0, 5.0)
- op = gr.multiply_const_vff((6.0, 7.0, 8.0, 9.0, 10.0))
+ op = blocks.multiply_const_vff((6.0, 7.0, 8.0, 9.0, 10.0))
exp_data = (6.0, 14.0, 24.0, 36.0, 50.0)
self.help_const_ff(src_data, exp_data, op)
def test_multiply_const_vcc_one(self):
src_data = (1.0+2.0j,)
- op = gr.multiply_const_vcc((2.0+3.0j,))
+ op = blocks.multiply_const_vcc((2.0+3.0j,))
exp_data = (-4.0+7.0j,)
self.help_const_cc(src_data, exp_data, op)
def test_multiply_const_vcc_five(self):
src_data = (1.0+2.0j, 3.0+4.0j, 5.0+6.0j, 7.0+8.0j, 9.0+10.0j)
- op = gr.multiply_const_vcc((11.0+12.0j, 13.0+14.0j, 15.0+16.0j, 17.0+18.0j, 19.0+20.0j))
+ op = blocks.multiply_const_vcc((11.0+12.0j, 13.0+14.0j, 15.0+16.0j, 17.0+18.0j, 19.0+20.0j))
exp_data = (-13.0+34.0j, -17.0+94.0j, -21.0+170.0j, -25.0+262.0j, -29.0+370.0j)
self.help_const_cc(src_data, exp_data, op)
diff --git a/gr-blocks/python/qa_argmax.py b/gr-blocks/python/qa_argmax.py
new file mode 100644
index 0000000000..1fbda7a1f9
--- /dev/null
+++ b/gr-blocks/python/qa_argmax.py
@@ -0,0 +1,78 @@
+#!/usr/bin/env python
+#
+# Copyright 2007,2010 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig as blocks
+import math
+
+
+class test_arg_max (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.tb = gr.top_block ()
+
+
+ def tearDown (self):
+ self.tb = None
+
+
+ def test_001(self):
+ tb = self.tb
+
+ src1_data = (0,0.2,-0.3,0,12,0)
+ src2_data = (0,0.0,3.0,0,10,0)
+ src3_data = (0,0.0,3.0,0,1,0)
+
+ src1 = gr.vector_source_f (src1_data)
+ s2v1 = blocks.stream_to_vector(gr.sizeof_float, len(src1_data))
+ tb.connect( src1, s2v1 )
+
+ src2 = gr.vector_source_f (src2_data)
+ s2v2 = blocks.stream_to_vector(gr.sizeof_float, len(src1_data))
+ tb.connect( src2, s2v2 )
+
+ src3 = gr.vector_source_f (src3_data)
+ s2v3 = blocks.stream_to_vector(gr.sizeof_float, len(src1_data))
+ tb.connect( src3, s2v3 )
+
+ dst1 = gr.vector_sink_s ()
+ dst2 = gr.vector_sink_s ()
+ argmax = gr.argmax_fs (len(src1_data))
+
+ tb.connect (s2v1, (argmax, 0))
+ tb.connect (s2v2, (argmax, 1))
+ tb.connect (s2v3, (argmax, 2))
+
+ tb.connect ((argmax,0), dst1)
+ tb.connect ((argmax,1), dst2)
+
+ tb.run ()
+ index = dst1.data ()
+ source = dst2.data ()
+ self.assertEqual ( index, (4,))
+ self.assertEqual ( source, (0,))
+
+
+
+if __name__ == '__main__':
+ gr_unittest.run(test_arg_max, "test_arg_max.xml")
+
diff --git a/gr-blocks/python/qa_bin_statistics.py b/gr-blocks/python/qa_bin_statistics.py
new file mode 100755
index 0000000000..00fd58b600
--- /dev/null
+++ b/gr-blocks/python/qa_bin_statistics.py
@@ -0,0 +1,231 @@
+#!/usr/bin/env python
+#
+# Copyright 2006,2007,2010 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig as blocks
+import random
+import struct
+
+#import os
+#print "pid =", os.getpid()
+#raw_input("Attach gdb and press return...")
+
+"""
+Note: The QA tests below have been disabled by renaming them from test_*
+to xtest_*. See ticket:199 on http://gnuradio.org/trac/ticket/199
+"""
+
+class counter(gr.feval_dd):
+ def __init__(self, step_size=1):
+ gr.feval_dd.__init__(self)
+ self.step_size = step_size
+ self.count = 0
+
+ def eval(self, input):
+ #print "eval: self.count =", self.count
+ t = self.count
+ self.count = self.count + self.step_size
+ return t
+
+
+class counter3(gr.feval_dd):
+ def __init__(self, f, step_size):
+ gr.feval_dd.__init__(self)
+ self.f = f
+ self.step_size = step_size
+ self.count = 0
+
+ def eval(self, input):
+ try:
+ #print "eval: self.count =", self.count
+ t = self.count
+ self.count = self.count + self.step_size
+ self.f(self.count)
+ except Exception, e:
+ print "Exception: ", e
+ return t
+
+def foobar3(new_t):
+ #print "foobar3: new_t =", new_t
+ pass
+
+
+class counter4(gr.feval_dd):
+ def __init__(self, obj_instance, step_size):
+ gr.feval_dd.__init__(self)
+ self.obj_instance = obj_instance
+ self.step_size = step_size
+ self.count = 0
+
+ def eval(self, input):
+ try:
+ #print "eval: self.count =", self.count
+ t = self.count
+ self.count = self.count + self.step_size
+ self.obj_instance.foobar4(self.count)
+ except Exception, e:
+ print "Exception: ", e
+ return t
+
+
+class parse_msg(object):
+ def __init__(self, msg):
+ self.center_freq = msg.arg1()
+ self.vlen = int(msg.arg2())
+ assert(msg.length() == self.vlen * gr.sizeof_float)
+ self.data = struct.unpack('%df' % (self.vlen,), msg.to_string())
+
+# FIXME: see ticket:199
+class xtest_bin_statistics(gr_unittest.TestCase):
+
+ def setUp(self):
+ self.tb = gr.top_block ()
+
+ def tearDown(self):
+ self.tb = None
+
+ def xtest_001(self):
+ vlen = 4
+ tune = counter(1)
+ tune_delay = 0
+ dwell_delay = 1
+ msgq = gr.msg_queue()
+
+ src_data = tuple([float(x) for x in
+ ( 1, 2, 3, 4,
+ 5, 6, 7, 8,
+ 9, 10, 11, 12,
+ 13, 14, 15, 16
+ )])
+
+ expected_results = tuple([float(x) for x in
+ ( 1, 2, 3, 4,
+ 5, 6, 7, 8,
+ 9, 10, 11, 12,
+ 13, 14, 15, 16
+ )])
+
+ src = gr.vector_source_f(src_data, False)
+ s2v = blocks.stream_to_vector(gr.sizeof_float, vlen)
+ stats = gr.bin_statistics_f(vlen, msgq, tune, tune_delay, dwell_delay)
+ self.tb.connect(src, s2v, stats)
+ self.tb.run()
+ self.assertEqual(4, msgq.count())
+ for i in range(4):
+ m = parse_msg(msgq.delete_head())
+ #print "m =", m.center_freq, m.data
+ self.assertEqual(expected_results[vlen*i:vlen*i + vlen], m.data)
+
+ def xtest_002(self):
+ vlen = 4
+ tune = counter(1)
+ tune_delay = 1
+ dwell_delay = 2
+ msgq = gr.msg_queue()
+
+ src_data = tuple([float(x) for x in
+ ( 1, 2, 3, 4,
+ 9, 6, 11, 8,
+ 5, 10, 7, 12,
+ 13, 14, 15, 16
+ )])
+
+ expected_results = tuple([float(x) for x in
+ ( 9, 10, 11, 12)])
+
+ src = gr.vector_source_f(src_data, False)
+ s2v = blocks.stream_to_vector(gr.sizeof_float, vlen)
+ stats = gr.bin_statistics_f(vlen, msgq, tune, tune_delay, dwell_delay)
+ self.tb.connect(src, s2v, stats)
+ self.tb.run()
+ self.assertEqual(1, msgq.count())
+ for i in range(1):
+ m = parse_msg(msgq.delete_head())
+ #print "m =", m.center_freq, m.data
+ self.assertEqual(expected_results[vlen*i:vlen*i + vlen], m.data)
+
+
+
+ def xtest_003(self):
+ vlen = 4
+ tune = counter3(foobar3, 1)
+ tune_delay = 1
+ dwell_delay = 2
+ msgq = gr.msg_queue()
+
+ src_data = tuple([float(x) for x in
+ ( 1, 2, 3, 4,
+ 9, 6, 11, 8,
+ 5, 10, 7, 12,
+ 13, 14, 15, 16
+ )])
+
+ expected_results = tuple([float(x) for x in
+ ( 9, 10, 11, 12)])
+
+ src = gr.vector_source_f(src_data, False)
+ s2v = blocks.stream_to_vector(gr.sizeof_float, vlen)
+ stats = gr.bin_statistics_f(vlen, msgq, tune, tune_delay, dwell_delay)
+ self.tb.connect(src, s2v, stats)
+ self.tb.run()
+ self.assertEqual(1, msgq.count())
+ for i in range(1):
+ m = parse_msg(msgq.delete_head())
+ #print "m =", m.center_freq, m.data
+ self.assertEqual(expected_results[vlen*i:vlen*i + vlen], m.data)
+
+
+ def foobar4(self, new_t):
+ #print "foobar4: new_t =", new_t
+ pass
+
+ def xtest_004(self):
+ vlen = 4
+ tune = counter4(self, 1)
+ tune_delay = 1
+ dwell_delay = 2
+ msgq = gr.msg_queue()
+
+ src_data = tuple([float(x) for x in
+ ( 1, 2, 3, 4,
+ 9, 6, 11, 8,
+ 5, 10, 7, 12,
+ 13, 14, 15, 16
+ )])
+
+ expected_results = tuple([float(x) for x in
+ ( 9, 10, 11, 12)])
+
+ src = gr.vector_source_f(src_data, False)
+ s2v = blocks.stream_to_vector(gr.sizeof_float, vlen)
+ stats = gr.bin_statistics_f(vlen, msgq, tune, tune_delay, dwell_delay)
+ self.tb.connect(src, s2v, stats)
+ self.tb.run()
+ self.assertEqual(1, msgq.count())
+ for i in range(1):
+ m = parse_msg(msgq.delete_head())
+ #print "m =", m.center_freq, m.data
+ self.assertEqual(expected_results[vlen*i:vlen*i + vlen], m.data)
+
+
+if __name__ == '__main__':
+ gr_unittest.run(xtest_bin_statistics, "test_bin_statistics.xml")
diff --git a/gr-blocks/python/qa_file_metadata.py b/gr-blocks/python/qa_file_metadata.py
index c7826b1d3e..c2319a800e 100644
--- a/gr-blocks/python/qa_file_metadata.py
+++ b/gr-blocks/python/qa_file_metadata.py
@@ -35,6 +35,7 @@ def sig_source_c(samp_rate, freq, amp, N):
class test_file_metadata(gr_unittest.TestCase):
def setUp(self):
+ os.environ['GR_CONF_CONTROLPORT_ON'] = 'False'
self.tb = gr.top_block()
def tearDown(self):
@@ -46,11 +47,11 @@ class test_file_metadata(gr_unittest.TestCase):
detached = False
samp_rate = 200000
- key = pmt.pmt_intern("samp_rate")
- val = pmt.pmt_from_double(samp_rate)
- extras = pmt.pmt_make_dict()
- extras = pmt.pmt_dict_add(extras, key, val)
- extras_str = pmt.pmt_serialize_str(extras)
+ key = pmt.intern("samp_rate")
+ val = pmt.from_double(samp_rate)
+ extras = pmt.make_dict()
+ extras = pmt.dict_add(extras, key, val)
+ extras_str = pmt.serialize_str(extras)
data = sig_source_c(samp_rate, 1000, 1, N)
src = gr.vector_source_c(data)
@@ -70,7 +71,7 @@ class test_file_metadata(gr_unittest.TestCase):
self.assertFalse()
try:
- header = pmt.pmt_deserialize_str(header_str)
+ header = pmt.deserialize_str(header_str)
except RuntimeError:
self.assertFalse()
@@ -82,14 +83,14 @@ class test_file_metadata(gr_unittest.TestCase):
handle.close()
try:
- extra = pmt.pmt_deserialize_str(extra_str)
+ extra = pmt.deserialize_str(extra_str)
except RuntimeError:
self.assertFalse()
extra_info = parse_file_metadata.parse_extra_dict(extra, info, False)
self.assertEqual(info['rx_rate'], samp_rate)
- self.assertEqual(pmt.pmt_to_double(extra_info['samp_rate']), samp_rate)
+ self.assertEqual(pmt.to_double(extra_info['samp_rate']), samp_rate)
# Test file metadata source
@@ -108,10 +109,10 @@ class test_file_metadata(gr_unittest.TestCase):
# were generated and received correctly.
tags = tsnk.current_tags()
for t in tags:
- if(pmt.pmt_eq(t.key, pmt.pmt_intern("samp_rate"))):
- self.assertEqual(pmt.pmt_to_double(t.value), samp_rate)
- elif(pmt.pmt_eq(t.key, pmt.pmt_intern("rx_rate"))):
- self.assertEqual(pmt.pmt_to_double(t.value), samp_rate)
+ if(pmt.eq(t.key, pmt.intern("samp_rate"))):
+ self.assertEqual(pmt.to_double(t.value), samp_rate)
+ elif(pmt.eq(t.key, pmt.intern("rx_rate"))):
+ self.assertEqual(pmt.to_double(t.value), samp_rate)
# Test that the data portion was extracted and received correctly.
self.assertComplexTuplesAlmostEqual(vsnk.data(), ssnk.data(), 5)
@@ -125,11 +126,11 @@ class test_file_metadata(gr_unittest.TestCase):
detached = True
samp_rate = 200000
- key = pmt.pmt_intern("samp_rate")
- val = pmt.pmt_from_double(samp_rate)
- extras = pmt.pmt_make_dict()
- extras = pmt.pmt_dict_add(extras, key, val)
- extras_str = pmt.pmt_serialize_str(extras)
+ key = pmt.intern("samp_rate")
+ val = pmt.from_double(samp_rate)
+ extras = pmt.make_dict()
+ extras = pmt.dict_add(extras, key, val)
+ extras_str = pmt.serialize_str(extras)
data = sig_source_c(samp_rate, 1000, 1, N)
src = gr.vector_source_c(data)
@@ -150,7 +151,7 @@ class test_file_metadata(gr_unittest.TestCase):
self.assertFalse()
try:
- header = pmt.pmt_deserialize_str(header_str)
+ header = pmt.deserialize_str(header_str)
except RuntimeError:
self.assertFalse()
@@ -161,14 +162,14 @@ class test_file_metadata(gr_unittest.TestCase):
handle.close()
try:
- extra = pmt.pmt_deserialize_str(extra_str)
+ extra = pmt.deserialize_str(extra_str)
except RuntimeError:
self.assertFalse()
extra_info = parse_file_metadata.parse_extra_dict(extra, info, False)
self.assertEqual(info['rx_rate'], samp_rate)
- self.assertEqual(pmt.pmt_to_double(extra_info['samp_rate']), samp_rate)
+ self.assertEqual(pmt.to_double(extra_info['samp_rate']), samp_rate)
# Test file metadata source
@@ -187,10 +188,10 @@ class test_file_metadata(gr_unittest.TestCase):
# were generated and received correctly.
tags = tsnk.current_tags()
for t in tags:
- if(pmt.pmt_eq(t.key, pmt.pmt_intern("samp_rate"))):
- self.assertEqual(pmt.pmt_to_double(t.value), samp_rate)
- elif(pmt.pmt_eq(t.key, pmt.pmt_intern("rx_rate"))):
- self.assertEqual(pmt.pmt_to_double(t.value), samp_rate)
+ if(pmt.eq(t.key, pmt.intern("samp_rate"))):
+ self.assertEqual(pmt.to_double(t.value), samp_rate)
+ elif(pmt.eq(t.key, pmt.intern("rx_rate"))):
+ self.assertEqual(pmt.to_double(t.value), samp_rate)
# Test that the data portion was extracted and received correctly.
self.assertComplexTuplesAlmostEqual(vsnk.data(), ssnk.data(), 5)
diff --git a/gr-blocks/python/qa_max.py b/gr-blocks/python/qa_max.py
new file mode 100755
index 0000000000..00c7e60110
--- /dev/null
+++ b/gr-blocks/python/qa_max.py
@@ -0,0 +1,71 @@
+#!/usr/bin/env python
+#
+# Copyright 2007,2010 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr, gr_unittest
+import blocks_swig as blocks
+import math
+
+
+class test_max (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.tb = gr.top_block ()
+
+
+ def tearDown (self):
+ self.tb = None
+
+
+ def test_001(self):
+
+ src_data = (0,0.2,-0.3,0,12,0)
+ expected_result = (float(max(src_data)), )
+
+ src = gr.vector_source_f(src_data)
+ s2v = blocks.stream_to_vector(gr.sizeof_float, len(src_data))
+ op = gr.max_ff( len(src_data) )
+ dst = gr.vector_sink_f()
+
+
+ self.tb.connect(src, s2v, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
+
+ def test_002(self):
+
+ src_data=(-100,-99,-98,-97,-96,-1)
+ expected_result = (float(max(src_data)), )
+
+ src = gr.vector_source_f(src_data)
+ s2v = blocks.stream_to_vector(gr.sizeof_float, len(src_data))
+ op = gr.max_ff( len(src_data) )
+ dst = gr.vector_sink_f()
+
+ self.tb.connect(src, s2v, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertEqual(expected_result, result_data)
+
+if __name__ == '__main__':
+ gr_unittest.run(test_max, "test_max.xml")
+
diff --git a/gr-blocks/python/qa_pdu.py b/gr-blocks/python/qa_pdu.py
index dc50b7c330..b421f9ce63 100755
--- a/gr-blocks/python/qa_pdu.py
+++ b/gr-blocks/python/qa_pdu.py
@@ -48,8 +48,8 @@ class test_pdu(gr_unittest.TestCase):
# Test that the right number of ports exist.
pi = snk3.message_ports_in()
po = snk3.message_ports_out()
- self.assertEqual(pmt.pmt_length(pi), 0)
- self.assertEqual(pmt.pmt_length(po), 1)
+ self.assertEqual(pmt.length(pi), 0)
+ self.assertEqual(pmt.length(po), 1)
time.sleep(0.1)
self.tb.connect(src, snk)
@@ -59,8 +59,8 @@ class test_pdu(gr_unittest.TestCase):
self.tb.start()
# make our reference and message pmts
- port = pmt.pmt_intern("pdus")
- msg = pmt.pmt_cons( pmt.PMT_NIL, pmt.pmt_make_u8vector(16, 0xFF))
+ port = pmt.intern("pdus")
+ msg = pmt.cons( pmt.PMT_NIL, pmt.make_u8vector(16, 0xFF))
# post the message
src.to_basic_block()._post(port, msg) # eww, what's that smell?
@@ -76,13 +76,13 @@ class test_pdu(gr_unittest.TestCase):
# Get the vector of data from the message sink
# Convert the message PMT as a pair into its vector
result_msg = dbg.get_message(0)
- msg_vec = pmt.pmt_cdr(result_msg)
- #pmt.pmt_print(msg_vec)
+ msg_vec = pmt.cdr(result_msg)
+ #pmt.print(msg_vec)
# Convert the PMT vector into a Python list
msg_data = []
for i in xrange(16):
- msg_data.append(pmt.pmt_u8vector_ref(msg_vec, i))
+ msg_data.append(pmt.u8vector_ref(msg_vec, i))
actual_data = 16*[0xFF,]
self.assertEqual(actual_data, list(result_data))
diff --git a/gr-blocks/python/qa_pipe_fittings.py b/gr-blocks/python/qa_pipe_fittings.py
index 321660d5ee..9894a5c211 100755
--- a/gr-blocks/python/qa_pipe_fittings.py
+++ b/gr-blocks/python/qa_pipe_fittings.py
@@ -21,7 +21,7 @@
#
from gnuradio import gr, gr_unittest
-import blocks_swig
+import blocks_swig as blocks
def calc_expected_result(src_data, n):
assert (len(src_data) % n) == 0
@@ -51,7 +51,7 @@ class test_pipe_fittings(gr_unittest.TestCase):
expected_results = calc_expected_result(src_data, n)
#print "expected results: ", expected_results
src = gr.vector_source_i(src_data)
- op = gr.stream_to_streams(gr.sizeof_int, n)
+ op = blocks.stream_to_streams(gr.sizeof_int, n)
self.tb.connect(src, op)
dsts = []
@@ -75,8 +75,8 @@ class test_pipe_fittings(gr_unittest.TestCase):
expected_results = src_data
src = gr.vector_source_i(src_data)
- op1 = gr.stream_to_streams(gr.sizeof_int, n)
- op2 = gr.streams_to_stream(gr.sizeof_int, n)
+ op1 = blocks.stream_to_streams(gr.sizeof_int, n)
+ op2 = blocks.streams_to_stream(gr.sizeof_int, n)
dst = gr.vector_sink_i()
self.tb.connect(src, op1)
@@ -97,9 +97,9 @@ class test_pipe_fittings(gr_unittest.TestCase):
expected_results = src_data
src = gr.vector_source_i(src_data)
- op1 = gr.stream_to_streams(gr.sizeof_int, n)
- op2 = gr.streams_to_vector(gr.sizeof_int, n)
- op3 = gr.vector_to_stream(gr.sizeof_int, n)
+ op1 = blocks.stream_to_streams(gr.sizeof_int, n)
+ op2 = blocks.streams_to_vector(gr.sizeof_int, n)
+ op3 = blocks.vector_to_stream(gr.sizeof_int, n)
dst = gr.vector_sink_i()
self.tb.connect(src, op1)
@@ -120,9 +120,9 @@ class test_pipe_fittings(gr_unittest.TestCase):
expected_results = src_data
src = gr.vector_source_i(src_data)
- op1 = gr.stream_to_vector(gr.sizeof_int, n)
- op2 = gr.vector_to_streams(gr.sizeof_int, n)
- op3 = gr.streams_to_stream(gr.sizeof_int, n)
+ op1 = blocks.stream_to_vector(gr.sizeof_int, n)
+ op2 = blocks.vector_to_streams(gr.sizeof_int, n)
+ op3 = blocks.streams_to_stream(gr.sizeof_int, n)
dst = gr.vector_sink_i()
self.tb.connect(src, op1, op2)
diff --git a/gr-blocks/python/qa_probe_signal.py b/gr-blocks/python/qa_probe_signal.py
new file mode 100644
index 0000000000..ce526e8a53
--- /dev/null
+++ b/gr-blocks/python/qa_probe_signal.py
@@ -0,0 +1,68 @@
+#!/usr/bin/env python
+#
+# Copyright 2012 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+import time
+
+from gnuradio import gr, gr_unittest
+import blocks_swig as blocks
+
+class test_probe_signal (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.tb = gr.top_block()
+
+ def tearDown (self):
+ self.tb = None
+
+ def test_001(self):
+
+ value = 12.3
+ repeats = 100
+ src_data = [value] * repeats
+
+ src = gr.vector_source_f(src_data)
+ dst = gr.probe_signal_f()
+
+ self.tb.connect(src, dst)
+ self.tb.run()
+ output = dst.level()
+ self.assertAlmostEqual(value, output, places=6)
+
+ def test_002(self):
+
+ vector_length = 10
+ repeats = 10
+ value = [0.5+i for i in range(0, vector_length)]
+ src_data = value * repeats
+
+ src = gr.vector_source_f(src_data)
+ s2v = blocks.stream_to_vector(gr.sizeof_float, vector_length)
+ dst = gr.probe_signal_vf(vector_length)
+
+ self.tb.connect(src, s2v, dst)
+ self.tb.run()
+ output = dst.level()
+ self.assertEqual(len(output), vector_length)
+ self.assertAlmostEqual(value[3], output[3], places=6)
+
+if __name__ == '__main__':
+ gr_unittest.run(test_probe_signal, "test_probe_signal.xml")
diff --git a/gr-blocks/python/qa_stream_mux.py b/gr-blocks/python/qa_stream_mux.py
index f21a9bbbc9..657bd3d63f 100755
--- a/gr-blocks/python/qa_stream_mux.py
+++ b/gr-blocks/python/qa_stream_mux.py
@@ -22,10 +22,12 @@
from gnuradio import gr, gr_unittest
import blocks_swig
+import os
class test_stream_mux (gr_unittest.TestCase):
def setUp (self):
+ os.environ['GR_CONF_CONTROLPORT_ON'] = 'False'
self.tb = gr.top_block ()
def tearDown (self):
diff --git a/gr-blocks/python/qa_stretch.py b/gr-blocks/python/qa_stretch.py
index 013d878a8f..078b404dbf 100755
--- a/gr-blocks/python/qa_stretch.py
+++ b/gr-blocks/python/qa_stretch.py
@@ -43,9 +43,9 @@ class test_stretch(gr_unittest.TestCase):
src0 = gr.vector_source_f(data0, False)
src1 = gr.vector_source_f(data1, False)
- inter = gr.streams_to_vector(gr.sizeof_float, 2)
+ inter = blocks.streams_to_vector(gr.sizeof_float, 2)
op = blocks.stretch_ff(0.1, 2)
- deinter = gr.vector_to_streams(gr.sizeof_float, 2)
+ deinter = blocks.vector_to_streams(gr.sizeof_float, 2)
dst0 = gr.vector_sink_f()
dst1 = gr.vector_sink_f()
diff --git a/gr-blocks/python/stream_to_vector_decimator.py b/gr-blocks/python/stream_to_vector_decimator.py
new file mode 100644
index 0000000000..c32ae6fce2
--- /dev/null
+++ b/gr-blocks/python/stream_to_vector_decimator.py
@@ -0,0 +1,106 @@
+#
+# Copyright 2008 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+from gnuradio import gr
+
+try:
+ from gnuradio import blocks
+except ImportError:
+ import blocks_swig as blocks
+
+class stream_to_vector_decimator(gr.hier_block2):
+ """
+ Convert the stream to a vector, decimate the vector stream to achieve the vector rate.
+ """
+
+ def __init__(self, item_size, sample_rate, vec_rate, vec_len):
+ """
+ Create the block chain.
+
+ Args:
+ item_size: the number of bytes per sample
+ sample_rate: the rate of incoming samples
+ vec_rate: the rate of outgoing vectors (same units as sample_rate)
+ vec_len: the length of the outgoing vectors in items
+ """
+ self._vec_rate = vec_rate
+ self._vec_len = vec_len
+ self._sample_rate = sample_rate
+
+ gr.hier_block2.__init__(self, "stream_to_vector_decimator",
+ gr.io_signature(1, 1, item_size), # Input signature
+ gr.io_signature(1, 1, item_size*vec_len)) # Output signature
+
+ s2v = blocks.stream_to_vector(item_size, vec_len)
+ self.one_in_n = blocks.keep_one_in_n(item_size*vec_len, 1)
+ self._update_decimator()
+ self.connect(self, s2v, self.one_in_n, self)
+
+ def set_sample_rate(self, sample_rate):
+ """
+ Set the new sampling rate and update the decimator.
+
+ Args:
+ sample_rate: the new rate
+ """
+ self._sample_rate = sample_rate
+ self._update_decimator()
+
+ def set_vec_rate(self, vec_rate):
+ """
+ Set the new vector rate and update the decimator.
+
+ Args:
+ vec_rate: the new rate
+ """
+ self._vec_rate = vec_rate
+ self._update_decimator()
+
+ def set_decimation(self, decim):
+ """
+ Set the decimation parameter directly.
+
+ Args:
+ decim: the new decimation
+ """
+ self._decim = max(1, int(round(decim)))
+ self.one_in_n.set_n(self._decim)
+
+ def _update_decimator(self):
+ self.set_decimation(self._sample_rate/self._vec_len/self._vec_rate)
+
+ def decimation(self):
+ """
+ Returns the actual decimation.
+ """
+ return self._decim
+
+ def sample_rate(self):
+ """
+ Returns configured sample rate.
+ """
+ return self._sample_rate
+
+ def frame_rate(self):
+ """
+ Returns actual frame rate
+ """
+ return self._sample_rate/self._vec_len/self._decim