diff options
Diffstat (limited to 'gr-digital/python/digital/qa_burst_shaper.py')
-rwxr-xr-x | gr-digital/python/digital/qa_burst_shaper.py | 71 |
1 files changed, 65 insertions, 6 deletions
diff --git a/gr-digital/python/digital/qa_burst_shaper.py b/gr-digital/python/digital/qa_burst_shaper.py index b5aecd4614..f85b79ceec 100755 --- a/gr-digital/python/digital/qa_burst_shaper.py +++ b/gr-digital/python/digital/qa_burst_shaper.py @@ -25,6 +25,7 @@ from gnuradio import gr, gr_unittest from gnuradio import blocks, digital import pmt import numpy as np +import sys def make_length_tag(offset, length): return gr.python_to_tag({'offset' : offset, @@ -32,13 +33,15 @@ def make_length_tag(offset, length): 'value' : pmt.from_long(length), 'srcid' : pmt.intern('qa_burst_shaper')}) +def make_tag(offset, key, value): + return gr.python_to_tag({'offset' : offset, + 'key' : pmt.intern(key), + 'value' : value, + 'srcid' : pmt.intern('qa_burst_shaper')}) + def compare_tags(a, b): - a = gr.tag_to_python(a) - b = gr.tag_to_python(b) - return a.key == b.key and a.offset == b.offset and \ - a.value == b.value - #return a.key == b.key and a.offset == b.offset and \ - # a.srcid == b.srcid and a.value == b.value + return a.offset == b.offset and pmt.equal(a.key, b.key) and \ + pmt.equal(a.value, b.value) class qa_burst_shaper (gr_unittest.TestCase): @@ -276,6 +279,62 @@ class qa_burst_shaper (gr_unittest.TestCase): for i in xrange(len(etags)): self.assertTrue(compare_tags(sink.tags()[i], etags[i])) + def test_tag_propagation (self): + prepad = 10 + postpad = 10 + length1 = 15 + length2 = 25 + gap_len = 5 + lentag1_offset = 0 + lentag2_offset = length1 + gap_len + tag1_offset = 0 # accompanies first length tag + tag2_offset = length1 + gap_len # accompanies second length tag + tag3_offset = 2 # in ramp-up state + tag4_offset = length1 + 2 # in gap; tag will be dropped + tag5_offset = length1 + gap_len + 7 # in copy state + + data = np.concatenate((np.ones(length1), np.zeros(gap_len), + -1.0*np.ones(length2), np.zeros(10))) + window = np.concatenate((-2.0*np.ones(5), -4.0*np.ones(5))) + tags = (make_length_tag(lentag1_offset, length1), + make_length_tag(lentag2_offset, length2), + make_tag(tag1_offset, 'head', pmt.intern('tag1')), + make_tag(tag2_offset, 'head', pmt.intern('tag2')), + make_tag(tag3_offset, 'body', pmt.intern('tag3')), + make_tag(tag4_offset, 'body', pmt.intern('tag4')), + make_tag(tag5_offset, 'body', pmt.intern('tag5'))) + expected = np.concatenate((np.zeros(prepad), window[0:5], + np.ones(length1 - len(window)), window[5:10], + np.zeros(postpad + prepad), -1.0*window[0:5], + -1.0*np.ones(length2 - len(window)), + -1.0*window[5:10], np.zeros(postpad))) + elentag1_offset = 0 + elentag2_offset = length1 + prepad + postpad + etag1_offset = 0 + etag2_offset = elentag2_offset + etag3_offset = prepad + tag3_offset + etag5_offset = 2*prepad + postpad + tag5_offset - gap_len + etags = (make_length_tag(elentag1_offset, length1 + prepad + postpad), + make_length_tag(elentag2_offset, length2 + prepad + postpad), + make_tag(etag1_offset, 'head', pmt.intern('tag1')), + make_tag(etag2_offset, 'head', pmt.intern('tag2')), + make_tag(etag3_offset, 'body', pmt.intern('tag3')), + make_tag(etag5_offset, 'body', pmt.intern('tag5'))) + + # flowgraph + source = blocks.vector_source_f(data, tags=tags) + shaper = digital.burst_shaper_ff(window, pre_padding=prepad, + post_padding=postpad) + sink = blocks.vector_sink_f() + self.tb.connect(source, shaper, sink) + self.tb.run () + + # checks + self.assertFloatTuplesAlmostEqual(sink.data(), expected, 6) + for x, y in zip(sorted(sink.tags(), key=gr.tag_t_offset_compare_key()), + sorted(etags, key=gr.tag_t_offset_compare_key())): + self.assertTrue(compare_tags(x, y)) + if __name__ == '__main__': gr_unittest.run(qa_burst_shaper, "qa_burst_shaper.xml") |