diff options
author | David Pi <david.pinho@gmail.com> | 2020-09-23 11:53:32 +0100 |
---|---|---|
committer | mormj <34754695+mormj@users.noreply.github.com> | 2020-09-25 06:40:13 -0400 |
commit | c196dacea56ef938902646a2143907858415b240 (patch) | |
tree | afb260d0803e14c71971e0d5eec28f70be1939bc /gr-blocks | |
parent | 8f3d61984ecd9ecc97789a174a54c7f0505bddb4 (diff) |
blocks: New block 'Stream Demux'
Stream demuxing block to demultiplex one stream into N output streams.
Demuxes a stream producing N outputs streams that contains n_0 items in
the first stream, n_1 items in the second, etc. and repeats. Number of
items of each output stream is specified using the 'lengths' parameter
like so [n_0, n_1, ..., n_N-1].
Example:
lengths = [2, 3, 4]
input stream: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, ...]
output_streams: [0, 1, 9, 10, ...]
[2, 3, 4, 11, ...]
[5, 6, 7, 8, ...]
Diffstat (limited to 'gr-blocks')
-rw-r--r-- | gr-blocks/grc/blocks.tree.yml | 1 | ||||
-rw-r--r-- | gr-blocks/grc/blocks_stream_demux.block.yml | 54 | ||||
-rw-r--r-- | gr-blocks/include/gnuradio/blocks/CMakeLists.txt | 1 | ||||
-rw-r--r-- | gr-blocks/include/gnuradio/blocks/stream_demux.h | 70 | ||||
-rw-r--r-- | gr-blocks/lib/CMakeLists.txt | 1 | ||||
-rw-r--r-- | gr-blocks/lib/stream_demux_impl.cc | 114 | ||||
-rw-r--r-- | gr-blocks/lib/stream_demux_impl.h | 53 | ||||
-rw-r--r-- | gr-blocks/python/blocks/bindings/CMakeLists.txt | 5 | ||||
-rw-r--r-- | gr-blocks/python/blocks/bindings/docstrings/stream_demux_pydoc_template.h | 24 | ||||
-rw-r--r-- | gr-blocks/python/blocks/bindings/python_bindings.cc | 2 | ||||
-rw-r--r-- | gr-blocks/python/blocks/bindings/stream_demux_python.cc | 46 | ||||
-rwxr-xr-x | gr-blocks/python/blocks/qa_stream_demux.py | 292 |
12 files changed, 661 insertions, 2 deletions
diff --git a/gr-blocks/grc/blocks.tree.yml b/gr-blocks/grc/blocks.tree.yml index d01650f3fc..c6a67cf39f 100644 --- a/gr-blocks/grc/blocks.tree.yml +++ b/gr-blocks/grc/blocks.tree.yml @@ -127,6 +127,7 @@ - blocks_interleave - blocks_keep_m_in_n - blocks_keep_one_in_n + - blocks_stream_demux - blocks_stream_mux - blocks_stream_to_streams - blocks_stream_to_vector diff --git a/gr-blocks/grc/blocks_stream_demux.block.yml b/gr-blocks/grc/blocks_stream_demux.block.yml new file mode 100644 index 0000000000..c0c3eacf1b --- /dev/null +++ b/gr-blocks/grc/blocks_stream_demux.block.yml @@ -0,0 +1,54 @@ +id: blocks_stream_demux +label: Stream Demux +flags: [ python, cpp ] + +parameters: +- id: type + label: Type + dtype: enum + options: [complex, float, int, short, byte] + option_attributes: + size: [gr.sizeof_gr_complex, gr.sizeof_float, gr.sizeof_int, gr.sizeof_short, + gr.sizeof_char] + hide: part +- id: lengths + label: Lengths + dtype: int_vector + default: 1, 1 +- id: num_outputs + label: Num Outputs + dtype: int + default: '2' + hide: part +- id: vlen + label: Vec Length + dtype: int + default: '1' + hide: ${ 'part' if vlen == 1 else 'none' } + +inputs: +- domain: stream + dtype: ${ type } + vlen: ${ vlen } + +outputs: +- domain: stream + dtype: ${ type } + vlen: ${ vlen } + multiplicity: ${ num_outputs } + +asserts: +- ${ num_outputs > 0 } +- ${ num_outputs == len(lengths) } +- ${ vlen > 0 } + +templates: + imports: from gnuradio import blocks + make: blocks.stream_demux(${type.size}*${vlen}, ${lengths}) + +cpp_templates: + includes: ['#include <gnuradio/blocks/stream_demux.h>'] + declarations: 'blocks::stream_demux::sptr ${id};' + make: 'this->${id} = blocks::stream_demux::make(${type.size}*${vlen}, ${lengths});' + +file_format: 1 diff --git a/gr-blocks/include/gnuradio/blocks/CMakeLists.txt b/gr-blocks/include/gnuradio/blocks/CMakeLists.txt index 6c980da423..9f88792221 100644 --- a/gr-blocks/include/gnuradio/blocks/CMakeLists.txt +++ b/gr-blocks/include/gnuradio/blocks/CMakeLists.txt @@ -131,6 +131,7 @@ install(FILES short_to_float.h skiphead.h socket_pdu.h + stream_demux.h stream_mux.h stream_to_streams.h stream_to_tagged_stream.h diff --git a/gr-blocks/include/gnuradio/blocks/stream_demux.h b/gr-blocks/include/gnuradio/blocks/stream_demux.h new file mode 100644 index 0000000000..ca229c4556 --- /dev/null +++ b/gr-blocks/include/gnuradio/blocks/stream_demux.h @@ -0,0 +1,70 @@ +/* -*- c++ -*- */ +/* + * Copyright 2020 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifndef INCLUDED_BLOCKS_STREAM_DEMUX_H +#define INCLUDED_BLOCKS_STREAM_DEMUX_H + +#include <gnuradio/block.h> +#include <gnuradio/blocks/api.h> +#include <vector> + +namespace gr { +namespace blocks { + +/*! + * \brief Stream demuxing block to demultiplex one stream into N output streams + * \ingroup stream_operators_blk + * + * \details + * Demuxes a stream producing N outputs streams that contains n_0 items in + * the first stream, n_1 items in the second, etc. and repeats. Number of + * items of each output stream is specified using the 'lengths' parameter + * like so [n_0, n_1, ..., n_N-1]. + * + * Example: + * lengths = [2, 3, 4] + * input stream: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, ...] + * output_streams: [0, 1, 9, 10, ...] + * [2, 3, 4, 11, ...] + * [5, 6, 7, 8, ...] + */ +class BLOCKS_API stream_demux : virtual public gr::block +{ +public: + // gr::blocks::stream_demux::sptr + typedef std::shared_ptr<stream_demux> sptr; + + /*! + * \brief Stream demuxing block to demultiplex one stream into N output streams + * + * \param itemsize the item size of the stream + * \param lengths a vector (list/tuple) specifying the number of + * items to copy to each output stream. + * + */ + static sptr make(size_t itemsize, const std::vector<int>& lengths); +}; + +} // namespace blocks +} // namespace gr + +#endif /* INCLUDED_BLOCKS_STREAM_DEMUX_H */ diff --git a/gr-blocks/lib/CMakeLists.txt b/gr-blocks/lib/CMakeLists.txt index e952088e60..bfe3eaa91e 100644 --- a/gr-blocks/lib/CMakeLists.txt +++ b/gr-blocks/lib/CMakeLists.txt @@ -135,6 +135,7 @@ set(BLOCKS_SOURCES short_to_float_impl.cc skiphead_impl.cc socket_pdu_impl.cc + stream_demux_impl.cc stream_mux_impl.cc stream_pdu_base.cc stream_to_streams_impl.cc diff --git a/gr-blocks/lib/stream_demux_impl.cc b/gr-blocks/lib/stream_demux_impl.cc new file mode 100644 index 0000000000..a7298a78df --- /dev/null +++ b/gr-blocks/lib/stream_demux_impl.cc @@ -0,0 +1,114 @@ +/* -*- c++ -*- */ +/* + * Copyright 2020 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "stream_demux_impl.h" +#include <gnuradio/io_signature.h> +#include <cstring> + +namespace gr { +namespace blocks { + +stream_demux::sptr stream_demux::make(size_t itemsize, const std::vector<int>& lengths) +{ + return gnuradio::make_block_sptr<stream_demux_impl>(itemsize, lengths); +} + +stream_demux_impl::stream_demux_impl(size_t itemsize, const std::vector<int>& lengths) + : gr::block("stream_demux", + gr::io_signature::make(1, 1, itemsize), + gr::io_signature::make(1, -1, itemsize)), + d_itemsize(itemsize), + d_lengths(lengths) +{ + while (d_lengths[d_stream] == 0) { + d_stream++; + if (d_stream == d_lengths.size()) { + throw std::invalid_argument("At least one size must be non-zero."); + } + } + d_residual = d_lengths[d_stream]; + set_tag_propagation_policy(TPP_DONT); +} + +void stream_demux_impl::forecast(int noutput_items, gr_vector_int& ninput_items_required) +{ + std::fill(ninput_items_required.begin(), ninput_items_required.end(), 1); +} + +int stream_demux_impl::general_work(int noutput_items, + gr_vector_int& ninput_items, + gr_vector_const_void_star& input_items, + gr_vector_void_star& output_items) +{ + int input_index = 0; // Items read + gr_vector_int output_index(d_lengths.size()); // Items written + + while (input_index < ninput_items[0]) { + if (noutput_items <= output_index[d_stream]) { + break; + } + const int space_left_in_buffers = + std::min(ninput_items[0] - input_index, // Space left in input buffer + noutput_items - output_index[d_stream] // Space left in output buffer + ); + const int items_to_copy = std::min(space_left_in_buffers, d_residual); + const char* in = + reinterpret_cast<const char*>(input_items[0]) + input_index * d_itemsize; + char* out = reinterpret_cast<char*>(output_items[d_stream]); + memcpy(&out[output_index[d_stream] * d_itemsize], in, items_to_copy * d_itemsize); + + std::vector<gr::tag_t> stream_t; + get_tags_in_window(stream_t, 0, input_index, input_index + items_to_copy); + for (auto& t : stream_t) { + t.offset = t.offset - nitems_read(0) - input_index + + nitems_written(d_stream) + output_index[d_stream]; + add_item_tag(d_stream, t); + } + + output_index[d_stream] += items_to_copy; + input_index += items_to_copy; + d_residual -= items_to_copy; + if (d_residual == 0) { + do { // Skip all those outputs with zero length + d_stream = (d_stream + 1) % d_lengths.size(); + } while (d_lengths[d_stream] == 0); + d_residual = d_lengths[d_stream]; + } else { + break; + } + } // while + + consume(0, input_index); + + for (size_t i = 0; i < output_index.size(); i++) { + produce((int)i, output_index[i]); + } + + return WORK_CALLED_PRODUCE; +} + +} /* namespace blocks */ +} /* namespace gr */ diff --git a/gr-blocks/lib/stream_demux_impl.h b/gr-blocks/lib/stream_demux_impl.h new file mode 100644 index 0000000000..8fabf5e613 --- /dev/null +++ b/gr-blocks/lib/stream_demux_impl.h @@ -0,0 +1,53 @@ +/* -*- c++ -*- */ +/* + * Copyright 2020 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifndef INCLUDED_BLOCKS_STREAM_DEMUX_IMPL_H +#define INCLUDED_BLOCKS_STREAM_DEMUX_IMPL_H + +#include <gnuradio/blocks/stream_demux.h> + +namespace gr { +namespace blocks { + +class BLOCKS_API stream_demux_impl : public stream_demux +{ +private: + const size_t d_itemsize; + unsigned int d_stream{ 0 }; // index of currently selected stream + int d_residual{ 0 }; // number of items left to put into current stream + const gr_vector_int d_lengths; // number of items to pack per stream + +public: + stream_demux_impl(size_t itemsize, const std::vector<int>& lengths); + + void forecast(int noutput_items, gr_vector_int& ninput_items_required); + + int general_work(int noutput_items, + gr_vector_int& ninput_items, + gr_vector_const_void_star& input_items, + gr_vector_void_star& output_items); +}; + +} // namespace blocks +} // namespace gr + +#endif /* INCLUDED_BLOCKS_STREAM_DEMUX_IMPL_H */ diff --git a/gr-blocks/python/blocks/bindings/CMakeLists.txt b/gr-blocks/python/blocks/bindings/CMakeLists.txt index 5fefaec511..257afe0119 100644 --- a/gr-blocks/python/blocks/bindings/CMakeLists.txt +++ b/gr-blocks/python/blocks/bindings/CMakeLists.txt @@ -130,6 +130,7 @@ list(APPEND blocks_python_files short_to_float_python.cc skiphead_python.cc socket_pdu_python.cc + stream_demux_python.cc stream_mux_python.cc stream_to_streams_python.cc stream_to_tagged_stream_python.cc @@ -178,8 +179,8 @@ if (SNDFILE_FOUND) wavfile_source_python.cc) endif() -GR_PYBIND_MAKE_CHECK_HASH(blocks - ../../.. +GR_PYBIND_MAKE_CHECK_HASH(blocks + ../../.. gr::blocks "${blocks_python_files}") diff --git a/gr-blocks/python/blocks/bindings/docstrings/stream_demux_pydoc_template.h b/gr-blocks/python/blocks/bindings/docstrings/stream_demux_pydoc_template.h new file mode 100644 index 0000000000..39470983cb --- /dev/null +++ b/gr-blocks/python/blocks/bindings/docstrings/stream_demux_pydoc_template.h @@ -0,0 +1,24 @@ +/* + * Copyright 2020 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ +#include "pydoc_macros.h" +#define D(...) DOC(gr, blocks, __VA_ARGS__) +/* + This file contains placeholders for docstrings for the Python bindings. + Do not edit! These were automatically extracted during the binding process + and will be overwritten during the build process + */ + + +static const char* __doc_gr_blocks_stream_demux = R"doc()doc"; + + +static const char* __doc_gr_blocks_stream_demux_stream_demux = R"doc()doc"; + + +static const char* __doc_gr_blocks_stream_demux_make = R"doc()doc"; diff --git a/gr-blocks/python/blocks/bindings/python_bindings.cc b/gr-blocks/python/blocks/bindings/python_bindings.cc index b815fae14f..155ce38b19 100644 --- a/gr-blocks/python/blocks/bindings/python_bindings.cc +++ b/gr-blocks/python/blocks/bindings/python_bindings.cc @@ -134,6 +134,7 @@ void bind_short_to_char(py::module&); void bind_short_to_float(py::module&); void bind_skiphead(py::module&); void bind_socket_pdu(py::module&); +void bind_stream_demux(py::module&); void bind_stream_mux(py::module&); void bind_stream_to_streams(py::module&); void bind_stream_to_tagged_stream(py::module&); @@ -316,6 +317,7 @@ PYBIND11_MODULE(blocks_python, m) bind_short_to_float(m); bind_skiphead(m); bind_socket_pdu(m); + bind_stream_demux(m); bind_stream_mux(m); bind_stream_to_streams(m); bind_stream_to_tagged_stream(m); diff --git a/gr-blocks/python/blocks/bindings/stream_demux_python.cc b/gr-blocks/python/blocks/bindings/stream_demux_python.cc new file mode 100644 index 0000000000..e7cccfea07 --- /dev/null +++ b/gr-blocks/python/blocks/bindings/stream_demux_python.cc @@ -0,0 +1,46 @@ +/* + * Copyright 2020 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +/***********************************************************************************/ +/* This file is automatically generated using bindtool and can be manually edited */ +/* The following lines can be configured to regenerate this file during cmake */ +/* If manual edits are made, the following tags should be modified accordingly. */ +/* BINDTOOL_GEN_AUTOMATIC(0) */ +/* BINDTOOL_USE_PYGCCXML(0) */ +/* BINDTOOL_HEADER_FILE(stream_demux.h) */ +/* BINDTOOL_HEADER_FILE_HASH(76f72d6993d70c9da3a497eacd72280a) */ +/***********************************************************************************/ + +#include <pybind11/complex.h> +#include <pybind11/pybind11.h> +#include <pybind11/stl.h> + +namespace py = pybind11; + +#include <gnuradio/blocks/stream_demux.h> +// pydoc.h is automatically generated in the build directory +#include <stream_demux_pydoc.h> + +void bind_stream_demux(py::module& m) +{ + + using stream_demux = ::gr::blocks::stream_demux; + + + py::class_<stream_demux, gr::block, gr::basic_block, std::shared_ptr<stream_demux>>( + m, "stream_demux", D(stream_demux)) + + .def(py::init(&stream_demux::make), + py::arg("itemsize"), + py::arg("lengths"), + D(stream_demux, make)) + + + ; +} diff --git a/gr-blocks/python/blocks/qa_stream_demux.py b/gr-blocks/python/blocks/qa_stream_demux.py new file mode 100755 index 0000000000..da9aa46be0 --- /dev/null +++ b/gr-blocks/python/blocks/qa_stream_demux.py @@ -0,0 +1,292 @@ +#!/usr/bin/env python +# +# Copyright 2020 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# GNU Radio is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# GNU Radio is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with GNU Radio; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# +# + + +from gnuradio import gr, gr_unittest, blocks +import pmt +import os + +class qa_stream_demux(gr_unittest.TestCase): + + def setUp (self): + os.environ['GR_CONF_CONTROLPORT_ON'] = 'False' + self.tb = gr.top_block () + + def tearDown (self): + self.tb = None + + def help_stream_2ff(self, N, stream_sizes): + v = blocks.vector_source_f(N*[1,] + N*[2,], False) + + demux = blocks.stream_demux(gr.sizeof_float, stream_sizes) + + dst0 = blocks.vector_sink_f() + dst1 = blocks.vector_sink_f() + + self.tb.connect(v, demux) + self.tb.connect((demux,0), dst0) + self.tb.connect((demux,1), dst1) + self.tb.run() + + return (dst0.data(), dst1.data()) + + def help_stream_ramp_2ff(self, N, stream_sizes): + r = list(range(N)) + list(reversed(range(N))) + + v = blocks.vector_source_f(r, False) + + demux = blocks.stream_demux(gr.sizeof_float, stream_sizes) + + dst0 = blocks.vector_sink_f() + dst1 = blocks.vector_sink_f() + + self.tb.connect(v, demux) + self.tb.connect((demux,0), dst0) + self.tb.connect((demux,1), dst1) + self.tb.run() + + return (dst0.data(), dst1.data()) + + def help_stream_tag_propagation(self, N, stream_sizes): + src_data = (stream_sizes[0]*[1,] + stream_sizes[1]*[2,] + stream_sizes[2]*[3,]) * N + + src = blocks.vector_source_f(src_data, False) + + tag_stream1 = blocks.stream_to_tagged_stream(gr.sizeof_float, 1, + stream_sizes[0], 'src1') + tag_stream2 = blocks.stream_to_tagged_stream(gr.sizeof_float, 1, + stream_sizes[1], 'src2') + tag_stream3 = blocks.stream_to_tagged_stream(gr.sizeof_float, 1, + stream_sizes[2], 'src3') + + demux = blocks.stream_demux(gr.sizeof_float, stream_sizes) + dst0 = blocks.vector_sink_f() + dst1 = blocks.vector_sink_f() + dst2 = blocks.vector_sink_f() + + self.tb.connect(src, tag_stream1) + self.tb.connect(tag_stream1, tag_stream2) + self.tb.connect(tag_stream2, tag_stream3) + self.tb.connect(tag_stream3, demux) + self.tb.connect((demux,0), dst0) + self.tb.connect((demux,1), dst1) + self.tb.connect((demux,2), dst2) + self.tb.run() + + return (dst0, dst1, dst2) + + def test_stream_2NN_ff(self): + N = 40 + stream_sizes = [10, 10] + result_data = self.help_stream_2ff(N, stream_sizes) + + exp_data0 = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0] + exp_data1 = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0] + + self.assertEqual (exp_data0, result_data[0]) + self.assertEqual (exp_data1, result_data[1]) + + def test_stream_ramp_2NN_ff(self): + N = 40 + stream_sizes = [10, 10] + result_data = self.help_stream_ramp_2ff(N, stream_sizes) + + exp_data0 = [ 0.0, 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, + 20.0, 21.0, 22.0, 23.0, 24.0, 25.0, 26.0, 27.0, 28.0, 29.0, + 39.0, 38.0, 37.0, 36.0, 35.0, 34.0, 33.0, 32.0, 31.0, 30.0, + 19.0, 18.0, 17.0, 16.0, 15.0, 14.0, 13.0, 12.0, 11.0, 10.0] + exp_data1 = [10.0, 11.0, 12.0, 13.0, 14.0, 15.0, 16.0, 17.0, 18.0, 19.0, + 30.0, 31.0, 32.0, 33.0, 34.0, 35.0, 36.0, 37.0, 38.0, 39.0, + 29.0, 28.0, 27.0, 26.0, 25.0, 24.0, 23.0, 22.0, 21.0, 20.0, + 9.0, 8.0, 7.0, 6.0, 5.0, 4.0, 3.0, 2.0, 1.0, 0.0] + + self.assertEqual (exp_data0, result_data[0]) + self.assertEqual (exp_data1, result_data[1]) + + def test_stream_2NM_ff(self): + N = 40 + stream_sizes = [7, 9] + self.help_stream_2ff(N, stream_sizes) + + result_data = self.help_stream_2ff(N, stream_sizes) + + exp_data0 = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0] + + exp_data1 = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 1.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0] + + self.assertEqual (exp_data0, result_data[0]) + self.assertEqual (exp_data1, result_data[1]) + + + def test_stream_2MN_ff(self): + N = 37 + stream_sizes = [7, 9] + self.help_stream_2ff(N, stream_sizes) + + result_data = self.help_stream_2ff(N, stream_sizes) + + exp_data0 = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 2.0, 2.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0] + + exp_data1 = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 2.0, 2.0, 2.0] + + self.assertEqual (exp_data0, result_data[0]) + self.assertEqual (exp_data1, result_data[1]) + + def test_stream_2N0_ff(self): + N = 30 + stream_sizes = [7, 0] + self.help_stream_2ff(N, stream_sizes) + + result_data = self.help_stream_2ff(N, stream_sizes) + + exp_data0 = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 1.0, 1.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 2.0, 2.0, 2.0, 2.0] + exp_data1 = [] + + self.assertEqual (exp_data0, result_data[0]) + self.assertEqual (exp_data1, result_data[1]) + + def test_stream_20N_ff(self): + N = 30 + stream_sizes = [0, 9] + self.help_stream_2ff(N, stream_sizes) + + result_data = self.help_stream_2ff(N, stream_sizes) + + exp_data0 = [] + exp_data1 = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, + 1.0, 1.0, 1.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, + 2.0, 2.0, 2.0, 2.0, 2.0, 2.0] + + self.assertEqual (exp_data0, result_data[0]) + self.assertEqual (exp_data1, result_data[1]) + + def test_largeN_ff(self): + stream_sizes = [3, 8191] + r0 = [1.0,] * stream_sizes[0] + r1 = [2.0,] * stream_sizes[1] + v = blocks.vector_source_f(r0 + r1, repeat=False) + demux = blocks.stream_demux(gr.sizeof_float, stream_sizes) + dst0 = blocks.vector_sink_f() + dst1 = blocks.vector_sink_f() + self.tb.connect (v, demux) + self.tb.connect ((demux,0), dst0) + self.tb.connect ((demux,1), dst1) + self.tb.run () + self.assertEqual (r0, dst0.data()) + self.assertEqual (r1, dst1.data()) + + def test_tag_propagation(self): + N = 10 # Block length + stream_sizes = [1,2,3] + + expected_result0 = N*(stream_sizes[0]*[1,]) + expected_result1 = N*(stream_sizes[1]*[2,]) + expected_result2 = N*(stream_sizes[2]*[3,]) + + # check the data + (result0, result1, result2) = self.help_stream_tag_propagation(N, stream_sizes) + self.assertFloatTuplesAlmostEqual(expected_result0, result0.data(), places=6) + self.assertFloatTuplesAlmostEqual(expected_result1, result1.data(), places=6) + self.assertFloatTuplesAlmostEqual(expected_result2, result2.data(), places=6) + + # check the tags - result0 + tags = result0.tags() + expected_tag_offsets_src1 = list(range(0,stream_sizes[0]*N,stream_sizes[0])) + expected_tag_offsets_src2 = list(range(0,stream_sizes[0]*N,stream_sizes[0])) + expected_tag_offsets_src3 = list(range(0,stream_sizes[0]*N,stream_sizes[0])) + tags_src1 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src1'))] + tags_src2 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src2'))] + tags_src3 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src3'))] + for i in range(len(expected_tag_offsets_src1)): + self.assertTrue(expected_tag_offsets_src1[i] == tags_src1[i].offset) + for i in range(len(expected_tag_offsets_src2)): + self.assertTrue(expected_tag_offsets_src2[i] == tags_src2[i].offset) + for i in range(len(expected_tag_offsets_src3)): + self.assertTrue(expected_tag_offsets_src3[i] == tags_src3[i].offset) + + # check the tags - result1 + tags = result1.tags() + expected_tag_offsets_src1 = list(range(0,stream_sizes[1]*N,stream_sizes[0])) + expected_tag_offsets_src2 = list(range(1,stream_sizes[1]*N,stream_sizes[1])) + expected_tag_offsets_src3 = list() + tags_src1 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src1'))] + tags_src2 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src2'))] + tags_src3 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src3'))] + for i in range(len(expected_tag_offsets_src1)): + self.assertTrue(expected_tag_offsets_src1[i] == tags_src1[i].offset) + for i in range(len(expected_tag_offsets_src2)): + self.assertTrue(expected_tag_offsets_src2[i] == tags_src2[i].offset) + for i in range(len(expected_tag_offsets_src3)): + self.assertTrue(expected_tag_offsets_src3[i] == tags_src3[i].offset) + + # check the tags - result2 + tags = result2.tags() + expected_tag_offsets_src1 = list(range(0,stream_sizes[2]*N,stream_sizes[0])) + expected_tag_offsets_src2 = list(range(1,stream_sizes[2]*N,stream_sizes[2])) + expected_tag_offsets_src3 = list(range(0,stream_sizes[2]*N,stream_sizes[2])) + tags_src1 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src1'))] + tags_src2 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src2'))] + tags_src3 = [tag for tag in tags if pmt.eq(tag.key, pmt.intern('src3'))] + for i in range(len(expected_tag_offsets_src1)): + self.assertTrue(expected_tag_offsets_src1[i] == tags_src1[i].offset) + for i in range(len(expected_tag_offsets_src2)): + self.assertTrue(expected_tag_offsets_src2[i] == tags_src2[i].offset) + for i in range(len(expected_tag_offsets_src3)): + self.assertTrue(expected_tag_offsets_src3[i] == tags_src3[i].offset) + +if __name__ == '__main__': + gr_unittest.run(qa_stream_demux) |