diff options
Diffstat (limited to 'gr-digital/python/digital/qa_packet_format.py')
-rw-r--r-- | gr-digital/python/digital/qa_packet_format.py | 47 |
1 files changed, 46 insertions, 1 deletions
diff --git a/gr-digital/python/digital/qa_packet_format.py b/gr-digital/python/digital/qa_packet_format.py index 6440b80a5e..ae1a79f1f4 100644 --- a/gr-digital/python/digital/qa_packet_format.py +++ b/gr-digital/python/digital/qa_packet_format.py @@ -20,7 +20,7 @@ # Boston, MA 02110-1301, USA. # -import random, time, struct +import time, struct import pmt from gnuradio import gr, gr_unittest, digital, blocks from gnuradio.digital import packet_utils @@ -80,6 +80,51 @@ class test_packet_format_fb(gr_unittest.TestCase): self.assertEqual(send_str, payload[0:length]) + def test_packet_parse_default(self): + ac = packet_utils.default_access_code + length = '0000000000000001' + + hdr_format_1bps = digital.header_format_default(ac, 0) + hdr_format_4bps = digital.header_format_default(ac, 0, 4) + + ac_bits = [int(x) & 1 for x in ac] + length_bits = [int(x) & 1 for x in length] + header_bits = ac_bits + length_bits + length_bits + + src_hdr = blocks.vector_source_b(header_bits) + + parser_1bps = digital.protocol_parser_b(hdr_format_1bps) + parser_4bps = digital.protocol_parser_b(hdr_format_4bps) + + snk_hdr_1bps = blocks.message_debug() + snk_hdr_4bps = blocks.message_debug() + + self.tb.connect(src_hdr, parser_1bps) + self.tb.connect(src_hdr, parser_4bps) + + self.tb.msg_connect(parser_1bps, 'info', snk_hdr_1bps, 'store') + self.tb.msg_connect(parser_4bps, 'info', snk_hdr_4bps, 'store') + + self.tb.start() + while (snk_hdr_1bps.num_messages() < 1) and (snk_hdr_4bps.num_messages() < 1): + time.sleep(0.1) + self.tb.stop() + self.tb.wait() + + result_1bps = snk_hdr_1bps.get_message(0) + result_4bps = snk_hdr_4bps.get_message(0) + + self.assertTrue(pmt.dict_has_key( + result_1bps, pmt.intern('payload symbols'))) + self.assertEqual(pmt.to_long(pmt.dict_ref( + result_1bps, pmt.intern('payload symbols'), pmt.PMT_F)), 8) + + self.assertTrue(pmt.dict_has_key( + result_4bps, pmt.intern('payload symbols'))) + self.assertEqual(pmt.to_long(pmt.dict_ref( + result_4bps, pmt.intern('payload symbols'), pmt.PMT_F)), 2) + + def test_packet_format_async_counter(self): bps = 2 ac = packet_utils.default_access_code |