diff options
Diffstat (limited to 'gr-digital/python')
-rw-r--r-- | gr-digital/python/digital/modulation_utils.py | 4 | ||||
-rw-r--r-- | gr-digital/python/digital/qa_correlate_access_code_XX_ts.py | 81 | ||||
-rw-r--r-- | gr-digital/python/digital/qa_correlate_access_code_tag.py | 57 |
3 files changed, 136 insertions, 6 deletions
diff --git a/gr-digital/python/digital/modulation_utils.py b/gr-digital/python/digital/modulation_utils.py index c290bf5a8d..59809ae1d8 100644 --- a/gr-digital/python/digital/modulation_utils.py +++ b/gr-digital/python/digital/modulation_utils.py @@ -69,9 +69,9 @@ def extract_kwargs_from_options(function, excluded_args, options): """ # Try this in C++ ;) - args, varargs, varkw, defaults = inspect.getargspec(function) + spec = inspect.getfullargspec(function) d = {} - for kw in [a for a in args if a not in excluded_args]: + for kw in [a for a in spec.args if a not in excluded_args]: if hasattr(options, kw): if getattr(options, kw) is not None: d[kw] = getattr(options, kw) diff --git a/gr-digital/python/digital/qa_correlate_access_code_XX_ts.py b/gr-digital/python/digital/qa_correlate_access_code_XX_ts.py index 4d9693b6c1..3e3cc7a4fa 100644 --- a/gr-digital/python/digital/qa_correlate_access_code_XX_ts.py +++ b/gr-digital/python/digital/qa_correlate_access_code_XX_ts.py @@ -53,6 +53,45 @@ class test_correlate_access_code_XX_ts(gr_unittest.TestCase): self.assertEqual(pmt.to_long(result_tags[0].value), len(payload)*8) self.assertEqual(result_data, expected) + def test_bb_prefix(self): + payload = "test packet" # payload length is 11 bytes + header = "\x00\xd0\x00\xd0" # header contains packet length, twice (bit-swapped) + packet = header + payload + pad = (0,) * 64 + src_data = (0, 1, 1, 1, 0, 0, 0, 1, 1) + tuple(string_to_1_0_list(packet)) + pad + expected = list(map(int, src_data[9+32:-len(pad)])) + src = blocks.vector_source_b(src_data) + op = digital.correlate_access_code_bb_ts("0011", 0, "sync") + dst = blocks.vector_sink_b() + self.tb.connect(src, op, dst) + self.tb.run() + result_data = dst.data() + result_tags = dst.tags() + self.assertEqual(len(result_data), len(payload)*8) + self.assertEqual(result_tags[0].offset, 0) + self.assertEqual(pmt.to_long(result_tags[0].value), len(payload)*8) + self.assertEqual(result_data, expected) + + def test_bb_immediate(self): + """Test that packets at start of stream match""" + payload = "test packet" # payload length is 11 bytes + header = "\x00\xd0\x00\xd0" # header contains packet length, twice (bit-swapped) + packet = header + payload + pad = (0,) * 64 + src_data = (0, 0, 1, 1) + tuple(string_to_1_0_list(packet)) + pad + expected = list(map(int, src_data[4+32:-len(pad)])) + src = blocks.vector_source_b(src_data) + op = digital.correlate_access_code_bb_ts("0011", 0, "sync") + dst = blocks.vector_sink_b() + self.tb.connect(src, op, dst) + self.tb.run() + result_data = dst.data() + result_tags = dst.tags() + #self.assertEqual(len(result_data), len(packet)*8) + self.assertEqual(result_tags[0].offset, 0) + #self.assertEqual(pmt.to_long(result_tags[0].value), len(payload)*8) + self.assertEqual(result_data, expected) + def test_002(self): payload = "test packet" # payload length is 11 bytes header = "\x00\xd0\x00\xd0" # header contains packet length, twice (bit-swapped) @@ -73,7 +112,47 @@ class test_correlate_access_code_XX_ts(gr_unittest.TestCase): self.assertEqual(pmt.to_long(result_tags[0].value), len(payload)*8) self.assertFloatTuplesAlmostEqual(result_data, expected, 5) + def test_ff_prefix(self): + payload = "test packet" # payload length is 11 bytes + header = "\x00\xd0\x00\xd0" # header contains packet length, twice (bit-swapped) + packet = header + payload + pad = (0,) * 64 + src_data = (0, 1, 1, 1, 1, 0, 0, 1, 1) + tuple(string_to_1_0_list(packet)) + pad + src_floats = tuple(2*b-1 for b in src_data) # convert to binary antipodal symbols (-1,1) + expected = src_floats[9+32:-len(pad)] + src = blocks.vector_source_f(src_floats) + op = digital.correlate_access_code_ff_ts("0011", 0, "sync") + dst = blocks.vector_sink_f() + self.tb.connect(src, op, dst) + self.tb.run() + result_data = dst.data() + result_tags = dst.tags() + self.assertEqual(len(result_data), len(payload)*8) + self.assertEqual(result_tags[0].offset, 0) + self.assertEqual(pmt.to_long(result_tags[0].value), len(payload)*8) + self.assertFloatTuplesAlmostEqual(result_data, expected, 5) + + def test_ff_immediate(self): + """Test that packets at start of stream match""" + payload = "test packet" # payload length is 11 bytes + header = "\x00\xd0\x00\xd0" # header contains packet length, twice (bit-swapped) + packet = header + payload + pad = (0,) * 64 + src_data = (0, 0, 1, 1) + tuple(string_to_1_0_list(packet)) + pad + src_floats = tuple(2*b-1 for b in src_data) # convert to binary antipodal symbols (-1,1) + expected = src_floats[4+32:-len(pad)] + src = blocks.vector_source_f(src_floats) + op = digital.correlate_access_code_ff_ts("0011", 0, "sync") + dst = blocks.vector_sink_f() + self.tb.connect(src, op, dst) + self.tb.run() + result_data = dst.data() + result_tags = dst.tags() + self.assertEqual(len(result_data), len(payload)*8) + self.assertEqual(result_tags[0].offset, 0) + self.assertEqual(pmt.to_long(result_tags[0].value), len(payload)*8) + self.assertFloatTuplesAlmostEqual(result_data, expected, 5) + if __name__ == '__main__': gr_unittest.run(test_correlate_access_code_XX_ts, "test_correlate_access_code_XX_ts.xml") - diff --git a/gr-digital/python/digital/qa_correlate_access_code_tag.py b/gr-digital/python/digital/qa_correlate_access_code_tag.py index d277ac6116..9b79529aba 100644 --- a/gr-digital/python/digital/qa_correlate_access_code_tag.py +++ b/gr-digital/python/digital/qa_correlate_access_code_tag.py @@ -34,7 +34,7 @@ class test_correlate_access_code(gr_unittest.TestCase): def tearDown(self): self.tb = None - def test_001(self): + def test_bb(self): pad = (0,) * 64 src_data = (1, 0, 1, 1, 1, 1, 0, 1, 1) + pad + (0,) * 7 src = blocks.vector_source_b(src_data) @@ -47,6 +47,31 @@ class test_correlate_access_code(gr_unittest.TestCase): self.assertEqual(result_data[0].offset, 4) self.assertEqual(result_data[1].offset, 9) + def test_bb_skip_prefix(self): + pad = (0,) * 64 + src_data = (0, 1, 1, 1, 1, 0, 0, 1, 1) + pad + (0,) * 7 + src = blocks.vector_source_b(src_data) + op = digital.correlate_access_code_tag_bb("0011", 0, "sync") + dst = blocks.tag_debug(gr.sizeof_char, "sync") + self.tb.connect(src, op, dst) + self.tb.run() + result_data = dst.current_tags() + self.assertEqual(len(result_data), 1) + self.assertEqual(result_data[0].offset, 9) + + def test_bb_immediate(self): + """Test that packets at start of stream match""" + pad = (0,) * 64 + src_data = (0, 0, 1, 1) + pad + (0,) * 7 + src = blocks.vector_source_b(src_data) + op = digital.correlate_access_code_tag_bb("0011", 0, "sync") + dst = blocks.tag_debug(gr.sizeof_char, "sync") + self.tb.connect(src, op, dst) + self.tb.run() + result_data = dst.current_tags() + self.assertEqual(len(result_data), 1) + self.assertEqual(result_data[0].offset, 4) + def test_002(self): code = tuple(string_to_1_0_list(default_access_code)) access_code = to_1_0_string(code) @@ -63,7 +88,7 @@ class test_correlate_access_code(gr_unittest.TestCase): self.assertEqual(len(result_data), 1) self.assertEqual(result_data[0].offset, len(code)) - def test_003(self): + def test_ff(self): pad = (0,) * 64 src_bits = (1, 0, 1, 1, 1, 1, 0, 1, 1) + pad + (0,) * 7 src_data = [2.0*x - 1.0 for x in src_bits] @@ -77,6 +102,33 @@ class test_correlate_access_code(gr_unittest.TestCase): self.assertEqual(result_data[0].offset, 4) self.assertEqual(result_data[1].offset, 9) + def test_ff_skip_prefix(self): + pad = (0,) * 64 + src_bits = (0, 1, 1, 1, 1, 0, 0, 1, 1) + pad + (0,) * 7 + src_data = [2.0*x - 1.0 for x in src_bits] + src = blocks.vector_source_f(src_data) + op = digital.correlate_access_code_tag_ff("0011", 0, "sync") + dst = blocks.tag_debug(gr.sizeof_float, "sync") + self.tb.connect(src, op, dst) + self.tb.run() + result_data = dst.current_tags() + self.assertEqual(len(result_data), 1) + self.assertEqual(result_data[0].offset, 9) + + def test_ff_immediate(self): + """Test that packets at start of stream match""" + pad = (0,) * 64 + src_bits = (0, 0, 1, 1) + pad + (0,) * 7 + src_data = [2.0*x - 1.0 for x in src_bits] + src = blocks.vector_source_f(src_data) + op = digital.correlate_access_code_tag_ff("0011", 0, "sync") + dst = blocks.tag_debug(gr.sizeof_float, "sync") + self.tb.connect(src, op, dst) + self.tb.run() + result_data = dst.current_tags() + self.assertEqual(len(result_data), 1) + self.assertEqual(result_data[0].offset, 4) + def test_004(self): code = tuple(string_to_1_0_list(default_access_code)) access_code = to_1_0_string(code) @@ -96,4 +148,3 @@ class test_correlate_access_code(gr_unittest.TestCase): if __name__ == '__main__': gr_unittest.run(test_correlate_access_code, "test_correlate_access_code_tag.xml") - |