diff options
Diffstat (limited to 'gnuradio-runtime/lib/single_threaded_scheduler.cc')
-rw-r--r-- | gnuradio-runtime/lib/single_threaded_scheduler.cc | 363 |
1 files changed, 0 insertions, 363 deletions
diff --git a/gnuradio-runtime/lib/single_threaded_scheduler.cc b/gnuradio-runtime/lib/single_threaded_scheduler.cc deleted file mode 100644 index c86d26ca3a..0000000000 --- a/gnuradio-runtime/lib/single_threaded_scheduler.cc +++ /dev/null @@ -1,363 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2004 Free Software Foundation, Inc. - * - * This file is part of GNU Radio - * - * GNU Radio is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3, or (at your option) - * any later version. - * - * GNU Radio is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with GNU Radio; see the file COPYING. If not, write to - * the Free Software Foundation, Inc., 51 Franklin Street, - * Boston, MA 02110-1301, USA. - */ - -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif - -#include "single_threaded_scheduler.h" -#include <gnuradio/block.h> -#include <gnuradio/block_detail.h> -#include <gnuradio/buffer.h> -#include <boost/thread.hpp> -#include <boost/format.hpp> -#include <iostream> -#include <limits> -#include <assert.h> -#include <stdio.h> - -namespace gr { - - // must be defined to either 0 or 1 -#define ENABLE_LOGGING 0 - -#if (ENABLE_LOGGING) -#define LOG(x) do { x; } while(0) -#else -#define LOG(x) do {;} while(0) -#endif - - static int which_scheduler = 0; - - single_threaded_scheduler_sptr - make_single_threaded_scheduler(const std::vector<block_sptr> &blocks) - { - return single_threaded_scheduler_sptr - (new single_threaded_scheduler(blocks)); - } - - single_threaded_scheduler::single_threaded_scheduler(const std::vector<block_sptr> &blocks) - : d_blocks(blocks), d_enabled(true), d_log(0) - { - if(ENABLE_LOGGING) { - std::string name = str(boost::format("sst-%d.log") % which_scheduler++); - d_log = new std::ofstream(name.c_str()); - *d_log << "single_threaded_scheduler: " - << d_blocks.size () - << " blocks\n"; - } - } - - single_threaded_scheduler::~single_threaded_scheduler() - { - if(ENABLE_LOGGING) - delete d_log; - } - - void - single_threaded_scheduler::run() - { - // d_enabled = true; // KLUDGE - main_loop (); - } - - void - single_threaded_scheduler::stop() - { - if(0) - std::cout << "gr_singled_threaded_scheduler::stop() " - << this << std::endl; - d_enabled = false; - } - - inline static unsigned int - round_up(unsigned int n, unsigned int multiple) - { - return ((n + multiple - 1) / multiple) * multiple; - } - - inline static unsigned int - round_down(unsigned int n, unsigned int multiple) - { - return (n / multiple) * multiple; - } - - // - // Return minimum available write space in all our downstream - // buffers or -1 if we're output blocked and the output we're - // blocked on is done. - // - static int - min_available_space(block_detail *d, int output_multiple) - { - int min_space = std::numeric_limits<int>::max(); - - for(int i = 0; i < d->noutputs (); i++) { - int n = round_down (d->output(i)->space_available (), output_multiple); - if(n == 0) { // We're blocked on output. - if(d->output(i)->done()) { // Downstream is done, therefore we're done. - return -1; - } - return 0; - } - min_space = std::min (min_space, n); - } - return min_space; - } - - void - single_threaded_scheduler::main_loop() - { - static const int DEFAULT_CAPACITY = 16; - - int noutput_items; - gr_vector_int ninput_items_required(DEFAULT_CAPACITY); - gr_vector_int ninput_items(DEFAULT_CAPACITY); - gr_vector_const_void_star input_items(DEFAULT_CAPACITY); - gr_vector_void_star output_items(DEFAULT_CAPACITY); - unsigned int bi; - unsigned int nalive; - int max_items_avail; - bool made_progress_last_pass; - bool making_progress; - - for(unsigned i = 0; i < d_blocks.size (); i++) - d_blocks[i]->detail()->set_done (false); // reset any done flags - - for(unsigned i = 0; i < d_blocks.size (); i++) // enable any drivers, etc. - d_blocks[i]->start(); - - bi = 0; - made_progress_last_pass = true; - making_progress = false; - - // Loop while there are still blocks alive - - nalive = d_blocks.size (); - while(d_enabled && nalive > 0) { - if(boost::this_thread::interruption_requested()) - break; - - block *m = d_blocks[bi].get (); - block_detail *d = m->detail().get (); - - LOG(*d_log << std::endl << m); - - if(d->done ()) - goto next_block; - - if(d->source_p ()) { - // Invoke sources as a last resort. As long as the previous - // pass made progress, don't call a source. - if(made_progress_last_pass) { - LOG(*d_log << " Skipping source\n"); - goto next_block; - } - - ninput_items_required.resize (0); - ninput_items.resize (0); - input_items.resize (0); - output_items.resize (d->noutputs ()); - - // determine the minimum available output space - noutput_items = min_available_space (d, m->output_multiple ()); - LOG(*d_log << " source\n noutput_items = " << noutput_items << std::endl); - if(noutput_items == -1) // we're done - goto were_done; - - if(noutput_items == 0) { // we're output blocked - LOG(*d_log << " BLKD_OUT\n"); - goto next_block; - } - - goto setup_call_to_work; // jump to common code - } - - else if(d->sink_p ()) { - ninput_items_required.resize (d->ninputs ()); - ninput_items.resize (d->ninputs ()); - input_items.resize (d->ninputs ()); - output_items.resize (0); - LOG(*d_log << " sink\n"); - - max_items_avail = 0; - for(int i = 0; i < d->ninputs (); i++) { - ninput_items[i] = d->input(i)->items_available(); - //if (ninput_items[i] == 0 && d->input(i)->done()) - if(ninput_items[i] < m->output_multiple() && d->input(i)->done()) - goto were_done; - - max_items_avail = std::max (max_items_avail, ninput_items[i]); - } - - // take a swag at how much output we can sink - noutput_items = (int) (max_items_avail * m->relative_rate ()); - noutput_items = round_down (noutput_items, m->output_multiple ()); - LOG(*d_log << " max_items_avail = " << max_items_avail << std::endl); - LOG(*d_log << " noutput_items = " << noutput_items << std::endl); - - if(noutput_items == 0) { // we're blocked on input - LOG(*d_log << " BLKD_IN\n"); - goto next_block; - } - - goto try_again; // Jump to code shared with regular case. - } - - else { - // do the regular thing - ninput_items_required.resize(d->ninputs ()); - ninput_items.resize(d->ninputs ()); - input_items.resize(d->ninputs ()); - output_items.resize(d->noutputs ()); - - max_items_avail = 0; - for(int i = 0; i < d->ninputs (); i++) { - ninput_items[i] = d->input(i)->items_available (); - max_items_avail = std::max(max_items_avail, ninput_items[i]); - } - - // determine the minimum available output space - noutput_items = min_available_space(d, m->output_multiple ()); - if(ENABLE_LOGGING){ - *d_log << " regular "; - if(m->relative_rate() >= 1.0) - *d_log << "1:" << m->relative_rate() << std::endl; - else - *d_log << 1.0/m->relative_rate() << ":1\n"; - *d_log << " max_items_avail = " << max_items_avail << std::endl; - *d_log << " noutput_items = " << noutput_items << std::endl; - } - if(noutput_items == -1) // we're done - goto were_done; - - if(noutput_items == 0) { // we're output blocked - LOG(*d_log << " BLKD_OUT\n"); - goto next_block; - } - -#if 0 - // Compute best estimate of noutput_items that we can really use. - noutput_items = - std::min((unsigned)noutput_items, - std::max((unsigned)m->output_multiple(), - round_up((unsigned)(max_items_avail * m->relative_rate()), - m->output_multiple()))); - - LOG(*d_log << " revised noutput_items = " << noutput_items << std::endl); -#endif - - try_again: - if(m->fixed_rate()) { - // try to work it forward starting with max_items_avail. - // We want to try to consume all the input we've got. - int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail); - reqd_noutput_items = round_up(reqd_noutput_items, m->output_multiple()); - if(reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items) - noutput_items = reqd_noutput_items; - } - - // ask the block how much input they need to produce noutput_items - m->forecast(noutput_items, ninput_items_required); - - // See if we've got sufficient input available - int i; - for(i = 0; i < d->ninputs (); i++) - if(ninput_items_required[i] > ninput_items[i]) // not enough - break; - - if(i < d->ninputs()) { // not enough input on input[i] - // if we can, try reducing the size of our output request - if(noutput_items > m->output_multiple ()){ - noutput_items /= 2; - noutput_items = round_up (noutput_items, m->output_multiple ()); - goto try_again; - } - - // We're blocked on input - LOG(*d_log << " BLKD_IN\n"); - if(d->input(i)->done()) // If the upstream block is done, we're done - goto were_done; - - // Is it possible to ever fulfill this request? - if(ninput_items_required[i] > d->input(i)->max_possible_items_available ()) { - // Nope, never going to happen... - std::cerr << "\nsched: <block " << m->name() - << " (" << m->unique_id() << ")>" - << " is requesting more input data\n" - << " than we can provide.\n" - << " ninput_items_required = " - << ninput_items_required[i] << "\n" - << " max_possible_items_available = " - << d->input(i)->max_possible_items_available() << "\n" - << " If this is a filter, consider reducing the number of taps.\n"; - goto were_done; - } - - goto next_block; - } - - // We've got enough data on each input to produce noutput_items. - // Finish setting up the call to work. - for(int i = 0; i < d->ninputs (); i++) - input_items[i] = d->input(i)->read_pointer(); - - setup_call_to_work: - - for(int i = 0; i < d->noutputs (); i++) - output_items[i] = d->output(i)->write_pointer(); - - // Do the actual work of the block - int n = m->general_work(noutput_items, ninput_items, - input_items, output_items); - LOG(*d_log << " general_work: noutput_items = " << noutput_items - << " result = " << n << std::endl); - - if(n == -1) // block is done - goto were_done; - - d->produce_each(n); // advance write pointers - if(n > 0) - making_progress = true; - - goto next_block; - } - assert(0); - - were_done: - LOG(*d_log << " were_done\n"); - d->set_done (true); - nalive--; - - next_block: - if(++bi >= d_blocks.size ()) { - bi = 0; - made_progress_last_pass = making_progress; - making_progress = false; - } - } - - for(unsigned i = 0; i < d_blocks.size(); i++) // disable any drivers, etc. - d_blocks[i]->stop(); - } - -} /* namespace gr */ |