/* -*- c++ -*- */
/*
 * Copyright 2004 Free Software Foundation, Inc.
 *
 * This file is part of GNU Radio
 *
 * GNU Radio is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 3, or (at your option)
 * any later version.
 *
 * GNU Radio is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with GNU Radio; see the file COPYING.  If not, write to
 * the Free Software Foundation, Inc., 51 Franklin Street,
 * Boston, MA 02110-1301, USA.
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include "single_threaded_scheduler.h"
#include <gnuradio/block.h>
#include <gnuradio/block_detail.h>
#include <gnuradio/buffer.h>
#include <boost/thread.hpp>
#include <boost/format.hpp>
#include <iostream>
#include <limits>
#include <assert.h>
#include <stdio.h>

namespace gr {

  // must be defined to either 0 or 1
#define ENABLE_LOGGING 0

#if (ENABLE_LOGGING)
#define LOG(x) do { x; } while(0)
#else
#define LOG(x) do {;} while(0)
#endif

  static int which_scheduler  = 0;

  single_threaded_scheduler_sptr
  make_single_threaded_scheduler(const std::vector<block_sptr> &blocks)
  {
    return single_threaded_scheduler_sptr
      (new single_threaded_scheduler(blocks));
  }

  single_threaded_scheduler::single_threaded_scheduler(const std::vector<block_sptr> &blocks)
    : d_blocks(blocks), d_enabled(true), d_log(0)
  {
    if(ENABLE_LOGGING) {
      std::string name = str(boost::format("sst-%d.log") % which_scheduler++);
      d_log = new std::ofstream(name.c_str());
      *d_log << "single_threaded_scheduler: "
             << d_blocks.size ()
             << " blocks\n";
    }
  }

  single_threaded_scheduler::~single_threaded_scheduler()
  {
    if(ENABLE_LOGGING)
      delete d_log;
  }

  void
  single_threaded_scheduler::run()
  {
    // d_enabled = true;    // KLUDGE
    main_loop ();
  }

  void
  single_threaded_scheduler::stop()
  {
    if(0)
      std::cout << "gr_singled_threaded_scheduler::stop() "
                << this << std::endl;
    d_enabled = false;
  }

  inline static unsigned int
  round_up(unsigned int n, unsigned int multiple)
  {
    return ((n + multiple - 1) / multiple) * multiple;
  }

  inline static unsigned int
  round_down(unsigned int n, unsigned int multiple)
  {
    return (n / multiple) * multiple;
  }

  //
  // Return minimum available write space in all our downstream
  // buffers or -1 if we're output blocked and the output we're
  // blocked on is done.
  //
  static int
  min_available_space(block_detail *d, int output_multiple)
  {
    int min_space = std::numeric_limits<int>::max();

    for(int i = 0; i < d->noutputs (); i++) {
      int n = round_down (d->output(i)->space_available (), output_multiple);
      if(n == 0) {			// We're blocked on output.
        if(d->output(i)->done()) {	// Downstream is done, therefore we're done.
          return -1;
        }
        return 0;
      }
      min_space = std::min (min_space, n);
    }
    return min_space;
  }

  void
  single_threaded_scheduler::main_loop()
  {
    static const int DEFAULT_CAPACITY = 16;

    int				noutput_items;
    gr_vector_int		ninput_items_required(DEFAULT_CAPACITY);
    gr_vector_int		ninput_items(DEFAULT_CAPACITY);
    gr_vector_const_void_star	input_items(DEFAULT_CAPACITY);
    gr_vector_void_star		output_items(DEFAULT_CAPACITY);
    unsigned int		bi;
    unsigned int		nalive;
    int				max_items_avail;
    bool			made_progress_last_pass;
    bool			making_progress;

    for(unsigned i = 0; i < d_blocks.size (); i++)
      d_blocks[i]->detail()->set_done (false);		// reset any done flags

    for(unsigned i = 0; i < d_blocks.size (); i++)	// enable any drivers, etc.
      d_blocks[i]->start();

    bi = 0;
    made_progress_last_pass = true;
    making_progress = false;

    // Loop while there are still blocks alive

    nalive = d_blocks.size ();
    while(d_enabled && nalive > 0) {
      if(boost::this_thread::interruption_requested())
        break;

      block *m = d_blocks[bi].get ();
      block_detail *d = m->detail().get ();

      LOG(*d_log << std::endl << m);

      if(d->done ())
        goto next_block;

      if(d->source_p ()) {
        // Invoke sources as a last resort. As long as the previous
        // pass made progress, don't call a source.
        if(made_progress_last_pass) {
          LOG(*d_log << "  Skipping source\n");
          goto next_block;
        }

        ninput_items_required.resize (0);
        ninput_items.resize (0);
        input_items.resize (0);
        output_items.resize (d->noutputs ());

        // determine the minimum available output space
        noutput_items = min_available_space (d, m->output_multiple ());
        LOG(*d_log << " source\n  noutput_items = " << noutput_items << std::endl);
        if(noutput_items == -1)    // we're done
          goto were_done;

        if(noutput_items == 0) {   // we're output blocked
          LOG(*d_log << "  BLKD_OUT\n");
          goto next_block;
        }

        goto setup_call_to_work;   // jump to common code
      }

      else if(d->sink_p ()) {
        ninput_items_required.resize (d->ninputs ());
        ninput_items.resize (d->ninputs ());
        input_items.resize (d->ninputs ());
        output_items.resize (0);
        LOG(*d_log << " sink\n");

        max_items_avail = 0;
        for(int i = 0; i < d->ninputs (); i++) {
          ninput_items[i] = d->input(i)->items_available();
          //if (ninput_items[i] == 0 && d->input(i)->done())
          if(ninput_items[i] < m->output_multiple() && d->input(i)->done())
            goto were_done;

          max_items_avail = std::max (max_items_avail, ninput_items[i]);
        }

        // take a swag at how much output we can sink
        noutput_items = (int) (max_items_avail * m->relative_rate ());
        noutput_items = round_down (noutput_items, m->output_multiple ());
        LOG(*d_log << "  max_items_avail = " << max_items_avail << std::endl);
        LOG(*d_log << "  noutput_items = " << noutput_items << std::endl);

        if(noutput_items == 0) {      // we're blocked on input
          LOG(*d_log << "  BLKD_IN\n");
          goto next_block;
        }

        goto try_again;    // Jump to code shared with regular case.
      }

      else {
        // do the regular thing
        ninput_items_required.resize(d->ninputs ());
        ninput_items.resize(d->ninputs ());
        input_items.resize(d->ninputs ());
        output_items.resize(d->noutputs ());

        max_items_avail = 0;
        for(int i = 0; i < d->ninputs (); i++) {
          ninput_items[i] = d->input(i)->items_available ();
          max_items_avail = std::max(max_items_avail, ninput_items[i]);
        }

        // determine the minimum available output space
        noutput_items = min_available_space(d, m->output_multiple ());
        if(ENABLE_LOGGING){
          *d_log << " regular ";
          if(m->relative_rate() >= 1.0)
            *d_log << "1:" << m->relative_rate() << std::endl;
          else
            *d_log << 1.0/m->relative_rate() << ":1\n";
          *d_log << "  max_items_avail = " << max_items_avail << std::endl;
          *d_log << "  noutput_items = " << noutput_items << std::endl;
        }
        if(noutput_items == -1)    // we're done
          goto were_done;

        if(noutput_items == 0) {   // we're output blocked
          LOG(*d_log << "  BLKD_OUT\n");
          goto next_block;
        }

#if 0
        // Compute best estimate of noutput_items that we can really use.
        noutput_items =
          std::min((unsigned)noutput_items,
                   std::max((unsigned)m->output_multiple(),
                              round_up((unsigned)(max_items_avail * m->relative_rate()),
                                       m->output_multiple())));

        LOG(*d_log << "  revised noutput_items = " << noutput_items << std::endl);
#endif

      try_again:
        if(m->fixed_rate()) {
          // try to work it forward starting with max_items_avail.
          // We want to try to consume all the input we've got.
          int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail);
          reqd_noutput_items = round_up(reqd_noutput_items, m->output_multiple());
          if(reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items)
            noutput_items = reqd_noutput_items;
        }

        // ask the block how much input they need to produce noutput_items
        m->forecast(noutput_items, ninput_items_required);

        // See if we've got sufficient input available
        int i;
        for(i = 0; i < d->ninputs (); i++)
          if(ninput_items_required[i] > ninput_items[i])   // not enough
            break;

        if(i < d->ninputs()) {    // not enough input on input[i]
          // if we can, try reducing the size of our output request
          if(noutput_items > m->output_multiple ()){
            noutput_items /= 2;
            noutput_items = round_up (noutput_items, m->output_multiple ());
            goto try_again;
          }

          // We're blocked on input
          LOG(*d_log << "  BLKD_IN\n");
          if(d->input(i)->done())    // If the upstream block is done, we're done
            goto were_done;

          // Is it possible to ever fulfill this request?
          if(ninput_items_required[i] > d->input(i)->max_possible_items_available ()) {
            // Nope, never going to happen...
            std::cerr << "\nsched: <block " << m->name()
                      << " (" << m->unique_id() << ")>"
                      << " is requesting more input data\n"
                      << "  than we can provide.\n"
                      << "  ninput_items_required = "
                      << ninput_items_required[i] << "\n"
                      << "  max_possible_items_available = "
                      << d->input(i)->max_possible_items_available() << "\n"
                      << "  If this is a filter, consider reducing the number of taps.\n";
            goto were_done;
          }

          goto next_block;
        }

        // We've got enough data on each input to produce noutput_items.
        // Finish setting up the call to work.
        for(int i = 0; i < d->ninputs (); i++)
          input_items[i] = d->input(i)->read_pointer();

      setup_call_to_work:

        for(int i = 0; i < d->noutputs (); i++)
          output_items[i] = d->output(i)->write_pointer();

        // Do the actual work of the block
        int n = m->general_work(noutput_items, ninput_items,
                                input_items, output_items);
        LOG(*d_log << "  general_work: noutput_items = " << noutput_items
            << " result = " << n << std::endl);

        if(n == -1)             // block is done
          goto were_done;

        d->produce_each(n);     // advance write pointers
        if(n > 0)
          making_progress = true;

        goto next_block;
      }
      assert(0);

    were_done:
      LOG(*d_log << "  were_done\n");
      d->set_done (true);
      nalive--;

    next_block:
      if(++bi >= d_blocks.size ()) {
        bi = 0;
        made_progress_last_pass = making_progress;
        making_progress = false;
      }
    }

    for(unsigned i = 0; i < d_blocks.size(); i++)    // disable any drivers, etc.
      d_blocks[i]->stop();
  }

} /* namespace gr */