diff options
Diffstat (limited to 'gr-network/lib/tcp_sink_impl.cc')
-rw-r--r-- | gr-network/lib/tcp_sink_impl.cc | 292 |
1 files changed, 292 insertions, 0 deletions
diff --git a/gr-network/lib/tcp_sink_impl.cc b/gr-network/lib/tcp_sink_impl.cc new file mode 100644 index 0000000000..aae7c429aa --- /dev/null +++ b/gr-network/lib/tcp_sink_impl.cc @@ -0,0 +1,292 @@ +/* -*- c++ -*- */ +/* + * Copyright 2020 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * SPDX-License-Identifier: GPL-3.0-or-later + * + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include "tcp_sink_impl.h" +#include <gnuradio/io_signature.h> + +#include <sstream> + +namespace gr { +namespace network { + +tcp_sink::sptr tcp_sink::make( + size_t itemsize, size_t veclen, const std::string& host, int port, int sinkmode) +{ + return gnuradio::get_initial_sptr( + new tcp_sink_impl(itemsize, veclen, host, port, sinkmode)); +} + +/* + * The private constructor + */ +tcp_sink_impl::tcp_sink_impl( + size_t itemsize, size_t veclen, const std::string& host, int port, int sinkmode) + : gr::sync_block("tcp_sink", + gr::io_signature::make(1, 1, itemsize * veclen), + gr::io_signature::make(0, 0, 0)), + d_itemsize(itemsize), + d_veclen(veclen), + d_host(host), + d_port(port), + d_sinkmode(sinkmode), + d_thread_running(false), + d_stop_thread(false), + d_listener_thread(NULL), + d_start_new_listener(false), + d_initial_connection(true) +{ + d_block_size = d_itemsize * d_veclen; + + if (d_sinkmode == TCPSINKMODE_CLIENT) { + // In this mode, we're connecting to a remote TCP service listener + // as a client. + std::stringstream msg; + + msg << "[TCP Sink] connecting to " << host << " on port " << port; + GR_LOG_INFO(d_logger, msg.str()); + + boost::system::error_code err; + d_tcpsocket = new boost::asio::ip::tcp::socket(d_io_service); + + std::string s_port = (boost::format("%d") % port).str(); + boost::asio::ip::tcp::resolver resolver(d_io_service); + boost::asio::ip::tcp::resolver::query query( + d_host, s_port, boost::asio::ip::resolver_query_base::passive); + + d_endpoint = *resolver.resolve(query, err); + + if (err) { + throw std::runtime_error( + std::string("[TCP Sink] Unable to resolve host/IP: ") + err.message()); + } + + if (d_host.find(":") != std::string::npos) + d_is_ipv6 = true; + else { + // This block supports a check that a name rather than an IP is provided. + // the endpoint is then checked after the resolver is done. + if (d_endpoint.address().is_v6()) + d_is_ipv6 = true; + else + d_is_ipv6 = false; + } + + d_tcpsocket->connect(d_endpoint, err); + if (err) { + throw std::runtime_error(std::string("[TCP Sink] Connection error: ") + + err.message()); + } + + d_connected = true; + + boost::asio::socket_base::keep_alive option(true); + d_tcpsocket->set_option(option); + } else { + // In this mode, we're starting a local port listener and waiting + // for inbound connections. + d_start_new_listener = true; + d_listener_thread = + new boost::thread(boost::bind(&tcp_sink_impl::run_listener, this)); + } +} + +void tcp_sink_impl::run_listener() +{ + d_thread_running = true; + + while (!d_stop_thread) { + // this will block + if (d_start_new_listener) { + d_start_new_listener = false; + connect(d_initial_connection); + d_initial_connection = false; + } else + usleep(10); + } + + d_thread_running = false; +} + +void tcp_sink_impl::accept_handler(boost::asio::ip::tcp::socket* new_connection, + const boost::system::error_code& error) +{ + if (!error) { + GR_LOG_INFO(d_logger, "Client connection received."); + + // Accept succeeded. + d_tcpsocket = new_connection; + + boost::asio::socket_base::keep_alive option(true); + d_tcpsocket->set_option(option); + d_connected = true; + + } else { + std::stringstream msg; + msg << "Error code " << error << " accepting TCP session."; + GR_LOG_ERROR(d_logger, msg.str()); + + // Boost made a copy so we have to clean up + delete new_connection; + + // safety settings. + d_connected = false; + d_tcpsocket = NULL; + } +} + +void tcp_sink_impl::connect(bool initial_connection) +{ + std::stringstream msg; + msg << "Waiting for connection on port " << d_port; + GR_LOG_INFO(d_logger, msg.str()); + + if (initial_connection) { + if (d_is_ipv6) + d_acceptor = new boost::asio::ip::tcp::acceptor( + d_io_service, + boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v6(), d_port)); + else + d_acceptor = new boost::asio::ip::tcp::acceptor( + d_io_service, + boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), d_port)); + } else { + d_io_service.reset(); + } + + if (d_tcpsocket) { + delete d_tcpsocket; + } + d_tcpsocket = NULL; + d_connected = false; + + boost::asio::ip::tcp::socket* tmpSocket = + new boost::asio::ip::tcp::socket(d_io_service); + d_acceptor->async_accept(*tmpSocket, + boost::bind(&tcp_sink_impl::accept_handler, + this, + tmpSocket, + boost::asio::placeholders::error)); + + d_io_service.run(); +} + +/* + * Our virtual destructor. + */ +tcp_sink_impl::~tcp_sink_impl() { stop(); } + +bool tcp_sink_impl::stop() +{ + if (d_thread_running) { + d_stop_thread = true; + } + + if (d_tcpsocket) { + d_tcpsocket->close(); + delete d_tcpsocket; + d_tcpsocket = NULL; + } + + d_io_service.reset(); + d_io_service.stop(); + + if (d_acceptor) { + delete d_acceptor; + d_acceptor = NULL; + } + + if (d_listener_thread) { + while (d_thread_running) + usleep(5); + + delete d_listener_thread; + d_listener_thread = NULL; + } + + return true; +} + +void tcp_sink_impl::check_for_disconnect() +{ + char buff[1]; + int bytes_read = + d_tcpsocket->receive(boost::asio::buffer(buff), d_tcpsocket->message_peek, ec); + if ((boost::asio::error::eof == ec) || (boost::asio::error::connection_reset == ec)) { + std::stringstream msg; + msg << "Disconnect detected on " << d_host << ":" << d_port << "."; + GR_LOG_INFO(d_logger, msg.str()); + + d_tcpsocket->close(); + delete d_tcpsocket; + d_tcpsocket = NULL; + + exit(1); + } else { + if (ec) { + std::stringstream msg; + msg << "Socket error " << ec << " detected."; + GR_LOG_ERROR(d_logger, msg.str()); + } + } +} + +int tcp_sink_impl::work(int noutput_items, + gr_vector_const_void_star& input_items, + gr_vector_void_star& output_items) +{ + gr::thread::scoped_lock guard(d_setlock); + + if (!d_connected) + return noutput_items; + + unsigned int noi = noutput_items * d_block_size; + int bytes_written; + int bytes_remaining = noi; + + ec.clear(); + + char* p_buff; + p_buff = (char*)input_items[0]; + + while ((bytes_remaining > 0) && (!ec)) { + bytes_written = boost::asio::write( + *d_tcpsocket, boost::asio::buffer((const void*)p_buff, bytes_remaining), ec); + bytes_remaining -= bytes_written; + p_buff += bytes_written; + + if (ec == boost::asio::error::connection_reset || + ec == boost::asio::error::broken_pipe) { + + // Connection was reset + d_connected = false; + bytes_remaining = 0; + + if (d_sinkmode == TCPSINKMODE_CLIENT) { + GR_LOG_WARN(d_logger, + "Server closed the connection. Stopping processing."); + + return WORK_DONE; + } else { + GR_LOG_INFO(d_logger, "Client disconnected. Waiting for new connection."); + + // start waiting for another connection + d_start_new_listener = true; + } + } + } + + return noutput_items; +} +} /* namespace network */ +} /* namespace gr */ |