/* -*- c++ -*- */
/*
 * Copyright 2004,2009-2011,2013 Free Software Foundation, Inc.
 *
 * This file is part of GNU Radio
 *
 * SPDX-License-Identifier: GPL-3.0-or-later
 *
 */

#ifndef INCLUDED_GR_RUNTIME_BUFFER_H
#define INCLUDED_GR_RUNTIME_BUFFER_H

#include <gnuradio/api.h>
#include <gnuradio/custom_lock.h>
#include <gnuradio/logger.h>
#include <gnuradio/runtime_types.h>
#include <gnuradio/tags.h>
#include <gnuradio/thread/thread.h>
#include <gnuradio/transfer_type.h>

#include <functional>
#include <iostream>
#include <map>
#include <memory>


namespace gr {

class vmcircbuf;
class buffer_reader;
class buffer_reader_sm;

enum class buffer_mapping_type { double_mapped, single_mapped };

typedef std::function<void*(void*, const void*, std::size_t)> mem_func_t;

/*!
 * \brief Allocate a buffer that holds at least \p nitems of size \p sizeof_item.
 *
 * The total size of the buffer will be rounded up to a system
 * dependent boundary. This is typically the system page size, but
 * under MS windows is 64KB.
 *
 * \param nitems is the minimum number of items the buffer will hold.
 * \param sizeof_item is the size of an item in bytes.
 * \param downstream_lcm_nitems is the least common multiple of the items to
 *                              read by downstream block(s)
 * \param downstream_max_out_mult is the maximum output multiple of all
 *                                downstream blocks
 * \param link is the block that writes to this buffer.
 * \param buf_owner is the block that owns the buffer which may or may not
 *                     be the same as the block that writes to this buffer
 */
GR_RUNTIME_API buffer_sptr make_buffer(int nitems,
                                       size_t sizeof_item,
                                       uint64_t downstream_lcm_nitems,
                                       uint32_t downstream_max_out_mult,
                                       block_sptr link = block_sptr(),
                                       block_sptr buf_owner = block_sptr());

/*!
 * \brief Single writer, multiple reader fifo.
 * \ingroup internal
 */
class GR_RUNTIME_API buffer : public custom_lock_if
{
public:
    gr::logger_ptr d_logger;
    gr::logger_ptr d_debug_logger;

    ~buffer() override;

    /*!
     * \brief return the buffer's mapping type
     */
    buffer_mapping_type get_mapping_type() { return d_buf_map_type; }

    /*!
     * \brief return number of items worth of space available for writing
     */
    virtual int space_available() = 0;

    /*!
     * \brief return size of this buffer in items
     */
    unsigned int bufsize() const { return d_bufsize; }

    /*!
     * \brief return the base address of the buffer
     */
    const char* base() const { return static_cast<const char*>(d_base); }

    /*!
     * \brief return pointer to write buffer.
     *
     * The return value points at space that can hold at least
     * space_available() items.
     */
    virtual void* write_pointer();

    /*!
     * \brief return pointer to read buffer.
     *
     * The return value points to at least items_available() items.
     */
    virtual const void* _read_pointer(unsigned int read_index);

    /*!
     * \brief tell buffer that we wrote \p nitems into it
     */
    void update_write_pointer(int nitems);

    void set_done(bool done);
    bool done() const { return d_done; }

    /*!
     * \brief Return the block that writes to this buffer.
     */
    block_sptr link() { return block_sptr(d_link); }

    size_t nreaders() const { return d_readers.size(); }
    buffer_reader* reader(size_t index) { return d_readers[index]; }

    gr::thread::mutex* mutex() { return &d_mutex; }

    uint64_t nitems_written() { return d_abs_write_offset; }

    void reset_nitem_counter()
    {
        d_write_index = 0;
        d_abs_write_offset = 0;
    }

    size_t get_sizeof_item() { return d_sizeof_item; }

    uint64_t get_downstream_lcm_nitems() { return d_downstream_lcm_nitems; }

