diff options
author | Tom Rondeau <trondeau@vt.edu> | 2013-04-17 13:43:52 -0400 |
---|---|---|
committer | Tom Rondeau <trondeau@vt.edu> | 2013-04-29 14:52:56 -0400 |
commit | f3e2e07201c50033bf6c9d0c6a6f068557b4f17f (patch) | |
tree | 140b3c2d20a951ffd4abd564c3378ee2e2f9fc7c /gnuradio-runtime/lib/single_threaded_scheduler.cc | |
parent | 35303ae975a5b1bdecc2492bc96e2b8e89b62a3d (diff) |
runtime: converting runtime core to gr namespace, gnuradio include dir.
Diffstat (limited to 'gnuradio-runtime/lib/single_threaded_scheduler.cc')
-rw-r--r-- | gnuradio-runtime/lib/single_threaded_scheduler.cc | 363 |
1 files changed, 363 insertions, 0 deletions
diff --git a/gnuradio-runtime/lib/single_threaded_scheduler.cc b/gnuradio-runtime/lib/single_threaded_scheduler.cc new file mode 100644 index 0000000000..0d7f91670b --- /dev/null +++ b/gnuradio-runtime/lib/single_threaded_scheduler.cc @@ -0,0 +1,363 @@ +/* -*- c++ -*- */ +/* + * Copyright 2004 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gnuradio/single_threaded_scheduler.h> +#include <gnuradio/block.h> +#include <gnuradio/block_detail.h> +#include <gnuradio/buffer.h> +#include <boost/thread.hpp> +#include <boost/format.hpp> +#include <iostream> +#include <limits> +#include <assert.h> +#include <stdio.h> + +namespace gr { + + // must be defined to either 0 or 1 +#define ENABLE_LOGGING 0 + +#if (ENABLE_LOGGING) +#define LOG(x) do { x; } while(0) +#else +#define LOG(x) do {;} while(0) +#endif + + static int which_scheduler = 0; + + single_threaded_scheduler_sptr + make_single_threaded_scheduler(const std::vector<block_sptr> &blocks) + { + return single_threaded_scheduler_sptr + (new single_threaded_scheduler(blocks)); + } + + single_threaded_scheduler::single_threaded_scheduler(const std::vector<block_sptr> &blocks) + : d_blocks(blocks), d_enabled(true), d_log(0) + { + if(ENABLE_LOGGING) { + std::string name = str(boost::format("sst-%d.log") % which_scheduler++); + d_log = new std::ofstream(name.c_str()); + *d_log << "single_threaded_scheduler: " + << d_blocks.size () + << " blocks\n"; + } + } + + single_threaded_scheduler::~single_threaded_scheduler() + { + if(ENABLE_LOGGING) + delete d_log; + } + + void + single_threaded_scheduler::run() + { + // d_enabled = true; // KLUDGE + main_loop (); + } + + void + single_threaded_scheduler::stop() + { + if(0) + std::cout << "gr_singled_threaded_scheduler::stop() " + << this << std::endl; + d_enabled = false; + } + + inline static unsigned int + round_up(unsigned int n, unsigned int multiple) + { + return ((n + multiple - 1) / multiple) * multiple; + } + + inline static unsigned int + round_down(unsigned int n, unsigned int multiple) + { + return (n / multiple) * multiple; + } + + // + // Return minimum available write space in all our downstream + // buffers or -1 if we're output blocked and the output we're + // blocked on is done. + // + static int + min_available_space(block_detail *d, int output_multiple) + { + int min_space = std::numeric_limits<int>::max(); + + for(int i = 0; i < d->noutputs (); i++) { + int n = round_down (d->output(i)->space_available (), output_multiple); + if(n == 0) { // We're blocked on output. + if(d->output(i)->done()) { // Downstream is done, therefore we're done. + return -1; + } + return 0; + } + min_space = std::min (min_space, n); + } + return min_space; + } + + void + single_threaded_scheduler::main_loop() + { + static const int DEFAULT_CAPACITY = 16; + + int noutput_items; + gr_vector_int ninput_items_required(DEFAULT_CAPACITY); + gr_vector_int ninput_items(DEFAULT_CAPACITY); + gr_vector_const_void_star input_items(DEFAULT_CAPACITY); + gr_vector_void_star output_items(DEFAULT_CAPACITY); + unsigned int bi; + unsigned int nalive; + int max_items_avail; + bool made_progress_last_pass; + bool making_progress; + + for(unsigned i = 0; i < d_blocks.size (); i++) + d_blocks[i]->detail()->set_done (false); // reset any done flags + + for(unsigned i = 0; i < d_blocks.size (); i++) // enable any drivers, etc. + d_blocks[i]->start(); + + bi = 0; + made_progress_last_pass = true; + making_progress = false; + + // Loop while there are still blocks alive + + nalive = d_blocks.size (); + while(d_enabled && nalive > 0) { + if(boost::this_thread::interruption_requested()) + break; + + block *m = d_blocks[bi].get (); + block_detail *d = m->detail().get (); + + LOG(*d_log << std::endl << m); + + if(d->done ()) + goto next_block; + + if(d->source_p ()) { + // Invoke sources as a last resort. As long as the previous + // pass made progress, don't call a source. + if(made_progress_last_pass) { + LOG(*d_log << " Skipping source\n"); + goto next_block; + } + + ninput_items_required.resize (0); + ninput_items.resize (0); + input_items.resize (0); + output_items.resize (d->noutputs ()); + + // determine the minimum available output space + noutput_items = min_available_space (d, m->output_multiple ()); + LOG(*d_log << " source\n noutput_items = " << noutput_items << std::endl); + if(noutput_items == -1) // we're done + goto were_done; + + if(noutput_items == 0) { // we're output blocked + LOG(*d_log << " BLKD_OUT\n"); + goto next_block; + } + + goto setup_call_to_work; // jump to common code + } + + else if(d->sink_p ()) { + ninput_items_required.resize (d->ninputs ()); + ninput_items.resize (d->ninputs ()); + input_items.resize (d->ninputs ()); + output_items.resize (0); + LOG(*d_log << " sink\n"); + + max_items_avail = 0; + for(int i = 0; i < d->ninputs (); i++) { + ninput_items[i] = d->input(i)->items_available(); + //if (ninput_items[i] == 0 && d->input(i)->done()) + if(ninput_items[i] < m->output_multiple() && d->input(i)->done()) + goto were_done; + + max_items_avail = std::max (max_items_avail, ninput_items[i]); + } + + // take a swag at how much output we can sink + noutput_items = (int) (max_items_avail * m->relative_rate ()); + noutput_items = round_down (noutput_items, m->output_multiple ()); + LOG(*d_log << " max_items_avail = " << max_items_avail << std::endl); + LOG(*d_log << " noutput_items = " << noutput_items << std::endl); + + if(noutput_items == 0) { // we're blocked on input + LOG(*d_log << " BLKD_IN\n"); + goto next_block; + } + + goto try_again; // Jump to code shared with regular case. + } + + else { + // do the regular thing + ninput_items_required.resize(d->ninputs ()); + ninput_items.resize(d->ninputs ()); + input_items.resize(d->ninputs ()); + output_items.resize(d->noutputs ()); + + max_items_avail = 0; + for(int i = 0; i < d->ninputs (); i++) { + ninput_items[i] = d->input(i)->items_available (); + max_items_avail = std::max(max_items_avail, ninput_items[i]); + } + + // determine the minimum available output space + noutput_items = min_available_space(d, m->output_multiple ()); + if(ENABLE_LOGGING){ + *d_log << " regular "; + if(m->relative_rate() >= 1.0) + *d_log << "1:" << m->relative_rate() << std::endl; + else + *d_log << 1.0/m->relative_rate() << ":1\n"; + *d_log << " max_items_avail = " << max_items_avail << std::endl; + *d_log << " noutput_items = " << noutput_items << std::endl; + } + if(noutput_items == -1) // we're done + goto were_done; + + if(noutput_items == 0) { // we're output blocked + LOG(*d_log << " BLKD_OUT\n"); + goto next_block; + } + +#if 0 + // Compute best estimate of noutput_items that we can really use. + noutput_items = + std::min((unsigned)noutput_items, + std::max((unsigned)m->output_multiple(), + round_up((unsigned)(max_items_avail * m->relative_rate()), + m->output_multiple()))); + + LOG(*d_log << " revised noutput_items = " << noutput_items << std::endl); +#endif + + try_again: + if(m->fixed_rate()) { + // try to work it forward starting with max_items_avail. + // We want to try to consume all the input we've got. + int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail); + reqd_noutput_items = round_up(reqd_noutput_items, m->output_multiple()); + if(reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items) + noutput_items = reqd_noutput_items; + } + + // ask the block how much input they need to produce noutput_items + m->forecast(noutput_items, ninput_items_required); + + // See if we've got sufficient input available + int i; + for(i = 0; i < d->ninputs (); i++) + if(ninput_items_required[i] > ninput_items[i]) // not enough + break; + + if(i < d->ninputs()) { // not enough input on input[i] + // if we can, try reducing the size of our output request + if(noutput_items > m->output_multiple ()){ + noutput_items /= 2; + noutput_items = round_up (noutput_items, m->output_multiple ()); + goto try_again; + } + + // We're blocked on input + LOG(*d_log << " BLKD_IN\n"); + if(d->input(i)->done()) // If the upstream block is done, we're done + goto were_done; + + // Is it possible to ever fulfill this request? + if(ninput_items_required[i] > d->input(i)->max_possible_items_available ()) { + // Nope, never going to happen... + std::cerr << "\nsched: <block " << m->name() + << " (" << m->unique_id() << ")>" + << " is requesting more input data\n" + << " than we can provide.\n" + << " ninput_items_required = " + << ninput_items_required[i] << "\n" + << " max_possible_items_available = " + << d->input(i)->max_possible_items_available() << "\n" + << " If this is a filter, consider reducing the number of taps.\n"; + goto were_done; + } + + goto next_block; + } + + // We've got enough data on each input to produce noutput_items. + // Finish setting up the call to work. + for(int i = 0; i < d->ninputs (); i++) + input_items[i] = d->input(i)->read_pointer(); + + setup_call_to_work: + + for(int i = 0; i < d->noutputs (); i++) + output_items[i] = d->output(i)->write_pointer(); + + // Do the actual work of the block + int n = m->general_work(noutput_items, ninput_items, + input_items, output_items); + LOG(*d_log << " general_work: noutput_items = " << noutput_items + << " result = " << n << std::endl); + + if(n == -1) // block is done + goto were_done; + + d->produce_each(n); // advance write pointers + if(n > 0) + making_progress = true; + + goto next_block; + } + assert(0); + + were_done: + LOG(*d_log << " were_done\n"); + d->set_done (true); + nalive--; + + next_block: + if(++bi >= d_blocks.size ()) { + bi = 0; + made_progress_last_pass = making_progress; + making_progress = false; + } + } + + for(unsigned i = 0; i < d_blocks.size(); i++) // disable any drivers, etc. + d_blocks[i]->stop(); + } + +} /* namespace gr */ |