diff options
Diffstat (limited to 'gr-digital/python/digital/qa_crc32_bb.py')
-rwxr-xr-x | gr-digital/python/digital/qa_crc32_bb.py | 111 |
1 files changed, 62 insertions, 49 deletions
diff --git a/gr-digital/python/digital/qa_crc32_bb.py b/gr-digital/python/digital/qa_crc32_bb.py index 71ba81ded3..bca19cd418 100755 --- a/gr-digital/python/digital/qa_crc32_bb.py +++ b/gr-digital/python/digital/qa_crc32_bb.py @@ -1,5 +1,5 @@ #!/usr/bin/env python -# Copyright 2012,2013 Free Software Foundation, Inc. +# Copyright 2012-2014 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -26,6 +26,7 @@ class qa_crc32_bb (gr_unittest.TestCase): def setUp (self): self.tb = gr.top_block () + self.tsb_key = "length" def tearDown (self): self.tb = None @@ -33,36 +34,37 @@ class qa_crc32_bb (gr_unittest.TestCase): def test_001_crc_len (self): """ Make sure the output of a CRC set is 4 bytes longer than the input. """ data = range(16) - tag_name = "len" - tag = gr.tag_t() - tag.offset = 0 - tag.key = pmt.string_to_symbol(tag_name) - tag.value = pmt.from_long(len(data)) - src = blocks.vector_source_b(data, False, 1, (tag,)) - crc = digital.crc32_bb(False, tag_name) - sink = blocks.vector_sink_b() - self.tb.connect(src, crc, sink) + src = blocks.vector_source_b(data) + crc = digital.crc32_bb(False, self.tsb_key) + sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key) + self.tb.connect( + src, + blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(data), self.tsb_key), + crc, + sink + ) self.tb.run() # Check that the packets before crc_check are 4 bytes longer that the input. - self.assertEqual(len(data)+4, len(sink.data())) + self.assertEqual(len(data)+4, len(sink.data()[0])) def test_002_crc_equal (self): """ Go through CRC set / CRC check and make sure the output is the same as the input. """ data = (0, 1, 2, 3, 4, 5, 6, 7, 8) - tag_name = "len" - tag = gr.tag_t() - tag.offset = 0 - tag.key = pmt.string_to_symbol(tag_name) - tag.value = pmt.from_long(len(data)) - src = blocks.vector_source_b(data, False, 1, (tag,)) - crc = digital.crc32_bb(False, tag_name) - crc_check = digital.crc32_bb(True, tag_name) - sink = blocks.vector_sink_b() - self.tb.connect(src, crc, crc_check, sink) + src = blocks.vector_source_b(data) + crc = digital.crc32_bb(False, self.tsb_key) + crc_check = digital.crc32_bb(True, self.tsb_key) + sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key) + self.tb.connect( + src, + blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(data), self.tsb_key), + crc, + crc_check, + sink + ) self.tb.run() # Check that the packets after crc_check are the same as input. - self.assertEqual(data, sink.data()) + self.assertEqual(data, sink.data()[0]) def test_003_crc_correct_lentag (self): tag_name = "length" @@ -88,12 +90,19 @@ class qa_crc32_bb (gr_unittest.TestCase): testtag3.offset = len(packets)-1 testtag3.key = pmt.string_to_symbol("tag3") testtag3.value = pmt.from_long(0) - src = blocks.vector_source_b(packets, False, 1, (tag1, tag2, testtag1, testtag2, testtag3)) - crc = digital.crc32_bb(False, tag_name) - sink = blocks.vector_sink_b() - self.tb.connect(src, crc, sink) + src = blocks.vector_source_b(packets, False, 1, (testtag1, testtag2, testtag3)) + crc = digital.crc32_bb(False, self.tsb_key) + sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key) + self.tb.connect( + src, + blocks.stream_to_tagged_stream(gr.sizeof_char, 1, pack_len, self.tsb_key), + crc, + sink + ) self.tb.run() - self.assertEqual(len(sink.data()), 2*(pack_len+4)) + self.assertEqual(len(sink.data()), 2) + self.assertEqual(len(sink.data()[0]), (pack_len+4)) + self.assertEqual(len(sink.data()[1]), (pack_len+4)) correct_offsets = {'tag1': 1, 'tag2': 12, 'tag3': 19} tags_found = {'tag1': False, 'tag2': False, 'tag3': False} for tag in sink.tags(): @@ -101,45 +110,49 @@ class qa_crc32_bb (gr_unittest.TestCase): if key in correct_offsets.keys(): tags_found[key] = True self.assertEqual(correct_offsets[key], tag.offset) - if key == tag_name: - self.assertTrue(tag.offset == 0 or tag.offset == pack_len+4) self.assertTrue(all(tags_found.values())) def test_004_fail (self): """ Corrupt the data and make sure it fails CRC test. """ data = (0, 1, 2, 3, 4, 5, 6, 7) - tag_name = "len" - tag = gr.tag_t() - tag.offset = 0 - tag.key = pmt.string_to_symbol(tag_name) - tag.value = pmt.from_long(len(data)) - src = blocks.vector_source_b(data, False, 1, (tag,)) - crc = digital.crc32_bb(False, tag_name) - crc_check = digital.crc32_bb(True, tag_name) + src = blocks.vector_source_b(data) + crc = digital.crc32_bb(False, self.tsb_key) + crc_check = digital.crc32_bb(True, self.tsb_key) corruptor = blocks.add_const_bb(1) - sink = blocks.vector_sink_b() - self.tb.connect(src, crc, corruptor, crc_check, sink) + sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key) + self.tb.connect( + src, + blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(data), self.tsb_key), + crc, + corruptor, + crc_check, + sink + ) self.tb.run() # crc_check will drop invalid packets self.assertEqual(len(sink.data()), 0) def test_005_tag_propagation (self): """ Make sure tags on the CRC aren't lost. """ - data = (0, 1, 2, 3, 4, 5, 6, 7, 8, 2, 67, 225, 188) - tag_name = "len" - tag = gr.tag_t() - tag.offset = 0 - tag.key = pmt.string_to_symbol(tag_name) - tag.value = pmt.from_long(len(data)) + # Data with precalculated CRC + data = ( + 0, 1, 2, 3, 4, 5, 6, 7, 8, + 2, 67, 225, 188 + ) testtag = gr.tag_t() testtag.offset = len(data)-1 testtag.key = pmt.string_to_symbol('tag1') testtag.value = pmt.from_long(0) - src = blocks.vector_source_b(data, False, 1, (tag, testtag)) - crc_check = digital.crc32_bb(True, tag_name) - sink = blocks.vector_sink_b() - self.tb.connect(src, crc_check, sink) + src = blocks.vector_source_b(data, False, 1, (testtag,)) + crc_check = digital.crc32_bb(True, self.tsb_key) + sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key) + self.tb.connect( + src, + blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(data), self.tsb_key), + crc_check, + sink + ) self.tb.run() self.assertEqual([len(data)-5,], [tag.offset for tag in sink.tags() if pmt.symbol_to_string(tag.key) == 'tag1']) |