diff options
Diffstat (limited to 'gr-digital/python')
-rwxr-xr-x | gr-digital/python/digital/qa_correlate_access_code.py | 20 | ||||
-rwxr-xr-x | gr-digital/python/digital/qa_correlate_access_code_tag.py | 38 | ||||
-rw-r--r-- | gr-digital/python/digital/qa_packet_format.py | 47 |
3 files changed, 99 insertions, 6 deletions
diff --git a/gr-digital/python/digital/qa_correlate_access_code.py b/gr-digital/python/digital/qa_correlate_access_code.py index d89b457117..0302e81313 100755 --- a/gr-digital/python/digital/qa_correlate_access_code.py +++ b/gr-digital/python/digital/qa_correlate_access_code.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2006,2007,2010,2011,2013 Free Software Foundation, Inc. +# Copyright 2006,2007,2010,2011,2013,2017 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -91,6 +91,24 @@ class test_correlate_access_code(gr_unittest.TestCase): result_data = dst.data() self.assertEqual(expected_result, result_data) + def test_004(self): + code = tuple(string_to_1_0_list(default_access_code)) + access_code = to_1_0_string(code) + pad = (0,) * 64 + #print code + #print access_code + src_bits = code + (1, 0, 1, 1) + pad + src_data = [2.0*x - 1.0 for x in src_bits] + expected_result_bits = code + (1, 0, 1, 1) + pad + expected_result = [2.0*x - 1.0 for x in expected_result_bits] + src = blocks.vector_source_f(src_data) + op = digital.correlate_access_code_tag_ff(access_code, 0, "test") + dst = blocks.vector_sink_f() + self.tb.connect(src, op, dst) + self.tb.run() + result_data = dst.data() + self.assertFloatTuplesAlmostEqual(expected_result, result_data, 5) + if __name__ == '__main__': gr_unittest.run(test_correlate_access_code, "test_correlate_access_code.xml") diff --git a/gr-digital/python/digital/qa_correlate_access_code_tag.py b/gr-digital/python/digital/qa_correlate_access_code_tag.py index f2663e4ecc..378333c286 100755 --- a/gr-digital/python/digital/qa_correlate_access_code_tag.py +++ b/gr-digital/python/digital/qa_correlate_access_code_tag.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2006,2007,2010,2011,2013 Free Software Foundation, Inc. +# Copyright 2006,2007,2010,2011,2013,2017 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -50,7 +50,7 @@ class test_correlate_access_code(gr_unittest.TestCase): src_data = (1, 0, 1, 1, 1, 1, 0, 1, 1) + pad + (0,) * 7 src = blocks.vector_source_b(src_data) op = digital.correlate_access_code_tag_bb("1011", 0, "sync") - dst = blocks.tag_debug(1, "sync") + dst = blocks.tag_debug(gr.sizeof_char, "sync") self.tb.connect(src, op, dst) self.tb.run() result_data = dst.current_tags() @@ -65,10 +65,40 @@ class test_correlate_access_code(gr_unittest.TestCase): #print code #print access_code src_data = code + (1, 0, 1, 1) + pad - expected_result = pad + code + (3, 0, 1, 1) src = blocks.vector_source_b(src_data) op = digital.correlate_access_code_tag_bb(access_code, 0, "sync") - dst = blocks.tag_debug(1, "sync") + dst = blocks.tag_debug(gr.sizeof_char, "sync") + self.tb.connect(src, op, dst) + self.tb.run() + result_data = dst.current_tags() + self.assertEqual(len(result_data), 1) + self.assertEqual(result_data[0].offset, len(code)) + + def test_003(self): + pad = (0,) * 64 + src_bits = (1, 0, 1, 1, 1, 1, 0, 1, 1) + pad + (0,) * 7 + src_data = [2.0*x - 1.0 for x in src_bits] + src = blocks.vector_source_f(src_data) + op = digital.correlate_access_code_tag_ff("1011", 0, "sync") + dst = blocks.tag_debug(gr.sizeof_float, "sync") + self.tb.connect(src, op, dst) + self.tb.run() + result_data = dst.current_tags() + self.assertEqual(len(result_data), 2) + self.assertEqual(result_data[0].offset, 4) + self.assertEqual(result_data[1].offset, 9) + + def test_004(self): + code = tuple(string_to_1_0_list(default_access_code)) + access_code = to_1_0_string(code) + pad = (0,) * 64 + #print code + #print access_code + src_bits = code + (1, 0, 1, 1) + pad + src_data = [2.0*x - 1.0 for x in src_bits] + src = blocks.vector_source_f(src_data) + op = digital.correlate_access_code_tag_ff(access_code, 0, "sync") + dst = blocks.tag_debug(gr.sizeof_float, "sync") self.tb.connect(src, op, dst) self.tb.run() result_data = dst.current_tags() diff --git a/gr-digital/python/digital/qa_packet_format.py b/gr-digital/python/digital/qa_packet_format.py index 6440b80a5e..ae1a79f1f4 100644 --- a/gr-digital/python/digital/qa_packet_format.py +++ b/gr-digital/python/digital/qa_packet_format.py @@ -20,7 +20,7 @@ # Boston, MA 02110-1301, USA. # -import random, time, struct +import time, struct import pmt from gnuradio import gr, gr_unittest, digital, blocks from gnuradio.digital import packet_utils @@ -80,6 +80,51 @@ class test_packet_format_fb(gr_unittest.TestCase): self.assertEqual(send_str, payload[0:length]) + def test_packet_parse_default(self): + ac = packet_utils.default_access_code + length = '0000000000000001' + + hdr_format_1bps = digital.header_format_default(ac, 0) + hdr_format_4bps = digital.header_format_default(ac, 0, 4) + + ac_bits = [int(x) & 1 for x in ac] + length_bits = [int(x) & 1 for x in length] + header_bits = ac_bits + length_bits + length_bits + + src_hdr = blocks.vector_source_b(header_bits) + + parser_1bps = digital.protocol_parser_b(hdr_format_1bps) + parser_4bps = digital.protocol_parser_b(hdr_format_4bps) + + snk_hdr_1bps = blocks.message_debug() + snk_hdr_4bps = blocks.message_debug() + + self.tb.connect(src_hdr, parser_1bps) + self.tb.connect(src_hdr, parser_4bps) + + self.tb.msg_connect(parser_1bps, 'info', snk_hdr_1bps, 'store') + self.tb.msg_connect(parser_4bps, 'info', snk_hdr_4bps, 'store') + + self.tb.start() + while (snk_hdr_1bps.num_messages() < 1) and (snk_hdr_4bps.num_messages() < 1): + time.sleep(0.1) + self.tb.stop() + self.tb.wait() + + result_1bps = snk_hdr_1bps.get_message(0) + result_4bps = snk_hdr_4bps.get_message(0) + + self.assertTrue(pmt.dict_has_key( + result_1bps, pmt.intern('payload symbols'))) + self.assertEqual(pmt.to_long(pmt.dict_ref( + result_1bps, pmt.intern('payload symbols'), pmt.PMT_F)), 8) + + self.assertTrue(pmt.dict_has_key( + result_4bps, pmt.intern('payload symbols'))) + self.assertEqual(pmt.to_long(pmt.dict_ref( + result_4bps, pmt.intern('payload symbols'), pmt.PMT_F)), 2) + + def test_packet_format_async_counter(self): bps = 2 ac = packet_utils.default_access_code |