diff options
Diffstat (limited to 'gr-digital/python/digital/qa_packet_headerparser_b.py')
-rw-r--r-- | gr-digital/python/digital/qa_packet_headerparser_b.py | 93 |
1 files changed, 54 insertions, 39 deletions
diff --git a/gr-digital/python/digital/qa_packet_headerparser_b.py b/gr-digital/python/digital/qa_packet_headerparser_b.py index 13840c7cad..897220a59d 100644 --- a/gr-digital/python/digital/qa_packet_headerparser_b.py +++ b/gr-digital/python/digital/qa_packet_headerparser_b.py @@ -1,11 +1,11 @@ #!/usr/bin/env python # Copyright 2012 Free Software Foundation, Inc. -# +# # This file is part of GNU Radio -# +# # SPDX-License-Identifier: GPL-3.0-or-later # -# +# import time @@ -15,16 +15,17 @@ from gnuradio import gr, gr_unittest, blocks, digital from gnuradio.digital.utils import tagged_streams import pmt + class qa_packet_headerparser_b (gr_unittest.TestCase): - def setUp (self): + def setUp(self): random.seed(0) - self.tb = gr.top_block () + self.tb = gr.top_block() - def tearDown (self): + def tearDown(self): self.tb = None - def test_001_t (self): + def test_001_t(self): """ First header: Packet length 4, packet num 0 Second header: Packet 2, packet num 1 @@ -32,9 +33,9 @@ class qa_packet_headerparser_b (gr_unittest.TestCase): """ encoded_headers = ( # | Number of bytes | Packet number | CRC - 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 1, - 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 1, - 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1 + 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 1, + 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 1, + 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1 ) packet_len_tagname = "packet_len" random_tag = gr.tag_t() @@ -67,10 +68,13 @@ class qa_packet_headerparser_b (gr_unittest.TestCase): header_len = 32 packet_len_tagname = "packet_len" packet_lengths = [random.randint(1, 100) for x in range(N)] - data, tags = tagged_streams.packets_to_vectors([list(range(packet_lengths[i])) for i in range(N)], packet_len_tagname) + data, tags = tagged_streams.packets_to_vectors( + [list(range(packet_lengths[i])) for i in range(N)], packet_len_tagname) src = blocks.vector_source_b(data, False, 1, tags) - header_gen = digital.packet_headergenerator_bb(header_len, packet_len_tagname) - header_parser = digital.packet_headerparser_b(header_len, packet_len_tagname) + header_gen = digital.packet_headergenerator_bb( + header_len, packet_len_tagname) + header_parser = digital.packet_headerparser_b( + header_len, packet_len_tagname) sink = blocks.message_debug() self.tb.connect(src, header_gen, header_parser) self.tb.msg_connect(header_parser, "header_data", sink, "store") @@ -81,9 +85,11 @@ class qa_packet_headerparser_b (gr_unittest.TestCase): self.assertEqual(sink.num_messages(), N) for i in range(N): msg = pmt.to_python(sink.get_message(i)) - self.assertEqual(msg, {'packet_len': packet_lengths[i], 'packet_num': i}) + self.assertEqual( + msg, { + 'packet_len': packet_lengths[i], 'packet_num': i}) - def test_003_ofdm (self): + def test_003_ofdm(self): """ Header 1: 193 bytes Header 2: 8 bytes 2 bits per complex symbol, 32 carriers => 64 bits = 8 bytes per OFDM symbol @@ -93,20 +99,21 @@ class qa_packet_headerparser_b (gr_unittest.TestCase): """ encoded_headers = ( # | Number of bytes | Packet number | CRC - 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 0, 1, 1, 0, - 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 1, 0, + 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 0, 1, 1, 0, + 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 1, 0, ) packet_len_tagname = "packet_len" frame_len_tagname = "frame_len" src = blocks.vector_source_b(encoded_headers) header_formatter = digital.packet_header_ofdm( - (list(range(32)),list(range(4)),list(range(8))), # 32/4/8 carriers are occupied (which doesn't matter here) - 1, # 1 OFDM symbol per header (= 32 bits) - packet_len_tagname, - frame_len_tagname, - "packet_num", - 1, # 1 bit per header symbols (BPSK) - 2 # 2 bits per payload symbol (QPSK) + # 32/4/8 carriers are occupied (which doesn't matter here) + (list(range(32)), list(range(4)), list(range(8))), + 1, # 1 OFDM symbol per header (= 32 bits) + packet_len_tagname, + frame_len_tagname, + "packet_num", + 1, # 1 bit per header symbols (BPSK) + 2 # 2 bits per payload symbol (QPSK) ) parser = digital.packet_headerparser_b(header_formatter.base()) sink = blocks.message_debug() @@ -120,8 +127,11 @@ class qa_packet_headerparser_b (gr_unittest.TestCase): msg1 = pmt.to_python(sink.get_message(0)) msg2 = pmt.to_python(sink.get_message(1)) # Multiply with 4 because unpacked bytes have only two bits - self.assertEqual(msg1, {'packet_len': 193*4, 'frame_len': 52, 'packet_num': 0}) - self.assertEqual(msg2, {'packet_len': 8*4, 'frame_len': 1, 'packet_num': 1}) + self.assertEqual(msg1, {'packet_len': 193 * 4, + 'frame_len': 52, 'packet_num': 0}) + self.assertEqual( + msg2, { + 'packet_len': 8 * 4, 'frame_len': 1, 'packet_num': 1}) def test_004_ofdm_scramble(self): """ @@ -131,17 +141,19 @@ class qa_packet_headerparser_b (gr_unittest.TestCase): packet_length = 23 packet_len_tagname = "packet_len" frame_len_tagname = "frame_len" - data, tags = tagged_streams.packets_to_vectors([list(range(packet_length)),list(range(packet_length)),], packet_len_tagname) + data, tags = tagged_streams.packets_to_vectors( + [list(range(packet_length)), list(range(packet_length)), ], packet_len_tagname) src = blocks.vector_source_b(data, False, 1, tags) header_formatter = digital.packet_header_ofdm( - (list(range(32)),), # 32 carriers are occupied (which doesn't matter here) - 1, # 1 OFDM symbol per header (= 32 bits) - packet_len_tagname, - frame_len_tagname, - "packet_num", - 1, # 1 bit per header symbols (BPSK) - 2, # 2 bits per payload symbol (QPSK) - scramble_header=True + # 32 carriers are occupied (which doesn't matter here) + (list(range(32)),), + 1, # 1 OFDM symbol per header (= 32 bits) + packet_len_tagname, + frame_len_tagname, + "packet_num", + 1, # 1 bit per header symbols (BPSK) + 2, # 2 bits per payload symbol (QPSK) + scramble_header=True ) header_gen = digital.packet_headergenerator_bb(header_formatter.base()) header_parser = digital.packet_headerparser_b(header_formatter.base()) @@ -153,11 +165,14 @@ class qa_packet_headerparser_b (gr_unittest.TestCase): self.tb.stop() self.tb.wait() msg = pmt.to_python(sink.get_message(0)) - self.assertEqual(msg, {'packet_len': packet_length, 'packet_num': 0, 'frame_len': 4}) + self.assertEqual( + msg, { + 'packet_len': packet_length, 'packet_num': 0, 'frame_len': 4}) msg = pmt.to_python(sink.get_message(1)) - self.assertEqual(msg, {'packet_len': packet_length, 'packet_num': 1, 'frame_len': 4}) + self.assertEqual( + msg, { + 'packet_len': packet_length, 'packet_num': 1, 'frame_len': 4}) + if __name__ == '__main__': gr_unittest.run(qa_packet_headerparser_b) - - |