    uint32_t get_max_reader_output_multiple() { return d_max_reader_output_multiple; }

    virtual void update_reader_block_history(unsigned history, [[maybe_unused]] int delay)
    {
        d_max_reader_history = std::max(d_max_reader_history, history);
        d_has_history = (d_max_reader_history > 1);
    }

    /*!
     * \brief  Adds a new tag to the buffer.
     *
     * \param tag        the new tag
     */
    void add_item_tag(const tag_t& tag);

    /*!
     * \brief  Removes an existing tag from the buffer.
     *
     * If no such tag is found, does nothing.
     * Note: Doesn't actually physically delete the tag, but
     * marks it as deleted. For the user, this has the same effect:
     * Any subsequent calls to get_tags_in_range() will not return
     * the tag.
     *
     * \param tag        the tag that needs to be removed
     * \param id         the unique ID of the block calling this function
     */
    void remove_item_tag(const tag_t& tag, long id);

    /*!
     * \brief  Removes all tags before \p max_time from buffer
     *
     * \param max_time        the time (item number) to trim up until.
     */
    void prune_tags(uint64_t max_time);

    std::multimap<uint64_t, tag_t>::iterator get_tags_begin()
    {
        return d_item_tags.begin();
    }
    std::multimap<uint64_t, tag_t>::iterator get_tags_end() { return d_item_tags.end(); }
    std::multimap<uint64_t, tag_t>::iterator get_tags_lower_bound(uint64_t x)
    {
        return d_item_tags.lower_bound(x);
    }
    std::multimap<uint64_t, tag_t>::iterator get_tags_upper_bound(uint64_t x)
    {
        return d_item_tags.upper_bound(x);
    }

    /*!
     * \brief Function to be executed after this object's owner completes the
     * call to general_work()
     */
    virtual void post_work(int nitems) = 0;

    /*!
     * \brief Returns true when the current thread is ready to call the callback,
     * false otherwise. Note if input_blocked_callback is overridden then this
     * function should also be overridden.
     */
    virtual bool input_blkd_cb_ready([[maybe_unused]] int items_required,
                                     [[maybe_unused]] unsigned read_index)
    {
        return false;
    }

    /*!
     * \brief Callback function that the scheduler will call when it determines
     * that the input is blocked. Override this function if needed.
     */
    virtual bool input_blocked_callback([[maybe_unused]] int items_required,
                                        [[maybe_unused]] int items_avail,
                                        [[maybe_unused]] unsigned read_index)
    {
        return false;
    }

    /*!
     * \brief Returns true if the current thread is ready to execute
     * output_blocked_callback(), false otherwise. Note if the default
     * output_blocked_callback is overridden this function should also be
     * overridden.
     */
    virtual bool output_blkd_cb_ready([[maybe_unused]] int output_multiple)
    {
        return false;
    }

    /*!
     * \brief Callback function that the scheduler will call when it determines
     * that the output is blocked. Override this function if needed.
     */
    virtual bool output_blocked_callback([[maybe_unused]] int output_multiple,
                                         [[maybe_unused]] bool force = false)
    {
        return false;
    }

    /*!
     * \brief Increment the number of active pointers for this buffer.
     */
    inline void increment_active()
    {
        gr::thread::scoped_lock lock(d_mutex);

        d_cv.wait(lock, [this]() { return d_callback_flag == false; });
        ++d_active_pointer_counter;
    }

    /*!
     * \brief Decrement the number of active pointers for this buffer and signal
     * anyone waiting when the count reaches zero.
     */
    inline void decrement_active()
    {
        gr::thread::scoped_lock lock(d_mutex);

        if (--d_active_pointer_counter == 0)
            d_cv.notify_all();
    }

    /*!
     * \brief "on_lock" function from the custom_lock_if.
     */
    void on_lock(gr::thread::scoped_lock& lock) override;

    /*!
     * \brief "on_unlock" function from the custom_lock_if.
     */
    void on_unlock() override;

