diff options
author | Marcus Müller <mmueller@gnuradio.org> | 2019-08-07 21:45:12 +0200 |
---|---|---|
committer | Marcus Müller <marcus@hostalia.de> | 2019-08-09 23:04:28 +0200 |
commit | f7bbf2c1d8d780294f3e016aff239ca35eb6516e (patch) | |
tree | e09ab6112e02b2215b2d59ac24d3d6ea2edac745 /gr-blocks/lib/socket_pdu_impl.cc | |
parent | 78431dc6941e3acc67c858277dfe4a0ed583643c (diff) |
Tree: clang-format without the include sorting
Diffstat (limited to 'gr-blocks/lib/socket_pdu_impl.cc')
-rw-r--r-- | gr-blocks/lib/socket_pdu_impl.cc | 335 |
1 files changed, 180 insertions, 155 deletions
diff --git a/gr-blocks/lib/socket_pdu_impl.cc b/gr-blocks/lib/socket_pdu_impl.cc index 168d74ebb2..b3a96d350a 100644 --- a/gr-blocks/lib/socket_pdu_impl.cc +++ b/gr-blocks/lib/socket_pdu_impl.cc @@ -30,65 +30,79 @@ #include <gnuradio/blocks/pdu.h> namespace gr { - namespace blocks { - - socket_pdu::sptr - socket_pdu::make(std::string type, std::string addr, std::string port, int MTU/*= 10000*/, bool tcp_no_delay/*= false*/) - { - return gnuradio::get_initial_sptr(new socket_pdu_impl(type, addr, port, MTU, tcp_no_delay)); - } - - socket_pdu_impl::socket_pdu_impl(std::string type, std::string addr, std::string port, int MTU/*= 10000*/, bool tcp_no_delay/*= false*/) - : block("socket_pdu", - io_signature::make (0, 0, 0), - io_signature::make (0, 0, 0)), +namespace blocks { + +socket_pdu::sptr socket_pdu::make(std::string type, + std::string addr, + std::string port, + int MTU /*= 10000*/, + bool tcp_no_delay /*= false*/) +{ + return gnuradio::get_initial_sptr( + new socket_pdu_impl(type, addr, port, MTU, tcp_no_delay)); +} + +socket_pdu_impl::socket_pdu_impl(std::string type, + std::string addr, + std::string port, + int MTU /*= 10000*/, + bool tcp_no_delay /*= false*/) + : block("socket_pdu", io_signature::make(0, 0, 0), io_signature::make(0, 0, 0)), d_tcp_no_delay(tcp_no_delay) - { - d_rxbuf.resize(MTU); +{ + d_rxbuf.resize(MTU); - message_port_register_in(pdu::pdu_port_id()); - message_port_register_out(pdu::pdu_port_id()); + message_port_register_in(pdu::pdu_port_id()); + message_port_register_out(pdu::pdu_port_id()); - if ((type == "TCP_SERVER") && ((addr.empty()) || (addr == "0.0.0.0"))) { // Bind on all interfaces + if ((type == "TCP_SERVER") && + ((addr.empty()) || (addr == "0.0.0.0"))) { // Bind on all interfaces int port_num = atoi(port.c_str()); if (port_num == 0) - throw std::invalid_argument("gr::blocks:socket_pdu: invalid port for TCP_SERVER"); - d_tcp_endpoint = boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port_num); - } - else if ((type == "TCP_SERVER") || (type == "TCP_CLIENT")) { + throw std::invalid_argument( + "gr::blocks:socket_pdu: invalid port for TCP_SERVER"); + d_tcp_endpoint = + boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port_num); + } else if ((type == "TCP_SERVER") || (type == "TCP_CLIENT")) { boost::asio::ip::tcp::resolver resolver(d_io_service); - boost::asio::ip::tcp::resolver::query query(boost::asio::ip::tcp::v4(), - addr, port, - boost::asio::ip::resolver_query_base::passive); + boost::asio::ip::tcp::resolver::query query( + boost::asio::ip::tcp::v4(), + addr, + port, + boost::asio::ip::resolver_query_base::passive); d_tcp_endpoint = *resolver.resolve(query); - } - else if ((type == "UDP_SERVER") && ((addr.empty()) || (addr == "0.0.0.0"))) { // Bind on all interfaces + } else if ((type == "UDP_SERVER") && + ((addr.empty()) || (addr == "0.0.0.0"))) { // Bind on all interfaces int port_num = atoi(port.c_str()); if (port_num == 0) - throw std::invalid_argument("gr::blocks:socket_pdu: invalid port for UDP_SERVER"); - d_udp_endpoint = boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port_num); - } - else if ((type == "UDP_SERVER") || (type == "UDP_CLIENT")) { + throw std::invalid_argument( + "gr::blocks:socket_pdu: invalid port for UDP_SERVER"); + d_udp_endpoint = + boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port_num); + } else if ((type == "UDP_SERVER") || (type == "UDP_CLIENT")) { boost::asio::ip::udp::resolver resolver(d_io_service); - boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), - addr, port, - boost::asio::ip::resolver_query_base::passive); + boost::asio::ip::udp::resolver::query query( + boost::asio::ip::udp::v4(), + addr, + port, + boost::asio::ip::resolver_query_base::passive); if (type == "UDP_SERVER") - d_udp_endpoint = *resolver.resolve(query); + d_udp_endpoint = *resolver.resolve(query); else - d_udp_endpoint_other = *resolver.resolve(query); - } + d_udp_endpoint_other = *resolver.resolve(query); + } - if (type == "TCP_SERVER") { - d_acceptor_tcp.reset(new boost::asio::ip::tcp::acceptor(d_io_service, d_tcp_endpoint)); + if (type == "TCP_SERVER") { + d_acceptor_tcp.reset( + new boost::asio::ip::tcp::acceptor(d_io_service, d_tcp_endpoint)); d_acceptor_tcp->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); start_tcp_accept(); - set_msg_handler(pdu::pdu_port_id(), boost::bind(&socket_pdu_impl::tcp_server_send, this, _1)); - } - else if (type =="TCP_CLIENT") { + set_msg_handler(pdu::pdu_port_id(), + boost::bind(&socket_pdu_impl::tcp_server_send, this, _1)); + } else if (type == "TCP_CLIENT") { boost::system::error_code error = boost::asio::error::host_not_found; d_tcp_socket.reset(new boost::asio::ip::tcp::socket(d_io_service)); d_tcp_socket->connect(d_tcp_endpoint, error); @@ -96,163 +110,174 @@ namespace gr { throw boost::system::system_error(error); d_tcp_socket->set_option(boost::asio::ip::tcp::no_delay(d_tcp_no_delay)); - set_msg_handler(pdu::pdu_port_id(), boost::bind(&socket_pdu_impl::tcp_client_send, this, _1)); - - d_tcp_socket->async_read_some(boost::asio::buffer(d_rxbuf), - boost::bind(&socket_pdu_impl::handle_tcp_read, this, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); - } - else if (type =="UDP_SERVER") { - d_udp_socket.reset(new boost::asio::ip::udp::socket(d_io_service, d_udp_endpoint)); - d_udp_socket->async_receive_from(boost::asio::buffer(d_rxbuf), d_udp_endpoint_other, - boost::bind(&socket_pdu_impl::handle_udp_read, this, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); - - set_msg_handler(pdu::pdu_port_id(), boost::bind(&socket_pdu_impl::udp_send, this, _1)); - } - else if (type =="UDP_CLIENT") { - d_udp_socket.reset(new boost::asio::ip::udp::socket(d_io_service, d_udp_endpoint)); - d_udp_socket->async_receive_from(boost::asio::buffer(d_rxbuf), d_udp_endpoint_other, - boost::bind(&socket_pdu_impl::handle_udp_read, this, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); - - set_msg_handler(pdu::pdu_port_id(), boost::bind(&socket_pdu_impl::udp_send, this, _1)); - } - else + set_msg_handler(pdu::pdu_port_id(), + boost::bind(&socket_pdu_impl::tcp_client_send, this, _1)); + + d_tcp_socket->async_read_some( + boost::asio::buffer(d_rxbuf), + boost::bind(&socket_pdu_impl::handle_tcp_read, + this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + } else if (type == "UDP_SERVER") { + d_udp_socket.reset( + new boost::asio::ip::udp::socket(d_io_service, d_udp_endpoint)); + d_udp_socket->async_receive_from( + boost::asio::buffer(d_rxbuf), + d_udp_endpoint_other, + boost::bind(&socket_pdu_impl::handle_udp_read, + this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + + set_msg_handler(pdu::pdu_port_id(), + boost::bind(&socket_pdu_impl::udp_send, this, _1)); + } else if (type == "UDP_CLIENT") { + d_udp_socket.reset( + new boost::asio::ip::udp::socket(d_io_service, d_udp_endpoint)); + d_udp_socket->async_receive_from( + boost::asio::buffer(d_rxbuf), + d_udp_endpoint_other, + boost::bind(&socket_pdu_impl::handle_udp_read, + this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + + set_msg_handler(pdu::pdu_port_id(), + boost::bind(&socket_pdu_impl::udp_send, this, _1)); + } else throw std::runtime_error("gr::blocks:socket_pdu: unknown socket type"); - d_thread = gr::thread::thread(boost::bind(&socket_pdu_impl::run_io_service, this)); - d_started = true; - } + d_thread = gr::thread::thread(boost::bind(&socket_pdu_impl::run_io_service, this)); + d_started = true; +} - socket_pdu_impl::~socket_pdu_impl() - { - stop(); - } +socket_pdu_impl::~socket_pdu_impl() { stop(); } - bool - socket_pdu_impl::stop() - { - if (d_started) { +bool socket_pdu_impl::stop() +{ + if (d_started) { d_io_service.stop(); d_thread.interrupt(); d_thread.join(); - } - d_started = false; - return true; } - - void - socket_pdu_impl::handle_tcp_read(const boost::system::error_code& error, size_t bytes_transferred) - { - if (!error) { - pmt::pmt_t vector = pmt::init_u8vector(bytes_transferred, (const uint8_t *)&d_rxbuf[0]); + d_started = false; + return true; +} + +void socket_pdu_impl::handle_tcp_read(const boost::system::error_code& error, + size_t bytes_transferred) +{ + if (!error) { + pmt::pmt_t vector = + pmt::init_u8vector(bytes_transferred, (const uint8_t*)&d_rxbuf[0]); pmt::pmt_t pdu = pmt::cons(pmt::PMT_NIL, vector); message_port_pub(pdu::pdu_port_id(), pdu); - d_tcp_socket->async_read_some(boost::asio::buffer(d_rxbuf), - boost::bind(&socket_pdu_impl::handle_tcp_read, this, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); - } - else + d_tcp_socket->async_read_some( + boost::asio::buffer(d_rxbuf), + boost::bind(&socket_pdu_impl::handle_tcp_read, + this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + } else throw boost::system::system_error(error); - } +} - void - socket_pdu_impl::start_tcp_accept() - { +void socket_pdu_impl::start_tcp_accept() +{ #if (BOOST_VERSION >= 107000) - tcp_connection::sptr new_connection = tcp_connection::make(d_io_service, d_rxbuf.size(), d_tcp_no_delay); + tcp_connection::sptr new_connection = + tcp_connection::make(d_io_service, d_rxbuf.size(), d_tcp_no_delay); #else - tcp_connection::sptr new_connection = tcp_connection::make(d_acceptor_tcp->get_io_service(), d_rxbuf.size(), d_tcp_no_delay); + tcp_connection::sptr new_connection = tcp_connection::make( + d_acceptor_tcp->get_io_service(), d_rxbuf.size(), d_tcp_no_delay); #endif - d_acceptor_tcp->async_accept(new_connection->socket(), - boost::bind(&socket_pdu_impl::handle_tcp_accept, this, - new_connection, boost::asio::placeholders::error)); - } - - void - socket_pdu_impl::tcp_server_send(pmt::pmt_t msg) - { - pmt::pmt_t vector = pmt::cdr(msg); - for(size_t i = 0; i < d_tcp_connections.size(); i++) + d_acceptor_tcp->async_accept(new_connection->socket(), + boost::bind(&socket_pdu_impl::handle_tcp_accept, + this, + new_connection, + boost::asio::placeholders::error)); +} + +void socket_pdu_impl::tcp_server_send(pmt::pmt_t msg) +{ + pmt::pmt_t vector = pmt::cdr(msg); + for (size_t i = 0; i < d_tcp_connections.size(); i++) d_tcp_connections[i]->send(vector); - } +} - void - socket_pdu_impl::handle_tcp_accept(tcp_connection::sptr new_connection, const boost::system::error_code& error) - { - if (!error) { +void socket_pdu_impl::handle_tcp_accept(tcp_connection::sptr new_connection, + const boost::system::error_code& error) +{ + if (!error) { // Garbage collect closed sockets std::vector<tcp_connection::sptr>::iterator it = d_tcp_connections.begin(); - while(it != d_tcp_connections.end()) { - if (! (**it).socket().is_open()) - it = d_tcp_connections.erase(it); - else - ++it; + while (it != d_tcp_connections.end()) { + if (!(**it).socket().is_open()) + it = d_tcp_connections.erase(it); + else + ++it; } new_connection->start(this); d_tcp_connections.push_back(new_connection); start_tcp_accept(); - } - else + } else std::cout << error << std::endl; - } - - void - socket_pdu_impl::tcp_client_send(pmt::pmt_t msg) - { - pmt::pmt_t vector = pmt::cdr(msg); - size_t len = pmt::blob_length(vector); - size_t offset = 0; - std::vector<char> txbuf(std::min(len, d_rxbuf.size())); - while (offset < len) { +} + +void socket_pdu_impl::tcp_client_send(pmt::pmt_t msg) +{ + pmt::pmt_t vector = pmt::cdr(msg); + size_t len = pmt::blob_length(vector); + size_t offset = 0; + std::vector<char> txbuf(std::min(len, d_rxbuf.size())); + while (offset < len) { size_t send_len = std::min((len - offset), txbuf.size()); memcpy(&txbuf[0], pmt::uniform_vector_elements(vector, offset), send_len); offset += send_len; d_tcp_socket->send(boost::asio::buffer(txbuf, send_len)); - } } +} - void - socket_pdu_impl::udp_send(pmt::pmt_t msg) - { - if (d_udp_endpoint_other.address().to_string() == "0.0.0.0") +void socket_pdu_impl::udp_send(pmt::pmt_t msg) +{ + if (d_udp_endpoint_other.address().to_string() == "0.0.0.0") return; - pmt::pmt_t vector = pmt::cdr(msg); - size_t len = pmt::blob_length(vector); - size_t offset = 0; - std::vector<char> txbuf(std::min(len, d_rxbuf.size())); - while (offset < len) { + pmt::pmt_t vector = pmt::cdr(msg); + size_t len = pmt::blob_length(vector); + size_t offset = 0; + std::vector<char> txbuf(std::min(len, d_rxbuf.size())); + while (offset < len) { size_t send_len = std::min((len - offset), txbuf.size()); memcpy(&txbuf[0], pmt::uniform_vector_elements(vector, offset), send_len); offset += send_len; d_udp_socket->send_to(boost::asio::buffer(txbuf, send_len), d_udp_endpoint_other); - } } - - void - socket_pdu_impl::handle_udp_read(const boost::system::error_code& error, size_t bytes_transferred) - { - if (!error) { - pmt::pmt_t vector = pmt::init_u8vector(bytes_transferred, (const uint8_t*)&d_rxbuf[0]); +} + +void socket_pdu_impl::handle_udp_read(const boost::system::error_code& error, + size_t bytes_transferred) +{ + if (!error) { + pmt::pmt_t vector = + pmt::init_u8vector(bytes_transferred, (const uint8_t*)&d_rxbuf[0]); pmt::pmt_t pdu = pmt::cons(pmt::PMT_NIL, vector); message_port_pub(pdu::pdu_port_id(), pdu); - d_udp_socket->async_receive_from(boost::asio::buffer(d_rxbuf), d_udp_endpoint_other, - boost::bind(&socket_pdu_impl::handle_udp_read, this, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); - } + d_udp_socket->async_receive_from( + boost::asio::buffer(d_rxbuf), + d_udp_endpoint_other, + boost::bind(&socket_pdu_impl::handle_udp_read, + this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); } +} - } /* namespace blocks */ -}/* namespace gr */ +} /* namespace blocks */ +} /* namespace gr */ |