/* -*- c++ -*- */
/*
 * Copyright 2004,2009,2010,2013 Free Software Foundation, Inc.
 *
 * This file is part of GNU Radio
 *
 * SPDX-License-Identifier: GPL-3.0-or-later
 *
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include <gnuradio/block_detail.h>
#include <gnuradio/buffer.h>
#include <gnuradio/buffer_reader.h>
#include <gnuradio/logger.h>

namespace gr {

static long s_ncurrently_allocated = 0;

long block_detail_ncurrently_allocated() { return s_ncurrently_allocated; }

block_detail::block_detail(unsigned int ninputs, unsigned int noutputs)
    : d_produce_or(0),
      d_ninputs(ninputs),
      d_noutputs(noutputs),
      d_input(ninputs),
      d_output(noutputs),
      d_done(false),
      d_ins_noutput_items(0),
      d_avg_noutput_items(0),
      d_var_noutput_items(0),
      d_total_noutput_items(0),
      d_ins_nproduced(0),
      d_avg_nproduced(0),
      d_var_nproduced(0),
      d_ins_input_buffers_full(ninputs, 0),
      d_avg_input_buffers_full(ninputs, 0),
      d_var_input_buffers_full(ninputs, 0),
      d_ins_output_buffers_full(noutputs, 0),
      d_avg_output_buffers_full(noutputs, 0),
      d_var_output_buffers_full(noutputs, 0),
      d_ins_work_time(0),
      d_avg_work_time(0),
      d_var_work_time(0),
      d_avg_throughput(0),
      d_pc_counter(0)
{
    s_ncurrently_allocated++;
    d_pc_start_time = gr::high_res_timer_now();
    gr::configure_default_loggers(d_logger, d_debug_logger, "block_detail");
}

block_detail::~block_detail()
{
    // should take care of itself
    s_ncurrently_allocated--;
}

void block_detail::set_input(unsigned int which, buffer_reader_sptr reader)
{
    if (which >= d_ninputs)
        throw std::invalid_argument("block_detail::set_input");

    d_input[which] = reader;
}

void block_detail::set_output(unsigned int which, buffer_sptr buffer)
{
    if (which >= d_noutputs)
        throw std::invalid_argument("block_detail::set_output");

    d_output[which] = buffer;
}

block_detail_sptr make_block_detail(unsigned int ninputs, unsigned int noutputs)
{
    return block_detail_sptr(new block_detail(ninputs, noutputs));
}

void block_detail::set_done(bool done)
{
    d_done = done;
    for (unsigned int i = 0; i < d_noutputs; i++)
        d_output[i]->set_done(done);

    for (unsigned int i = 0; i < d_ninputs; i++)
        d_input[i]->set_done(done);
}

void block_detail::consume(int which_input, int how_many_items)
{
    d_consumed = how_many_items;
    if (how_many_items > 0) {
        input(which_input)->update_read_pointer(how_many_items);
    }
}

int block_detail::consumed() const { return d_consumed; }

void block_detail::consume_each(int how_many_items)
{
    d_consumed = how_many_items;
    if (how_many_items > 0) {
        for (int i = 0; i < ninputs(); i++) {
            d_input[i]->update_read_pointer(how_many_items);
        }
    }
}

void block_detail::produce(int which_output, int how_many_items)
{
    if (how_many_items > 0) {
        d_output[which_output]->post_work(how_many_items);
        d_output[which_output]->update_write_pointer(how_many_items);
        d_produce_or |= how_many_items;
    }
}

void block_detail::produce_each(int how_many_items)
{
    if (how_many_items > 0) {
        for (int i = 0; i < noutputs(); i++) {
            d_output[i]->post_work(how_many_items);
            d_output[i]->update_write_pointer(how_many_items);
        }
        d_produce_or |= how_many_items;
    }
}

uint64_t block_detail::nitems_read(unsigned int which_input)
{
    if (which_input >= d_ninputs)
        throw std::invalid_argument("block_detail::n_input_items");
    return d_input[which_input]->nitems_read();
}

uint64_t block_detail::nitems_written(unsigned int which_output)
{
    if (which_output >= d_noutputs)
        throw std::invalid_argument("block_detail::n_output_items");
    return d_output[which_output]->nitems_written();
}

void block_detail::reset_nitem_counters()
{
    for (unsigned int i = 0; i < d_ninputs; i++) {
        d_input[i]->reset_nitem_counter();
    }
    for (unsigned int o = 0; o < d_noutputs; o++) {
        d_output[o]->reset_nitem_counter();
    }
}

void block_detail::clear_tags()
{
    for (unsigned int i = 0; i < d_ninputs; i++) {
        uint64_t max_time = 0xFFFFFFFFFFFFFFFF; // from now to the end of time
        d_input[i]->buffer()->prune_tags(max_time);
    }
}

void block_detail::add_item_tag(unsigned int which_output, const tag_t& tag)
{
    if (!pmt::is_symbol(tag.key)) {
        throw pmt::wrong_type("block_detail::add_item_tag key", tag.key);
    } else {
        // Add tag to gr_buffer's deque tags
        d_output[which_output]->add_item_tag(tag);
    }
}

void block_detail::remove_item_tag(unsigned int which_input, const tag_t& tag, long id)
{
    if (!pmt::is_symbol(tag.key)) {
        throw pmt::wrong_type("block_detail::add_item_tag key", tag.key);
    } else {
        // Add tag to gr_buffer's deque tags
        d_input[which_input]->buffer()->remove_item_tag(tag, id);
    }
}

void block_detail::get_tags_in_range(std::vector<tag_t>& v,
                                     unsigned int which_input,
                                     uint64_t abs_start,
                                     uint64_t abs_end,
                                     long id)
{
    // get from gr_buffer_reader's deque of tags
    d_input[which_input]->get_tags_in_range(v, abs_start, abs_end, id);
}

void block_detail::get_tags_in_range(std::vector<tag_t>& v,
                                     unsigned int which_input,
                                     uint64_t abs_start,
                                     uint64_t abs_end,
                                     const pmt::pmt_t& key,
                                     long id)
{
    std::vector<tag_t> found_items;

    v.resize(0);

    // get from gr_buffer_reader's deque of tags
    d_input[which_input]->get_tags_in_range(found_items, abs_start, abs_end, id);

    // Filter further by key name
    pmt::pmt_t itemkey;
    std::vector<tag_t>::iterator itr;
    for (itr = found_items.begin(); itr != found_items.end(); itr++) {
        itemkey = (*itr).key;
        if (pmt::eqv(key, itemkey)) {
            v.push_back(*itr);
        }
    }
}

void block_detail::set_processor_affinity(const std::vector<int>& mask)
{
    if (threaded) {
        try {
            gr::thread::thread_bind_to_processor(thread, mask);
        } catch (std::runtime_error& e) {
            GR_LOG_ERROR(d_logger, "set_processor_affinity: invalid mask.");
        }
    }
}

void block_detail::unset_processor_affinity()
{
    if (threaded) {
        gr::thread::thread_unbind(thread);
    }
}

int block_detail::thread_priority()
{
    if (threaded) {
        return gr::thread::thread_priority(thread);
    }
    return -1;
}

int block_detail::set_thread_priority(int priority)
{
    if (threaded) {
        return gr::thread::set_thread_priority(thread, priority);
    }
    return -1;
}

void block_detail::start_perf_counters()
{
    d_start_of_work = gr::high_res_timer_now_perfmon();
}

void block_detail::stop_perf_counters(int noutput_items, int nproduced)
{
    d_end_of_work = gr::high_res_timer_now_perfmon();
    gr::high_res_timer_type diff = d_end_of_work - d_start_of_work;

    if (d_pc_counter == 0) {
        d_ins_work_time = diff;
        d_avg_work_time = diff;
        d_var_work_time = 0;
        d_total_work_time = diff;
        d_ins_nproduced = nproduced;
        d_avg_nproduced = nproduced;
        d_var_nproduced = 0;
        d_ins_noutput_items = noutput_items;
        d_avg_noutput_items = noutput_items;
        d_var_noutput_items = 0;
        d_total_noutput_items = noutput_items;
        d_pc_start_time = (float)gr::high_res_timer_now();
        for (size_t i = 0; i < d_input.size(); i++) {
            buffer_reader_sptr in_buf = d_input[i];
            gr::thread::scoped_lock guard(*in_buf->mutex());
            float pfull = static_cast<float>(in_buf->items_available()) /
                          static_cast<float>(in_buf->max_possible_items_available());
            d_ins_input_buffers_full[i] = pfull;
            d_avg_input_buffers_full[i] = pfull;
            d_var_input_buffers_full[i] = 0;
        }
        for (size_t i = 0; i < d_output.size(); i++) {
            buffer_sptr out_buf = d_output[i];
            gr::thread::scoped_lock guard(*out_buf->mutex());
            float pfull = 1.0f - static_cast<float>(out_buf->space_available()) /
                                     static_cast<float>(out_buf->bufsize());
            d_ins_output_buffers_full[i] = pfull;
            d_avg_output_buffers_full[i] = pfull;
            d_var_output_buffers_full[i] = 0;
        }
    } else {
        float d = diff - d_avg_work_time;
        d_ins_work_time = diff;
        d_avg_work_time = d_avg_work_time + d / d_pc_counter;
        d_var_work_time = d_var_work_time + d * d;
        d_total_work_time += diff;

        d = nproduced - d_avg_nproduced;
        d_ins_nproduced = nproduced;
        d_avg_nproduced = d_avg_nproduced + d / d_pc_counter;
        d_var_nproduced = d_var_nproduced + d * d;

        d = noutput_items - d_avg_noutput_items;
        d_ins_noutput_items = noutput_items;
        d_avg_noutput_items = d_avg_noutput_items + d / d_pc_counter;
        d_var_noutput_items = d_var_noutput_items + d * d;
        d_total_noutput_items += noutput_items;
        d_pc_last_work_time = gr::high_res_timer_now();
        float monitor_time = (float)(d_pc_last_work_time - d_pc_start_time) /
                             (float)gr::high_res_timer_tps();
        d_avg_throughput = d_total_noutput_items / monitor_time;

        for (size_t i = 0; i < d_input.size(); i++) {
            buffer_reader_sptr in_buf = d_input[i];
            gr::thread::scoped_lock guard(*in_buf->mutex());
            float pfull = static_cast<float>(in_buf->items_available()) /
                          static_cast<float>(in_buf->max_possible_items_available());

            d = pfull - d_avg_input_buffers_full[i];
            d_ins_input_buffers_full[i] = pfull;
            d_avg_input_buffers_full[i] = d_avg_input_buffers_full[i] + d / d_pc_counter;
            d_var_input_buffers_full[i] = d_var_input_buffers_full[i] + d * d;
        }

        for (size_t i = 0; i < d_output.size(); i++) {
            buffer_sptr out_buf = d_output[i];
            gr::thread::scoped_lock guard(*out_buf->mutex());
            float pfull = 1.0f - static_cast<float>(out_buf->space_available()) /
                                     static_cast<float>(out_buf->bufsize());

            d = pfull - d_avg_output_buffers_full[i];
            d_ins_output_buffers_full[i] = pfull;
            d_avg_output_buffers_full[i] =
                d_avg_output_buffers_full[i] + d / d_pc_counter;
            d_var_output_buffers_full[i] = d_var_output_buffers_full[i] + d * d;
        }
    }

    d_pc_counter++;
}

void block_detail::reset_perf_counters() { d_pc_counter = 0; }

float block_detail::pc_noutput_items() { return d_ins_noutput_items; }

float block_detail::pc_nproduced() { return d_ins_nproduced; }

float block_detail::pc_input_buffers_full(size_t which)
{
    if (which < d_ins_input_buffers_full.size())
        return d_ins_input_buffers_full[which];
    else
        return 0;
}

std::vector<float> block_detail::pc_input_buffers_full()
{
    return d_ins_input_buffers_full;
}

float block_detail::pc_output_buffers_full(size_t which)
{
    if (which < d_ins_output_buffers_full.size())
        return d_ins_output_buffers_full[which];
    else
        return 0;
}

std::vector<float> block_detail::pc_output_buffers_full()
{
    return d_ins_output_buffers_full;
}

float block_detail::pc_work_time() { return d_ins_work_time; }

float block_detail::pc_noutput_items_avg() { return d_avg_noutput_items; }

float block_detail::pc_nproduced_avg() { return d_avg_nproduced; }

float block_detail::pc_input_buffers_full_avg(size_t which)
{
    if (which < d_avg_input_buffers_full.size())
        return d_avg_input_buffers_full[which];
    else
        return 0;
}

std::vector<float> block_detail::pc_input_buffers_full_avg()
{
    return d_avg_input_buffers_full;
}

float block_detail::pc_output_buffers_full_avg(size_t which)
{
    if (which < d_avg_output_buffers_full.size())
        return d_avg_output_buffers_full[which];
    else
        return 0;
}

std::vector<float> block_detail::pc_output_buffers_full_avg()
{
    return d_avg_output_buffers_full;
}

float block_detail::pc_work_time_avg() { return d_avg_work_time; }

float block_detail::pc_noutput_items_var()
{
    return d_var_noutput_items / (d_pc_counter - 1);
}

float block_detail::pc_nproduced_var() { return d_var_nproduced / (d_pc_counter - 1); }

float block_detail::pc_input_buffers_full_var(size_t which)
{
    if (which < d_avg_input_buffers_full.size())
        return d_var_input_buffers_full[which] / (d_pc_counter - 1);
    else
        return 0;
}

std::vector<float> block_detail::pc_input_buffers_full_var()
{
    std::vector<float> var(d_avg_input_buffers_full.size(), 0);
    for (size_t i = 0; i < d_avg_input_buffers_full.size(); i++)
        var[i] = d_avg_input_buffers_full[i] / (d_pc_counter - 1);
    return var;
}

float block_detail::pc_output_buffers_full_var(size_t which)
{
    if (which < d_avg_output_buffers_full.size())
        return d_var_output_buffers_full[which] / (d_pc_counter - 1);
    else
        return 0;
}

std::vector<float> block_detail::pc_output_buffers_full_var()
{
    std::vector<float> var(d_avg_output_buffers_full.size(), 0);
    for (size_t i = 0; i < d_avg_output_buffers_full.size(); i++)
        var[i] = d_avg_output_buffers_full[i] / (d_pc_counter - 1);
    return var;
}

float block_detail::pc_work_time_var() { return d_var_work_time / (d_pc_counter - 1); }

float block_detail::pc_work_time_total() { return d_total_work_time; }

float block_detail::pc_throughput_avg() { return d_avg_throughput; }
} /* namespace gr */