diff options
Diffstat (limited to 'gr-digital/python/digital/qa_packet_headerparser_b.py')
-rwxr-xr-x | gr-digital/python/digital/qa_packet_headerparser_b.py | 136 |
1 files changed, 136 insertions, 0 deletions
diff --git a/gr-digital/python/digital/qa_packet_headerparser_b.py b/gr-digital/python/digital/qa_packet_headerparser_b.py new file mode 100755 index 0000000000..2dca3637e3 --- /dev/null +++ b/gr-digital/python/digital/qa_packet_headerparser_b.py @@ -0,0 +1,136 @@ +#!/usr/bin/env python +# Copyright 2012 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +import time +import random + +from gnuradio import gr, gr_unittest, blocks, digital +from gnuradio.digital import tagged_streams +import pmt + +class qa_packet_headerparser_b (gr_unittest.TestCase): + + def setUp (self): + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def test_001_t (self): + """ + First header: Packet length 4, packet num 0 + Second header: Packet 2, packet num 1 + Third header: Invalid (parity bit does not check) (would be len 4, num 2) + """ + encoded_headers = ( + # | Number of bytes | Packet number | Parity + 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, + 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0 + ) + packet_len_tagname = "packet_len" + random_tag = gr.tag_t() + random_tag.offset = 5 + random_tag.key = pmt.string_to_symbol("foo") + random_tag.value = pmt.from_long(42) + src = blocks.vector_source_b(encoded_headers, tags=(random_tag,)) + parser = digital.packet_headerparser_b(32, packet_len_tagname) + sink = blocks.message_debug() + self.tb.connect(src, parser) + self.tb.msg_connect(parser, "header_data", sink, "store") + self.tb.start() + time.sleep(1) + self.tb.stop() + self.tb.wait() + self.assertEqual(sink.num_messages(), 3) + msg1 = pmt.to_python(sink.get_message(0)) + msg2 = pmt.to_python(sink.get_message(1)) + msg3 = pmt.to_python(sink.get_message(2)) + self.assertEqual(msg1, {'packet_len': 4, 'packet_num': 0, 'foo': 42}) + self.assertEqual(msg2, {'packet_len': 2, 'packet_num': 1}) + self.assertEqual(msg3, False) + + def test_002_pipe(self): + """ + Create N packets of random length, pipe them through header generator, + back to header parser, make sure output is the same. + """ + N = 20 + header_len = 32 + packet_len_tagname = "packet_len" + packet_lengths = [random.randint(1, 100) for x in range(N)] + data, tags = tagged_streams.packets_to_vectors([range(packet_lengths[i]) for i in range(N)], packet_len_tagname) + src = blocks.vector_source_b(data, False, 1, tags) + header_gen = digital.packet_headergenerator_bb(header_len, packet_len_tagname) + header_parser = digital.packet_headerparser_b(header_len, packet_len_tagname) + sink = blocks.message_debug() + self.tb.connect(src, header_gen, header_parser) + self.tb.msg_connect(header_parser, "header_data", sink, "store") + self.tb.start() + time.sleep(1) + self.tb.stop() + self.tb.wait() + self.assertEqual(sink.num_messages(), N) + for i in xrange(N): + msg = pmt.to_python(sink.get_message(i)) + self.assertEqual(msg, {'packet_len': packet_lengths[i], 'packet_num': i}) + + def test_003_ofdm (self): + """ Header 1: 193 bytes + Header 2: 8 bytes + 2 bits per complex symbol, 32 carriers => 64 bits = 8 bytes per OFDM symbol + """ + encoded_headers = ( + # | Number of bytes | Packet number | Parity + 1, 0, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, + 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + ) + packet_len_tagname = "packet_len" + frame_len_tagname = "frame_len" + src = blocks.vector_source_b(encoded_headers) + header_formatter = digital.packet_header_ofdm( + (range(32),), # 32 carriers are occupied (which doesn't matter here) + 1, # 1 OFDM symbol per header (= 32 bits) + packet_len_tagname, + frame_len_tagname, + "packet_num", + 1, # 1 bit per header symbols (BPSK) + 2 # 2 bits per payload symbol (QPSK) + ) + parser = digital.packet_headerparser_b(header_formatter.base()) + sink = blocks.message_debug() + self.tb.connect(src, parser) + self.tb.msg_connect(parser, "header_data", sink, "store") + self.tb.start() + time.sleep(1) + self.tb.stop() + self.tb.wait() + self.assertEqual(sink.num_messages(), 2) + msg1 = pmt.to_python(sink.get_message(0)) + msg2 = pmt.to_python(sink.get_message(1)) + # Multiply with 4 because unpacked bytes have only two bits + self.assertEqual(msg1, {'packet_len': 193*4, 'frame_len': 25, 'packet_num': 0}) + self.assertEqual(msg2, {'packet_len': 8*4, 'frame_len': 1, 'packet_num': 1}) + +if __name__ == '__main__': + gr_unittest.run(qa_packet_headerparser_b, "qa_packet_headerparser_b.xml") + + |