diff options
author | Felix Wunsch <felix.wunsch@kit.edu> | 2015-07-17 17:04:52 +0200 |
---|---|---|
committer | Felix Wunsch <felix.wunsch@kit.edu> | 2015-07-20 13:20:49 +0200 |
commit | b4ba432720c4ac011d63361fd9273d0a4b486017 (patch) | |
tree | a87d0641e0683f89a48c61a903d098c94f86c4fc /gr-digital/python/digital/qa_crc32_bb.py | |
parent | a6d1ca5b73041af374b3dbc080f964efb00681fb (diff) |
digital: added an option to the crc32_bb block that adds the unpacked CRC to the bit stream
Diffstat (limited to 'gr-digital/python/digital/qa_crc32_bb.py')
-rwxr-xr-x | gr-digital/python/digital/qa_crc32_bb.py | 130 |
1 files changed, 130 insertions, 0 deletions
diff --git a/gr-digital/python/digital/qa_crc32_bb.py b/gr-digital/python/digital/qa_crc32_bb.py index bca19cd418..5e45bfb1b0 100755 --- a/gr-digital/python/digital/qa_crc32_bb.py +++ b/gr-digital/python/digital/qa_crc32_bb.py @@ -156,5 +156,135 @@ class qa_crc32_bb (gr_unittest.TestCase): self.tb.run() self.assertEqual([len(data)-5,], [tag.offset for tag in sink.tags() if pmt.symbol_to_string(tag.key) == 'tag1']) + # NOTE: What follows are the same tests as before but with the packed flag set to False + + def test_006_crc_len (self): + """ Make sure the output of a CRC set is 32 (unpacked) bytes longer than the input. """ + data = range(16) + src = blocks.vector_source_b(data) + crc = digital.crc32_bb(False, self.tsb_key, False) + sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key) + self.tb.connect( + src, + blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(data), self.tsb_key), + crc, + sink + ) + self.tb.run() + # Check that the packets before crc_check are 4 bytes longer that the input. + self.assertEqual(len(data)+32, len(sink.data()[0])) + + def test_007_crc_equal (self): + """ Go through CRC set / CRC check and make sure the output + is the same as the input. """ + data = (0, 1, 2, 3, 4, 5, 6, 7, 8) + src = blocks.vector_source_b(data) + crc = digital.crc32_bb(False, self.tsb_key, False) + crc_check = digital.crc32_bb(True, self.tsb_key, False) + sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key) + self.tb.connect( + src, + blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(data), self.tsb_key), + crc, + crc_check, + sink + ) + self.tb.run() + # Check that the packets after crc_check are the same as input. + self.assertEqual(data, sink.data()[0]) + + def test_008_crc_correct_lentag (self): + tag_name = "length" + pack_len = 8 + packets = range(pack_len*2) + tag1 = gr.tag_t() + tag1.offset = 0 + tag1.key = pmt.string_to_symbol(tag_name) + tag1.value = pmt.from_long(pack_len) + tag2 = gr.tag_t() + tag2.offset = pack_len + tag2.key = pmt.string_to_symbol(tag_name) + tag2.value = pmt.from_long(pack_len) + testtag1 = gr.tag_t() + testtag1.offset = 1 + testtag1.key = pmt.string_to_symbol("tag1") + testtag1.value = pmt.from_long(0) + testtag2 = gr.tag_t() + testtag2.offset = pack_len + testtag2.key = pmt.string_to_symbol("tag2") + testtag2.value = pmt.from_long(0) + testtag3 = gr.tag_t() + testtag3.offset = len(packets)-1 + testtag3.key = pmt.string_to_symbol("tag3") + testtag3.value = pmt.from_long(0) + src = blocks.vector_source_b(packets, False, 1, (testtag1, testtag2, testtag3)) + crc = digital.crc32_bb(False, self.tsb_key, False) + sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key) + self.tb.connect( + src, + blocks.stream_to_tagged_stream(gr.sizeof_char, 1, pack_len, self.tsb_key), + crc, + sink + ) + self.tb.run() + self.assertEqual(len(sink.data()), 2) + self.assertEqual(len(sink.data()[0]), (pack_len+32)) + self.assertEqual(len(sink.data()[1]), (pack_len+32)) + correct_offsets = {'tag1': 1, 'tag2': 8+32, 'tag3': 15+32} + tags_found = {'tag1': False, 'tag2': False, 'tag3': False} + for tag in sink.tags(): + key = pmt.symbol_to_string(tag.key) + if key in correct_offsets.keys(): + tags_found[key] = True + self.assertEqual(correct_offsets[key], tag.offset) + self.assertTrue(all(tags_found.values())) + + + def test_009_fail (self): + """ Corrupt the data and make sure it fails CRC test. """ + data = (0, 1, 2, 3, 4, 5, 6, 7) + src = blocks.vector_source_b(data) + crc = digital.crc32_bb(False, self.tsb_key, False) + crc_check = digital.crc32_bb(True, self.tsb_key, False) + corruptor = blocks.add_const_bb(1) + sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key) + self.tb.connect( + src, + blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(data), self.tsb_key), + crc, + corruptor, + crc_check, + sink + ) + self.tb.run() + # crc_check will drop invalid packets + self.assertEqual(len(sink.data()), 0) + + def test_0010_tag_propagation (self): + """ Make sure tags on the CRC aren't lost. """ + # Data with precalculated CRC + data = ( + 0, 1, 2, 3, 4, 5, 6, 7, 8, + 0, 1, 0, 0, 0, 0, 0, 0, + 1, 1, 0, 0, 0, 0, 1, 0, + 1, 0, 0, 0, 0, 1, 1, 1, + 0, 0, 1, 1, 1, 1, 0, 1 + ) # 2, 67, 225, 188 + testtag = gr.tag_t() + testtag.offset = len(data)-1 + testtag.key = pmt.string_to_symbol('tag1') + testtag.value = pmt.from_long(0) + src = blocks.vector_source_b(data, False, 1, (testtag,)) + crc_check = digital.crc32_bb(True, self.tsb_key, False) + sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key) + self.tb.connect( + src, + blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(data), self.tsb_key), + crc_check, + sink + ) + self.tb.run() + self.assertEqual([len(data)-33,], [tag.offset for tag in sink.tags() if pmt.symbol_to_string(tag.key) == 'tag1']) + if __name__ == '__main__': gr_unittest.run(qa_crc32_bb, "qa_crc32_bb.xml") |