summaryrefslogtreecommitdiff
path: root/gr-digital/python
diff options
context:
space:
mode:
Diffstat (limited to 'gr-digital/python')
-rwxr-xr-xgr-digital/python/digital/qa_correlate_access_code.py20
-rwxr-xr-xgr-digital/python/digital/qa_correlate_access_code_tag.py38
-rw-r--r--gr-digital/python/digital/qa_packet_format.py47
3 files changed, 99 insertions, 6 deletions
diff --git a/gr-digital/python/digital/qa_correlate_access_code.py b/gr-digital/python/digital/qa_correlate_access_code.py
index d89b457117..0302e81313 100755
--- a/gr-digital/python/digital/qa_correlate_access_code.py
+++ b/gr-digital/python/digital/qa_correlate_access_code.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2006,2007,2010,2011,2013 Free Software Foundation, Inc.
+# Copyright 2006,2007,2010,2011,2013,2017 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -91,6 +91,24 @@ class test_correlate_access_code(gr_unittest.TestCase):
result_data = dst.data()
self.assertEqual(expected_result, result_data)
+ def test_004(self):
+ code = tuple(string_to_1_0_list(default_access_code))
+ access_code = to_1_0_string(code)
+ pad = (0,) * 64
+ #print code
+ #print access_code
+ src_bits = code + (1, 0, 1, 1) + pad
+ src_data = [2.0*x - 1.0 for x in src_bits]
+ expected_result_bits = code + (1, 0, 1, 1) + pad
+ expected_result = [2.0*x - 1.0 for x in expected_result_bits]
+ src = blocks.vector_source_f(src_data)
+ op = digital.correlate_access_code_tag_ff(access_code, 0, "test")
+ dst = blocks.vector_sink_f()
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.data()
+ self.assertFloatTuplesAlmostEqual(expected_result, result_data, 5)
+
if __name__ == '__main__':
gr_unittest.run(test_correlate_access_code, "test_correlate_access_code.xml")
diff --git a/gr-digital/python/digital/qa_correlate_access_code_tag.py b/gr-digital/python/digital/qa_correlate_access_code_tag.py
index f2663e4ecc..378333c286 100755
--- a/gr-digital/python/digital/qa_correlate_access_code_tag.py
+++ b/gr-digital/python/digital/qa_correlate_access_code_tag.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2006,2007,2010,2011,2013 Free Software Foundation, Inc.
+# Copyright 2006,2007,2010,2011,2013,2017 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -50,7 +50,7 @@ class test_correlate_access_code(gr_unittest.TestCase):
src_data = (1, 0, 1, 1, 1, 1, 0, 1, 1) + pad + (0,) * 7
src = blocks.vector_source_b(src_data)
op = digital.correlate_access_code_tag_bb("1011", 0, "sync")
- dst = blocks.tag_debug(1, "sync")
+ dst = blocks.tag_debug(gr.sizeof_char, "sync")
self.tb.connect(src, op, dst)
self.tb.run()
result_data = dst.current_tags()
@@ -65,10 +65,40 @@ class test_correlate_access_code(gr_unittest.TestCase):
#print code
#print access_code
src_data = code + (1, 0, 1, 1) + pad
- expected_result = pad + code + (3, 0, 1, 1)
src = blocks.vector_source_b(src_data)
op = digital.correlate_access_code_tag_bb(access_code, 0, "sync")
- dst = blocks.tag_debug(1, "sync")
+ dst = blocks.tag_debug(gr.sizeof_char, "sync")
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.current_tags()
+ self.assertEqual(len(result_data), 1)
+ self.assertEqual(result_data[0].offset, len(code))
+
+ def test_003(self):
+ pad = (0,) * 64
+ src_bits = (1, 0, 1, 1, 1, 1, 0, 1, 1) + pad + (0,) * 7
+ src_data = [2.0*x - 1.0 for x in src_bits]
+ src = blocks.vector_source_f(src_data)
+ op = digital.correlate_access_code_tag_ff("1011", 0, "sync")
+ dst = blocks.tag_debug(gr.sizeof_float, "sync")
+ self.tb.connect(src, op, dst)
+ self.tb.run()
+ result_data = dst.current_tags()
+ self.assertEqual(len(result_data), 2)
+ self.assertEqual(result_data[0].offset, 4)
+ self.assertEqual(result_data[1].offset, 9)
+
+ def test_004(self):
+ code = tuple(string_to_1_0_list(default_access_code))
+ access_code = to_1_0_string(code)
+ pad = (0,) * 64
+ #print code
+ #print access_code
+ src_bits = code + (1, 0, 1, 1) + pad
+ src_data = [2.0*x - 1.0 for x in src_bits]
+ src = blocks.vector_source_f(src_data)
+ op = digital.correlate_access_code_tag_ff(access_code, 0, "sync")
+ dst = blocks.tag_debug(gr.sizeof_float, "sync")
self.tb.connect(src, op, dst)
self.tb.run()
result_data = dst.current_tags()
diff --git a/gr-digital/python/digital/qa_packet_format.py b/gr-digital/python/digital/qa_packet_format.py
index 6440b80a5e..ae1a79f1f4 100644
--- a/gr-digital/python/digital/qa_packet_format.py
+++ b/gr-digital/python/digital/qa_packet_format.py
@@ -20,7 +20,7 @@
# Boston, MA 02110-1301, USA.
#
-import random, time, struct
+import time, struct
import pmt
from gnuradio import gr, gr_unittest, digital, blocks
from gnuradio.digital import packet_utils
@@ -80,6 +80,51 @@ class test_packet_format_fb(gr_unittest.TestCase):
self.assertEqual(send_str, payload[0:length])
+ def test_packet_parse_default(self):
+ ac = packet_utils.default_access_code
+ length = '0000000000000001'
+
+ hdr_format_1bps = digital.header_format_default(ac, 0)
+ hdr_format_4bps = digital.header_format_default(ac, 0, 4)
+
+ ac_bits = [int(x) & 1 for x in ac]
+ length_bits = [int(x) & 1 for x in length]
+ header_bits = ac_bits + length_bits + length_bits
+
+ src_hdr = blocks.vector_source_b(header_bits)
+
+ parser_1bps = digital.protocol_parser_b(hdr_format_1bps)
+ parser_4bps = digital.protocol_parser_b(hdr_format_4bps)
+
+ snk_hdr_1bps = blocks.message_debug()
+ snk_hdr_4bps = blocks.message_debug()
+
+ self.tb.connect(src_hdr, parser_1bps)
+ self.tb.connect(src_hdr, parser_4bps)
+
+ self.tb.msg_connect(parser_1bps, 'info', snk_hdr_1bps, 'store')
+ self.tb.msg_connect(parser_4bps, 'info', snk_hdr_4bps, 'store')
+
+ self.tb.start()
+ while (snk_hdr_1bps.num_messages() < 1) and (snk_hdr_4bps.num_messages() < 1):
+ time.sleep(0.1)
+ self.tb.stop()
+ self.tb.wait()
+
+ result_1bps = snk_hdr_1bps.get_message(0)
+ result_4bps = snk_hdr_4bps.get_message(0)
+
+ self.assertTrue(pmt.dict_has_key(
+ result_1bps, pmt.intern('payload symbols')))
+ self.assertEqual(pmt.to_long(pmt.dict_ref(
+ result_1bps, pmt.intern('payload symbols'), pmt.PMT_F)), 8)
+
+ self.assertTrue(pmt.dict_has_key(
+ result_4bps, pmt.intern('payload symbols')))
+ self.assertEqual(pmt.to_long(pmt.dict_ref(
+ result_4bps, pmt.intern('payload symbols'), pmt.PMT_F)), 2)
+
+
def test_packet_format_async_counter(self):
bps = 2
ac = packet_utils.default_access_code