/* -*- c++ -*- */
/*
 * Copyright 2020 Free Software Foundation, Inc.
 *
 * This file is part of GNU Radio
 *
 * SPDX-License-Identifier: GPL-3.0-or-later
 *
 */

#ifdef HAVE_CONFIG_H
#include "config.h"
#endif

#include "tcp_sink_impl.h"
#include <gnuradio/io_signature.h>

#include <sstream>

namespace gr {
namespace network {

tcp_sink::sptr tcp_sink::make(
    size_t itemsize, size_t veclen, const std::string& host, int port, int sinkmode)
{
    return gnuradio::make_block_sptr<tcp_sink_impl>(
        itemsize, veclen, host, port, sinkmode);
}

/*
 * The private constructor
 */
tcp_sink_impl::tcp_sink_impl(
    size_t itemsize, size_t veclen, const std::string& host, int port, int sinkmode)
    : gr::sync_block("tcp_sink",
                     gr::io_signature::make(1, 1, itemsize * veclen),
                     gr::io_signature::make(0, 0, 0)),
      d_itemsize(itemsize),
      d_veclen(veclen),
      d_host(host),
      d_port(port),
      d_sinkmode(sinkmode),
      d_thread_running(false),
      d_stop_thread(false),
      d_listener_thread(NULL),
      d_start_new_listener(false),
      d_initial_connection(true)
{
    d_block_size = d_itemsize * d_veclen;

    if (d_sinkmode == TCPSINKMODE_CLIENT) {
        // In this mode, we're connecting to a remote TCP service listener
        // as a client.
        std::stringstream msg;

        msg << "[TCP Sink] connecting to " << host << " on port " << port;
        GR_LOG_INFO(d_logger, msg.str());

        boost::system::error_code err;
        d_tcpsocket = new boost::asio::ip::tcp::socket(d_io_service);

        std::string s_port = (boost::format("%d") % port).str();
        boost::asio::ip::tcp::resolver resolver(d_io_service);
        boost::asio::ip::tcp::resolver::query query(
            d_host, s_port, boost::asio::ip::resolver_query_base::passive);

        d_endpoint = *resolver.resolve(query, err);

        if (err) {
            throw std::runtime_error(
                std::string("[TCP Sink] Unable to resolve host/IP: ") + err.message());
        }

        if (d_host.find(":") != std::string::npos)
            d_is_ipv6 = true;
        else {
            // This block supports a check that a name rather than an IP is provided.
            // the endpoint is then checked after the resolver is done.
            if (d_endpoint.address().is_v6())
                d_is_ipv6 = true;
            else
                d_is_ipv6 = false;
        }

        d_tcpsocket->connect(d_endpoint, err);
        if (err) {
            throw std::runtime_error(std::string("[TCP Sink] Connection error: ") +
                                     err.message());
        }

        d_connected = true;

        boost::asio::socket_base::keep_alive option(true);
        d_tcpsocket->set_option(option);
    } else {
        // In this mode, we're starting a local port listener and waiting
        // for inbound connections.
        d_start_new_listener = true;
        d_listener_thread =
            new boost::thread(boost::bind(&tcp_sink_impl::run_listener, this));
    }
}

void tcp_sink_impl::run_listener()
{
    d_thread_running = true;

    while (!d_stop_thread) {
        // this will block
        if (d_start_new_listener) {
            d_start_new_listener = false;
            connect(d_initial_connection);
            d_initial_connection = false;
        } else
            usleep(10);
    }

    d_thread_running = false;
}

void tcp_sink_impl::accept_handler(boost::asio::ip::tcp::socket* new_connection,
                                   const boost::system::error_code& error)
{
    if (!error) {
        GR_LOG_INFO(d_logger, "Client connection received.");

        // Accept succeeded.
        d_tcpsocket = new_connection;

        boost::asio::socket_base::keep_alive option(true);
        d_tcpsocket->set_option(option);
        d_connected = true;

    } else {
        std::stringstream msg;
        msg << "Error code " << error << " accepting TCP session.";
        GR_LOG_ERROR(d_logger, msg.str());

        // Boost made a copy so we have to clean up
        delete new_connection;

        // safety settings.
        d_connected = false;
        d_tcpsocket = NULL;
    }
}

void tcp_sink_impl::connect(bool initial_connection)
{
    std::stringstream msg;
    msg << "Waiting for connection on port " << d_port;
    GR_LOG_INFO(d_logger, msg.str());

    if (initial_connection) {
        if (d_is_ipv6)
            d_acceptor = new boost::asio::ip::tcp::acceptor(
                d_io_service,
                boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v6(), d_port));
        else
            d_acceptor = new boost::asio::ip::tcp::acceptor(
                d_io_service,
                boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), d_port));
    } else {
        d_io_service.reset();
    }

    if (d_tcpsocket) {
        delete d_tcpsocket;
    }
    d_tcpsocket = NULL;
    d_connected = false;

    boost::asio::ip::tcp::socket* tmpSocket =
        new boost::asio::ip::tcp::socket(d_io_service);
    d_acceptor->async_accept(*tmpSocket,
                             boost::bind(&tcp_sink_impl::accept_handler,
                                         this,
                                         tmpSocket,
                                         boost::asio::placeholders::error));

    d_io_service.run();
}

/*
 * Our virtual destructor.
 */
tcp_sink_impl::~tcp_sink_impl() { stop(); }

bool tcp_sink_impl::stop()
{
    if (d_thread_running) {
        d_stop_thread = true;
    }

    if (d_tcpsocket) {
        d_tcpsocket->close();
        delete d_tcpsocket;
        d_tcpsocket = NULL;
    }

    d_io_service.reset();
    d_io_service.stop();

    if (d_acceptor) {
        delete d_acceptor;
        d_acceptor = NULL;
    }

    if (d_listener_thread) {
        while (d_thread_running)
            usleep(5);

        delete d_listener_thread;
        d_listener_thread = NULL;
    }

    return true;
}

int tcp_sink_impl::work(int noutput_items,
                        gr_vector_const_void_star& input_items,
                        gr_vector_void_star& output_items)
{
    gr::thread::scoped_lock guard(d_setlock);

    if (!d_connected)
        return noutput_items;

    unsigned int noi = noutput_items * d_block_size;
    int bytes_written;
    int bytes_remaining = noi;

    ec.clear();

    char* p_buff;
    p_buff = (char*)input_items[0];

    while ((bytes_remaining > 0) && (!ec)) {
        bytes_written = boost::asio::write(
            *d_tcpsocket, boost::asio::buffer((const void*)p_buff, bytes_remaining), ec);
        bytes_remaining -= bytes_written;
        p_buff += bytes_written;

        if (ec == boost::asio::error::connection_reset ||
            ec == boost::asio::error::broken_pipe) {

            // Connection was reset
            d_connected = false;
            bytes_remaining = 0;

            if (d_sinkmode == TCPSINKMODE_CLIENT) {
                GR_LOG_WARN(d_logger,
                            "Server closed the connection.  Stopping processing.");

                return WORK_DONE;
            } else {
                GR_LOG_INFO(d_logger, "Client disconnected. Waiting for new connection.");

                // start waiting for another connection
                d_start_new_listener = true;
            }
        }
    }

    return noutput_items;
}
} /* namespace network */
} /* namespace gr */