summaryrefslogtreecommitdiff
path: root/gr-digital/python/digital/qa_crc32_bb.py
diff options
context:
space:
mode:
Diffstat (limited to 'gr-digital/python/digital/qa_crc32_bb.py')
-rwxr-xr-xgr-digital/python/digital/qa_crc32_bb.py111
1 files changed, 62 insertions, 49 deletions
diff --git a/gr-digital/python/digital/qa_crc32_bb.py b/gr-digital/python/digital/qa_crc32_bb.py
index 71ba81ded3..bca19cd418 100755
--- a/gr-digital/python/digital/qa_crc32_bb.py
+++ b/gr-digital/python/digital/qa_crc32_bb.py
@@ -1,5 +1,5 @@
#!/usr/bin/env python
-# Copyright 2012,2013 Free Software Foundation, Inc.
+# Copyright 2012-2014 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -26,6 +26,7 @@ class qa_crc32_bb (gr_unittest.TestCase):
def setUp (self):
self.tb = gr.top_block ()
+ self.tsb_key = "length"
def tearDown (self):
self.tb = None
@@ -33,36 +34,37 @@ class qa_crc32_bb (gr_unittest.TestCase):
def test_001_crc_len (self):
""" Make sure the output of a CRC set is 4 bytes longer than the input. """
data = range(16)
- tag_name = "len"
- tag = gr.tag_t()
- tag.offset = 0
- tag.key = pmt.string_to_symbol(tag_name)
- tag.value = pmt.from_long(len(data))
- src = blocks.vector_source_b(data, False, 1, (tag,))
- crc = digital.crc32_bb(False, tag_name)
- sink = blocks.vector_sink_b()
- self.tb.connect(src, crc, sink)
+ src = blocks.vector_source_b(data)
+ crc = digital.crc32_bb(False, self.tsb_key)
+ sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
+ self.tb.connect(
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(data), self.tsb_key),
+ crc,
+ sink
+ )
self.tb.run()
# Check that the packets before crc_check are 4 bytes longer that the input.
- self.assertEqual(len(data)+4, len(sink.data()))
+ self.assertEqual(len(data)+4, len(sink.data()[0]))
def test_002_crc_equal (self):
""" Go through CRC set / CRC check and make sure the output
is the same as the input. """
data = (0, 1, 2, 3, 4, 5, 6, 7, 8)
- tag_name = "len"
- tag = gr.tag_t()
- tag.offset = 0
- tag.key = pmt.string_to_symbol(tag_name)
- tag.value = pmt.from_long(len(data))
- src = blocks.vector_source_b(data, False, 1, (tag,))
- crc = digital.crc32_bb(False, tag_name)
- crc_check = digital.crc32_bb(True, tag_name)
- sink = blocks.vector_sink_b()
- self.tb.connect(src, crc, crc_check, sink)
+ src = blocks.vector_source_b(data)
+ crc = digital.crc32_bb(False, self.tsb_key)
+ crc_check = digital.crc32_bb(True, self.tsb_key)
+ sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
+ self.tb.connect(
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(data), self.tsb_key),
+ crc,
+ crc_check,
+ sink
+ )
self.tb.run()
# Check that the packets after crc_check are the same as input.
- self.assertEqual(data, sink.data())
+ self.assertEqual(data, sink.data()[0])
def test_003_crc_correct_lentag (self):
tag_name = "length"
@@ -88,12 +90,19 @@ class qa_crc32_bb (gr_unittest.TestCase):
testtag3.offset = len(packets)-1
testtag3.key = pmt.string_to_symbol("tag3")
testtag3.value = pmt.from_long(0)
- src = blocks.vector_source_b(packets, False, 1, (tag1, tag2, testtag1, testtag2, testtag3))
- crc = digital.crc32_bb(False, tag_name)
- sink = blocks.vector_sink_b()
- self.tb.connect(src, crc, sink)
+ src = blocks.vector_source_b(packets, False, 1, (testtag1, testtag2, testtag3))
+ crc = digital.crc32_bb(False, self.tsb_key)
+ sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
+ self.tb.connect(
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_char, 1, pack_len, self.tsb_key),
+ crc,
+ sink
+ )
self.tb.run()
- self.assertEqual(len(sink.data()), 2*(pack_len+4))
+ self.assertEqual(len(sink.data()), 2)
+ self.assertEqual(len(sink.data()[0]), (pack_len+4))
+ self.assertEqual(len(sink.data()[1]), (pack_len+4))
correct_offsets = {'tag1': 1, 'tag2': 12, 'tag3': 19}
tags_found = {'tag1': False, 'tag2': False, 'tag3': False}
for tag in sink.tags():
@@ -101,45 +110,49 @@ class qa_crc32_bb (gr_unittest.TestCase):
if key in correct_offsets.keys():
tags_found[key] = True
self.assertEqual(correct_offsets[key], tag.offset)
- if key == tag_name:
- self.assertTrue(tag.offset == 0 or tag.offset == pack_len+4)
self.assertTrue(all(tags_found.values()))
def test_004_fail (self):
""" Corrupt the data and make sure it fails CRC test. """
data = (0, 1, 2, 3, 4, 5, 6, 7)
- tag_name = "len"
- tag = gr.tag_t()
- tag.offset = 0
- tag.key = pmt.string_to_symbol(tag_name)
- tag.value = pmt.from_long(len(data))
- src = blocks.vector_source_b(data, False, 1, (tag,))
- crc = digital.crc32_bb(False, tag_name)
- crc_check = digital.crc32_bb(True, tag_name)
+ src = blocks.vector_source_b(data)
+ crc = digital.crc32_bb(False, self.tsb_key)
+ crc_check = digital.crc32_bb(True, self.tsb_key)
corruptor = blocks.add_const_bb(1)
- sink = blocks.vector_sink_b()
- self.tb.connect(src, crc, corruptor, crc_check, sink)
+ sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
+ self.tb.connect(
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(data), self.tsb_key),
+ crc,
+ corruptor,
+ crc_check,
+ sink
+ )
self.tb.run()
# crc_check will drop invalid packets
self.assertEqual(len(sink.data()), 0)
def test_005_tag_propagation (self):
""" Make sure tags on the CRC aren't lost. """
- data = (0, 1, 2, 3, 4, 5, 6, 7, 8, 2, 67, 225, 188)
- tag_name = "len"
- tag = gr.tag_t()
- tag.offset = 0
- tag.key = pmt.string_to_symbol(tag_name)
- tag.value = pmt.from_long(len(data))
+ # Data with precalculated CRC
+ data = (
+ 0, 1, 2, 3, 4, 5, 6, 7, 8,
+ 2, 67, 225, 188
+ )
testtag = gr.tag_t()
testtag.offset = len(data)-1
testtag.key = pmt.string_to_symbol('tag1')
testtag.value = pmt.from_long(0)
- src = blocks.vector_source_b(data, False, 1, (tag, testtag))
- crc_check = digital.crc32_bb(True, tag_name)
- sink = blocks.vector_sink_b()
- self.tb.connect(src, crc_check, sink)
+ src = blocks.vector_source_b(data, False, 1, (testtag,))
+ crc_check = digital.crc32_bb(True, self.tsb_key)
+ sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
+ self.tb.connect(
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(data), self.tsb_key),
+ crc_check,
+ sink
+ )
self.tb.run()
self.assertEqual([len(data)-5,], [tag.offset for tag in sink.tags() if pmt.symbol_to_string(tag.key) == 'tag1'])