/* -*- c++ -*- */
/*
 * Copyright 2004 Free Software Foundation, Inc.
 *
 * This file is part of GNU Radio
 *
 * GNU Radio is free software; you can redistribute it and/or modify
 * it under the terms of the GNU General Public License as published by
 * the Free Software Foundation; either version 3, or (at your option)
 * any later version.
 *
 * GNU Radio is distributed in the hope that it will be useful,
 * but WITHOUT ANY WARRANTY; without even the implied warranty of
 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 * GNU General Public License for more details.
 *
 * You should have received a copy of the GNU General Public License
 * along with GNU Radio; see the file COPYING.  If not, write to
 * the Free Software Foundation, Inc., 51 Franklin Street,
 * Boston, MA 02110-1301, USA.
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include <gr_single_threaded_scheduler.h>
#include <gr_block.h>
#include <gr_block_detail.h>
#include <gr_buffer.h>
#include <boost/thread.hpp>
#include <boost/format.hpp>
#include <iostream>
#include <limits>
#include <assert.h>
#include <stdio.h>

// must be defined to either 0 or 1
#define ENABLE_LOGGING 0

#if (ENABLE_LOGGING)
#define LOG(x) do { x; } while(0)
#else
#define LOG(x) do {;} while(0)
#endif

static int which_scheduler  = 0;

gr_single_threaded_scheduler_sptr
gr_make_single_threaded_scheduler (const std::vector<gr_block_sptr> &blocks)
{
  return
    gr_single_threaded_scheduler_sptr (new gr_single_threaded_scheduler (blocks));
}

gr_single_threaded_scheduler::gr_single_threaded_scheduler (
    const std::vector<gr_block_sptr> &blocks)
  : d_blocks (blocks), d_enabled (true), d_log(0)
{
  if (ENABLE_LOGGING){
    std::string name = str(boost::format("sst-%d.log") % which_scheduler++);
    d_log = new std::ofstream(name.c_str());
    *d_log << "gr_single_threaded_scheduler: "
	   << d_blocks.size ()
	   << " blocks\n";
  }
}

gr_single_threaded_scheduler::~gr_single_threaded_scheduler ()
{
  if (ENABLE_LOGGING)
    delete d_log;
}

void
gr_single_threaded_scheduler::run ()
{
  // d_enabled = true;		// KLUDGE
  main_loop ();
}

void
gr_single_threaded_scheduler::stop ()
{
  if (0)
    std::cout << "gr_singled_threaded_scheduler::stop() "
	      << this << std::endl;
  d_enabled = false;
}

inline static unsigned int
round_up (unsigned int n, unsigned int multiple)
{
  return ((n + multiple - 1) / multiple) * multiple;
}

inline static unsigned int
round_down (unsigned int n, unsigned int multiple)
{
  return (n / multiple) * multiple;
}

//
// Return minimum available write space in all our downstream buffers
// or -1 if we're output blocked and the output we're blocked
// on is done.
//
static int
min_available_space (gr_block_detail *d, int output_multiple)
{
  int	min_space = std::numeric_limits<int>::max();

  for (int i = 0; i < d->noutputs (); i++){
    int n = round_down (d->output(i)->space_available (), output_multiple);
    if (n == 0){			// We're blocked on output.
      if (d->output(i)->done()){	// Downstream is done, therefore we're done.
	return -1;
      }
      return 0;
    }
    min_space = std::min (min_space, n);
  }
  return min_space;
}

void
gr_single_threaded_scheduler::main_loop ()
{
  static const int DEFAULT_CAPACITY = 16;

  int				noutput_items;
  gr_vector_int			ninput_items_required (DEFAULT_CAPACITY);
  gr_vector_int			ninput_items (DEFAULT_CAPACITY);
  gr_vector_const_void_star	input_items (DEFAULT_CAPACITY);
  gr_vector_void_star		output_items (DEFAULT_CAPACITY);
  unsigned int			bi;
  unsigned int			nalive;
  int				max_items_avail;
  bool				made_progress_last_pass;
  bool				making_progress;

  for (unsigned i = 0; i < d_blocks.size (); i++)
    d_blocks[i]->detail()->set_done (false);		// reset any done flags

  for (unsigned i = 0; i < d_blocks.size (); i++)	// enable any drivers, etc.
    d_blocks[i]->start();


  bi = 0;
  made_progress_last_pass = true;
  making_progress = false;

  // Loop while there are still blocks alive

  nalive = d_blocks.size ();
  while (d_enabled && nalive > 0){

    if (boost::this_thread::interruption_requested())
      break;

    gr_block		*m = d_blocks[bi].get ();
    gr_block_detail	*d = m->detail().get ();

    LOG(*d_log << std::endl << m);

    if (d->done ())
      goto next_block;

    if (d->source_p ()){
      // Invoke sources as a last resort.  As long as the previous pass
      // made progress, don't call a source.
      if (made_progress_last_pass){
	LOG(*d_log << "  Skipping source\n");
	goto next_block;
      }

      ninput_items_required.resize (0);
      ninput_items.resize (0);
      input_items.resize (0);
      output_items.resize (d->noutputs ());

      // determine the minimum available output space
      noutput_items = min_available_space (d, m->output_multiple ());
      LOG(*d_log << " source\n  noutput_items = " << noutput_items << std::endl);
      if (noutput_items == -1)		// we're done
	goto were_done;

      if (noutput_items == 0){		// we're output blocked
	LOG(*d_log << "  BLKD_OUT\n");
	goto next_block;
      }

      goto setup_call_to_work;		// jump to common code
    }

    else if (d->sink_p ()){
      ninput_items_required.resize (d->ninputs ());
      ninput_items.resize (d->ninputs ());
      input_items.resize (d->ninputs ());
      output_items.resize (0);
      LOG(*d_log << " sink\n");

      max_items_avail = 0;
      for (int i = 0; i < d->ninputs (); i++){
	ninput_items[i] = d->input(i)->items_available();
	//if (ninput_items[i] == 0 && d->input(i)->done())
	if (ninput_items[i] < m->output_multiple() && d->input(i)->done())
	  goto were_done;

	max_items_avail = std::max (max_items_avail, ninput_items[i]);
      }

      // take a swag at how much output we can sink
      noutput_items = (int) (max_items_avail * m->relative_rate ());
      noutput_items = round_down (noutput_items, m->output_multiple ());
      LOG(*d_log << "  max_items_avail = " << max_items_avail << std::endl);
      LOG(*d_log << "  noutput_items = " << noutput_items << std::endl);

      if (noutput_items == 0){	// we're blocked on input
	LOG(*d_log << "  BLKD_IN\n");
	goto next_block;
      }

      goto try_again;		// Jump to code shared with regular case.
    }

    else {
      // do the regular thing
      ninput_items_required.resize (d->ninputs ());
      ninput_items.resize (d->ninputs ());
      input_items.resize (d->ninputs ());
      output_items.resize (d->noutputs ());

      max_items_avail = 0;
      for (int i = 0; i < d->ninputs (); i++){
	ninput_items[i] = d->input(i)->items_available ();
	max_items_avail = std::max (max_items_avail, ninput_items[i]);
      }

      // determine the minimum available output space
      noutput_items = min_available_space (d, m->output_multiple ());
      if (ENABLE_LOGGING){
	*d_log << " regular ";
	if (m->relative_rate() >= 1.0)
	  *d_log << "1:" << m->relative_rate() << std::endl;
	else
	  *d_log << 1.0/m->relative_rate() << ":1\n";
	*d_log << "  max_items_avail = " << max_items_avail << std::endl;
	*d_log << "  noutput_items = " << noutput_items << std::endl;
      }
      if (noutput_items == -1)		// we're done
	goto were_done;

      if (noutput_items == 0){		// we're output blocked
	LOG(*d_log << "  BLKD_OUT\n");
	goto next_block;
      }

#if 0
      // Compute best estimate of noutput_items that we can really use.
      noutput_items =
	std::min ((unsigned) noutput_items,
		  std::max ((unsigned) m->output_multiple(),
			    round_up ((unsigned) (max_items_avail * m->relative_rate()),
				      m->output_multiple ())));

      LOG(*d_log << "  revised noutput_items = " << noutput_items << std::endl);
#endif

    try_again:
      if (m->fixed_rate()){
	// try to work it forward starting with max_items_avail.
	// We want to try to consume all the input we've got.
	int reqd_noutput_items = m->fixed_rate_ninput_to_noutput(max_items_avail);
	reqd_noutput_items = round_up(reqd_noutput_items, m->output_multiple());
	if (reqd_noutput_items > 0 && reqd_noutput_items <= noutput_items)
	  noutput_items = reqd_noutput_items;
      }

      // ask the block how much input they need to produce noutput_items
      m->forecast (noutput_items, ninput_items_required);

      // See if we've got sufficient input available

      int i;
      for (i = 0; i < d->ninputs (); i++)
	if (ninput_items_required[i] > ninput_items[i])	// not enough
	  break;

      if (i < d->ninputs ()){			// not enough input on input[i]
	// if we can, try reducing the size of our output request
	if (noutput_items > m->output_multiple ()){
	  noutput_items /= 2;
	  noutput_items = round_up (noutput_items, m->output_multiple ());
	  goto try_again;
	}

	// We're blocked on input
	LOG(*d_log << "  BLKD_IN\n");
	if (d->input(i)->done())    // If the upstream block is done, we're done
	  goto were_done;

	// Is it possible to ever fulfill this request?
	if (ninput_items_required[i] > d->input(i)->max_possible_items_available ()){
	  // Nope, never going to happen...
	  std::cerr << "\nsched: <gr_block " << m->name()
		    << " (" << m->unique_id() << ")>"
		    << " is requesting more input data\n"
		    << "  than we can provide.\n"
		    << "  ninput_items_required = "
		    << ninput_items_required[i] << "\n"
		    << "  max_possible_items_available = "
		    << d->input(i)->max_possible_items_available() << "\n"
		    << "  If this is a filter, consider reducing the number of taps.\n";
	  goto were_done;
	}

	goto next_block;
      }

      // We've got enough data on each input to produce noutput_items.
      // Finish setting up the call to work.

      for (int i = 0; i < d->ninputs (); i++)
	input_items[i] = d->input(i)->read_pointer();

    setup_call_to_work:

      for (int i = 0; i < d->noutputs (); i++)
	output_items[i] = d->output(i)->write_pointer();

      // Do the actual work of the block
      int n = m->general_work (noutput_items, ninput_items,
			       input_items, output_items);
      LOG(*d_log << "  general_work: noutput_items = " << noutput_items
	  << " result = " << n << std::endl);

      if (n == -1)		// block is done
	goto were_done;

      d->produce_each (n);	// advance write pointers
      if (n > 0)
	making_progress = true;

      goto next_block;
    }
    assert (0);

  were_done:
    LOG(*d_log << "  were_done\n");
    d->set_done (true);
    nalive--;

  next_block:
    if (++bi >= d_blocks.size ()){
      bi = 0;
      made_progress_last_pass = making_progress;
      making_progress = false;
    }
  }

  for (unsigned i = 0; i < d_blocks.size (); i++)	// disable any drivers, etc.
    d_blocks[i]->stop();
}