summaryrefslogtreecommitdiff
path: root/gr-digital/python/digital/qa_burst_shaper.py
diff options
context:
space:
mode:
Diffstat (limited to 'gr-digital/python/digital/qa_burst_shaper.py')
-rwxr-xr-xgr-digital/python/digital/qa_burst_shaper.py71
1 files changed, 65 insertions, 6 deletions
diff --git a/gr-digital/python/digital/qa_burst_shaper.py b/gr-digital/python/digital/qa_burst_shaper.py
index b5aecd4614..f85b79ceec 100755
--- a/gr-digital/python/digital/qa_burst_shaper.py
+++ b/gr-digital/python/digital/qa_burst_shaper.py
@@ -25,6 +25,7 @@ from gnuradio import gr, gr_unittest
from gnuradio import blocks, digital
import pmt
import numpy as np
+import sys
def make_length_tag(offset, length):
return gr.python_to_tag({'offset' : offset,
@@ -32,13 +33,15 @@ def make_length_tag(offset, length):
'value' : pmt.from_long(length),
'srcid' : pmt.intern('qa_burst_shaper')})
+def make_tag(offset, key, value):
+ return gr.python_to_tag({'offset' : offset,
+ 'key' : pmt.intern(key),
+ 'value' : value,
+ 'srcid' : pmt.intern('qa_burst_shaper')})
+
def compare_tags(a, b):
- a = gr.tag_to_python(a)
- b = gr.tag_to_python(b)
- return a.key == b.key and a.offset == b.offset and \
- a.value == b.value
- #return a.key == b.key and a.offset == b.offset and \
- # a.srcid == b.srcid and a.value == b.value
+ return a.offset == b.offset and pmt.equal(a.key, b.key) and \
+ pmt.equal(a.value, b.value)
class qa_burst_shaper (gr_unittest.TestCase):
@@ -276,6 +279,62 @@ class qa_burst_shaper (gr_unittest.TestCase):
for i in xrange(len(etags)):
self.assertTrue(compare_tags(sink.tags()[i], etags[i]))
+ def test_tag_propagation (self):
+ prepad = 10
+ postpad = 10
+ length1 = 15
+ length2 = 25
+ gap_len = 5
+ lentag1_offset = 0
+ lentag2_offset = length1 + gap_len
+ tag1_offset = 0 # accompanies first length tag
+ tag2_offset = length1 + gap_len # accompanies second length tag
+ tag3_offset = 2 # in ramp-up state
+ tag4_offset = length1 + 2 # in gap; tag will be dropped
+ tag5_offset = length1 + gap_len + 7 # in copy state
+
+ data = np.concatenate((np.ones(length1), np.zeros(gap_len),
+ -1.0*np.ones(length2), np.zeros(10)))
+ window = np.concatenate((-2.0*np.ones(5), -4.0*np.ones(5)))
+ tags = (make_length_tag(lentag1_offset, length1),
+ make_length_tag(lentag2_offset, length2),
+ make_tag(tag1_offset, 'head', pmt.intern('tag1')),
+ make_tag(tag2_offset, 'head', pmt.intern('tag2')),
+ make_tag(tag3_offset, 'body', pmt.intern('tag3')),
+ make_tag(tag4_offset, 'body', pmt.intern('tag4')),
+ make_tag(tag5_offset, 'body', pmt.intern('tag5')))
+ expected = np.concatenate((np.zeros(prepad), window[0:5],
+ np.ones(length1 - len(window)), window[5:10],
+ np.zeros(postpad + prepad), -1.0*window[0:5],
+ -1.0*np.ones(length2 - len(window)),
+ -1.0*window[5:10], np.zeros(postpad)))
+ elentag1_offset = 0
+ elentag2_offset = length1 + prepad + postpad
+ etag1_offset = 0
+ etag2_offset = elentag2_offset
+ etag3_offset = prepad + tag3_offset
+ etag5_offset = 2*prepad + postpad + tag5_offset - gap_len
+ etags = (make_length_tag(elentag1_offset, length1 + prepad + postpad),
+ make_length_tag(elentag2_offset, length2 + prepad + postpad),
+ make_tag(etag1_offset, 'head', pmt.intern('tag1')),
+ make_tag(etag2_offset, 'head', pmt.intern('tag2')),
+ make_tag(etag3_offset, 'body', pmt.intern('tag3')),
+ make_tag(etag5_offset, 'body', pmt.intern('tag5')))
+
+ # flowgraph
+ source = blocks.vector_source_f(data, tags=tags)
+ shaper = digital.burst_shaper_ff(window, pre_padding=prepad,
+ post_padding=postpad)
+ sink = blocks.vector_sink_f()
+ self.tb.connect(source, shaper, sink)
+ self.tb.run ()
+
+ # checks
+ self.assertFloatTuplesAlmostEqual(sink.data(), expected, 6)
+ for x, y in zip(sorted(sink.tags(), key=gr.tag_t_offset_compare_key()),
+ sorted(etags, key=gr.tag_t_offset_compare_key())):
+ self.assertTrue(compare_tags(x, y))
+
if __name__ == '__main__':
gr_unittest.run(qa_burst_shaper, "qa_burst_shaper.xml")