summaryrefslogtreecommitdiff
path: root/gr-blocks
diff options
context:
space:
mode:
authorDavid Pi <david.pinho@gmail.com>2020-09-23 11:53:32 +0100
committermormj <34754695+mormj@users.noreply.github.com>2020-09-25 06:40:13 -0400
commitc196dacea56ef938902646a2143907858415b240 (patch)
treeafb260d0803e14c71971e0d5eec28f70be1939bc /gr-blocks
parent8f3d61984ecd9ecc97789a174a54c7f0505bddb4 (diff)
blocks: New block 'Stream Demux'
Stream demuxing block to demultiplex one stream into N output streams. Demuxes a stream producing N outputs streams that contains n_0 items in the first stream, n_1 items in the second, etc. and repeats. Number of items of each output stream is specified using the 'lengths' parameter like so [n_0, n_1, ..., n_N-1]. Example: lengths = [2, 3, 4] input stream: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, ...] output_streams: [0, 1, 9, 10, ...] [2, 3, 4, 11, ...] [5, 6, 7, 8, ...]
Diffstat (limited to 'gr-blocks')
-rw-r--r--gr-blocks/grc/blocks.tree.yml1
-rw-r--r--gr-blocks/grc/blocks_stream_demux.block.yml54
-rw-r--r--gr-blocks/include/gnuradio/blocks/CMakeLists.txt1
-rw-r--r--gr-blocks/include/gnuradio/blocks/stream_demux.h70
-rw-r--r--gr-blocks/lib/CMakeLists.txt1
-rw-r--r--gr-blocks/lib/stream_demux_impl.cc114
-rw-r--r--gr-blocks/lib/stream_demux_impl.h53
-rw-r--r--gr-blocks/python/blocks/bindings/CMakeLists.txt5
-rw-r--r--gr-blocks/python/blocks/bindings/docstrings/stream_demux_pydoc_template.h24
-rw-r--r--gr-blocks/python/blocks/bindings/python_bindings.cc2
-rw-r--r--gr-blocks/python/blocks/bindings/stream_demux_python.cc46
-rwxr-xr-xgr-blocks/python/blocks/qa_stream_demux.py292
12 files changed, 661 insertions, 2 deletions
diff --git a/gr-blocks/grc/blocks.tree.yml b/gr-blocks/grc/blocks.tree.yml
index d01650f3fc..c6a67cf39f 100644
--- a/gr-blocks/grc/blocks.tree.yml
+++ b/gr-blocks/grc/blocks.tree.yml
@@ -127,6 +127,7 @@
- blocks_interleave
- blocks_keep_m_in_n
- blocks_keep_one_in_n
+ - blocks_stream_demux
- blocks_stream_mux
- blocks_stream_to_streams
- blocks_stream_to_vector
diff --git a/gr-blocks/grc/blocks_stream_demux.block.yml b/gr-blocks/grc/blocks_stream_demux.block.yml
new file mode 100644
index 0000000000..c0c3eacf1b
--- /dev/null
+++ b/gr-blocks/grc/blocks_stream_demux.block.yml
@@ -0,0 +1,54 @@
+id: blocks_stream_demux
+label: Stream Demux
+flags: [ python, cpp ]
+
+parameters:
+- id: type
+ label: Type
+ dtype: enum
+ options: [complex, float, int, short, byte]
+ option_attributes:
+ size: [gr.sizeof_gr_complex, gr.sizeof_float, gr.sizeof_int, gr.sizeof_short,
+ gr.sizeof_char]
+ hide: part
+- id: lengths
+ label: Lengths
+ dtype: int_vector
+ default: 1, 1
+- id: num_outputs
+ label: Num Outputs
+ dtype: int
+ default: '2'
+ hide: part
+- id: vlen
+ label: Vec Length
+ dtype: int
+ default: '1'
+ hide: ${ 'part' if vlen == 1 else 'none' }
+
+inputs:
+- domain: stream
+ dtype: ${ type }
+ vlen: ${ vlen }
+
+outputs:
+- domain: stream
+ dtype: ${ type }
+ vlen: ${ vlen }
+ multiplicity: ${ num_outputs }
+
+asserts:
+- ${ num_outputs > 0 }
+- ${ num_outputs == len(lengths) }
+- ${ vlen > 0 }
+
+templates:
+ imports: from gnuradio import blocks
+ make: blocks.stream_demux(${type.size}*${vlen}, ${lengths})
+
+cpp_templates:
+ includes: ['#include <gnuradio/blocks/stream_demux.h>']
+ declarations: 'blocks::stream_demux::sptr ${id};'
+ make: 'this->${id} = blocks::stream_demux::make(${type.size}*${vlen}, ${lengths});'
+
+file_format: 1
diff --git a/gr-blocks/include/gnuradio/blocks/CMakeLists.txt b/gr-blocks/include/gnuradio/blocks/CMakeLists.txt
index 6c980da423..9f88792221 100644
--- a/gr-blocks/include/gnuradio/blocks/CMakeLists.txt
+++ b/gr-blocks/include/gnuradio/blocks/CMakeLists.txt
@@ -131,6 +131,7 @@ install(FILES
short_to_float.h
skiphead.h
socket_pdu.h
+ stream_demux.h
stream_mux.h
stream_to_streams.h
stream_to_tagged_stream.h
diff --git a/gr-blocks/include/gnuradio/blocks/stream_demux.h b/gr-blocks/include/gnuradio/blocks/stream_demux.h
new file mode 100644
index 0000000000..ca229c4556
--- /dev/null
+++ b/gr-blocks/include/gnuradio/blocks/stream_demux.h
@@ -0,0 +1,70 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2020 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_BLOCKS_STREAM_DEMUX_H
+#define INCLUDED_BLOCKS_STREAM_DEMUX_H
+
+#include <gnuradio/block.h>
+#include <gnuradio/blocks/api.h>
+#include <vector>
+
+namespace gr {
+namespace blocks {
+
+/*!
+ * \brief Stream demuxing block to demultiplex one stream into N output streams
+ * \ingroup stream_operators_blk
+ *
+ * \details
+ * Demuxes a stream producing N outputs streams that contains n_0 items in
+ * the first stream, n_1 items in the second, etc. and repeats. Number of
+ * items of each output stream is specified using the 'lengths' parameter
+ * like so [n_0, n_1, ..., n_N-1].
+ *
+ * Example:
+ * lengths = [2, 3, 4]
+ * input stream: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, ...]
+ * output_streams: [0, 1, 9, 10, ...]
+ * [2, 3, 4, 11, ...]
+ * [5, 6, 7, 8, ...]
+ */
+class BLOCKS_API stream_demux : virtual public gr::block
+{
+public:
+ // gr::blocks::stream_demux::sptr
+ typedef std::shared_ptr<stream_demux> sptr;
+
+ /*!
+ * \brief Stream demuxing block to demultiplex one stream into N output streams
+ *
+ * \param itemsize the item size of the stream
+ * \param lengths a vector (list/tuple) specifying the number of
+ * items to copy to each output stream.
+ *
+ */
+ static sptr make(size_t itemsize, const std::vector<int>& lengths);
+};
+
+} // namespace blocks
+} // namespace gr
+
+#endif /* INCLUDED_BLOCKS_STREAM_DEMUX_H */
diff --git a/gr-blocks/lib/CMakeLists.txt b/gr-blocks/lib/CMakeLists.txt
index e952088e60..bfe3eaa91e 100644
--- a/gr-blocks/lib/CMakeLists.txt
+++ b/gr-blocks/lib/CMakeLists.txt
@@ -135,6 +135,7 @@ set(BLOCKS_SOURCES
short_to_float_impl.cc
skiphead_impl.cc
socket_pdu_impl.cc
+ stream_demux_impl.cc
stream_mux_impl.cc
stream_pdu_base.cc
stream_to_streams_impl.cc
diff --git a/gr-blocks/lib/stream_demux_impl.cc b/gr-blocks/lib/stream_demux_impl.cc
new file mode 100644
index 0000000000..a7298a78df
--- /dev/null
+++ b/gr-blocks/lib/stream_demux_impl.cc
@@ -0,0 +1,114 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2020 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifdef HAVE_CONFIG_H
+#include "config.h"
+#endif
+
+#include "stream_demux_impl.h"
+#include <gnuradio/io_signature.h>
+#include <cstring>
+
+namespace gr {
+namespace blocks {
+
+stream_demux::sptr stream_demux::make(size_t itemsize, const std::vector<int>& lengths)
+{
+ return gnuradio::make_block_sptr<stream_demux_impl>(itemsize, lengths);
+}
+
+stream_demux_impl::stream_demux_impl(size_t itemsize, const std::vector<int>& lengths)
+ : gr::block("stream_demux",
+ gr::io_signature::make(1, 1, itemsize),
+ gr::io_signature::make(1, -1, itemsize)),
+ d_itemsize(itemsize),
+ d_lengths(lengths)
+{
+ while (d_lengths[d_stream] == 0) {
+ d_stream++;
+ if (d_stream == d_lengths.size()) {
+ throw std::invalid_argument("At least one size must be non-zero.");
+ }
+ }
+ d_residual = d_lengths[d_stream];
+ set_tag_propagation_policy(TPP_DONT);
+}
+
+void stream_demux_impl::forecast(int noutput_items, gr_vector_int& ninput_items_required)
+{
+ std::fill(ninput_items_required.begin(), ninput_items_required.end(), 1);
+}
+
+int stream_demux_impl::general_work(int noutput_items,
+ gr_vector_int& ninput_items,
+ gr_vector_const_void_star& input_items,
+ gr_vector_void_star& output_items)
+{
+ int input_index = 0; // Items read
+ gr_vector_int output_index(d_lengths.size()); // Items written
+
+ while (input_index < ninput_items[0]) {
+ if (noutput_items <= output_index[d_stream]) {
+ break;
+ }
+ const int space_left_in_buffers =
+ std::min(ninput_items[0] - input_index, // Space left in input buffer
+ noutput_items - output_index[d_stream] // Space left in output buffer
+ );
+ const int items_to_copy = std::min(space_left_in_buffers, d_residual);
+ const char* in =
+ reinterpret_cast<const char*>(input_items[0]) + input_index * d_itemsize;
+ char* out = reinterpret_cast<char*>(output_items[d_stream]);
+ memcpy(&out[output_index[d_stream] * d_itemsize], in, items_to_copy * d_itemsize);
+
+ std::vector<gr::tag_t> stream_t;
+ get_tags_in_window(stream_t, 0, input_index, input_index + items_to_copy);
+ for (auto& t : stream_t) {
+ t.offset = t.offset - nitems_read(0) - input_index +
+ nitems_written(d_stream) + output_index[d_stream];
+ add_item_tag(d_stream, t);
+ }
+
+ output_index[d_stream] += items_to_copy;
+ input_index += items_to_copy;
+ d_residual -= items_to_copy;
+ if (d_residual == 0) {
+ do { // Skip all those outputs with zero length
+ d_stream = (d_stream + 1) % d_lengths.size();
+ } while (d_lengths[d_stream] == 0);
+ d_residual = d_lengths[d_stream];
+ } else {
+ break;
+ }
+ } // while
+
+ consume(0, input_index);
+
+ for (size_t i = 0; i < output_index.size(); i++) {
+ produce((int)i, output_index[i]);
+ }
+
+ return WORK_CALLED_PRODUCE;
+}
+
+} /* namespace blocks */
+} /* namespace gr */
diff --git a/gr-blocks/lib/stream_demux_impl.h b/gr-blocks/lib/stream_demux_impl.h
new file mode 100644
index 0000000000..8fabf5e613
--- /dev/null
+++ b/gr-blocks/lib/stream_demux_impl.h
@@ -0,0 +1,53 @@
+/* -*- c++ -*- */
+/*
+ * Copyright 2020 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * GNU Radio is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3, or (at your option)
+ * any later version.
+ *
+ * GNU Radio is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with GNU Radio; see the file COPYING. If not, write to
+ * the Free Software Foundation, Inc., 51 Franklin Street,
+ * Boston, MA 02110-1301, USA.
+ */
+
+#ifndef INCLUDED_BLOCKS_STREAM_DEMUX_IMPL_H
+#define INCLUDED_BLOCKS_STREAM_DEMUX_IMPL_H
+
+#include <gnuradio/blocks/stream_demux.h>
+
+namespace gr {
+namespace blocks {
+
+class BLOCKS_API stream_demux_impl : public stream_demux
+{
+private:
+ const size_t d_itemsize;
+ unsigned int d_stream{ 0 }; // index of currently selected stream
+ int d_residual{ 0 }; // number of items left to put into current stream
+ const gr_vector_int d_lengths; // number of items to pack per stream
+
+public:
+ stream_demux_impl(size_t itemsize, const std::vector<int>& lengths);
+
+ void forecast(int noutput_items, gr_vector_int& ninput_items_required);
+
+ int general_work(int noutput_items,
+ gr_vector_int& ninput_items,
+ gr_vector_const_void_star& input_items,
+ gr_vector_void_star& output_items);
+};
+
+} // namespace blocks
+} // namespace gr
+
+#endif /* INCLUDED_BLOCKS_STREAM_DEMUX_IMPL_H */
diff --git a/gr-blocks/python/blocks/bindings/CMakeLists.txt b/gr-blocks/python/blocks/bindings/CMakeLists.txt
index 5fefaec511..257afe0119 100644
--- a/gr-blocks/python/blocks/bindings/CMakeLists.txt
+++ b/gr-blocks/python/blocks/bindings/CMakeLists.txt
@@ -130,6 +130,7 @@ list(APPEND blocks_python_files
short_to_float_python.cc
skiphead_python.cc
socket_pdu_python.cc
+ stream_demux_python.cc
stream_mux_python.cc
stream_to_streams_python.cc
stream_to_tagged_stream_python.cc
@@ -178,8 +179,8 @@ if (SNDFILE_FOUND)
wavfile_source_python.cc)
endif()
-GR_PYBIND_MAKE_CHECK_HASH(blocks
- ../../..
+GR_PYBIND_MAKE_CHECK_HASH(blocks
+ ../../..
gr::blocks
"${blocks_python_files}")
diff --git a/gr-blocks/python/blocks/bindings/docstrings/stream_demux_pydoc_template.h b/gr-blocks/python/blocks/bindings/docstrings/stream_demux_pydoc_template.h
new file mode 100644
index 0000000000..39470983cb
--- /dev/null
+++ b/gr-blocks/python/blocks/bindings/docstrings/stream_demux_pydoc_template.h
@@ -0,0 +1,24 @@
+/*
+ * Copyright 2020 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *
+ */
+#include "pydoc_macros.h"
+#define D(...) DOC(gr, blocks, __VA_ARGS__)
+/*
+ This file contains placeholders for docstrings for the Python bindings.
+ Do not edit! These were automatically extracted during the binding process
+ and will be overwritten during the build process
+ */
+
+
+static const char* __doc_gr_blocks_stream_demux = R"doc()doc";
+
+
+static const char* __doc_gr_blocks_stream_demux_stream_demux = R"doc()doc";
+
+
+static const char* __doc_gr_blocks_stream_demux_make = R"doc()doc";
diff --git a/gr-blocks/python/blocks/bindings/python_bindings.cc b/gr-blocks/python/blocks/bindings/python_bindings.cc
index b815fae14f..155ce38b19 100644
--- a/gr-blocks/python/blocks/bindings/python_bindings.cc
+++ b/gr-blocks/python/blocks/bindings/python_bindings.cc
@@ -134,6 +134,7 @@ void bind_short_to_char(py::module&);
void bind_short_to_float(py::module&);
void bind_skiphead(py::module&);
void bind_socket_pdu(py::module&);
+void bind_stream_demux(py::module&);
void bind_stream_mux(py::module&);
void bind_stream_to_streams(py::module&);
void bind_stream_to_tagged_stream(py::module&);
@@ -316,6 +317,7 @@ PYBIND11_MODULE(blocks_python, m)
bind_short_to_float(m);
bind_skiphead(m);
bind_socket_pdu(m);
+ bind_stream_demux(m);
bind_stream_mux(m);
bind_stream_to_streams(m);
bind_stream_to_tagged_stream(m);
diff --git a/gr-blocks/python/blocks/bindings/stream_demux_python.cc b/gr-blocks/python/blocks/bindings/stream_demux_python.cc
new file mode 100644
index 0000000000..e7cccfea07
--- /dev/null
+++ b/gr-blocks/python/blocks/bindings/stream_demux_python.cc
@@ -0,0 +1,46 @@
+/*
+ * Copyright 2020 Free Software Foundation, Inc.
+ *
+ * This file is part of GNU Radio
+ *
+ * SPDX-License-Identifier: GPL-3.0-or-later
+ *
+ */
+
+/***********************************************************************************/
+/* This file is automatically generated using bindtool and can be manually edited */
+/* The following lines can be configured to regenerate this file during cmake */
+/* If manual edits are made, the following tags should be modified accordingly. */
+/* BINDTOOL_GEN_AUTOMATIC(0) */
+/* BINDTOOL_USE_PYGCCXML(0) */
+/* BINDTOOL_HEADER_FILE(stream_demux.h) */
+/* BINDTOOL_HEADER_FILE_HASH(76f72d6993d70c9da3a497eacd72280a) */
+/***********************************************************************************/
+
+#include <pybind11/complex.h>
+#include <pybind11/pybind11.h>
+#include <pybind11/stl.h>
+
+namespace py = pybind11;
+
+#include <gnuradio/blocks/stream_demux.h>
+// pydoc.h is automatically generated in the build directory
+#include <stream_demux_pydoc.h>
+
+void bind_stream_demux(py::module& m)
+{
+
+ using stream_demux = ::gr::blocks::stream_demux;
+
+
+ py::class_<stream_demux, gr::block, gr::basic_block, std::shared_ptr<stream_demux>>(
+ m, "stream_demux", D(stream_demux))
+
+ .def(py::init(&stream_demux::make),
+ py::arg("itemsize"),
+ py::arg("lengths"),
+ D(stream_demux, make))
+
+
+ ;
+}
diff --git a/gr-blocks/python/blocks/qa_stream_demux.py b/gr-blocks/python/blocks/qa_stream_demux.py
new file mode 100755
index 0000000000..da9aa46be0
--- /dev/null
+++ b/gr-blocks/python/blocks/qa_stream_demux.py
@@ -0,0 +1,292 @@
+#!/usr/bin/env python
+#
+# Copyright 2020 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+#
+
+
+from gnuradio import gr, gr_unittest, blocks
+import pmt
+import os
+
+class qa_stream_demux(gr_unittest.TestCase):
+
+ def setUp (self):
+ os.environ['GR_CONF_CONTROLPORT_ON'] = 'False'
+ self.tb = gr.top_block ()
+
+ def tearDown (self):
+ self.tb = None
+
+ def help_stream_2ff(self, N, stream_sizes):
+ v = blocks.vector_source_f(N*[1,] + N*[2,], False)
+
+ demux = blocks.stream_demux(gr.sizeof_float, stream_sizes)
+
+ dst0 = blocks.vector_sink_f()
+ dst1 = blocks.vector_sink_f()
+
+ self.tb.connect(v, demux)
+ self.tb.connect((demux,0), dst0)
+ self.tb.connect((demux,1), dst1)
+ self.tb.run()
+
+ return (dst0.data(), dst1.data())
+
+ def help_stream_ramp_2ff(self, N, stream_sizes):
+ r = list(range(N)) + list(reversed(range(N)))
+
+ v = blocks.vector_source_f(r, False)
+
+ demux = blocks.stream_demux(gr.sizeof_float, stream_sizes)
+
+ dst0 = blocks.vector_sink_f()
+ dst1 = blocks.vector_sink_f()
+
+ self.tb.connect(v, demux)
+ self.tb.connect((demux,0), dst0)
+ self.tb.connect((demux,1), dst1)
+ self.tb.run()
+
+ return (dst0.data(), dst1.data())
+
+ def help_stream_tag_propagation(self, N, stream_sizes):
+ src_data = (stream_sizes[0]*[1,] + stream_sizes[1]*[2,] + stream_sizes[2]*[3,]) * N
+
+ src = blocks.vector_source_f(src_data, False)
+
+ tag_stream1 = blocks.stream_to_tagged_stream(gr.sizeof_float, 1,
+ stream_sizes[0], 'src1')
+ tag_stream2 = blocks.stream_to_tagged_stream(gr.sizeof_float, 1,
+ stream_sizes[1], 'src2')
+ tag_stream3 = blocks.stream_to_tagged_stream(gr.sizeof_float, 1,
+ stream_sizes[2], 'src3')
+
+ demux = blocks.stream_demux(gr.sizeof_float, stream_sizes)
+ dst0 = blocks.vector_sink_f()
+ dst1 = blocks.vector_sink_f()
+ dst2 = blocks.vector_sink_f()
+
+ self.tb.connect(src, tag_stream1)
+ self.tb.connect(tag_stream1, tag_stream2)
+ self.tb.connect(tag_stream2, tag_stream3)
+ self.tb.connect(tag_stream3, demux)
+ self.tb.connect((demux,0), dst0)
+ self.tb.connect((demux,1), dst1)
+ self.tb.connect((demux,2), dst2)
+ self.tb.run()
+
+ return (dst0, dst1, dst2)
+
+ def test_stream_2NN_ff(self):
+ N = 40
+ stream_sizes = [10, 10]
+ result_data = self.help_stream_2ff(N, stream_sizes)
+
+ exp_data0 = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0]
+ exp_data1 = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0]
+
+ self.assertEqual (exp_data0, result_data[0])
+ self.assertEqual (exp_data1, result_data[1])
+
+ def test_stream_ramp_2NN_ff(self):
+ N = 40
+ stream_sizes = [10, 10]
+ result_data = self.help_stream_ramp_2ff(N, stream_sizes)
+
+ exp_data0 = [ 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0,
+ 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, 26.0, 27.0, 28.0, 29.0,
+ 39.0, 38.0, 37.0, 36.0, 35.0, 34.0, 33.0, 32.0, 31.0, 30.0,
+ 19.0, 18.0, 17.0, 16.0, 15.0, 14.0, 13.0, 12.0, 11.0, 10.0]
+ exp_data1 = [10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0,
+ 30.0, 31.0, 32.0, 33.0, 34.0, 35.0, 36.0, 37.0, 38.0, 39.0,
+ 29.0, 28.0, 27.0, 26.0, 25.0, 24.0, 23.0, 22.0, 21.0, 20.0,
+ 9.0, 8.0, 7.0, 6.0, 5.0, 4.0, 3.0, 2.0, 1.0, 0.0]
+
+ self.assertEqual (exp_data0, result_data[0])
+ self.assertEqual (exp_data1, result_data[1])
+
+ def test_stream_2NM_ff(self):
+ N = 40
+ stream_sizes = [7, 9]
+ self.help_stream_2ff(N, stream_sizes)
+
+ result_data = self.help_stream_2ff(N, stream_sizes)
+
+ exp_data0 = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0]
+
+ exp_data1 = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 1.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0]
+
+ self.assertEqual (exp_data0, result_data[0])
+ self.assertEqual (exp_data1, result_data[1])
+
+
+ def test_stream_2MN_ff(self):
+ N = 37
+ stream_sizes = [7, 9]
+ self.help_stream_2ff(N, stream_sizes)
+
+ result_data = self.help_stream_2ff(N, stream_sizes)
+
+ exp_data0 = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 2.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0]
+
+ exp_data1 = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 2.0, 2.0, 2.0]
+
+ self.assertEqual (exp_data0, result_data[0])
+ self.assertEqual (exp_data1, result_data[1])
+
+ def test_stream_2N0_ff(self):
+ N = 30
+ stream_sizes = [7, 0]
+ self.help_stream_2ff(N, stream_sizes)
+
+ result_data = self.help_stream_2ff(N, stream_sizes)
+
+ exp_data0 = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 1.0, 1.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 2.0, 2.0, 2.0, 2.0]
+ exp_data1 = []
+
+ self.assertEqual (exp_data0, result_data[0])
+ self.assertEqual (exp_data1, result_data[1])
+
+ def test_stream_20N_ff(self):
+ N = 30
+ stream_sizes = [0, 9]
+ self.help_stream_2ff(N, stream_sizes)
+
+ result_data = self.help_stream_2ff(N, stream_sizes)
+
+ exp_data0 = []
+ exp_data1 = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0,
+ 1.0, 1.0, 1.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0,
+ 2.0, 2.0, 2.0, 2.0, 2.0, 2.0]
+
+ self.assertEqual (exp_data0, result_data[0])
+ self.assertEqual (exp_data1, result_data[1])
+
+ def test_largeN_ff(self):
+ stream_sizes = [3, 8191]
+ r0 = [1.0,] * stream_sizes[0]
+ r1 = [2.0,] * stream_sizes[1]
+ v = blocks.vector_source_f(r0 + r1, repeat=False)
+ demux = blocks.stream_demux(gr.sizeof_float, stream_sizes)
+ dst0 = blocks.vector_sink_f()
+ dst1 = blocks.vector_sink_f()
+ self.tb.connect (v, demux)
+ self.tb.connect ((demux,0), dst0)
+ self.tb.connect ((demux,1), dst1)
+ self.tb.run ()
+ self.assertEqual (r0, dst0.data())
+ self.assertEqual (r1, dst1.data())
+
+ def test_tag_propagation(self):
+ N = 10 # Block length
+ stream_sizes = [1,2,3]
+
+ expected_result0 = N*(stream_sizes[0]*[1,])
+ expected_result1 = N*(stream_sizes[1]*[2,])
+ expected_result2 = N*(stream_sizes[2]*[3,])
+
+ # check the data
+ (result0, result1, result2) = self.help_stream_tag_propagation(N, stream_sizes)
+ self.assertFloatTuplesAlmostEqual(expected_result0, result0.data(), places=6)
+ self.assertFloatTuplesAlmostEqual(expected_result1, result1.data(), places=6)
+ self.assertFloatTuplesAlmostEqual(expected_result2, result2.data(), places=6)
+
+ # check the tags - result0
+ tags = result0.tags()
+ expected_tag_offsets_src1 = list(range(0,stream_sizes[0]*N,stream_sizes[0]))
+ expected_tag_offsets_src2 = list(range(0,stream_sizes[0]*N,stream_sizes[0]))
+ expected_tag_offsets_src3 = list(range(0,stream_sizes[0]*N,stream_sizes[0]))
+ tags_src1 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src1'))]
+ tags_src2 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src2'))]
+ tags_src3 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src3'))]
+ for i in range(len(expected_tag_offsets_src1)):
+ self.assertTrue(expected_tag_offsets_src1[i] == tags_src1[i].offset)
+ for i in range(len(expected_tag_offsets_src2)):
+ self.assertTrue(expected_tag_offsets_src2[i] == tags_src2[i].offset)
+ for i in range(len(expected_tag_offsets_src3)):
+ self.assertTrue(expected_tag_offsets_src3[i] == tags_src3[i].offset)
+
+ # check the tags - result1
+ tags = result1.tags()
+ expected_tag_offsets_src1 = list(range(0,stream_sizes[1]*N,stream_sizes[0]))
+ expected_tag_offsets_src2 = list(range(1,stream_sizes[1]*N,stream_sizes[1]))
+ expected_tag_offsets_src3 = list()
+ tags_src1 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src1'))]
+ tags_src2 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src2'))]
+ tags_src3 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src3'))]
+ for i in range(len(expected_tag_offsets_src1)):
+ self.assertTrue(expected_tag_offsets_src1[i] == tags_src1[i].offset)
+ for i in range(len(expected_tag_offsets_src2)):
+ self.assertTrue(expected_tag_offsets_src2[i] == tags_src2[i].offset)
+ for i in range(len(expected_tag_offsets_src3)):
+ self.assertTrue(expected_tag_offsets_src3[i] == tags_src3[i].offset)
+
+ # check the tags - result2
+ tags = result2.tags()
+ expected_tag_offsets_src1 = list(range(0,stream_sizes[2]*N,stream_sizes[0]))
+ expected_tag_offsets_src2 = list(range(1,stream_sizes[2]*N,stream_sizes[2]))
+ expected_tag_offsets_src3 = list(range(0,stream_sizes[2]*N,stream_sizes[2]))
+ tags_src1 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src1'))]
+ tags_src2 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src2'))]
+ tags_src3 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src3'))]
+ for i in range(len(expected_tag_offsets_src1)):
+ self.assertTrue(expected_tag_offsets_src1[i] == tags_src1[i].offset)
+ for i in range(len(expected_tag_offsets_src2)):
+ self.assertTrue(expected_tag_offsets_src2[i] == tags_src2[i].offset)
+ for i in range(len(expected_tag_offsets_src3)):
+ self.assertTrue(expected_tag_offsets_src3[i] == tags_src3[i].offset)
+
+if __name__ == '__main__':
+ gr_unittest.run(qa_stream_demux)