summaryrefslogtreecommitdiff
path: root/gr-digital/python/digital/qa_packet_headerparser_b.py
diff options
context:
space:
mode:
authorBen Reynwar <ben@reynwar.net>2013-05-10 05:45:12 -0700
committerBen Reynwar <ben@reynwar.net>2013-05-10 05:45:12 -0700
commite4f0319eced22c112f7e6a4cc45bc2036d285332 (patch)
tree92baa4254f47fd0b62d1d3f69e3b1ddb96dd32d3 /gr-digital/python/digital/qa_packet_headerparser_b.py
parent5fd6b62702369e45ef01a02e12bec1afc5b57200 (diff)
parent3e052d303874a07237c4e0c31eef3f7f7a192835 (diff)
Merged in 'next' branch.
Diffstat (limited to 'gr-digital/python/digital/qa_packet_headerparser_b.py')
-rwxr-xr-xgr-digital/python/digital/qa_packet_headerparser_b.py97
1 files changed, 79 insertions, 18 deletions
diff --git a/gr-digital/python/digital/qa_packet_headerparser_b.py b/gr-digital/python/digital/qa_packet_headerparser_b.py
index b127c86fb..8f63ac04d 100755
--- a/gr-digital/python/digital/qa_packet_headerparser_b.py
+++ b/gr-digital/python/digital/qa_packet_headerparser_b.py
@@ -20,7 +20,10 @@
#
import time
+import random
+
from gnuradio import gr, gr_unittest, blocks, digital
+from gnuradio.digital import tagged_streams
import pmt
class qa_packet_headerparser_b (gr_unittest.TestCase):
@@ -32,38 +35,96 @@ class qa_packet_headerparser_b (gr_unittest.TestCase):
self.tb = None
def test_001_t (self):
- expected_data = (
- # | Number of symbols | Packet number | Parity
+ """
+ First header: Packet length 4, packet num 0
+ Second header: Packet 2, packet num 1
+ Third header: Invalid (parity bit does not check) (would be len 4, num 2)
+ """
+ encoded_headers = (
+ # | Number of bytes | Packet number | Parity
0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0,
0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0
)
- tagname = "packet_len"
+ packet_len_tagname = "packet_len"
- src = blocks.vector_source_b(expected_data)
- parser = digital.packet_headerparser_b(32, tagname)
+ src = blocks.vector_source_b(encoded_headers)
+ parser = digital.packet_headerparser_b(32, packet_len_tagname)
sink = blocks.message_debug()
-
self.tb.connect(src, parser)
self.tb.msg_connect(parser, "header_data", sink, "store")
- self.tb.start ()
+ self.tb.start()
time.sleep(1)
self.tb.stop()
self.tb.wait()
-
self.assertEqual(sink.num_messages(), 3)
- msg = sink.get_message(0)
- #try:
- #self.assertEqual(4, pmt.pmt_to_long(pmt.pmt_dict_ref(msg, pmt.pmt_string_to_symbol(tagname), pmt.PMT_F)))
- #self.assertEqual(0, pmt.pmt_to_long(pmt.pmt_dict_ref(msg, pmt.pmt_string_to_symbol("packet_num"), pmt.PMT_F)))
-
- #except:
- #self.fail()
- # msg1: length 4, number 0
- # msg2: length 2, number 1
- # msg3: PMT_F because parity fail
+ msg1 = pmt.to_python(sink.get_message(0))
+ msg2 = pmt.to_python(sink.get_message(1))
+ msg3 = pmt.to_python(sink.get_message(2))
+ self.assertEqual(msg1, {'packet_len': 4, 'packet_num': 0})
+ self.assertEqual(msg2, {'packet_len': 2, 'packet_num': 1})
+ self.assertEqual(msg3, False)
+ def test_002_pipe(self):
+ """
+ Create N packets of random length, pipe them through header generator,
+ back to header parser, make sure output is the same.
+ """
+ N = 20
+ header_len = 32
+ packet_len_tagname = "packet_len"
+ packet_lengths = [random.randint(1, 100) for x in range(N)]
+ data, tags = tagged_streams.packets_to_vectors([range(packet_lengths[i]) for i in range(N)], packet_len_tagname)
+ src = blocks.vector_source_b(data, False, 1, tags)
+ header_gen = digital.packet_headergenerator_bb(header_len, packet_len_tagname)
+ header_parser = digital.packet_headerparser_b(header_len, packet_len_tagname)
+ sink = blocks.message_debug()
+ self.tb.connect(src, header_gen, header_parser)
+ self.tb.msg_connect(header_parser, "header_data", sink, "store")
+ self.tb.start()
+ time.sleep(1)
+ self.tb.stop()
+ self.tb.wait()
+ self.assertEqual(sink.num_messages(), N)
+ for i in xrange(N):
+ msg = pmt.to_python(sink.get_message(i))
+ self.assertEqual(msg, {'packet_len': packet_lengths[i], 'packet_num': i})
+ def test_003_ofdm (self):
+ """ Header 1: 193 bytes
+ Header 2: 8 bytes
+ 2 bits per complex symbol, 32 carriers => 64 bits = 8 bytes per OFDM symbol
+ """
+ encoded_headers = (
+ # | Number of bytes | Packet number | Parity
+ 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0,
+ 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
+ )
+ packet_len_tagname = "packet_len"
+ frame_len_tagname = "frame_len"
+ src = blocks.vector_source_b(encoded_headers)
+ header_formatter = digital.packet_header_ofdm(
+ (range(32),), # 32 carriers are occupied (which doesn't matter here)
+ 1, # 1 OFDM symbol per header (= 32 bits)
+ packet_len_tagname,
+ frame_len_tagname,
+ "packet_num",
+ 1, # 1 bit per header symbols (BPSK)
+ 2 # 2 bits per payload symbol (QPSK)
+ )
+ parser = digital.packet_headerparser_b(header_formatter.base())
+ sink = blocks.message_debug()
+ self.tb.connect(src, parser)
+ self.tb.msg_connect(parser, "header_data", sink, "store")
+ self.tb.start()
+ time.sleep(1)
+ self.tb.stop()
+ self.tb.wait()
+ self.assertEqual(sink.num_messages(), 2)
+ msg1 = pmt.to_python(sink.get_message(0))
+ msg2 = pmt.to_python(sink.get_message(1))
+ self.assertEqual(msg1, {'packet_len': 193, 'frame_len': 25, 'packet_num': 0})
+ self.assertEqual(msg2, {'packet_len': 8, 'frame_len': 1, 'packet_num': 1})
if __name__ == '__main__':
gr_unittest.run(qa_packet_headerparser_b, "qa_packet_headerparser_b.xml")