summaryrefslogtreecommitdiff
path: root/gr-digital/python/qa_packet_headerparser_b.py
diff options
context:
space:
mode:
Diffstat (limited to 'gr-digital/python/qa_packet_headerparser_b.py')
-rwxr-xr-xgr-digital/python/qa_packet_headerparser_b.py96
1 files changed, 78 insertions, 18 deletions
diff --git a/gr-digital/python/qa_packet_headerparser_b.py b/gr-digital/python/qa_packet_headerparser_b.py
index 3c8cd67335..ff74da5657 100755
--- a/gr-digital/python/qa_packet_headerparser_b.py
+++ b/gr-digital/python/qa_packet_headerparser_b.py
@@ -20,10 +20,12 @@
#
import time
+import random
from gnuradio import gr, gr_unittest
import pmt
import blocks_swig as blocks
import digital_swig as digital
+from utils import tagged_streams
class qa_packet_headerparser_b (gr_unittest.TestCase):
@@ -34,38 +36,96 @@ class qa_packet_headerparser_b (gr_unittest.TestCase):
self.tb = None
def test_001_t (self):
- expected_data = (
- # | Number of symbols | Packet number | Parity
+ """
+ First header: Packet length 4, packet num 0
+ Second header: Packet 2, packet num 1
+ Third header: Invalid (parity bit does not check) (would be len 4, num 2)
+ """
+ encoded_headers = (
+ # | Number of bytes | Packet number | Parity
0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0,
0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0
)
- tagname = "packet_len"
+ packet_len_tagname = "packet_len"
- src = blocks.vector_source_b(expected_data)
- parser = digital.packet_headerparser_b(32, tagname)
+ src = blocks.vector_source_b(encoded_headers)
+ parser = digital.packet_headerparser_b(32, packet_len_tagname)
sink = blocks.message_debug()
-
self.tb.connect(src, parser)
self.tb.msg_connect(parser, "header_data", sink, "store")
- self.tb.start ()
+ self.tb.start()
time.sleep(1)
self.tb.stop()
self.tb.wait()
-
self.assertEqual(sink.num_messages(), 3)
- msg = sink.get_message(0)
- #try:
- #self.assertEqual(4, pmt.pmt_to_long(pmt.pmt_dict_ref(msg, pmt.pmt_string_to_symbol(tagname), pmt.PMT_F)))
- #self.assertEqual(0, pmt.pmt_to_long(pmt.pmt_dict_ref(msg, pmt.pmt_string_to_symbol("packet_num"), pmt.PMT_F)))
-
- #except:
- #self.fail()
- # msg1: length 4, number 0
- # msg2: length 2, number 1
- # msg3: PMT_F because parity fail
+ msg1 = pmt.to_python(sink.get_message(0))
+ msg2 = pmt.to_python(sink.get_message(1))
+ msg3 = pmt.to_python(sink.get_message(2))
+ self.assertEqual(msg1, {'packet_len': 4, 'packet_num': 0})
+ self.assertEqual(msg2, {'packet_len': 2, 'packet_num': 1})
+ self.assertEqual(msg3, False)
+ def test_002_pipe(self):
+ """
+ Create N packets of random length, pipe them through header generator,
+ back to header parser, make sure output is the same.
+ """
+ N = 20
+ header_len = 32
+ packet_len_tagname = "packet_len"
+ packet_lengths = [random.randint(1, 100) for x in range(N)]
+ data, tags = tagged_streams.packets_to_vectors([range(packet_lengths[i]) for i in range(N)], packet_len_tagname)
+ src = blocks.vector_source_b(data, False, 1, tags)
+ header_gen = digital.packet_headergenerator_bb(header_len, packet_len_tagname)
+ header_parser = digital.packet_headerparser_b(header_len, packet_len_tagname)
+ sink = blocks.message_debug()
+ self.tb.connect(src, header_gen, header_parser)
+ self.tb.msg_connect(header_parser, "header_data", sink, "store")
+ self.tb.start()
+ time.sleep(1)
+ self.tb.stop()
+ self.tb.wait()
+ self.assertEqual(sink.num_messages(), N)
+ for i in xrange(N):
+ msg = pmt.to_python(sink.get_message(i))
+ self.assertEqual(msg, {'packet_len': packet_lengths[i], 'packet_num': i})
+ def test_003_ofdm (self):
+ """ Header 1: 193 bytes
+ Header 2: 8 bytes
+ 2 bits per complex symbol, 32 carriers => 64 bits = 8 bytes per OFDM symbol
+ """
+ encoded_headers = (
+ # | Number of bytes | Packet number | Parity
+ 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0,
+ 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ )
+ packet_len_tagname = "packet_len"
+ frame_len_tagname = "frame_len"
+ src = blocks.vector_source_b(encoded_headers)
+ header_formatter = digital.packet_header_ofdm(
+ (range(32),), # 32 carriers are occupied (which doesn't matter here)
+ 1, # 1 OFDM symbol per header (= 32 bits)
+ packet_len_tagname,
+ frame_len_tagname,
+ "packet_num",
+ 1, # 1 bit per header symbols (BPSK)
+ 2 # 2 bits per payload symbol (QPSK)
+ )
+ parser = digital.packet_headerparser_b(header_formatter.base())
+ sink = blocks.message_debug()
+ self.tb.connect(src, parser)
+ self.tb.msg_connect(parser, "header_data", sink, "store")
+ self.tb.start()
+ time.sleep(1)
+ self.tb.stop()
+ self.tb.wait()
+ self.assertEqual(sink.num_messages(), 2)
+ msg1 = pmt.to_python(sink.get_message(0))
+ msg2 = pmt.to_python(sink.get_message(1))
+ self.assertEqual(msg1, {'packet_len': 193, 'frame_len': 25, 'packet_num': 0})
+ self.assertEqual(msg2, {'packet_len': 8, 'frame_len': 1, 'packet_num': 1})
if __name__ == '__main__':
gr_unittest.run(qa_packet_headerparser_b, "qa_packet_headerparser_b.xml")