    friend std::ostream& operator<<(std::ostream& os, const buffer& buf);

    // -------------------------------------------------------------------------

    /*!
     * \brief Assign buffer's transfer_type
     */
    void set_transfer_type(const transfer_type& type);

private:
    friend class buffer_reader;
    friend class buffer_reader_sm;

    friend GR_RUNTIME_API buffer_sptr make_buffer(int nitems,
                                                  size_t sizeof_item,
                                                  uint64_t downstream_lcm_nitems,
                                                  block_sptr link);
    friend GR_RUNTIME_API buffer_reader_sptr buffer_add_reader(buffer_sptr buf,
                                                               int nzero_preload,
                                                               block_sptr link,
                                                               int delay);

protected:
    char* d_base;           // base address of buffer inside d_vmcircbuf.
    unsigned int d_bufsize; // in items
    buffer_mapping_type d_buf_map_type;

    // Keep track of maximum sample delay of any reader; Only prune tags past this.
    unsigned d_max_reader_delay;

    // Keep track of the maximum sample history requirements of all blocks that
    // consume from this buffer
    unsigned d_max_reader_history;

    // Indicates if d_max_reader_history > 1
    bool d_has_history;

    size_t d_sizeof_item; // in bytes
    std::vector<buffer_reader*> d_readers;
    std::weak_ptr<block> d_link; // block that writes to this buffer

    //
    // The mutex protects d_write_index, d_abs_write_offset, d_done, d_item_tags
    // and the d_read_index's and d_abs_read_offset's in the buffer readers.
    // Also d_callback_flag and d_active_pointer_counter.
    //
    gr::thread::mutex d_mutex;
    unsigned int d_write_index;  // in items [0,d_bufsize)
    uint64_t d_abs_write_offset; // num items written since the start
    bool d_done;
    std::multimap<uint64_t, tag_t> d_item_tags;
    uint64_t d_last_min_items_read;
    //
    gr::thread::condition_variable d_cv;
    bool d_callback_flag;
    uint32_t d_active_pointer_counter;

    uint64_t d_downstream_lcm_nitems;
    uint64_t d_write_multiple;
    uint32_t d_max_reader_output_multiple;

    transfer_type d_transfer_type;

    /*!
     * \brief  Increment read or write index for this buffer
     */
    virtual unsigned index_add(unsigned a, unsigned b) = 0;

    /*!
     * \brief  Decrement read or write index for this buffer
     */
    virtual unsigned index_sub(unsigned a, unsigned b) = 0;

    virtual bool allocate_buffer([[maybe_unused]] int nitems) { return false; };

    /*!
     * \brief constructor is private.  Use gr_make_buffer to create instances.
     *
     * Allocate a buffer that holds at least \p nitems of size \p sizeof_item.
     *
     * \param buftype is an enum type that describes the buffer TODO: fix me
     * \param nitems is the minimum number of items the buffer will hold.
     * \param sizeof_item is the size of an item in bytes.
     * \param downstream_lcm_nitems is the least common multiple of the items to
     *                              read by downstream block(s)
     * \param downstream_max_out_mult is the maximum output multiple of all
     *                                downstream blocks
     * \param link is the block that writes to this buffer.
     *
     * The total size of the buffer will be rounded up to a system
     * dependent boundary.  This is typically the system page size, but
     * under MS windows is 64KB.
     */
    buffer(buffer_mapping_type buftype,
           int nitems,
           size_t sizeof_item,
           uint64_t downstream_lcm_nitems,
           uint32_t downstream_max_out_mult,
           block_sptr link);

    /*!
     * \brief disassociate \p reader from this buffer
     */
    void drop_reader(buffer_reader* reader);
};

//! returns # of buffers currently allocated
GR_RUNTIME_API long buffer_ncurrently_allocated();

} /* namespace gr */

#endif /* INCLUDED_GR_RUNTIME_BUFFER_H */