diff options
Diffstat (limited to 'gnuradio-runtime/lib/gr_single_threaded_scheduler.cc')
-rw-r--r-- | gnuradio-runtime/lib/gr_single_threaded_scheduler.cc | 364 |
1 files changed, 0 insertions, 364 deletions
diff --git a/gnuradio-runtime/lib/gr_single_threaded_scheduler.cc b/gnuradio-runtime/lib/gr_single_threaded_scheduler.cc deleted file mode 100644 index 1bb9e9b0a8..0000000000 --- a/gnuradio-runtime/lib/gr_single_threaded_scheduler.cc +++ /dev/null @@ -1,364 +0,0 @@ -/* -*- c++ -*- */ -/* - * Copyright 2004 Free Software Foundation, Inc. - * - * This file is part of GNU Radio - * - * GNU Radio is free software; you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation; either version 3, or (at your option) - * any later version. - * - * GNU Radio is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the GNU General Public License - * along with GNU Radio; see the file COPYING. If not, write to - * the Free Software Foundation, Inc., 51 Franklin Street, - * Boston, MA 02110-1301, USA. - */ - -#ifdef HAVE_CONFIG_H -#include "config.h" -#endif - -#include <gr_single_threaded_scheduler.h> -#include <gr_block.h> -#include <gr_block_detail.h> -#include <gr_buffer.h> -#include <boost/thread.hpp> -#include <boost/format.hpp> -#include <iostream> -#include <limits> -#include <assert.h> -#include <stdio.h> - -// must be defined to either 0 or 1 -#define ENABLE_LOGGING 0 - -#if (ENABLE_LOGGING) -#define LOG(x) do { x; } while(0) -#else -#define LOG(x) do {;} while(0) -#endif - -static int which_scheduler = 0; - -gr_single_threaded_scheduler_sptr -gr_make_single_threaded_scheduler (const std::vector<gr_block_sptr> &blocks) -{ - return - gr_single_threaded_scheduler_sptr (new gr_single_threaded_scheduler (blocks)); -} - -gr_single_threaded_scheduler::gr_single_threaded_scheduler ( - const std::vector<gr_block_sptr> &blocks) - : d_blocks (blocks), d_enabled (true), d_log(0) -{ - if (ENABLE_LOGGING){ - std::string name = str(boost::format("sst-%d.log") % which_scheduler++); - d_log = new std::ofstream(name.c_str()); - *d_log << "gr_single_threaded_scheduler: " - << d_blocks.size () - << " blocks\n"; - } -} - -gr_single_threaded_scheduler::~gr_single_threaded_scheduler () -{ - if (ENABLE_LOGGING) - delete d_log; -} - -void -gr_single_threaded_scheduler::run () -{ - // d_enabled = true; // KLUDGE - main_loop (); -} - -void -gr_single_threaded_scheduler::stop () -{ - if (0) - std::cout << "gr_singled_threaded_scheduler::stop() " - << this << std::endl; - d_enabled = false; -} - -inline static unsigned int -round_up (unsigned int n, unsigned int multiple) -{ - return ((n + multiple - 1) / multiple) * multiple; -} - -inline static unsigned int -round_down (unsigned int n, unsigned int multiple) -{ - return (n / multiple) * multiple; -} - -// -// Return minimum available write space in all our downstream buffers -// or -1 if we're output blocked and the output we're blocked -// on is done. -// -static int -min_available_space (gr_block_detail *d, int output_multiple) -{ - int min_space = std::numeric_limits<int>::max(); - - for (int i = 0; i < d->noutputs (); i++){ - int n = round_down (d->output(i)->space_available (), output_multiple); - if (n == 0){ // We're blocked on output. - if (d->output(i)->done()){ // Downstream is done, therefore we're done. - return -1; - } - return 0; - } - min_space = std::min (min_space, n); - } - return min_space; -} - -void -gr_single_threaded_scheduler::main_loop () -{ - static const int DEFAULT_CAPACITY = 16; - - int noutput_items; - gr_vector_int ninput_items_required (DEFAULT_CAPACITY); - gr_vector_int ninput_items (DEFAULT_CAPACITY); - gr_vector_const_void_star input_items (DEFAULT_CAPACITY); - gr_vector_void_star output_items (DEFAULT_CAPACITY); - unsigned int bi; - unsigned int nalive; - int max_items_avail; - bool made_progress_last_pass; - bool making_progress; - - for (unsigned i = 0; i < d_blocks.size (); i++) - d_blocks[i]->detail()->set_done (false); // reset any done flags - - for (unsigned i = 0; i < d_blocks.size (); i++) // enable any drivers, etc. - d_blocks[i]->start(); - - - bi = 0; - made_progress_last_pass = true; - making_progress = false; - - // Loop while there are still blocks alive - - nalive = d_blocks.size (); - while (d_enabled && nalive > 0){ - - if (boost::this_thread::interruption_requested()) - break; - - gr_block *m = d_blocks[bi].get (); - gr_block_detail *d = m->detail().get (); - - LOG(*d_log << std::endl << m); - - if (d->done ()) - goto next_block; - - if (d->source_p ()){ - // Invoke sources as a last resort. As long as the previous pass - // made progress, don't call a source. - if (made_progress_last_pass){ - LOG(*d_log << " Skipping source\n"); - goto next_block; - } - - ninput_items_required.resize (0); - ninput_items.resize (0); - input_items.resize (0); - output_items.resize (d->noutputs ()); - - // determine the minimum available output space - noutput_items = min_available_space (d, m->output_multiple ()); - LOG(*d_log << " source\n noutput_items = " << noutput_items << std::endl); - if (noutput_items == -1) // we're done - goto were_done; - - if (noutput_items == 0){ // we're output blocked - LOG(*d_log << " BLKD_OUT\n"); - goto next_block; - } - - goto setup_call_to_work; // jump to common code - } - - else if (d->sink_p ()){ - ninput_items_required.resize (d->ninputs ()); - ninput_items.resize (d->ninputs ()); - input_items.resize (d->ninputs ()); - output_items.resize (0); - LOG(*d_log << " sink\n"); - - max_items_avail = 0; - for (int i = 0; i < d->ninputs (); i++){ - ninput_items[i] = d->input(i)->items_available(); - //if (ninput_items[i] == 0 && d->input(i)->done()) - if (ninput_items[i] < m->output_multiple() && d->input(i)->done()) - goto were_done; - - max_items_avail = std::max (max_items_avail, ninput_items[i]); - } - - // take a swag at how much output we can sink - noutput_items = (int) (max_items_avail * m->relative_rate ()); - noutput_items = round_down (noutput_items, m->output_multiple ()); - LOG(*d_log << " max_items_avail = " << max_items_avail << std::endl); - LOG(*d_log << " noutput_items = " << noutput_items << std::endl); - - if (noutput_items == 0){ // we're blocked on input - LOG(*d_log << " BLKD_IN\n"); - goto next_block; - } - - goto try_again; // Jump to code shared with regular case. - } - - else { - // do the regular thing - ninput_items_required.resize (d->ninputs ()); - ninput_items.resize (d->ninputs ()); - input_items.resize (d->ninputs ()); - output_items.resize (d->noutputs ()); - - max_items_avail = 0; - for (int i = 0; i < d->ninputs (); i++){ - ninput_items[i] = d->input(i)->items_available (); - max_items_avail = std::max (max_items_avail, ninput_items[i]); - } - - // determine the minimum available output space - noutput_items = min_available_space (d, m->output_multiple ()); - if (ENABLE_LOGGING){ - *d_log << " regular "; - if (m->relative_rate() >= 1.0) - *d_log << "1:" << m->relative_rate() << std::endl; - else - *d_log << 1.0/m->relative_rate() << ":1\n"; - *d_log << " max_items_avail = " << max_items_avail << std::endl; - *d_log << " noutput_items = " << noutput_items << std::endl; - } - if (noutput_items == -1) // we're done - goto were_done; - - if (noutput_items == 0){ // we're output blocked - LOG(*d_log << " BLKD_OUT\n"); - goto next_block; - } - -#if 0 - // Compute best estimate of noutput_items that we can really use. - noutput_items = - std::min ((unsigned) noutput_items, - std::max ((unsigned) m->output_multiple(), - round_up ((unsigned) (max_items_avail * m->relative_rate()), - m->output_multiple ()))); - - LOG(*d_log << " revised noutput_items = " << noutput_items << std::endl); -#endif - - try_again: - if (m->fixed_rate()){ - // try to work it forward starting with max_items_avail. - // We want to try to consume all the input we've got. - int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail); - reqd_noutput_items = round_up(reqd_noutput_items, m->output_multiple()); - if (reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items) - noutput_items = reqd_noutput_items; - } - - // ask the block how much input they need to produce noutput_items - m->forecast (noutput_items, ninput_items_required); - - // See if we've got sufficient input available - - int i; - for (i = 0; i < d->ninputs (); i++) - if (ninput_items_required[i] > ninput_items[i]) // not enough - break; - - if (i < d->ninputs ()){ // not enough input on input[i] - // if we can, try reducing the size of our output request - if (noutput_items > m->output_multiple ()){ - noutput_items /= 2; - noutput_items = round_up (noutput_items, m->output_multiple ()); - goto try_again; - } - - // We're blocked on input - LOG(*d_log << " BLKD_IN\n"); - if (d->input(i)->done()) // If the upstream block is done, we're done - goto were_done; - - // Is it possible to ever fulfill this request? - if (ninput_items_required[i] > d->input(i)->max_possible_items_available ()){ - // Nope, never going to happen... - std::cerr << "\nsched: <gr_block " << m->name() - << " (" << m->unique_id() << ")>" - << " is requesting more input data\n" - << " than we can provide.\n" - << " ninput_items_required = " - << ninput_items_required[i] << "\n" - << " max_possible_items_available = " - << d->input(i)->max_possible_items_available() << "\n" - << " If this is a filter, consider reducing the number of taps.\n"; - goto were_done; - } - - goto next_block; - } - - // We've got enough data on each input to produce noutput_items. - // Finish setting up the call to work. - - for (int i = 0; i < d->ninputs (); i++) - input_items[i] = d->input(i)->read_pointer(); - - setup_call_to_work: - - for (int i = 0; i < d->noutputs (); i++) - output_items[i] = d->output(i)->write_pointer(); - - // Do the actual work of the block - int n = m->general_work (noutput_items, ninput_items, - input_items, output_items); - LOG(*d_log << " general_work: noutput_items = " << noutput_items - << " result = " << n << std::endl); - - if (n == -1) // block is done - goto were_done; - - d->produce_each (n); // advance write pointers - if (n > 0) - making_progress = true; - - goto next_block; - } - assert (0); - - were_done: - LOG(*d_log << " were_done\n"); - d->set_done (true); - nalive--; - - next_block: - if (++bi >= d_blocks.size ()){ - bi = 0; - made_progress_last_pass = making_progress; - making_progress = false; - } - } - - for (unsigned i = 0; i < d_blocks.size (); i++) // disable any drivers, etc. - d_blocks[i]->stop(); -} |