/* -*- c++ -*- */
/*
 * Copyright 2020 Free Software Foundation, Inc.
 *
 * This file is part of GNU Radio
 *
 * SPDX-License-Identifier: GPL-3.0-or-later
 *
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include "udp_sink_impl.h"
#include <gnuradio/io_signature.h>
#include <boost/array.hpp>

namespace gr {
namespace network {

udp_sink::sptr udp_sink::make(size_t itemsize,
                              size_t veclen,
                              const std::string& host,
                              int port,
                              int header_type,
                              int payloadsize,
                              bool send_eof)
{
    return gnuradio::make_block_sptr<udp_sink_impl>(
        itemsize, veclen, host, port, header_type, payloadsize, send_eof);
}

/*
 * The private constructor
 */
udp_sink_impl::udp_sink_impl(size_t itemsize,
                             size_t veclen,
                             const std::string& host,
                             int port,
                             int header_type,
                             int payloadsize,
                             bool send_eof)
    : gr::sync_block("udp_sink",
                     gr::io_signature::make(1, 1, itemsize * veclen),
                     gr::io_signature::make(0, 0, 0)),
      d_itemsize(itemsize),
      d_veclen(veclen),
      d_header_type(header_type),
      d_header_size(0),
      d_seq_num(0),
      d_payloadsize(payloadsize),
      b_send_eof(send_eof)
{
    // Lets set up the max payload size for the UDP packet based on the requested
    // payload size. Some important notes:  For a standard IP/UDP packet, say
    // crossing the Internet with a standard MTU, 1472 is the max UDP payload
    // size.  Larger values can be sent, however the IP stack will fragment the
    // packet.  This can cause additional network overhead as the packet gets
    // reassembled. Now for local nets that support jumbo frames, the max payload
    // size is 8972 (9000-the UDP 28-byte header) Same rules apply with
    // fragmentation.

    d_port = port;

    d_header_size = 0;

    switch (d_header_type) {
    case HEADERTYPE_SEQNUM:
        d_header_size = sizeof(header_seq_num);
        break;

    case HEADERTYPE_SEQPLUSSIZE:
        d_header_size = sizeof(header_seq_plus_size);
        break;

    case HEADERTYPE_NONE:
        d_header_size = 0;
        break;

    default:
        GR_LOG_ERROR(d_logger, "Unknown header type.");
        throw std::invalid_argument("Unknown UDP header type.");
        break;
    }

    if (d_payloadsize < 8) {
        GR_LOG_ERROR(d_logger,
                     "Payload size is too small.  Must be at "
                     "least 8 bytes once header/trailer adjustments are made.");
        throw std::invalid_argument(
            "Payload size is too small.  Must be at "
            "least 8 bytes once header/trailer adjustments are made.");
    }

    d_seq_num = 0;

    d_block_size = d_itemsize * d_veclen;

    d_precomp_datasize = d_payloadsize - d_header_size;
    d_precomp_data_overitemsize = d_precomp_datasize / d_itemsize;

    d_localbuffer = new char[d_payloadsize];

    long max_circ_buffer;

    // Let's keep it from getting too big
    if (d_payloadsize < 2000) {
        max_circ_buffer = d_payloadsize * 4000;
    } else {
        if (d_payloadsize < 5000)
            max_circ_buffer = d_payloadsize * 2000;
        else
            max_circ_buffer = d_payloadsize * 1500;
    }

    d_localqueue = new boost::circular_buffer<char>(max_circ_buffer);

    d_udpsocket = new boost::asio::ip::udp::socket(d_io_service);

    std::string s_port = (boost::format("%d") % port).str();
    std::string s_host = host.empty() ? std::string("localhost") : host;
    boost::asio::ip::udp::resolver resolver(d_io_service);
    boost::asio::ip::udp::resolver::query query(
        s_host, s_port, boost::asio::ip::resolver_query_base::passive);

    boost::system::error_code err;
    d_endpoint = *resolver.resolve(query, err);

    if (err) {
        throw std::runtime_error(std::string("[UDP Sink] Unable to resolve host/IP: ") +
                                 err.message());
    }

    if (host.find(":") != std::string::npos)
        is_ipv6 = true;
    else {
        // This block supports a check that a name rather than an IP is provided.
        // the endpoint is then checked after the resolver is done.
        if (d_endpoint.address().is_v6())
            is_ipv6 = true;
        else
            is_ipv6 = false;
    }

    if (is_ipv6) {
        d_udpsocket->open(boost::asio::ip::udp::v6());
    } else {
        d_udpsocket->open(boost::asio::ip::udp::v4());
    }

    int out_multiple = (d_payloadsize - d_header_size) / d_block_size;

    if (out_multiple == 1)
        out_multiple = 2; // Ensure we get pairs, for instance complex -> ichar pairs

    gr::block::set_output_multiple(out_multiple);
}

