summaryrefslogtreecommitdiff
path: root/gr-blocks/lib/socket_pdu_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'gr-blocks/lib/socket_pdu_impl.cc')
-rw-r--r--gr-blocks/lib/socket_pdu_impl.cc271
1 files changed, 0 insertions, 271 deletions
diff --git a/gr-blocks/lib/socket_pdu_impl.cc b/gr-blocks/lib/socket_pdu_impl.cc
deleted file mode 100644
index f30e944d7b..0000000000
--- a/gr-blocks/lib/socket_pdu_impl.cc
+++ /dev/null
@@ -1,271 +0,0 @@
-/* -*- c++ -*- */
-/*
- * Copyright 2013,2019 Free Software Foundation, Inc.
- *
- * This file is part of GNU Radio
- *
- * SPDX-License-Identifier: GPL-3.0-or-later
- *
- */
-
-#ifdef HAVE_CONFIG_H
-#include "config.h"
-#endif
-
-#include "socket_pdu_impl.h"
-#include "tcp_connection.h"
-#include <gnuradio/blocks/pdu.h>
-#include <gnuradio/io_signature.h>
-
-namespace gr {
-namespace blocks {
-
-socket_pdu::sptr socket_pdu::make(std::string type,
- std::string addr,
- std::string port,
- int MTU /*= 10000*/,
- bool tcp_no_delay /*= false*/)
-{
- return gnuradio::make_block_sptr<socket_pdu_impl>(
- type, addr, port, MTU, tcp_no_delay);
-}
-
-socket_pdu_impl::socket_pdu_impl(std::string type,
- std::string addr,
- std::string port,
- int MTU /*= 10000*/,
- bool tcp_no_delay /*= false*/)
- : block("socket_pdu", io_signature::make(0, 0, 0), io_signature::make(0, 0, 0)),
- d_tcp_no_delay(tcp_no_delay)
-{
- d_rxbuf.resize(MTU);
-
- message_port_register_in(pdu::pdu_port_id());
- message_port_register_out(pdu::pdu_port_id());
-
- if ((type == "TCP_SERVER") &&
- ((addr.empty()) || (addr == "0.0.0.0"))) { // Bind on all interfaces
- int port_num = atoi(port.c_str());
- if (port_num == 0)
- throw std::invalid_argument(
- "gr::blocks:socket_pdu: invalid port for TCP_SERVER");
- d_tcp_endpoint =
- boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port_num);
- } else if ((type == "TCP_SERVER") || (type == "TCP_CLIENT")) {
- boost::asio::ip::tcp::resolver resolver(d_io_service);
- boost::asio::ip::tcp::resolver::query query(
- boost::asio::ip::tcp::v4(),
- addr,
- port,
- boost::asio::ip::resolver_query_base::passive);
- d_tcp_endpoint = *resolver.resolve(query);
- } else if ((type == "UDP_SERVER") &&
- ((addr.empty()) || (addr == "0.0.0.0"))) { // Bind on all interfaces
- int port_num = atoi(port.c_str());
- if (port_num == 0)
- throw std::invalid_argument(
- "gr::blocks:socket_pdu: invalid port for UDP_SERVER");
- d_udp_endpoint =
- boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port_num);
- } else if ((type == "UDP_SERVER") || (type == "UDP_CLIENT")) {
- boost::asio::ip::udp::resolver resolver(d_io_service);
- boost::asio::ip::udp::resolver::query query(
- boost::asio::ip::udp::v4(),
- addr,
- port,
- boost::asio::ip::resolver_query_base::passive);
-
- if (type == "UDP_SERVER")
- d_udp_endpoint = *resolver.resolve(query);
- else
- d_udp_endpoint_other = *resolver.resolve(query);
- }
-
- if (type == "TCP_SERVER") {
- d_acceptor_tcp = std::make_shared<boost::asio::ip::tcp::acceptor>(d_io_service,
- d_tcp_endpoint);
- d_acceptor_tcp->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
-
- start_tcp_accept();
-
- set_msg_handler(pdu::pdu_port_id(),
- [this](pmt::pmt_t msg) { this->tcp_server_send(msg); });
- } else if (type == "TCP_CLIENT") {
- boost::system::error_code error = boost::asio::error::host_not_found;
- d_tcp_socket = std::make_shared<boost::asio::ip::tcp::socket>(d_io_service);
- d_tcp_socket->connect(d_tcp_endpoint, error);
- if (error)
- throw boost::system::system_error(error);
- d_tcp_socket->set_option(boost::asio::ip::tcp::no_delay(d_tcp_no_delay));
-
- set_msg_handler(pdu::pdu_port_id(),
- [this](pmt::pmt_t msg) { this->tcp_client_send(msg); });
-
- d_tcp_socket->async_read_some(
- boost::asio::buffer(d_rxbuf),
- boost::bind(&socket_pdu_impl::handle_tcp_read,
- this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
- } else if (type == "UDP_SERVER") {
- d_udp_socket =
- std::make_shared<boost::asio::ip::udp::socket>(d_io_service, d_udp_endpoint);
- d_udp_socket->async_receive_from(
- boost::asio::buffer(d_rxbuf),
- d_udp_endpoint_other,
- boost::bind(&socket_pdu_impl::handle_udp_read,
- this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
-
- set_msg_handler(pdu::pdu_port_id(),
- [this](pmt::pmt_t msg) { this->udp_send(msg); });
- } else if (type == "UDP_CLIENT") {
- d_udp_socket =
- std::make_shared<boost::asio::ip::udp::socket>(d_io_service, d_udp_endpoint);
- d_udp_socket->async_receive_from(
- boost::asio::buffer(d_rxbuf),
- d_udp_endpoint_other,
- boost::bind(&socket_pdu_impl::handle_udp_read,
- this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
-
- set_msg_handler(pdu::pdu_port_id(),
- [this](pmt::pmt_t msg) { this->udp_send(msg); });
- } else
- throw std::runtime_error("gr::blocks:socket_pdu: unknown socket type");
-
- d_thread = gr::thread::thread(boost::bind(&socket_pdu_impl::run_io_service, this));
- d_started = true;
-}
-
-socket_pdu_impl::~socket_pdu_impl() { stop(); }
-
-bool socket_pdu_impl::stop()
-{
- if (d_started) {
- d_io_service.stop();
- d_thread.interrupt();
- d_thread.join();
- }
- d_started = false;
- return true;
-}
-
-void socket_pdu_impl::handle_tcp_read(const boost::system::error_code& error,
- size_t bytes_transferred)
-{
- if (!error) {
- pmt::pmt_t vector =
- pmt::init_u8vector(bytes_transferred, (const uint8_t*)&d_rxbuf[0]);
- pmt::pmt_t pdu = pmt::cons(pmt::PMT_NIL, vector);
- message_port_pub(pdu::pdu_port_id(), pdu);
-
- d_tcp_socket->async_read_some(
- boost::asio::buffer(d_rxbuf),
- boost::bind(&socket_pdu_impl::handle_tcp_read,
- this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
- } else
- throw boost::system::system_error(error);
-}
-
-void socket_pdu_impl::start_tcp_accept()
-{
-#if (BOOST_VERSION >= 107000)
- tcp_connection::sptr new_connection =
- tcp_connection::make(d_io_service, d_rxbuf.size(), d_tcp_no_delay);
-#else
- tcp_connection::sptr new_connection = tcp_connection::make(
- d_acceptor_tcp->get_io_service(), d_rxbuf.size(), d_tcp_no_delay);
-#endif
-
- d_acceptor_tcp->async_accept(new_connection->socket(),
- boost::bind(&socket_pdu_impl::handle_tcp_accept,
- this,
- new_connection,
- boost::asio::placeholders::error));
-}
-
-void socket_pdu_impl::tcp_server_send(pmt::pmt_t msg)
-{
- pmt::pmt_t vector = pmt::cdr(msg);
- for (size_t i = 0; i < d_tcp_connections.size(); i++)
- d_tcp_connections[i]->send(vector);
-}
-
-void socket_pdu_impl::handle_tcp_accept(tcp_connection::sptr new_connection,
- const boost::system::error_code& error)
-{
- if (!error) {
- // Garbage collect closed sockets
- std::vector<tcp_connection::sptr>::iterator it = d_tcp_connections.begin();
- while (it != d_tcp_connections.end()) {
- if (!(**it).socket().is_open())
- it = d_tcp_connections.erase(it);
- else
- ++it;
- }
-
- new_connection->start(this);
- d_tcp_connections.push_back(new_connection);
- start_tcp_accept();
- } else
- std::cout << error << std::endl;
-}
-
-void socket_pdu_impl::tcp_client_send(pmt::pmt_t msg)
-{
- pmt::pmt_t vector = pmt::cdr(msg);
- size_t len = pmt::blob_length(vector);
- size_t offset = 0;
- std::vector<char> txbuf(std::min(len, d_rxbuf.size()));
- while (offset < len) {
- size_t send_len = std::min((len - offset), txbuf.size());
- memcpy(&txbuf[0], pmt::uniform_vector_elements(vector, offset), send_len);
- offset += send_len;
- d_tcp_socket->send(boost::asio::buffer(txbuf, send_len));
- }
-}
-
-void socket_pdu_impl::udp_send(pmt::pmt_t msg)
-{
- if (d_udp_endpoint_other.address().to_string() == "0.0.0.0")
- return;
-
- pmt::pmt_t vector = pmt::cdr(msg);
- size_t len = pmt::blob_length(vector);
- size_t offset = 0;
- std::vector<char> txbuf(std::min(len, d_rxbuf.size()));
- while (offset < len) {
- size_t send_len = std::min((len - offset), txbuf.size());
- memcpy(&txbuf[0], pmt::uniform_vector_elements(vector, offset), send_len);
- offset += send_len;
- d_udp_socket->send_to(boost::asio::buffer(txbuf, send_len), d_udp_endpoint_other);
- }
-}
-
-void socket_pdu_impl::handle_udp_read(const boost::system::error_code& error,
- size_t bytes_transferred)
-{
- if (!error) {
- pmt::pmt_t vector =
- pmt::init_u8vector(bytes_transferred, (const uint8_t*)&d_rxbuf[0]);
- pmt::pmt_t pdu = pmt::cons(pmt::PMT_NIL, vector);
-
- message_port_pub(pdu::pdu_port_id(), pdu);
-
- d_udp_socket->async_receive_from(
- boost::asio::buffer(d_rxbuf),
- d_udp_endpoint_other,
- boost::bind(&socket_pdu_impl::handle_udp_read,
- this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
- }
-}
-
-} /* namespace blocks */
-} /* namespace gr */