summaryrefslogtreecommitdiff
path: root/gr-blocks/python
diff options
context:
space:
mode:
Diffstat (limited to 'gr-blocks/python')
-rwxr-xr-xgr-blocks/python/blocks/qa_interleave.py71
-rwxr-xr-xgr-blocks/python/blocks/qa_keep_one_in_n.py1
-rwxr-xr-xgr-blocks/python/blocks/qa_repack_bits_bb.py57
-rwxr-xr-xgr-blocks/python/blocks/qa_tagged_stream_mux.py123
-rwxr-xr-xgr-blocks/python/blocks/qa_tsb_vector_sink_X.py58
5 files changed, 199 insertions, 111 deletions
diff --git a/gr-blocks/python/blocks/qa_interleave.py b/gr-blocks/python/blocks/qa_interleave.py
index 9eaf87c83c..526e4a4e6f 100755
--- a/gr-blocks/python/blocks/qa_interleave.py
+++ b/gr-blocks/python/blocks/qa_interleave.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2004,2007,2010,2012,2013 Free Software Foundation, Inc.
+# Copyright 2004,2007,2010,2012-2014 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -49,6 +49,37 @@ class test_interleave (gr_unittest.TestCase):
result_data = dst.data ()
self.assertFloatTuplesAlmostEqual (expected_result, result_data)
+ def test_int_002 (self):
+ blksize = 4
+ lenx = 64
+ plusup_big = lambda a: a + (blksize * 4)
+ plusup_little = lambda a: a + blksize
+ a_vec = range(0,blksize)
+ for i in range(0,(lenx/(4 * blksize)) - 1):
+ a_vec += map(plusup_big, a_vec[len(a_vec) - blksize:])
+
+ b_vec = map(plusup_little, a_vec)
+ c_vec = map(plusup_little, b_vec)
+ d_vec = map(plusup_little, c_vec)
+
+ src0 = blocks.vector_source_f (a_vec)
+ src1 = blocks.vector_source_f (b_vec)
+ src2 = blocks.vector_source_f (c_vec)
+ src3 = blocks.vector_source_f (d_vec)
+ op = blocks.interleave (gr.sizeof_float, blksize)
+ dst = blocks.vector_sink_f ()
+
+ self.tb.connect (src0, (op, 0))
+ self.tb.connect (src1, (op, 1))
+ self.tb.connect (src2, (op, 2))
+ self.tb.connect (src3, (op, 3))
+ self.tb.connect (op, dst)
+ self.tb.run ()
+ expected_result = tuple (range (lenx))
+ result_data = dst.data ()
+
+ self.assertFloatTuplesAlmostEqual (expected_result, result_data)
+
def test_deint_001 (self):
lenx = 64
src = blocks.vector_source_f (range (lenx))
@@ -75,6 +106,42 @@ class test_interleave (gr_unittest.TestCase):
self.assertFloatTuplesAlmostEqual (expected_result2, dst2.data ())
self.assertFloatTuplesAlmostEqual (expected_result3, dst3.data ())
+ def test_deint_002 (self):
+ blksize = 4
+ lenx = 64
+ src = blocks.vector_source_f (range (lenx))
+ op = blocks.deinterleave (gr.sizeof_float, blksize)
+ dst0 = blocks.vector_sink_f ()
+ dst1 = blocks.vector_sink_f ()
+ dst2 = blocks.vector_sink_f ()
+ dst3 = blocks.vector_sink_f ()
+
+ self.tb.connect (src, op)
+ self.tb.connect ((op, 0), dst0)
+ self.tb.connect ((op, 1), dst1)
+ self.tb.connect ((op, 2), dst2)
+ self.tb.connect ((op, 3), dst3)
+ self.tb.run ()
+
+ plusup_big = lambda a: a + (blksize * 4)
+ plusup_little = lambda a: a + blksize
+ a_vec = range(0,blksize)
+ for i in range(0,(lenx/(4 * blksize)) - 1):
+ a_vec += map(plusup_big, a_vec[len(a_vec) - blksize:])
+
+ b_vec = map(plusup_little, a_vec)
+ c_vec = map(plusup_little, b_vec)
+ d_vec = map(plusup_little, c_vec)
+
+ expected_result0 = tuple (a_vec)
+ expected_result1 = tuple (b_vec)
+ expected_result2 = tuple (c_vec)
+ expected_result3 = tuple (d_vec)
+
+ self.assertFloatTuplesAlmostEqual (expected_result0, dst0.data ())
+ self.assertFloatTuplesAlmostEqual (expected_result1, dst1.data ())
+ self.assertFloatTuplesAlmostEqual (expected_result2, dst2.data ())
+ self.assertFloatTuplesAlmostEqual (expected_result3, dst3.data ())
+
if __name__ == '__main__':
gr_unittest.run(test_interleave, "test_interleave.xml")
-
diff --git a/gr-blocks/python/blocks/qa_keep_one_in_n.py b/gr-blocks/python/blocks/qa_keep_one_in_n.py
index 2a5d936cce..d8251fe611 100755
--- a/gr-blocks/python/blocks/qa_keep_one_in_n.py
+++ b/gr-blocks/python/blocks/qa_keep_one_in_n.py
@@ -36,7 +36,6 @@ class test_keep_one_in_n(gr_unittest.TestCase):
src = blocks.vector_source_b(src_data);
op = blocks.keep_one_in_n(gr.sizeof_char, 5)
dst = blocks.vector_sink_b()
- print "HERE"
self.tb.connect(src, op, dst)
self.tb.run()
self.assertEqual(dst.data(), expected_data)
diff --git a/gr-blocks/python/blocks/qa_repack_bits_bb.py b/gr-blocks/python/blocks/qa_repack_bits_bb.py
index d9bbfe4fac..10880b196a 100755
--- a/gr-blocks/python/blocks/qa_repack_bits_bb.py
+++ b/gr-blocks/python/blocks/qa_repack_bits_bb.py
@@ -28,6 +28,7 @@ class qa_repack_bits_bb (gr_unittest.TestCase):
def setUp (self):
self.tb = gr.top_block ()
+ self.tsb_key = "length"
def tearDown (self):
self.tb = None
@@ -77,24 +78,18 @@ class qa_repack_bits_bb (gr_unittest.TestCase):
expected_data = (0b101,) + (0b111,) * 4 + (0b001,)
k = 8
l = 3
- tag_name = "len"
- tag = gr.tag_t()
- tag.offset = 0
- tag.key = pmt.string_to_symbol(tag_name)
- tag.value = pmt.from_long(len(src_data))
- src = blocks.vector_source_b(src_data, False, 1, (tag,))
- repack = blocks.repack_bits_bb(k, l, tag_name)
- sink = blocks.vector_sink_b()
- self.tb.connect(src, repack, sink)
+ src = blocks.vector_source_b(src_data, False, 1)
+ repack = blocks.repack_bits_bb(k, l, self.tsb_key)
+ sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
+ self.tb.connect(
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(src_data), self.tsb_key),
+ repack,
+ sink
+ )
self.tb.run ()
- self.assertEqual(sink.data(), expected_data)
- try:
- out_tag = sink.tags()[0]
- except:
- self.assertFail()
- self.assertEqual(out_tag.offset, 0)
- self.assertEqual(pmt.symbol_to_string(out_tag.key), tag_name)
- self.assertEqual(pmt.to_long(out_tag.value), len(expected_data))
+ self.assertEqual(len(sink.data()), 1)
+ self.assertEqual(sink.data()[0], expected_data)
def test_005_three_with_tags_trailing (self):
""" 3 -> 8, trailing bits """
@@ -102,24 +97,18 @@ class qa_repack_bits_bb (gr_unittest.TestCase):
expected_data = (0b11111101, 0b11111111)
k = 3
l = 8
- tag_name = "len"
- tag = gr.tag_t()
- tag.offset = 0
- tag.key = pmt.string_to_symbol(tag_name)
- tag.value = pmt.from_long(len(src_data))
- src = blocks.vector_source_b(src_data, False, 1, (tag,))
- repack = blocks.repack_bits_bb(k, l, tag_name, True)
- sink = blocks.vector_sink_b()
- self.tb.connect(src, repack, sink)
+ src = blocks.vector_source_b(src_data, False, 1)
+ repack = blocks.repack_bits_bb(k, l, self.tsb_key, True)
+ sink = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
+ self.tb.connect(
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_char, 1, len(src_data), self.tsb_key),
+ repack,
+ sink
+ )
self.tb.run ()
- self.assertEqual(sink.data(), expected_data)
- try:
- out_tag = sink.tags()[0]
- except:
- self.assertFail()
- self.assertEqual(out_tag.offset, 0)
- self.assertEqual(pmt.symbol_to_string(out_tag.key), tag_name)
- self.assertEqual(pmt.to_long(out_tag.value), len(expected_data))
+ self.assertEqual(len(sink.data()), 1)
+ self.assertEqual(sink.data()[0], expected_data)
if __name__ == '__main__':
gr_unittest.run(qa_repack_bits_bb, "qa_repack_bits_bb.xml")
diff --git a/gr-blocks/python/blocks/qa_tagged_stream_mux.py b/gr-blocks/python/blocks/qa_tagged_stream_mux.py
index 749fda3c32..6f1c1c538a 100755
--- a/gr-blocks/python/blocks/qa_tagged_stream_mux.py
+++ b/gr-blocks/python/blocks/qa_tagged_stream_mux.py
@@ -1,6 +1,6 @@
#!/usr/bin/env python
#
-# Copyright 2013 Free Software Foundation, Inc.
+# Copyright 2013-2014 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
@@ -20,87 +20,62 @@
# Boston, MA 02110-1301, USA.
#
-from gnuradio import gr, gr_unittest, blocks
-import pmt
import numpy
+import pmt
+from gnuradio import gr, gr_unittest, blocks
+from gnuradio.gr import packet_utils
-def make_len_tags(tupl, key):
- tags = []
- tag = gr.tag_t()
- tag.key = pmt.string_to_symbol(key)
- n_read = 0
- for element in tupl:
- tag.offset = n_read
- n_read += len(element)
- tag.value = pmt.to_pmt(len(element))
- tags.append(tag)
- return tags
-
-def make_len_tag(offset, key, value):
+def make_tag(key, value, offset, srcid=None):
tag = gr.tag_t()
- tag.offset = offset
tag.key = pmt.string_to_symbol(key)
tag.value = pmt.to_pmt(value)
+ tag.offset = offset
+ if srcid is not None:
+ tag.srcid = pmt.to_pmt(srcid)
return tag
-
class qa_tagged_stream_mux (gr_unittest.TestCase):
def setUp(self):
self.tb = gr.top_block()
+ self.tsb_key = "tsb_key"
def tearDown(self):
self.tb = None
- def test_1(self):
- datas = (
- 0, 1, 2, 5, 6, 10, 14, 15, 16,
- 3, 4, 7, 8, 9, 11, 12, 13, 17
+ def setup_data_tags(self, data):
+ return packet_utils.packets_to_vectors(
+ data,
+ self.tsb_key
)
- expected = tuple(range(18))
- tagname = "packet_length"
- len_tags_0 = (
- make_len_tag(0, tagname, 3),
- make_len_tag(3, tagname, 2),
- make_len_tag(5, tagname, 1),
- make_len_tag(6, tagname, 3)
+ def test_1(self):
+ packets0 = (
+ (0, 1, 2), (5, 6), (10,), (14, 15, 16,)
)
- len_tags_1 = (
- make_len_tag(0, tagname, 2),
- make_len_tag(2, tagname, 3),
- make_len_tag(5, tagname, 3),
- make_len_tag(8, tagname, 1)
+ packets1 = (
+ (3, 4), (7, 8, 9), (11, 12, 13), (17,)
)
- test_tag_0 = gr.tag_t()
- test_tag_0.key = pmt.string_to_symbol('spam')
- test_tag_0.offset = 4 # On the second '1'
- test_tag_0.value = pmt.to_pmt(42)
- test_tag_1 = gr.tag_t()
- test_tag_1.key = pmt.string_to_symbol('eggs')
- test_tag_1.offset = 3 # On the first '3' of the 2nd stream
- test_tag_1.value = pmt.to_pmt(23)
-
- src0 = blocks.vector_source_b(datas[0:9], False, 1, len_tags_0 + (test_tag_0,))
- src1 = blocks.vector_source_b(datas[9:], False, 1, len_tags_1 + (test_tag_1,))
- tagged_stream_mux = blocks.tagged_stream_mux(gr.sizeof_char, tagname)
- snk = blocks.vector_sink_b()
+ expected = ((0, 1, 2, 3, 4), (5, 6, 7, 8, 9), (10, 11, 12, 13), (14, 15, 16, 17))
+ data0, tags0 = self.setup_data_tags(packets0)
+ data1, tags1 = self.setup_data_tags(packets1)
+ tags0.append(make_tag('spam', 42, 4))
+ tags1.append(make_tag('eggs', 23, 3))
+ src0 = blocks.vector_source_b(data0, tags=tags0)
+ src1 = blocks.vector_source_b(data1, tags=tags1)
+ tagged_stream_mux = blocks.tagged_stream_mux(gr.sizeof_char, self.tsb_key)
+ snk = blocks.tsb_vector_sink_b(tsb_key=self.tsb_key)
self.tb.connect(src0, (tagged_stream_mux, 0))
self.tb.connect(src1, (tagged_stream_mux, 1))
self.tb.connect(tagged_stream_mux, snk)
self.tb.run()
-
+ # Check
self.assertEqual(expected, snk.data())
-
tags = [gr.tag_to_python(x) for x in snk.tags()]
tags = sorted([(x.offset, x.key, x.value) for x in tags])
tags_expected = [
- (0, 'packet_length', 5),
- (5, 'packet_length', 5),
(6, 'spam', 42),
(8, 'eggs', 23),
- (10, 'packet_length', 4),
- (14, 'packet_length', 4)
]
self.assertEqual(tags, tags_expected)
@@ -108,35 +83,35 @@ class qa_tagged_stream_mux (gr_unittest.TestCase):
""" Test the 'preserve head position' function.
This will add a 'special' tag to item 0 on stream 1.
It should be on item 0 of the output stream. """
- special_tag = gr.tag_t()
- special_tag.key = pmt.string_to_symbol('spam')
- special_tag.offset = 0
- special_tag.value = pmt.to_pmt('eggs')
- len_tag_key = "length"
- packet_len_1 = 5
- packet_len_2 = 3
- mux = blocks.tagged_stream_mux(gr.sizeof_float, len_tag_key, 1)
- sink = blocks.vector_sink_f()
+ packet_len_0 = 5
+ data0 = range(packet_len_0)
+ packet_len_1 = 3
+ data1 = range(packet_len_1)
+ mux = blocks.tagged_stream_mux(
+ gr.sizeof_float,
+ self.tsb_key,
+ 1 # Mark port 1 as carrying special tags on the head position
+ )
+ sink = blocks.tsb_vector_sink_f(tsb_key=self.tsb_key)
self.tb.connect(
- blocks.vector_source_f(range(packet_len_1)),
- blocks.stream_to_tagged_stream(gr.sizeof_float, 1, packet_len_1, len_tag_key),
+ blocks.vector_source_f(data0),
+ blocks.stream_to_tagged_stream(gr.sizeof_float, 1, packet_len_0, self.tsb_key),
(mux, 0)
)
self.tb.connect(
- blocks.vector_source_f(range(packet_len_2), False, 1, (special_tag,)),
- blocks.stream_to_tagged_stream(gr.sizeof_float, 1, packet_len_2, len_tag_key),
+ blocks.vector_source_f(range(packet_len_1), tags=(make_tag('spam', 'eggs', 0),)),
+ blocks.stream_to_tagged_stream(gr.sizeof_float, 1, packet_len_1, self.tsb_key),
(mux, 1)
)
self.tb.connect(mux, sink)
self.tb.run()
- self.assertEqual(sink.data(), tuple(range(packet_len_1) + range(packet_len_2)))
- tags = [gr.tag_to_python(x) for x in sink.tags()]
- tags = sorted([(x.offset, x.key, x.value) for x in tags])
- tags_expected = [
- (0, 'length', packet_len_1 + packet_len_2),
- (0, 'spam', 'eggs'),
- ]
- self.assertEqual(tags, tags_expected)
+ self.assertEqual(len(sink.data()), 1)
+ self.assertEqual(sink.data()[0], tuple(data0 + data1))
+ self.assertEqual(len(sink.tags()), 1)
+ tag = gr.tag_to_python(sink.tags()[0])
+ tag = (tag.offset, tag.key, tag.value)
+ tag_expected = (0, 'spam', 'eggs')
+ self.assertEqual(tag, tag_expected)
if __name__ == '__main__':
diff --git a/gr-blocks/python/blocks/qa_tsb_vector_sink_X.py b/gr-blocks/python/blocks/qa_tsb_vector_sink_X.py
new file mode 100755
index 0000000000..aa7154e6a5
--- /dev/null
+++ b/gr-blocks/python/blocks/qa_tsb_vector_sink_X.py
@@ -0,0 +1,58 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Copyright 2014 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+import pmt
+from gnuradio import gr, gr_unittest
+from gnuradio import blocks
+
+class qa_tsb_vector_sink (gr_unittest.TestCase):
+
+ def setUp (self):
+ self.tb = gr.top_block ()
+ self.tsb_key = "tsb"
+
+ def tearDown (self):
+ self.tb = None
+
+ def test_001_t (self):
+ packet_len = 4
+ data = range(2 * packet_len)
+ tag = gr.tag_t()
+ tag.key = pmt.intern("foo")
+ tag.offset = 5
+ tag.value = pmt.intern("bar")
+ src = blocks.vector_source_f(data, tags=(tag,))
+ sink = blocks.tsb_vector_sink_f(tsb_key=self.tsb_key)
+ self.tb.connect(
+ src,
+ blocks.stream_to_tagged_stream(gr.sizeof_float, 1, packet_len, self.tsb_key),
+ sink
+ )
+ self.tb.run()
+ self.assertEqual((tuple(data[0:packet_len]), tuple(data[packet_len:])), sink.data())
+ self.assertEqual(len(sink.tags()), 1)
+ self.assertEqual(sink.tags()[0].offset, tag.offset)
+
+
+if __name__ == '__main__':
+ gr_unittest.run(qa_tsb_vector_sink, "qa_tsb_vector_sink.xml")