/*
 * Our virtual destructor.
 */
udp_sink_impl::~udp_sink_impl() { stop(); }

bool udp_sink_impl::stop()
{
    if (d_udpsocket) {
        gr::thread::scoped_lock guard(d_setlock);

        if (b_send_eof) {
            // Send a few zero-length packets to signal receiver we are done
            boost::array<char, 0> send_buf;
            for (int i = 0; i < 3; i++)
                d_udpsocket->send_to(boost::asio::buffer(send_buf), d_endpoint);
        }

        d_udpsocket->close();
        d_udpsocket = NULL;

        d_io_service.reset();
        d_io_service.stop();
    }

    if (d_localbuffer) {
        delete[] d_localbuffer;
        d_localbuffer = NULL;
    }

    if (d_localqueue) {
        delete d_localqueue;
        d_localqueue = NULL;
    }

    return true;
}

void udp_sink_impl::build_header()
{
    switch (d_header_type) {
    case HEADERTYPE_SEQNUM: {
        d_seq_num++;
        header_seq_num seq_header;
        seq_header.seqnum = d_seq_num;
        memcpy((void*)d_tmpheaderbuff, (void*)&seq_header, d_header_size);
    } break;

    case HEADERTYPE_SEQPLUSSIZE: {
        d_seq_num++;
        header_seq_plus_size seq_header_plus_size;
        seq_header_plus_size.seqnum = d_seq_num;
        seq_header_plus_size.length = d_payloadsize;
        memcpy((void*)d_tmpheaderbuff, (void*)&seq_header_plus_size, d_header_size);
    } break;
    }
}

int udp_sink_impl::work(int noutput_items,
                        gr_vector_const_void_star& input_items,
                        gr_vector_void_star& output_items)
{
    gr::thread::scoped_lock guard(d_setlock);

    long num_bytes_to_transmit = noutput_items * d_block_size;
    const char* in = (const char*)input_items[0];

    // Build a long local queue to pull from so we can break it up easier
    for (int i = 0; i < num_bytes_to_transmit; i++) {
        d_localqueue->push_back(in[i]);
    }

    // Local boost buffer for transmitting
    std::vector<boost::asio::const_buffer> transmitbuffer;

    // Let's see how many blocks are in the buffer
    int bytes_available = d_localqueue->size();
    long blocks_available = bytes_available / d_precomp_datasize;

    for (int cur_block = 0; cur_block < blocks_available; cur_block++) {
        // Clear the next transmit buffer
        transmitbuffer.clear();

        // build our next header if we need it
        if (d_header_type != HEADERTYPE_NONE) {
            build_header();

            transmitbuffer.push_back(
                boost::asio::buffer((const void*)d_tmpheaderbuff, d_header_size));
        }

        // Fill the data buffer
        for (int i = 0; i < d_precomp_datasize; i++) {
            d_localbuffer[i] = d_localqueue->at(0);
            d_localqueue->pop_front();
        }

        // Set up for transmit
        transmitbuffer.push_back(
            boost::asio::buffer((const void*)d_localbuffer, d_precomp_datasize));

        // Send
        d_udpsocket->send_to(transmitbuffer, d_endpoint);
    }

    int itemsreturned = blocks_available * d_precomp_data_overitemsize;

    return itemsreturned;
}

} /* namespace network */
} /* namespace gr */