summaryrefslogtreecommitdiff
path: root/gr-digital/python/digital/qa_packet_headerparser_b.py
diff options
context:
space:
mode:
Diffstat (limited to 'gr-digital/python/digital/qa_packet_headerparser_b.py')
-rw-r--r--gr-digital/python/digital/qa_packet_headerparser_b.py93
1 files changed, 54 insertions, 39 deletions
diff --git a/gr-digital/python/digital/qa_packet_headerparser_b.py b/gr-digital/python/digital/qa_packet_headerparser_b.py
index 13840c7cad..897220a59d 100644
--- a/gr-digital/python/digital/qa_packet_headerparser_b.py
+++ b/gr-digital/python/digital/qa_packet_headerparser_b.py
@@ -1,11 +1,11 @@
#!/usr/bin/env python
# Copyright 2012 Free Software Foundation, Inc.
-#
+#
# This file is part of GNU Radio
-#
+#
# SPDX-License-Identifier: GPL-3.0-or-later
#
-#
+#
import time
@@ -15,16 +15,17 @@ from gnuradio import gr, gr_unittest, blocks, digital
from gnuradio.digital.utils import tagged_streams
import pmt
+
class qa_packet_headerparser_b (gr_unittest.TestCase):
- def setUp (self):
+ def setUp(self):
random.seed(0)
- self.tb = gr.top_block ()
+ self.tb = gr.top_block()
- def tearDown (self):
+ def tearDown(self):
self.tb = None
- def test_001_t (self):
+ def test_001_t(self):
"""
First header: Packet length 4, packet num 0
Second header: Packet 2, packet num 1
@@ -32,9 +33,9 @@ class qa_packet_headerparser_b (gr_unittest.TestCase):
"""
encoded_headers = (
# | Number of bytes | Packet number | CRC
- 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 1,
- 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 1,
- 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1
+ 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 1, 0, 0, 0, 1,
+ 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 1,
+ 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1
)
packet_len_tagname = "packet_len"
random_tag = gr.tag_t()
@@ -67,10 +68,13 @@ class qa_packet_headerparser_b (gr_unittest.TestCase):
header_len = 32
packet_len_tagname = "packet_len"
packet_lengths = [random.randint(1, 100) for x in range(N)]
- data, tags = tagged_streams.packets_to_vectors([list(range(packet_lengths[i])) for i in range(N)], packet_len_tagname)
+ data, tags = tagged_streams.packets_to_vectors(
+ [list(range(packet_lengths[i])) for i in range(N)], packet_len_tagname)
src = blocks.vector_source_b(data, False, 1, tags)
- header_gen = digital.packet_headergenerator_bb(header_len, packet_len_tagname)
- header_parser = digital.packet_headerparser_b(header_len, packet_len_tagname)
+ header_gen = digital.packet_headergenerator_bb(
+ header_len, packet_len_tagname)
+ header_parser = digital.packet_headerparser_b(
+ header_len, packet_len_tagname)
sink = blocks.message_debug()
self.tb.connect(src, header_gen, header_parser)
self.tb.msg_connect(header_parser, "header_data", sink, "store")
@@ -81,9 +85,11 @@ class qa_packet_headerparser_b (gr_unittest.TestCase):
self.assertEqual(sink.num_messages(), N)
for i in range(N):
msg = pmt.to_python(sink.get_message(i))
- self.assertEqual(msg, {'packet_len': packet_lengths[i], 'packet_num': i})
+ self.assertEqual(
+ msg, {
+ 'packet_len': packet_lengths[i], 'packet_num': i})
- def test_003_ofdm (self):
+ def test_003_ofdm(self):
""" Header 1: 193 bytes
Header 2: 8 bytes
2 bits per complex symbol, 32 carriers => 64 bits = 8 bytes per OFDM symbol
@@ -93,20 +99,21 @@ class qa_packet_headerparser_b (gr_unittest.TestCase):
"""
encoded_headers = (
# | Number of bytes | Packet number | CRC
- 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 0, 1, 1, 0,
- 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 1, 0,
+ 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 0, 1, 1, 0,
+ 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 1, 1, 1, 0,
)
packet_len_tagname = "packet_len"
frame_len_tagname = "frame_len"
src = blocks.vector_source_b(encoded_headers)
header_formatter = digital.packet_header_ofdm(
- (list(range(32)),list(range(4)),list(range(8))), # 32/4/8 carriers are occupied (which doesn't matter here)
- 1, # 1 OFDM symbol per header (= 32 bits)
- packet_len_tagname,
- frame_len_tagname,
- "packet_num",
- 1, # 1 bit per header symbols (BPSK)
- 2 # 2 bits per payload symbol (QPSK)
+ # 32/4/8 carriers are occupied (which doesn't matter here)
+ (list(range(32)), list(range(4)), list(range(8))),
+ 1, # 1 OFDM symbol per header (= 32 bits)
+ packet_len_tagname,
+ frame_len_tagname,
+ "packet_num",
+ 1, # 1 bit per header symbols (BPSK)
+ 2 # 2 bits per payload symbol (QPSK)
)
parser = digital.packet_headerparser_b(header_formatter.base())
sink = blocks.message_debug()
@@ -120,8 +127,11 @@ class qa_packet_headerparser_b (gr_unittest.TestCase):
msg1 = pmt.to_python(sink.get_message(0))
msg2 = pmt.to_python(sink.get_message(1))
# Multiply with 4 because unpacked bytes have only two bits
- self.assertEqual(msg1, {'packet_len': 193*4, 'frame_len': 52, 'packet_num': 0})
- self.assertEqual(msg2, {'packet_len': 8*4, 'frame_len': 1, 'packet_num': 1})
+ self.assertEqual(msg1, {'packet_len': 193 * 4,
+ 'frame_len': 52, 'packet_num': 0})
+ self.assertEqual(
+ msg2, {
+ 'packet_len': 8 * 4, 'frame_len': 1, 'packet_num': 1})
def test_004_ofdm_scramble(self):
"""
@@ -131,17 +141,19 @@ class qa_packet_headerparser_b (gr_unittest.TestCase):
packet_length = 23
packet_len_tagname = "packet_len"
frame_len_tagname = "frame_len"
- data, tags = tagged_streams.packets_to_vectors([list(range(packet_length)),list(range(packet_length)),], packet_len_tagname)
+ data, tags = tagged_streams.packets_to_vectors(
+ [list(range(packet_length)), list(range(packet_length)), ], packet_len_tagname)
src = blocks.vector_source_b(data, False, 1, tags)
header_formatter = digital.packet_header_ofdm(
- (list(range(32)),), # 32 carriers are occupied (which doesn't matter here)
- 1, # 1 OFDM symbol per header (= 32 bits)
- packet_len_tagname,
- frame_len_tagname,
- "packet_num",
- 1, # 1 bit per header symbols (BPSK)
- 2, # 2 bits per payload symbol (QPSK)
- scramble_header=True
+ # 32 carriers are occupied (which doesn't matter here)
+ (list(range(32)),),
+ 1, # 1 OFDM symbol per header (= 32 bits)
+ packet_len_tagname,
+ frame_len_tagname,
+ "packet_num",
+ 1, # 1 bit per header symbols (BPSK)
+ 2, # 2 bits per payload symbol (QPSK)
+ scramble_header=True
)
header_gen = digital.packet_headergenerator_bb(header_formatter.base())
header_parser = digital.packet_headerparser_b(header_formatter.base())
@@ -153,11 +165,14 @@ class qa_packet_headerparser_b (gr_unittest.TestCase):
self.tb.stop()
self.tb.wait()
msg = pmt.to_python(sink.get_message(0))
- self.assertEqual(msg, {'packet_len': packet_length, 'packet_num': 0, 'frame_len': 4})
+ self.assertEqual(
+ msg, {
+ 'packet_len': packet_length, 'packet_num': 0, 'frame_len': 4})
msg = pmt.to_python(sink.get_message(1))
- self.assertEqual(msg, {'packet_len': packet_length, 'packet_num': 1, 'frame_len': 4})
+ self.assertEqual(
+ msg, {
+ 'packet_len': packet_length, 'packet_num': 1, 'frame_len': 4})
+
if __name__ == '__main__':
gr_unittest.run(qa_packet_headerparser_b)
-
-