summaryrefslogtreecommitdiff
path: root/gr-digital/python
diff options
context:
space:
mode:
authorThomas Habets <thomas@habets.se>2020-08-12 16:20:26 +0100
committermormj <34754695+mormj@users.noreply.github.com>2020-10-19 13:28:31 -0400
commitcc4cd94e326d6348228bd7d6763d99b8777124e6 (patch)
tree343972b821f7deddf4d0b8cb98d6ec04cbe41e32 /gr-digital/python
parentcf5264adf550ced75dc8262b71ae365707a63cc1 (diff)
digital/correlate_access_code: Prevent false triggers
Leftover data from last time CAC triggered, or just leading zeroes being ignored. This commit prevents these by assuring that we've processed at least as many bits as the code is, before allowing a trigger.
Diffstat (limited to 'gr-digital/python')
-rw-r--r--gr-digital/python/digital/qa_correlate_access_code_XX_ts.py81
-rw-r--r--gr-digital/python/digital/qa_correlate_access_code_tag.py57
2 files changed, 134 insertions, 4 deletions
diff --git a/gr-digital/python/digital/qa_correlate_access_code_XX_ts.py b/gr-digital/python/digital/qa_correlate_access_code_XX_ts.py
index 4d9693b6c1..3e3cc7a4fa 100644
--- a/gr-digital/python/digital/qa_correlate_access_code_XX_ts.py
+++ b/gr-digital/python/digital/qa_correlate_access_code_XX_ts.py
@@ -53,6 +53,45 @@ class test_correlate_access_code_XX_ts(gr_unittest.TestCase):
self.assertEqual(pmt.to_long(result_tags[0].value), len(payload)*8)
self.assertEqual(result_data, expected)
+ def test_bb_prefix(self):
+ payload = "test packet" # payload length is 11 bytes
+ header = "\x00\xd0\x00\xd0" # header contains packet length, twice (bit-swapped)
+ packet = header + payload
+ pad = (0,) * 64
+ src_data = (0, 1, 1, 1, 0, 0, 0, 1, 1) + tuple(string_to_1_0_list(packet)) + pad
+ expected = list(map(int, src_data[9+32:-len(pad)]))
+ src = blocks.vector_source_b(src_data)
+ op = digital.correlate_access_code_bb_ts("0011", 0, "sync")
+ dst = blocks.vector_sink_b()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ result_tags = dst.tags()
+ self.assertEqual(len(result_data), len(payload)*8)
+ self.assertEqual(result_tags[0].offset, 0)
+ self.assertEqual(pmt.to_long(result_tags[0].value), len(payload)*8)
+ self.assertEqual(result_data, expected)
+
+ def test_bb_immediate(self):
+ """Test that packets at start of stream match"""
+ payload = "test packet" # payload length is 11 bytes
+ header = "\x00\xd0\x00\xd0" # header contains packet length, twice (bit-swapped)
+ packet = header + payload
+ pad = (0,) * 64
+ src_data = (0, 0, 1, 1) + tuple(string_to_1_0_list(packet)) + pad
+ expected = list(map(int, src_data[4+32:-len(pad)]))
+ src = blocks.vector_source_b(src_data)
+ op = digital.correlate_access_code_bb_ts("0011", 0, "sync")
+ dst = blocks.vector_sink_b()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ result_tags = dst.tags()
+ #self.assertEqual(len(result_data), len(packet)*8)
+ self.assertEqual(result_tags[0].offset, 0)
+ #self.assertEqual(pmt.to_long(result_tags[0].value), len(payload)*8)
+ self.assertEqual(result_data, expected)
+
def test_002(self):
payload = "test packet" # payload length is 11 bytes
header = "\x00\xd0\x00\xd0" # header contains packet length, twice (bit-swapped)
@@ -73,7 +112,47 @@ class test_correlate_access_code_XX_ts(gr_unittest.TestCase):
self.assertEqual(pmt.to_long(result_tags[0].value), len(payload)*8)
self.assertFloatTuplesAlmostEqual(result_data, expected, 5)
+ def test_ff_prefix(self):
+ payload = "test packet" # payload length is 11 bytes
+ header = "\x00\xd0\x00\xd0" # header contains packet length, twice (bit-swapped)
+ packet = header + payload
+ pad = (0,) * 64
+ src_data = (0, 1, 1, 1, 1, 0, 0, 1, 1) + tuple(string_to_1_0_list(packet)) + pad
+ src_floats = tuple(2*b-1 for b in src_data) # convert to binary antipodal symbols (-1,1)
+ expected = src_floats[9+32:-len(pad)]
+ src = blocks.vector_source_f(src_floats)
+ op = digital.correlate_access_code_ff_ts("0011", 0, "sync")
+ dst = blocks.vector_sink_f()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ result_tags = dst.tags()
+ self.assertEqual(len(result_data), len(payload)*8)
+ self.assertEqual(result_tags[0].offset, 0)
+ self.assertEqual(pmt.to_long(result_tags[0].value), len(payload)*8)
+ self.assertFloatTuplesAlmostEqual(result_data, expected, 5)
+
+ def test_ff_immediate(self):
+ """Test that packets at start of stream match"""
+ payload = "test packet" # payload length is 11 bytes
+ header = "\x00\xd0\x00\xd0" # header contains packet length, twice (bit-swapped)
+ packet = header + payload
+ pad = (0,) * 64
+ src_data = (0, 0, 1, 1) + tuple(string_to_1_0_list(packet)) + pad
+ src_floats = tuple(2*b-1 for b in src_data) # convert to binary antipodal symbols (-1,1)
+ expected = src_floats[4+32:-len(pad)]
+ src = blocks.vector_source_f(src_floats)
+ op = digital.correlate_access_code_ff_ts("0011", 0, "sync")
+ dst = blocks.vector_sink_f()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ result_tags = dst.tags()
+ self.assertEqual(len(result_data), len(payload)*8)
+ self.assertEqual(result_tags[0].offset, 0)
+ self.assertEqual(pmt.to_long(result_tags[0].value), len(payload)*8)
+ self.assertFloatTuplesAlmostEqual(result_data, expected, 5)
+
if __name__ == '__main__':
gr_unittest.run(test_correlate_access_code_XX_ts, "test_correlate_access_code_XX_ts.xml")
-
diff --git a/gr-digital/python/digital/qa_correlate_access_code_tag.py b/gr-digital/python/digital/qa_correlate_access_code_tag.py
index d277ac6116..9b79529aba 100644
--- a/gr-digital/python/digital/qa_correlate_access_code_tag.py
+++ b/gr-digital/python/digital/qa_correlate_access_code_tag.py
@@ -34,7 +34,7 @@ class test_correlate_access_code(gr_unittest.TestCase):
def tearDown(self):
self.tb = None
- def test_001(self):
+ def test_bb(self):
pad = (0,) * 64
src_data = (1, 0, 1, 1, 1, 1, 0, 1, 1) + pad + (0,) * 7
src = blocks.vector_source_b(src_data)
@@ -47,6 +47,31 @@ class test_correlate_access_code(gr_unittest.TestCase):
self.assertEqual(result_data[0].offset, 4)
self.assertEqual(result_data[1].offset, 9)
+ def test_bb_skip_prefix(self):
+ pad = (0,) * 64
+ src_data = (0, 1, 1, 1, 1, 0, 0, 1, 1) + pad + (0,) * 7
+ src = blocks.vector_source_b(src_data)
+ op = digital.correlate_access_code_tag_bb("0011", 0, "sync")
+ dst = blocks.tag_debug(gr.sizeof_char, "sync")
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.current_tags()
+ self.assertEqual(len(result_data), 1)
+ self.assertEqual(result_data[0].offset, 9)
+
+ def test_bb_immediate(self):
+ """Test that packets at start of stream match"""
+ pad = (0,) * 64
+ src_data = (0, 0, 1, 1) + pad + (0,) * 7
+ src = blocks.vector_source_b(src_data)
+ op = digital.correlate_access_code_tag_bb("0011", 0, "sync")
+ dst = blocks.tag_debug(gr.sizeof_char, "sync")
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.current_tags()
+ self.assertEqual(len(result_data), 1)
+ self.assertEqual(result_data[0].offset, 4)
+
def test_002(self):
code = tuple(string_to_1_0_list(default_access_code))
access_code = to_1_0_string(code)
@@ -63,7 +88,7 @@ class test_correlate_access_code(gr_unittest.TestCase):
self.assertEqual(len(result_data), 1)
self.assertEqual(result_data[0].offset, len(code))
- def test_003(self):
+ def test_ff(self):
pad = (0,) * 64
src_bits = (1, 0, 1, 1, 1, 1, 0, 1, 1) + pad + (0,) * 7
src_data = [2.0*x - 1.0 for x in src_bits]
@@ -77,6 +102,33 @@ class test_correlate_access_code(gr_unittest.TestCase):
self.assertEqual(result_data[0].offset, 4)
self.assertEqual(result_data[1].offset, 9)
+ def test_ff_skip_prefix(self):
+ pad = (0,) * 64
+ src_bits = (0, 1, 1, 1, 1, 0, 0, 1, 1) + pad + (0,) * 7
+ src_data = [2.0*x - 1.0 for x in src_bits]
+ src = blocks.vector_source_f(src_data)
+ op = digital.correlate_access_code_tag_ff("0011", 0, "sync")
+ dst = blocks.tag_debug(gr.sizeof_float, "sync")
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.current_tags()
+ self.assertEqual(len(result_data), 1)
+ self.assertEqual(result_data[0].offset, 9)
+
+ def test_ff_immediate(self):
+ """Test that packets at start of stream match"""
+ pad = (0,) * 64
+ src_bits = (0, 0, 1, 1) + pad + (0,) * 7
+ src_data = [2.0*x - 1.0 for x in src_bits]
+ src = blocks.vector_source_f(src_data)
+ op = digital.correlate_access_code_tag_ff("0011", 0, "sync")
+ dst = blocks.tag_debug(gr.sizeof_float, "sync")
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.current_tags()
+ self.assertEqual(len(result_data), 1)
+ self.assertEqual(result_data[0].offset, 4)
+
def test_004(self):
code = tuple(string_to_1_0_list(default_access_code))
access_code = to_1_0_string(code)
@@ -96,4 +148,3 @@ class test_correlate_access_code(gr_unittest.TestCase):
if __name__ == '__main__':
gr_unittest.run(test_correlate_access_code, "test_correlate_access_code_tag.xml")
-