diff options
author | Martin Braun <martin.braun@ettus.com> | 2014-03-25 21:32:13 +0100 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2014-03-25 21:32:13 +0100 |
commit | 1da534610a1e1ed0e7d613d39ff47485bb8b7bb6 (patch) | |
tree | 9cf8b35e99a9360bb1f7eb73e075223b1edd5391 | |
parent | 1092e685defd10692d3fa47435c716a88dfd8712 (diff) |
blocks: refactored stream_mux to be more flexible with buffer sizes
-rw-r--r-- | gr-blocks/lib/stream_mux_impl.cc | 115 | ||||
-rw-r--r-- | gr-blocks/lib/stream_mux_impl.h | 6 | ||||
-rwxr-xr-x | gr-blocks/python/blocks/qa_stream_mux.py | 16 |
3 files changed, 69 insertions, 68 deletions
diff --git a/gr-blocks/lib/stream_mux_impl.cc b/gr-blocks/lib/stream_mux_impl.cc index 1e42c2504f..698cf89d09 100644 --- a/gr-blocks/lib/stream_mux_impl.cc +++ b/gr-blocks/lib/stream_mux_impl.cc @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2012 Free Software Foundation, Inc. + * Copyright 2012,2014 Free Software Foundation, Inc. * * This file is part of GNU Radio * @@ -26,10 +26,8 @@ #include "stream_mux_impl.h" #include <gnuradio/io_signature.h> -#include <string.h> -#include <cstdio> - -#define VERBOSE 0 +#include <boost/foreach.hpp> +#include <cstring> namespace gr { namespace blocks { @@ -48,8 +46,11 @@ namespace gr { d_residual(0), d_lengths(lengths) { - if(d_lengths[d_stream] == 0) { - increment_stream(); + while (d_lengths[d_stream] == 0) { + d_stream++; + if (d_stream == d_lengths.size()) { + throw std::invalid_argument("At least one size must be non-zero."); + } } d_residual = d_lengths[d_stream]; } @@ -58,69 +59,57 @@ namespace gr { stream_mux_impl::forecast(int noutput_items, gr_vector_int &ninput_items_required) { unsigned ninputs = ninput_items_required.size (); - for (unsigned i = 0; i < ninputs; i++) - ninput_items_required[i] = (d_lengths[i] == 0 ? 0 : 1); - } - - void - stream_mux_impl::increment_stream() - { - do { - d_stream = (d_stream+1) % d_lengths.size(); - } while(d_lengths[d_stream] == 0); - - d_residual = d_lengths[d_stream]; + for (unsigned i = 0; i < ninputs; i++) { + // Only active inputs *need* items, for the rest, it would just be nice + ninput_items_required[i] = (d_stream == i ? 1 : 0); + } } int stream_mux_impl::general_work(int noutput_items, - gr_vector_int &ninput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items) - { + gr_vector_int &ninput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items + ){ char *out = (char *) output_items[0]; const char *in; - int out_index = 0; - std::vector<int> input_index(d_lengths.size(), 0); - - if(VERBOSE) { - printf("mux: nouput_items: %d d_stream: %d\n", noutput_items, d_stream); - for(size_t i = 0; i < d_lengths.size(); i++) - printf("\tninput_items[%zu]: %d\n", i, ninput_items[i]); - } - - while (1) { - int r = std::min(noutput_items - out_index, - std::min(d_residual, - ninput_items[d_stream] - input_index[d_stream])); - if(VERBOSE) { - printf("mux: r=%d\n", r); - printf("\tnoutput_items - out_index: %d\n", - noutput_items - out_index); - printf("\td_residual: %d\n", - d_residual); - printf("\tninput_items[d_stream] - input_index[d_stream]: %d\n", - ninput_items[d_stream] - input_index[d_stream]); - } - - if(r <= 0) { - return out_index; - } - - in = (const char *) input_items[d_stream] + input_index[d_stream]*d_itemsize; - - memcpy(&out[out_index*d_itemsize], in, r*d_itemsize); - out_index += r; - input_index[d_stream] += r; - d_residual -= r; - - consume(d_stream, r); - - if(d_residual == 0) { - increment_stream(); - } + int out_index = 0; // Items written + gr_vector_int input_index(d_lengths.size(), 0); // Items read + + while (out_index < noutput_items) { + if (ninput_items[d_stream] <= input_index[d_stream]) { + break; + } + int space_left_in_buffers = std::min( + noutput_items - out_index, // Space left in output buffer + ninput_items[d_stream] - input_index[d_stream] // Space left in input buffer + ); + int items_to_copy = std::min( + space_left_in_buffers, + d_residual + ); + in = (const char *) input_items[d_stream] + input_index[d_stream]*d_itemsize; + memcpy(&out[out_index*d_itemsize], in, items_to_copy*d_itemsize); + out_index += items_to_copy; + input_index[d_stream] += items_to_copy; + d_residual -= items_to_copy; + if (d_residual == 0) { + do { // Skip all those inputs with zero length + d_stream = (d_stream+1) % d_lengths.size(); + } while (d_lengths[d_stream] == 0); + d_residual = d_lengths[d_stream]; + } else { + break; + } + } // while + + for (size_t i = 0; i < input_index.size(); i++) { + consume((int) i, input_index[i]); } - } + + return out_index; + } /* work */ + } /* namespace blocks */ } /* namespace gr */ diff --git a/gr-blocks/lib/stream_mux_impl.h b/gr-blocks/lib/stream_mux_impl.h index 328eb0710e..67be9381af 100644 --- a/gr-blocks/lib/stream_mux_impl.h +++ b/gr-blocks/lib/stream_mux_impl.h @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2012 Free Software Foundation, Inc. + * Copyright 2012,2014 Free Software Foundation, Inc. * * This file is part of GNU Radio * @@ -36,10 +36,8 @@ namespace gr { int d_residual; // number if items left to put into current stream gr_vector_int d_lengths; // number if items to pack per stream - void increment_stream(); - void forecast(int noutput_items, gr_vector_int &ninput_items_required); - + public: stream_mux_impl(size_t itemsize, const std::vector<int> &lengths); diff --git a/gr-blocks/python/blocks/qa_stream_mux.py b/gr-blocks/python/blocks/qa_stream_mux.py index 7abbced54c..00e32e955e 100755 --- a/gr-blocks/python/blocks/qa_stream_mux.py +++ b/gr-blocks/python/blocks/qa_stream_mux.py @@ -1,6 +1,6 @@ #!/usr/bin/env python # -# Copyright 2004,2005,2007,2010,2012,2013 Free Software Foundation, Inc. +# Copyright 2004,2005,2007,2010,2012,2013,2014 Free Software Foundation, Inc. # # This file is part of GNU Radio # @@ -167,5 +167,19 @@ class test_stream_mux (gr_unittest.TestCase): self.assertEqual (exp_data, result_data) + def test_largeN_ff(self): + stream_sizes = [3, 8191] + r1 = (1,) * stream_sizes[0] + r2 = (2,) * stream_sizes[1] + v0 = blocks.vector_source_f(r1, repeat=False) + v1 = blocks.vector_source_f(r2, repeat=False) + mux = blocks.stream_mux(gr.sizeof_float, stream_sizes) + dst = blocks.vector_sink_f () + self.tb.connect (v0, (mux,0)) + self.tb.connect (v1, (mux,1)) + self.tb.connect (mux, dst) + self.tb.run () + self.assertEqual (r1 + r2, dst.data()) + if __name__ == '__main__': gr_unittest.run(test_stream_mux, "test_stream_mux.xml") |