summaryrefslogtreecommitdiff
path: root/gr-digital/python/digital/qa_crc32_bb.py
diff options
context:
space:
mode:
authorFelix Wunsch <felix.wunsch@kit.edu>2015-07-17 17:04:52 +0200
committerFelix Wunsch <felix.wunsch@kit.edu>2015-07-20 13:20:49 +0200
commitb4ba432720c4ac011d63361fd9273d0a4b486017 (patch)
treea87d0641e0683f89a48c61a903d098c94f86c4fc /gr-digital/python/digital/qa_crc32_bb.py
parenta6d1ca5b73041af374b3dbc080f964efb00681fb (diff)
digital: added an option to the crc32_bb block that adds the unpacked CRC to the bit stream
Diffstat (limited to 'gr-digital/python/digital/qa_crc32_bb.py')
-rwxr-xr-xgr-digital/python/digital/qa_crc32_bb.py130
1 files changed, 130 insertions, 0 deletions
diff --git a/gr-digital/python/digital/qa_crc32_bb.py b/gr-digital/python/digital/qa_crc32_bb.py
index bca19cd418..5e45bfb1b0 100755
--- a/gr-digital/python/digital/qa_crc32_bb.py
+++ b/gr-digital/python/digital/qa_crc32_bb.py
@@ -156,5 +156,135 @@ class qa_crc32_bb (gr_unittest.TestCase):
self.tb.run()
self.assertEqual([len(data)-5,], [tag.offset for tag in sink.tags() if pmt.symbol_to_string(tag.key) == 'tag1'])
+ # NOTE: What follows are the same tests as before but with the packed flag set to False
+
+ def test_006_crc_len (self):
+ """ Make sure the output of a CRC set is 32 (unpacked) bytes longer than the input. """
+ data = range(16)
+ src = blocks.vector_source_b(data)
+ crc = digital.crc32_bb(False, self.tsb_key, False)
+ sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
+ self.tb.connect(
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(data), self.tsb_key),
+ crc,
+ sink
+ )
+ self.tb.run()
+ # Check that the packets before crc_check are 4 bytes longer that the input.
+ self.assertEqual(len(data)+32, len(sink.data()[0]))
+
+ def test_007_crc_equal (self):
+ """ Go through CRC set / CRC check and make sure the output
+ is the same as the input. """
+ data = (0, 1, 2, 3, 4, 5, 6, 7, 8)
+ src = blocks.vector_source_b(data)
+ crc = digital.crc32_bb(False, self.tsb_key, False)
+ crc_check = digital.crc32_bb(True, self.tsb_key, False)
+ sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
+ self.tb.connect(
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(data), self.tsb_key),
+ crc,
+ crc_check,
+ sink
+ )
+ self.tb.run()
+ # Check that the packets after crc_check are the same as input.
+ self.assertEqual(data, sink.data()[0])
+
+ def test_008_crc_correct_lentag (self):
+ tag_name = "length"
+ pack_len = 8
+ packets = range(pack_len*2)
+ tag1 = gr.tag_t()
+ tag1.offset = 0
+ tag1.key = pmt.string_to_symbol(tag_name)
+ tag1.value = pmt.from_long(pack_len)
+ tag2 = gr.tag_t()
+ tag2.offset = pack_len
+ tag2.key = pmt.string_to_symbol(tag_name)
+ tag2.value = pmt.from_long(pack_len)
+ testtag1 = gr.tag_t()
+ testtag1.offset = 1
+ testtag1.key = pmt.string_to_symbol("tag1")
+ testtag1.value = pmt.from_long(0)
+ testtag2 = gr.tag_t()
+ testtag2.offset = pack_len
+ testtag2.key = pmt.string_to_symbol("tag2")
+ testtag2.value = pmt.from_long(0)
+ testtag3 = gr.tag_t()
+ testtag3.offset = len(packets)-1
+ testtag3.key = pmt.string_to_symbol("tag3")
+ testtag3.value = pmt.from_long(0)
+ src = blocks.vector_source_b(packets, False, 1, (testtag1, testtag2, testtag3))
+ crc = digital.crc32_bb(False, self.tsb_key, False)
+ sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
+ self.tb.connect(
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_char, 1, pack_len, self.tsb_key),
+ crc,
+ sink
+ )
+ self.tb.run()
+ self.assertEqual(len(sink.data()), 2)
+ self.assertEqual(len(sink.data()[0]), (pack_len+32))
+ self.assertEqual(len(sink.data()[1]), (pack_len+32))
+ correct_offsets = {'tag1': 1, 'tag2': 8+32, 'tag3': 15+32}
+ tags_found = {'tag1': False, 'tag2': False, 'tag3': False}
+ for tag in sink.tags():
+ key = pmt.symbol_to_string(tag.key)
+ if key in correct_offsets.keys():
+ tags_found[key] = True
+ self.assertEqual(correct_offsets[key], tag.offset)
+ self.assertTrue(all(tags_found.values()))
+
+
+ def test_009_fail (self):
+ """ Corrupt the data and make sure it fails CRC test. """
+ data = (0, 1, 2, 3, 4, 5, 6, 7)
+ src = blocks.vector_source_b(data)
+ crc = digital.crc32_bb(False, self.tsb_key, False)
+ crc_check = digital.crc32_bb(True, self.tsb_key, False)
+ corruptor = blocks.add_const_bb(1)
+ sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
+ self.tb.connect(
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(data), self.tsb_key),
+ crc,
+ corruptor,
+ crc_check,
+ sink
+ )
+ self.tb.run()
+ # crc_check will drop invalid packets
+ self.assertEqual(len(sink.data()), 0)
+
+ def test_0010_tag_propagation (self):
+ """ Make sure tags on the CRC aren't lost. """
+ # Data with precalculated CRC
+ data = (
+ 0, 1, 2, 3, 4, 5, 6, 7, 8,
+ 0, 1, 0, 0, 0, 0, 0, 0,
+ 1, 1, 0, 0, 0, 0, 1, 0,
+ 1, 0, 0, 0, 0, 1, 1, 1,
+ 0, 0, 1, 1, 1, 1, 0, 1
+ ) # 2, 67, 225, 188
+ testtag = gr.tag_t()
+ testtag.offset = len(data)-1
+ testtag.key = pmt.string_to_symbol('tag1')
+ testtag.value = pmt.from_long(0)
+ src = blocks.vector_source_b(data, False, 1, (testtag,))
+ crc_check = digital.crc32_bb(True, self.tsb_key, False)
+ sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
+ self.tb.connect(
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(data), self.tsb_key),
+ crc_check,
+ sink
+ )
+ self.tb.run()
+ self.assertEqual([len(data)-33,], [tag.offset for tag in sink.tags() if pmt.symbol_to_string(tag.key) == 'tag1'])
+
if __name__ == '__main__':
gr_unittest.run(qa_crc32_bb, "qa_crc32_bb.xml")