summaryrefslogtreecommitdiff
path: root/gnuradio-runtime/lib/single_threaded_scheduler.cc
diff options
context:
space:
mode:
Diffstat (limited to 'gnuradio-runtime/lib/single_threaded_scheduler.cc')
-rw-r--r--gnuradio-runtime/lib/single_threaded_scheduler.cc363
1 files changed, 0 insertions, 363 deletions
diff --git a/gnuradio-runtime/lib/single_threaded_scheduler.cc b/gnuradio-runtime/lib/single_threaded_scheduler.cc
deleted file mode 100644
index c86d26ca3a..0000000000
--- a/gnuradio-runtime/lib/single_threaded_scheduler.cc
+++ /dev/null
@@ -1,363 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2004 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio
- *
- * GNU Radio is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3, or (at your option)
- * any later version.
- *
- * GNU Radio is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with GNU Radio; see the file COPYING. If not, write to
- * the Free Software Foundation, Inc., 51 Franklin Street,
- * Boston, MA 02110-1301, USA.
- */
-
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
-
-#include "single_threaded_scheduler.h"
-#include <gnuradio/block.h>
-#include <gnuradio/block_detail.h>
-#include <gnuradio/buffer.h>
-#include <boost/thread.hpp>
-#include <boost/format.hpp>
-#include <iostream>
-#include <limits>
-#include <assert.h>
-#include <stdio.h>
-
-namespace gr {
-
- // must be defined to either 0 or 1
-#define ENABLE_LOGGING 0
-
-#if (ENABLE_LOGGING)
-#define LOG(x) do { x; } while(0)
-#else
-#define LOG(x) do {;} while(0)
-#endif
-
- static int which_scheduler = 0;
-
- single_threaded_scheduler_sptr
- make_single_threaded_scheduler(const std::vector<block_sptr> &blocks)
- {
- return single_threaded_scheduler_sptr
- (new single_threaded_scheduler(blocks));
- }
-
- single_threaded_scheduler::single_threaded_scheduler(const std::vector<block_sptr> &blocks)
- : d_blocks(blocks), d_enabled(true), d_log(0)
- {
- if(ENABLE_LOGGING) {
- std::string name = str(boost::format("sst-%d.log") % which_scheduler++);
- d_log = new std::ofstream(name.c_str());
- *d_log << "single_threaded_scheduler: "
- << d_blocks.size ()
- << " blocks\n";
- }
- }
-
- single_threaded_scheduler::~single_threaded_scheduler()
- {
- if(ENABLE_LOGGING)
- delete d_log;
- }
-
- void
- single_threaded_scheduler::run()
- {
- // d_enabled = true; // KLUDGE
- main_loop ();
- }
-
- void
- single_threaded_scheduler::stop()
- {
- if(0)
- std::cout << "gr_singled_threaded_scheduler::stop() "
- << this << std::endl;
- d_enabled = false;
- }
-
- inline static unsigned int
- round_up(unsigned int n, unsigned int multiple)
- {
- return ((n + multiple - 1) / multiple) * multiple;
- }
-
- inline static unsigned int
- round_down(unsigned int n, unsigned int multiple)
- {
- return (n / multiple) * multiple;
- }
-
- //
- // Return minimum available write space in all our downstream
- // buffers or -1 if we're output blocked and the output we're
- // blocked on is done.
- //
- static int
- min_available_space(block_detail *d, int output_multiple)
- {
- int min_space = std::numeric_limits<int>::max();
-
- for(int i = 0; i < d->noutputs (); i++) {
- int n = round_down (d->output(i)->space_available (), output_multiple);
- if(n == 0) { // We're blocked on output.
- if(d->output(i)->done()) { // Downstream is done, therefore we're done.
- return -1;
- }
- return 0;
- }
- min_space = std::min (min_space, n);
- }
- return min_space;
- }
-
- void
- single_threaded_scheduler::main_loop()
- {
- static const int DEFAULT_CAPACITY = 16;
-
- int noutput_items;
- gr_vector_int ninput_items_required(DEFAULT_CAPACITY);
- gr_vector_int ninput_items(DEFAULT_CAPACITY);
- gr_vector_const_void_star input_items(DEFAULT_CAPACITY);
- gr_vector_void_star output_items(DEFAULT_CAPACITY);
- unsigned int bi;
- unsigned int nalive;
- int max_items_avail;
- bool made_progress_last_pass;
- bool making_progress;
-
- for(unsigned i = 0; i < d_blocks.size (); i++)
- d_blocks[i]->detail()->set_done (false); // reset any done flags
-
- for(unsigned i = 0; i < d_blocks.size (); i++) // enable any drivers, etc.
- d_blocks[i]->start();
-
- bi = 0;
- made_progress_last_pass = true;
- making_progress = false;
-
- // Loop while there are still blocks alive
-
- nalive = d_blocks.size ();
- while(d_enabled && nalive > 0) {
- if(boost::this_thread::interruption_requested())
- break;
-
- block *m = d_blocks[bi].get ();
- block_detail *d = m->detail().get ();
-
- LOG(*d_log << std::endl << m);
-
- if(d->done ())
- goto next_block;
-
- if(d->source_p ()) {
- // Invoke sources as a last resort. As long as the previous
- // pass made progress, don't call a source.
- if(made_progress_last_pass) {
- LOG(*d_log << " Skipping source\n");
- goto next_block;
- }
-
- ninput_items_required.resize (0);
- ninput_items.resize (0);
- input_items.resize (0);
- output_items.resize (d->noutputs ());
-
- // determine the minimum available output space
- noutput_items = min_available_space (d, m->output_multiple ());
- LOG(*d_log << " source\n noutput_items = " << noutput_items << std::endl);
- if(noutput_items == -1) // we're done
- goto were_done;
-
- if(noutput_items == 0) { // we're output blocked
- LOG(*d_log << " BLKD_OUT\n");
- goto next_block;
- }
-
- goto setup_call_to_work; // jump to common code
- }
-
- else if(d->sink_p ()) {
- ninput_items_required.resize (d->ninputs ());
- ninput_items.resize (d->ninputs ());
- input_items.resize (d->ninputs ());
- output_items.resize (0);
- LOG(*d_log << " sink\n");
-
- max_items_avail = 0;
- for(int i = 0; i < d->ninputs (); i++) {
- ninput_items[i] = d->input(i)->items_available();
- //if (ninput_items[i] == 0 && d->input(i)->done())
- if(ninput_items[i] < m->output_multiple() && d->input(i)->done())
- goto were_done;
-
- max_items_avail = std::max (max_items_avail, ninput_items[i]);
- }
-
- // take a swag at how much output we can sink
- noutput_items = (int) (max_items_avail * m->relative_rate ());
- noutput_items = round_down (noutput_items, m->output_multiple ());
- LOG(*d_log << " max_items_avail = " << max_items_avail << std::endl);
- LOG(*d_log << " noutput_items = " << noutput_items << std::endl);
-
- if(noutput_items == 0) { // we're blocked on input
- LOG(*d_log << " BLKD_IN\n");
- goto next_block;
- }
-
- goto try_again; // Jump to code shared with regular case.
- }
-
- else {
- // do the regular thing
- ninput_items_required.resize(d->ninputs ());
- ninput_items.resize(d->ninputs ());
- input_items.resize(d->ninputs ());
- output_items.resize(d->noutputs ());
-
- max_items_avail = 0;
- for(int i = 0; i < d->ninputs (); i++) {
- ninput_items[i] = d->input(i)->items_available ();
- max_items_avail = std::max(max_items_avail, ninput_items[i]);
- }
-
- // determine the minimum available output space
- noutput_items = min_available_space(d, m->output_multiple ());
- if(ENABLE_LOGGING){
- *d_log << " regular ";
- if(m->relative_rate() >= 1.0)
- *d_log << "1:" << m->relative_rate() << std::endl;
- else
- *d_log << 1.0/m->relative_rate() << ":1\n";
- *d_log << " max_items_avail = " << max_items_avail << std::endl;
- *d_log << " noutput_items = " << noutput_items << std::endl;
- }
- if(noutput_items == -1) // we're done
- goto were_done;
-
- if(noutput_items == 0) { // we're output blocked
- LOG(*d_log << " BLKD_OUT\n");
- goto next_block;
- }
-
-#if 0
- // Compute best estimate of noutput_items that we can really use.
- noutput_items =
- std::min((unsigned)noutput_items,
- std::max((unsigned)m->output_multiple(),
- round_up((unsigned)(max_items_avail * m->relative_rate()),
- m->output_multiple())));
-
- LOG(*d_log << " revised noutput_items = " << noutput_items << std::endl);
-#endif
-
- try_again:
- if(m->fixed_rate()) {
- // try to work it forward starting with max_items_avail.
- // We want to try to consume all the input we've got.
- int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail);
- reqd_noutput_items = round_up(reqd_noutput_items, m->output_multiple());
- if(reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items)
- noutput_items = reqd_noutput_items;
- }
-
- // ask the block how much input they need to produce noutput_items
- m->forecast(noutput_items, ninput_items_required);
-
- // See if we've got sufficient input available
- int i;
- for(i = 0; i < d->ninputs (); i++)
- if(ninput_items_required[i] > ninput_items[i]) // not enough
- break;
-
- if(i < d->ninputs()) { // not enough input on input[i]
- // if we can, try reducing the size of our output request
- if(noutput_items > m->output_multiple ()){
- noutput_items /= 2;
- noutput_items = round_up (noutput_items, m->output_multiple ());
- goto try_again;
- }
-
- // We're blocked on input
- LOG(*d_log << " BLKD_IN\n");
- if(d->input(i)->done()) // If the upstream block is done, we're done
- goto were_done;
-
- // Is it possible to ever fulfill this request?
- if(ninput_items_required[i] > d->input(i)->max_possible_items_available ()) {
- // Nope, never going to happen...
- std::cerr << "\nsched: <block " << m->name()
- << " (" << m->unique_id() << ")>"
- << " is requesting more input data\n"
- << " than we can provide.\n"
- << " ninput_items_required = "
- << ninput_items_required[i] << "\n"
- << " max_possible_items_available = "
- << d->input(i)->max_possible_items_available() << "\n"
- << " If this is a filter, consider reducing the number of taps.\n";
- goto were_done;
- }
-
- goto next_block;
- }
-
- // We've got enough data on each input to produce noutput_items.
- // Finish setting up the call to work.
- for(int i = 0; i < d->ninputs (); i++)
- input_items[i] = d->input(i)->read_pointer();
-
- setup_call_to_work:
-
- for(int i = 0; i < d->noutputs (); i++)
- output_items[i] = d->output(i)->write_pointer();
-
- // Do the actual work of the block
- int n = m->general_work(noutput_items, ninput_items,
- input_items, output_items);
- LOG(*d_log << " general_work: noutput_items = " << noutput_items
- << " result = " << n << std::endl);
-
- if(n == -1) // block is done
- goto were_done;
-
- d->produce_each(n); // advance write pointers
- if(n > 0)
- making_progress = true;
-
- goto next_block;
- }
- assert(0);
-
- were_done:
- LOG(*d_log << " were_done\n");
- d->set_done (true);
- nalive--;
-
- next_block:
- if(++bi >= d_blocks.size ()) {
- bi = 0;
- made_progress_last_pass = making_progress;
- making_progress = false;
- }
- }
-
- for(unsigned i = 0; i < d_blocks.size(); i++) // disable any drivers, etc.
- d_blocks[i]->stop();
- }
-
-} /* namespace gr */