diff options
author | Thomas Habets <thomas@habets.se> | 2020-08-12 16:20:26 +0100 |
---|---|---|
committer | mormj <34754695+mormj@users.noreply.github.com> | 2020-10-19 13:28:31 -0400 |
commit | cc4cd94e326d6348228bd7d6763d99b8777124e6 (patch) | |
tree | 343972b821f7deddf4d0b8cb98d6ec04cbe41e32 /gr-digital/python | |
parent | cf5264adf550ced75dc8262b71ae365707a63cc1 (diff) |
digital/correlate_access_code: Prevent false triggers
Leftover data from last time CAC triggered, or just leading zeroes
being ignored.
This commit prevents these by assuring that we've processed at least
as many bits as the code is, before allowing a trigger.
Diffstat (limited to 'gr-digital/python')
-rw-r--r-- | gr-digital/python/digital/qa_correlate_access_code_XX_ts.py | 81 | ||||
-rw-r--r-- | gr-digital/python/digital/qa_correlate_access_code_tag.py | 57 |
2 files changed, 134 insertions, 4 deletions
diff --git a/gr-digital/python/digital/qa_correlate_access_code_XX_ts.py b/gr-digital/python/digital/qa_correlate_access_code_XX_ts.py index 4d9693b6c1..3e3cc7a4fa 100644 --- a/gr-digital/python/digital/qa_correlate_access_code_XX_ts.py +++ b/gr-digital/python/digital/qa_correlate_access_code_XX_ts.py @@ -53,6 +53,45 @@ class test_correlate_access_code_XX_ts(gr_unittest.TestCase): self.assertEqual(pmt.to_long(result_tags[0].value), len(payload)*8) self.assertEqual(result_data, expected) + def test_bb_prefix(self): + payload = "test packet" # payload length is 11 bytes + header = "\x00\xd0\x00\xd0" # header contains packet length, twice (bit-swapped) + packet = header + payload + pad = (0,) * 64 + src_data = (0, 1, 1, 1, 0, 0, 0, 1, 1) + tuple(string_to_1_0_list(packet)) + pad + expected = list(map(int, src_data[9+32:-len(pad)])) + src = blocks.vector_source_b(src_data) + op = digital.correlate_access_code_bb_ts("0011", 0, "sync") + dst = blocks.vector_sink_b() + self.tb.connect(src, op, dst) + self.tb.run() + result_data = dst.data() + result_tags = dst.tags() + self.assertEqual(len(result_data), len(payload)*8) + self.assertEqual(result_tags[0].offset, 0) + self.assertEqual(pmt.to_long(result_tags[0].value), len(payload)*8) + self.assertEqual(result_data, expected) + + def test_bb_immediate(self): + """Test that packets at start of stream match""" + payload = "test packet" # payload length is 11 bytes + header = "\x00\xd0\x00\xd0" # header contains packet length, twice (bit-swapped) + packet = header + payload + pad = (0,) * 64 + src_data = (0, 0, 1, 1) + tuple(string_to_1_0_list(packet)) + pad + expected = list(map(int, src_data[4+32:-len(pad)])) + src = blocks.vector_source_b(src_data) + op = digital.correlate_access_code_bb_ts("0011", 0, "sync") + dst = blocks.vector_sink_b() + self.tb.connect(src, op, dst) + self.tb.run() + result_data = dst.data() + result_tags = dst.tags() + #self.assertEqual(len(result_data), len(packet)*8) + self.assertEqual(result_tags[0].offset, 0) + #self.assertEqual(pmt.to_long(result_tags[0].value), len(payload)*8) + self.assertEqual(result_data, expected) + def test_002(self): payload = "test packet" # payload length is 11 bytes header = "\x00\xd0\x00\xd0" # header contains packet length, twice (bit-swapped) @@ -73,7 +112,47 @@ class test_correlate_access_code_XX_ts(gr_unittest.TestCase): self.assertEqual(pmt.to_long(result_tags[0].value), len(payload)*8) self.assertFloatTuplesAlmostEqual(result_data, expected, 5) + def test_ff_prefix(self): + payload = "test packet" # payload length is 11 bytes + header = "\x00\xd0\x00\xd0" # header contains packet length, twice (bit-swapped) + packet = header + payload + pad = (0,) * 64 + src_data = (0, 1, 1, 1, 1, 0, 0, 1, 1) + tuple(string_to_1_0_list(packet)) + pad + src_floats = tuple(2*b-1 for b in src_data) # convert to binary antipodal symbols (-1,1) + expected = src_floats[9+32:-len(pad)] + src = blocks.vector_source_f(src_floats) + op = digital.correlate_access_code_ff_ts("0011", 0, "sync") + dst = blocks.vector_sink_f() + self.tb.connect(src, op, dst) + self.tb.run() + result_data = dst.data() + result_tags = dst.tags() + self.assertEqual(len(result_data), len(payload)*8) + self.assertEqual(result_tags[0].offset, 0) + self.assertEqual(pmt.to_long(result_tags[0].value), len(payload)*8) + self.assertFloatTuplesAlmostEqual(result_data, expected, 5) + + def test_ff_immediate(self): + """Test that packets at start of stream match""" + payload = "test packet" # payload length is 11 bytes + header = "\x00\xd0\x00\xd0" # header contains packet length, twice (bit-swapped) + packet = header + payload + pad = (0,) * 64 + src_data = (0, 0, 1, 1) + tuple(string_to_1_0_list(packet)) + pad + src_floats = tuple(2*b-1 for b in src_data) # convert to binary antipodal symbols (-1,1) + expected = src_floats[4+32:-len(pad)] + src = blocks.vector_source_f(src_floats) + op = digital.correlate_access_code_ff_ts("0011", 0, "sync") + dst = blocks.vector_sink_f() + self.tb.connect(src, op, dst) + self.tb.run() + result_data = dst.data() + result_tags = dst.tags() + self.assertEqual(len(result_data), len(payload)*8) + self.assertEqual(result_tags[0].offset, 0) + self.assertEqual(pmt.to_long(result_tags[0].value), len(payload)*8) + self.assertFloatTuplesAlmostEqual(result_data, expected, 5) + if __name__ == '__main__': gr_unittest.run(test_correlate_access_code_XX_ts, "test_correlate_access_code_XX_ts.xml") - diff --git a/gr-digital/python/digital/qa_correlate_access_code_tag.py b/gr-digital/python/digital/qa_correlate_access_code_tag.py index d277ac6116..9b79529aba 100644 --- a/gr-digital/python/digital/qa_correlate_access_code_tag.py +++ b/gr-digital/python/digital/qa_correlate_access_code_tag.py @@ -34,7 +34,7 @@ class test_correlate_access_code(gr_unittest.TestCase): def tearDown(self): self.tb = None - def test_001(self): + def test_bb(self): pad = (0,) * 64 src_data = (1, 0, 1, 1, 1, 1, 0, 1, 1) + pad + (0,) * 7 src = blocks.vector_source_b(src_data) @@ -47,6 +47,31 @@ class test_correlate_access_code(gr_unittest.TestCase): self.assertEqual(result_data[0].offset, 4) self.assertEqual(result_data[1].offset, 9) + def test_bb_skip_prefix(self): + pad = (0,) * 64 + src_data = (0, 1, 1, 1, 1, 0, 0, 1, 1) + pad + (0,) * 7 + src = blocks.vector_source_b(src_data) + op = digital.correlate_access_code_tag_bb("0011", 0, "sync") + dst = blocks.tag_debug(gr.sizeof_char, "sync") + self.tb.connect(src, op, dst) + self.tb.run() + result_data = dst.current_tags() + self.assertEqual(len(result_data), 1) + self.assertEqual(result_data[0].offset, 9) + + def test_bb_immediate(self): + """Test that packets at start of stream match""" + pad = (0,) * 64 + src_data = (0, 0, 1, 1) + pad + (0,) * 7 + src = blocks.vector_source_b(src_data) + op = digital.correlate_access_code_tag_bb("0011", 0, "sync") + dst = blocks.tag_debug(gr.sizeof_char, "sync") + self.tb.connect(src, op, dst) + self.tb.run() + result_data = dst.current_tags() + self.assertEqual(len(result_data), 1) + self.assertEqual(result_data[0].offset, 4) + def test_002(self): code = tuple(string_to_1_0_list(default_access_code)) access_code = to_1_0_string(code) @@ -63,7 +88,7 @@ class test_correlate_access_code(gr_unittest.TestCase): self.assertEqual(len(result_data), 1) self.assertEqual(result_data[0].offset, len(code)) - def test_003(self): + def test_ff(self): pad = (0,) * 64 src_bits = (1, 0, 1, 1, 1, 1, 0, 1, 1) + pad + (0,) * 7 src_data = [2.0*x - 1.0 for x in src_bits] @@ -77,6 +102,33 @@ class test_correlate_access_code(gr_unittest.TestCase): self.assertEqual(result_data[0].offset, 4) self.assertEqual(result_data[1].offset, 9) + def test_ff_skip_prefix(self): + pad = (0,) * 64 + src_bits = (0, 1, 1, 1, 1, 0, 0, 1, 1) + pad + (0,) * 7 + src_data = [2.0*x - 1.0 for x in src_bits] + src = blocks.vector_source_f(src_data) + op = digital.correlate_access_code_tag_ff("0011", 0, "sync") + dst = blocks.tag_debug(gr.sizeof_float, "sync") + self.tb.connect(src, op, dst) + self.tb.run() + result_data = dst.current_tags() + self.assertEqual(len(result_data), 1) + self.assertEqual(result_data[0].offset, 9) + + def test_ff_immediate(self): + """Test that packets at start of stream match""" + pad = (0,) * 64 + src_bits = (0, 0, 1, 1) + pad + (0,) * 7 + src_data = [2.0*x - 1.0 for x in src_bits] + src = blocks.vector_source_f(src_data) + op = digital.correlate_access_code_tag_ff("0011", 0, "sync") + dst = blocks.tag_debug(gr.sizeof_float, "sync") + self.tb.connect(src, op, dst) + self.tb.run() + result_data = dst.current_tags() + self.assertEqual(len(result_data), 1) + self.assertEqual(result_data[0].offset, 4) + def test_004(self): code = tuple(string_to_1_0_list(default_access_code)) access_code = to_1_0_string(code) @@ -96,4 +148,3 @@ class test_correlate_access_code(gr_unittest.TestCase): if __name__ == '__main__': gr_unittest.run(test_correlate_access_code, "test_correlate_access_code_tag.xml") - |