/* -*- c++ -*- */ /* * Copyright 2013 Free Software Foundation, Inc. * * This file is part of GNU Radio * * GNU Radio is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 3, or (at your option) * any later version. * * GNU Radio is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with GNU Radio; see the file COPYING. If not, write to * the Free Software Foundation, Inc., 51 Franklin Street, * Boston, MA 02110-1301, USA. */ #ifdef HAVE_CONFIG_H #include "config.h" #endif #include "socket_pdu_impl.h" #include "tcp_connection.h" #include <gnuradio/io_signature.h> #include <gnuradio/blocks/pdu.h> namespace gr { namespace blocks { socket_pdu::sptr socket_pdu::make(std::string type, std::string addr, std::string port, int MTU/*= 10000*/, bool tcp_no_delay/*= false*/) { return gnuradio::get_initial_sptr(new socket_pdu_impl(type, addr, port, MTU, tcp_no_delay)); } socket_pdu_impl::socket_pdu_impl(std::string type, std::string addr, std::string port, int MTU/*= 10000*/, bool tcp_no_delay/*= false*/) : block("socket_pdu", io_signature::make (0, 0, 0), io_signature::make (0, 0, 0)), d_tcp_no_delay(tcp_no_delay) { d_rxbuf.resize(MTU); message_port_register_in(PDU_PORT_ID); message_port_register_out(PDU_PORT_ID); if ((type == "TCP_SERVER") && ((addr.empty()) || (addr == "0.0.0.0"))) { // Bind on all interfaces int port_num = atoi(port.c_str()); if (port_num == 0) throw std::invalid_argument("gr::blocks:socket_pdu: invalid port for TCP_SERVER"); d_tcp_endpoint = boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port_num); } else if ((type == "TCP_SERVER") || (type == "TCP_CLIENT")) { boost::asio::ip::tcp::resolver resolver(d_io_service); boost::asio::ip::tcp::resolver::query query(boost::asio::ip::tcp::v4(), addr, port, boost::asio::ip::resolver_query_base::passive); d_tcp_endpoint = *resolver.resolve(query); } else if ((type == "UDP_SERVER") && ((addr.empty()) || (addr == "0.0.0.0"))) { // Bind on all interfaces int port_num = atoi(port.c_str()); if (port_num == 0) throw std::invalid_argument("gr::blocks:socket_pdu: invalid port for UDP_SERVER"); d_udp_endpoint = boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port_num); } else if ((type == "UDP_SERVER") || (type == "UDP_CLIENT")) { boost::asio::ip::udp::resolver resolver(d_io_service); boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port, boost::asio::ip::resolver_query_base::passive); if (type == "UDP_SERVER") d_udp_endpoint = *resolver.resolve(query); else d_udp_endpoint_other = *resolver.resolve(query); } if (type == "TCP_SERVER") { d_acceptor_tcp.reset(new boost::asio::ip::tcp::acceptor(d_io_service, d_tcp_endpoint)); d_acceptor_tcp->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); start_tcp_accept(); set_msg_handler(PDU_PORT_ID, boost::bind(&socket_pdu_impl::tcp_server_send, this, _1)); } else if (type =="TCP_CLIENT") { boost::system::error_code error = boost::asio::error::host_not_found; d_tcp_socket.reset(new boost::asio::ip::tcp::socket(d_io_service)); d_tcp_socket->connect(d_tcp_endpoint, error); if (error) throw boost::system::system_error(error); d_tcp_socket->set_option(boost::asio::ip::tcp::no_delay(d_tcp_no_delay)); set_msg_handler(PDU_PORT_ID, boost::bind(&socket_pdu_impl::tcp_client_send, this, _1)); d_tcp_socket->async_read_some(boost::asio::buffer(d_rxbuf), boost::bind(&socket_pdu_impl::handle_tcp_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } else if (type =="UDP_SERVER") { d_udp_socket.reset(new boost::asio::ip::udp::socket(d_io_service, d_udp_endpoint)); d_udp_socket->async_receive_from(boost::asio::buffer(d_rxbuf), d_udp_endpoint_other, boost::bind(&socket_pdu_impl::handle_udp_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); set_msg_handler(PDU_PORT_ID, boost::bind(&socket_pdu_impl::udp_send, this, _1)); } else if (type =="UDP_CLIENT") { d_udp_socket.reset(new boost::asio::ip::udp::socket(d_io_service, d_udp_endpoint)); d_udp_socket->async_receive_from(boost::asio::buffer(d_rxbuf), d_udp_endpoint_other, boost::bind(&socket_pdu_impl::handle_udp_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); set_msg_handler(PDU_PORT_ID, boost::bind(&socket_pdu_impl::udp_send, this, _1)); } else throw std::runtime_error("gr::blocks:socket_pdu: unknown socket type"); d_thread = gr::thread::thread(boost::bind(&socket_pdu_impl::run_io_service, this)); d_started = true; } socket_pdu_impl::~socket_pdu_impl() { stop(); } bool socket_pdu_impl::stop() { if (d_started) { d_io_service.stop(); d_thread.interrupt(); d_thread.join(); } d_started = false; return true; } void socket_pdu_impl::handle_tcp_read(const boost::system::error_code& error, size_t bytes_transferred) { if (!error) { pmt::pmt_t vector = pmt::init_u8vector(bytes_transferred, (const uint8_t *)&d_rxbuf[0]); pmt::pmt_t pdu = pmt::cons(pmt::PMT_NIL, vector); message_port_pub(PDU_PORT_ID, pdu); d_tcp_socket->async_read_some(boost::asio::buffer(d_rxbuf), boost::bind(&socket_pdu_impl::handle_tcp_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } else throw boost::system::system_error(error); } void socket_pdu_impl::start_tcp_accept() { tcp_connection::sptr new_connection = tcp_connection::make(d_acceptor_tcp->get_io_service(), d_rxbuf.size(), d_tcp_no_delay); d_acceptor_tcp->async_accept(new_connection->socket(), boost::bind(&socket_pdu_impl::handle_tcp_accept, this, new_connection, boost::asio::placeholders::error)); } void socket_pdu_impl::tcp_server_send(pmt::pmt_t msg) { pmt::pmt_t vector = pmt::cdr(msg); for(size_t i = 0; i < d_tcp_connections.size(); i++) d_tcp_connections[i]->send(vector); } void socket_pdu_impl::handle_tcp_accept(tcp_connection::sptr new_connection, const boost::system::error_code& error) { if (!error) { new_connection->start(this); d_tcp_connections.push_back(new_connection); start_tcp_accept(); } else std::cout << error << std::endl; } void socket_pdu_impl::tcp_client_send(pmt::pmt_t msg) { pmt::pmt_t vector = pmt::cdr(msg); size_t len = pmt::blob_length(vector); size_t offset = 0; std::vector<char> txbuf(std::min(len, d_rxbuf.size())); while (offset < len) { size_t send_len = std::min((len - offset), txbuf.size()); memcpy(&txbuf[0], pmt::uniform_vector_elements(vector, offset), send_len); offset += send_len; d_tcp_socket->send(boost::asio::buffer(txbuf, send_len)); } } void socket_pdu_impl::udp_send(pmt::pmt_t msg) { if (d_udp_endpoint_other.address().to_string() == "0.0.0.0") return; pmt::pmt_t vector = pmt::cdr(msg); size_t len = pmt::blob_length(vector); size_t offset = 0; std::vector<char> txbuf(std::min(len, d_rxbuf.size())); while (offset < len) { size_t send_len = std::min((len - offset), txbuf.size()); memcpy(&txbuf[0], pmt::uniform_vector_elements(vector, offset), send_len); offset += send_len; d_udp_socket->send_to(boost::asio::buffer(txbuf, send_len), d_udp_endpoint_other); } } void socket_pdu_impl::handle_udp_read(const boost::system::error_code& error, size_t bytes_transferred) { if (!error) { pmt::pmt_t vector = pmt::init_u8vector(bytes_transferred, (const uint8_t*)&d_rxbuf[0]); pmt::pmt_t pdu = pmt::cons(pmt::PMT_NIL, vector); message_port_pub(PDU_PORT_ID, pdu); d_udp_socket->async_receive_from(boost::asio::buffer(d_rxbuf), d_udp_endpoint_other, boost::bind(&socket_pdu_impl::handle_udp_read, this, boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred)); } } } /* namespace blocks */ }/* namespace gr */