summaryrefslogtreecommitdiff
path: root/gr-digital/python
diff options
context:
space:
mode:
Diffstat (limited to 'gr-digital/python')
-rw-r--r--gr-digital/python/digital/modulation_utils.py4
-rw-r--r--gr-digital/python/digital/qa_correlate_access_code_XX_ts.py81
-rw-r--r--gr-digital/python/digital/qa_correlate_access_code_tag.py57
3 files changed, 136 insertions, 6 deletions
diff --git a/gr-digital/python/digital/modulation_utils.py b/gr-digital/python/digital/modulation_utils.py
index c290bf5a8d..59809ae1d8 100644
--- a/gr-digital/python/digital/modulation_utils.py
+++ b/gr-digital/python/digital/modulation_utils.py
@@ -69,9 +69,9 @@ def extract_kwargs_from_options(function, excluded_args, options):
"""
# Try this in C++ ;)
- args, varargs, varkw, defaults = inspect.getargspec(function)
+ spec = inspect.getfullargspec(function)
d = {}
- for kw in [a for a in args if a not in excluded_args]:
+ for kw in [a for a in spec.args if a not in excluded_args]:
if hasattr(options, kw):
if getattr(options, kw) is not None:
d[kw] = getattr(options, kw)
diff --git a/gr-digital/python/digital/qa_correlate_access_code_XX_ts.py b/gr-digital/python/digital/qa_correlate_access_code_XX_ts.py
index 4d9693b6c1..3e3cc7a4fa 100644
--- a/gr-digital/python/digital/qa_correlate_access_code_XX_ts.py
+++ b/gr-digital/python/digital/qa_correlate_access_code_XX_ts.py
@@ -53,6 +53,45 @@ class test_correlate_access_code_XX_ts(gr_unittest.TestCase):
self.assertEqual(pmt.to_long(result_tags[0].value), len(payload)*8)
self.assertEqual(result_data, expected)
+ def test_bb_prefix(self):
+ payload = "test packet" # payload length is 11 bytes
+ header = "\x00\xd0\x00\xd0" # header contains packet length, twice (bit-swapped)
+ packet = header + payload
+ pad = (0,) * 64
+ src_data = (0, 1, 1, 1, 0, 0, 0, 1, 1) + tuple(string_to_1_0_list(packet)) + pad
+ expected = list(map(int, src_data[9+32:-len(pad)]))
+ src = blocks.vector_source_b(src_data)
+ op = digital.correlate_access_code_bb_ts("0011", 0, "sync")
+ dst = blocks.vector_sink_b()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ result_tags = dst.tags()
+ self.assertEqual(len(result_data), len(payload)*8)
+ self.assertEqual(result_tags[0].offset, 0)
+ self.assertEqual(pmt.to_long(result_tags[0].value), len(payload)*8)
+ self.assertEqual(result_data, expected)
+
+ def test_bb_immediate(self):
+ """Test that packets at start of stream match"""
+ payload = "test packet" # payload length is 11 bytes
+ header = "\x00\xd0\x00\xd0" # header contains packet length, twice (bit-swapped)
+ packet = header + payload
+ pad = (0,) * 64
+ src_data = (0, 0, 1, 1) + tuple(string_to_1_0_list(packet)) + pad
+ expected = list(map(int, src_data[4+32:-len(pad)]))
+ src = blocks.vector_source_b(src_data)
+ op = digital.correlate_access_code_bb_ts("0011", 0, "sync")
+ dst = blocks.vector_sink_b()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ result_tags = dst.tags()
+ #self.assertEqual(len(result_data), len(packet)*8)
+ self.assertEqual(result_tags[0].offset, 0)
+ #self.assertEqual(pmt.to_long(result_tags[0].value), len(payload)*8)
+ self.assertEqual(result_data, expected)
+
def test_002(self):
payload = "test packet" # payload length is 11 bytes
header = "\x00\xd0\x00\xd0" # header contains packet length, twice (bit-swapped)
@@ -73,7 +112,47 @@ class test_correlate_access_code_XX_ts(gr_unittest.TestCase):
self.assertEqual(pmt.to_long(result_tags[0].value), len(payload)*8)
self.assertFloatTuplesAlmostEqual(result_data, expected, 5)
+ def test_ff_prefix(self):
+ payload = "test packet" # payload length is 11 bytes
+ header = "\x00\xd0\x00\xd0" # header contains packet length, twice (bit-swapped)
+ packet = header + payload
+ pad = (0,) * 64
+ src_data = (0, 1, 1, 1, 1, 0, 0, 1, 1) + tuple(string_to_1_0_list(packet)) + pad
+ src_floats = tuple(2*b-1 for b in src_data) # convert to binary antipodal symbols (-1,1)
+ expected = src_floats[9+32:-len(pad)]
+ src = blocks.vector_source_f(src_floats)
+ op = digital.correlate_access_code_ff_ts("0011", 0, "sync")
+ dst = blocks.vector_sink_f()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ result_tags = dst.tags()
+ self.assertEqual(len(result_data), len(payload)*8)
+ self.assertEqual(result_tags[0].offset, 0)
+ self.assertEqual(pmt.to_long(result_tags[0].value), len(payload)*8)
+ self.assertFloatTuplesAlmostEqual(result_data, expected, 5)
+
+ def test_ff_immediate(self):
+ """Test that packets at start of stream match"""
+ payload = "test packet" # payload length is 11 bytes
+ header = "\x00\xd0\x00\xd0" # header contains packet length, twice (bit-swapped)
+ packet = header + payload
+ pad = (0,) * 64
+ src_data = (0, 0, 1, 1) + tuple(string_to_1_0_list(packet)) + pad
+ src_floats = tuple(2*b-1 for b in src_data) # convert to binary antipodal symbols (-1,1)
+ expected = src_floats[4+32:-len(pad)]
+ src = blocks.vector_source_f(src_floats)
+ op = digital.correlate_access_code_ff_ts("0011", 0, "sync")
+ dst = blocks.vector_sink_f()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ result_tags = dst.tags()
+ self.assertEqual(len(result_data), len(payload)*8)
+ self.assertEqual(result_tags[0].offset, 0)
+ self.assertEqual(pmt.to_long(result_tags[0].value), len(payload)*8)
+ self.assertFloatTuplesAlmostEqual(result_data, expected, 5)
+
if __name__ == '__main__':
gr_unittest.run(test_correlate_access_code_XX_ts, "test_correlate_access_code_XX_ts.xml")
-
diff --git a/gr-digital/python/digital/qa_correlate_access_code_tag.py b/gr-digital/python/digital/qa_correlate_access_code_tag.py
index d277ac6116..9b79529aba 100644
--- a/gr-digital/python/digital/qa_correlate_access_code_tag.py
+++ b/gr-digital/python/digital/qa_correlate_access_code_tag.py
@@ -34,7 +34,7 @@ class test_correlate_access_code(gr_unittest.TestCase):
def tearDown(self):
self.tb = None
- def test_001(self):
+ def test_bb(self):
pad = (0,) * 64
src_data = (1, 0, 1, 1, 1, 1, 0, 1, 1) + pad + (0,) * 7
src = blocks.vector_source_b(src_data)
@@ -47,6 +47,31 @@ class test_correlate_access_code(gr_unittest.TestCase):
self.assertEqual(result_data[0].offset, 4)
self.assertEqual(result_data[1].offset, 9)
+ def test_bb_skip_prefix(self):
+ pad = (0,) * 64
+ src_data = (0, 1, 1, 1, 1, 0, 0, 1, 1) + pad + (0,) * 7
+ src = blocks.vector_source_b(src_data)
+ op = digital.correlate_access_code_tag_bb("0011", 0, "sync")
+ dst = blocks.tag_debug(gr.sizeof_char, "sync")
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.current_tags()
+ self.assertEqual(len(result_data), 1)
+ self.assertEqual(result_data[0].offset, 9)
+
+ def test_bb_immediate(self):
+ """Test that packets at start of stream match"""
+ pad = (0,) * 64
+ src_data = (0, 0, 1, 1) + pad + (0,) * 7
+ src = blocks.vector_source_b(src_data)
+ op = digital.correlate_access_code_tag_bb("0011", 0, "sync")
+ dst = blocks.tag_debug(gr.sizeof_char, "sync")
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.current_tags()
+ self.assertEqual(len(result_data), 1)
+ self.assertEqual(result_data[0].offset, 4)
+
def test_002(self):
code = tuple(string_to_1_0_list(default_access_code))
access_code = to_1_0_string(code)
@@ -63,7 +88,7 @@ class test_correlate_access_code(gr_unittest.TestCase):
self.assertEqual(len(result_data), 1)
self.assertEqual(result_data[0].offset, len(code))
- def test_003(self):
+ def test_ff(self):
pad = (0,) * 64
src_bits = (1, 0, 1, 1, 1, 1, 0, 1, 1) + pad + (0,) * 7
src_data = [2.0*x - 1.0 for x in src_bits]
@@ -77,6 +102,33 @@ class test_correlate_access_code(gr_unittest.TestCase):
self.assertEqual(result_data[0].offset, 4)
self.assertEqual(result_data[1].offset, 9)
+ def test_ff_skip_prefix(self):
+ pad = (0,) * 64
+ src_bits = (0, 1, 1, 1, 1, 0, 0, 1, 1) + pad + (0,) * 7
+ src_data = [2.0*x - 1.0 for x in src_bits]
+ src = blocks.vector_source_f(src_data)
+ op = digital.correlate_access_code_tag_ff("0011", 0, "sync")
+ dst = blocks.tag_debug(gr.sizeof_float, "sync")
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.current_tags()
+ self.assertEqual(len(result_data), 1)
+ self.assertEqual(result_data[0].offset, 9)
+
+ def test_ff_immediate(self):
+ """Test that packets at start of stream match"""
+ pad = (0,) * 64
+ src_bits = (0, 0, 1, 1) + pad + (0,) * 7
+ src_data = [2.0*x - 1.0 for x in src_bits]
+ src = blocks.vector_source_f(src_data)
+ op = digital.correlate_access_code_tag_ff("0011", 0, "sync")
+ dst = blocks.tag_debug(gr.sizeof_float, "sync")
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.current_tags()
+ self.assertEqual(len(result_data), 1)
+ self.assertEqual(result_data[0].offset, 4)
+
def test_004(self):
code = tuple(string_to_1_0_list(default_access_code))
access_code = to_1_0_string(code)
@@ -96,4 +148,3 @@ class test_correlate_access_code(gr_unittest.TestCase):
if __name__ == '__main__':
gr_unittest.run(test_correlate_access_code, "test_correlate_access_code_tag.xml")
-