diff options
author | Balint Seeber <balint@ettus.com> | 2014-03-27 01:27:33 -0700 |
---|---|---|
committer | Balint Seeber <balint@ettus.com> | 2014-03-27 11:33:38 -0700 |
commit | 8b8d7ee57ce4029411cde48810e2ec6eac7ae5f2 (patch) | |
tree | 5bfa18d6edd9b6ef65403e0a69890326620f8f1d | |
parent | 1092e685defd10692d3fa47435c716a88dfd8712 (diff) |
blocks: added 'MTU' and 'tcp_no_delay' params for 'socket_pdu' (and GRC option), applied MTU (buffer size) to TCP/UDP send, separate TCP/UDP server endpoint resolvers for empty/0.0.0.0 Host param (listen on all interfaces)
Whitespace clean-up.
-rw-r--r-- | gr-blocks/grc/blocks_socket_pdu.xml | 28 | ||||
-rw-r--r-- | gr-blocks/include/gnuradio/blocks/socket_pdu.h | 3 | ||||
-rw-r--r-- | gr-blocks/lib/socket_pdu_impl.cc | 136 | ||||
-rw-r--r-- | gr-blocks/lib/socket_pdu_impl.h | 5 |
4 files changed, 113 insertions, 59 deletions
diff --git a/gr-blocks/grc/blocks_socket_pdu.xml b/gr-blocks/grc/blocks_socket_pdu.xml index 1e897cfc4b..72dc38134c 100644 --- a/gr-blocks/grc/blocks_socket_pdu.xml +++ b/gr-blocks/grc/blocks_socket_pdu.xml @@ -8,7 +8,7 @@ <name>Socket PDU</name> <key>blocks_socket_pdu</key> <import>from gnuradio import blocks</import> - <make>blocks.socket_pdu($type, $host, $port, $mtu)</make> + <make>blocks.socket_pdu($type, $host, $port, $mtu, $tcp_no_delay)</make> <param> <name>Type</name> <key>type</key> @@ -49,6 +49,31 @@ <value>10000</value> <type>int</type> </param> + <param> + <name>TCP No Delay</name> + <key>tcp_no_delay</key> + <value>False</value> + <type>enum</type> + <hide> +#if (($type() == '"TCP_CLIENT"') or ($type() == '"TCP_SERVER"')) +#if (str($tcp_no_delay()) == 'False') +part +#else +none +#end if +#else +all +#end if +</hide> + <option> + <name>Enabled</name> + <key>True</key> + </option> + <option> + <name>Disabled</name> + <key>False</key> + </option> + </param> <sink> <name>pdus</name> <type>message</type> @@ -59,4 +84,5 @@ <type>message</type> <optional>1</optional> </source> + <doc>For server modes, leave Host blank to bind to all interfaces (equivalent to 0.0.0.0).</doc> </block> diff --git a/gr-blocks/include/gnuradio/blocks/socket_pdu.h b/gr-blocks/include/gnuradio/blocks/socket_pdu.h index 82a7632e7b..31468a3f43 100644 --- a/gr-blocks/include/gnuradio/blocks/socket_pdu.h +++ b/gr-blocks/include/gnuradio/blocks/socket_pdu.h @@ -45,8 +45,9 @@ namespace gr { * \param addr network address to use * \param port network port to use * \param MTU maximum transmission unit + * \param tcp_no_delay TCP No Delay option (set to True to disable Nagle algorithm) */ - static sptr make(std::string type, std::string addr, std::string port, int MTU=10000); + static sptr make(std::string type, std::string addr, std::string port, int MTU=10000, bool tcp_no_delay=false); }; } /* namespace blocks */ diff --git a/gr-blocks/lib/socket_pdu_impl.cc b/gr-blocks/lib/socket_pdu_impl.cc index 9daf8c30c9..3e483fb8a7 100644 --- a/gr-blocks/lib/socket_pdu_impl.cc +++ b/gr-blocks/lib/socket_pdu_impl.cc @@ -33,41 +33,56 @@ namespace gr { namespace blocks { socket_pdu::sptr - socket_pdu::make(std::string type, std::string addr, std::string port, int MTU) + socket_pdu::make(std::string type, std::string addr, std::string port, int MTU/*= 10000*/, bool tcp_no_delay/*= false*/) { - return gnuradio::get_initial_sptr(new socket_pdu_impl(type, addr, port, MTU)); + return gnuradio::get_initial_sptr(new socket_pdu_impl(type, addr, port, MTU, tcp_no_delay)); } - socket_pdu_impl::socket_pdu_impl(std::string type, std::string addr, std::string port, int MTU) - : block("socket_pdu", - io_signature::make (0, 0, 0), - io_signature::make (0, 0, 0)) + socket_pdu_impl::socket_pdu_impl(std::string type, std::string addr, std::string port, int MTU/*= 10000*/, bool tcp_no_delay/*= false*/) + : block("socket_pdu", + io_signature::make (0, 0, 0), + io_signature::make (0, 0, 0)), + d_tcp_no_delay(tcp_no_delay) { + d_rxbuf.resize(MTU); + message_port_register_in(PDU_PORT_ID); message_port_register_out(PDU_PORT_ID); - if ((type == "TCP_SERVER") || (type == "TCP_CLIENT")) { + if ((type == "TCP_SERVER") && ((addr.empty()) || (addr == "0.0.0.0"))) { // Bind on all interfaces + int port_num = atoi(port.c_str()); + if (port_num == 0) + throw std::invalid_argument("gr::blocks:socket_pdu: invalid port for TCP_SERVER"); + d_tcp_endpoint = boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port_num); + } + else if ((type == "TCP_SERVER") || (type == "TCP_CLIENT")) { boost::asio::ip::tcp::resolver resolver(d_io_service); boost::asio::ip::tcp::resolver::query query(boost::asio::ip::tcp::v4(), addr, port); - d_tcp_endpoint = *resolver.resolve(query); + d_tcp_endpoint = *resolver.resolve(query); } - - if ((type == "UDP_SERVER") || (type == "UDP_CLIENT")) { - boost::asio::ip::udp::resolver resolver(d_io_service); + else if ((type == "UDP_SERVER") && ((addr.empty()) || (addr == "0.0.0.0"))) { // Bind on all interfaces + int port_num = atoi(port.c_str()); + if (port_num == 0) + throw std::invalid_argument("gr::blocks:socket_pdu: invalid port for UDP_SERVER"); + d_udp_endpoint = boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port_num); + } + else if ((type == "UDP_SERVER") || (type == "UDP_CLIENT")) { + boost::asio::ip::udp::resolver resolver(d_io_service); boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port); if (type == "UDP_SERVER") - d_udp_endpoint = *resolver.resolve(query); + d_udp_endpoint = *resolver.resolve(query); else - d_udp_endpoint_other = *resolver.resolve(query); + d_udp_endpoint_other = *resolver.resolve(query); } if (type == "TCP_SERVER") { d_acceptor_tcp.reset(new boost::asio::ip::tcp::acceptor(d_io_service, d_tcp_endpoint)); d_acceptor_tcp->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true)); + start_tcp_accept(); + set_msg_handler(PDU_PORT_ID, boost::bind(&socket_pdu_impl::tcp_server_send, this, _1)); - } else if (type =="TCP_CLIENT") { boost::system::error_code error = boost::asio::error::host_not_found; @@ -75,34 +90,35 @@ namespace gr { d_tcp_socket->connect(d_tcp_endpoint, error); if (error) throw boost::system::system_error(error); + d_tcp_socket->set_option(boost::asio::ip::tcp::no_delay(d_tcp_no_delay)); set_msg_handler(PDU_PORT_ID, boost::bind(&socket_pdu_impl::tcp_client_send, this, _1)); - d_tcp_socket->async_read_some( - boost::asio::buffer(d_rxbuf), - boost::bind(&socket_pdu_impl::handle_tcp_read, this, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred) - ); + d_tcp_socket->async_read_some(boost::asio::buffer(d_rxbuf), + boost::bind(&socket_pdu_impl::handle_tcp_read, this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); } else if (type =="UDP_SERVER") { d_udp_socket.reset(new boost::asio::ip::udp::socket(d_io_service, d_udp_endpoint)); d_udp_socket->async_receive_from(boost::asio::buffer(d_rxbuf), d_udp_endpoint_other, - boost::bind(&socket_pdu_impl::handle_udp_read, this, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); + boost::bind(&socket_pdu_impl::handle_udp_read, this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + set_msg_handler(PDU_PORT_ID, boost::bind(&socket_pdu_impl::udp_send, this, _1)); } else if (type =="UDP_CLIENT") { d_udp_socket.reset(new boost::asio::ip::udp::socket(d_io_service, d_udp_endpoint)); d_udp_socket->async_receive_from(boost::asio::buffer(d_rxbuf), d_udp_endpoint_other, - boost::bind(&socket_pdu_impl::handle_udp_read, this, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); + boost::bind(&socket_pdu_impl::handle_udp_read, this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); + set_msg_handler(PDU_PORT_ID, boost::bind(&socket_pdu_impl::udp_send, this, _1)); } else - throw std::runtime_error("gr::blocks:socket_pdu: unknown socket type"); + throw std::runtime_error("gr::blocks:socket_pdu: unknown socket type"); d_thread = gr::thread::thread(boost::bind(&socket_pdu_impl::run_io_service, this)); d_started = true; @@ -112,14 +128,14 @@ namespace gr { socket_pdu_impl::handle_tcp_read(const boost::system::error_code& error, size_t bytes_transferred) { if (!error) { - pmt::pmt_t vector = pmt::init_u8vector(bytes_transferred, (const uint8_t *)&d_rxbuf[0]); - pmt::pmt_t pdu = pmt::cons(pmt::PMT_NIL, vector); - message_port_pub(PDU_PORT_ID, pdu); - - d_tcp_socket->async_read_some(boost::asio::buffer(d_rxbuf), - boost::bind(&socket_pdu_impl::handle_tcp_read, this, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); + pmt::pmt_t vector = pmt::init_u8vector(bytes_transferred, (const uint8_t *)&d_rxbuf[0]); + pmt::pmt_t pdu = pmt::cons(pmt::PMT_NIL, vector); + message_port_pub(PDU_PORT_ID, pdu); + + d_tcp_socket->async_read_some(boost::asio::buffer(d_rxbuf), + boost::bind(&socket_pdu_impl::handle_tcp_read, this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); } else throw boost::system::system_error(error); @@ -128,11 +144,11 @@ namespace gr { void socket_pdu_impl::start_tcp_accept() { - tcp_connection::sptr new_connection = tcp_connection::make(d_acceptor_tcp->get_io_service()); + tcp_connection::sptr new_connection = tcp_connection::make(d_acceptor_tcp->get_io_service(), d_rxbuf.size(), d_tcp_no_delay); d_acceptor_tcp->async_accept(new_connection->socket(), - boost::bind(&socket_pdu_impl::handle_tcp_accept, this, - new_connection, boost::asio::placeholders::error)); + boost::bind(&socket_pdu_impl::handle_tcp_accept, this, + new_connection, boost::asio::placeholders::error)); } void @@ -147,12 +163,12 @@ namespace gr { socket_pdu_impl::handle_tcp_accept(tcp_connection::sptr new_connection, const boost::system::error_code& error) { if (!error) { - new_connection->start(this); - d_tcp_connections.push_back(new_connection); - start_tcp_accept(); + new_connection->start(this); + d_tcp_connections.push_back(new_connection); + start_tcp_accept(); } else - std::cout << error << std::endl; + std::cout << error << std::endl; } void @@ -160,22 +176,32 @@ namespace gr { { pmt::pmt_t vector = pmt::cdr(msg); size_t len = pmt::length(vector); - size_t offset(0); - boost::array<char, 10000> txbuf; - memcpy(&txbuf[0], pmt::uniform_vector_elements(vector, offset), len); - d_tcp_socket->send(boost::asio::buffer(txbuf,len)); + size_t offset = 0; + std::vector<char> txbuf(std::min(len, d_rxbuf.size())); + while (offset < len) { + size_t send_len = std::min((len - offset), txbuf.size()); + memcpy(&txbuf[0], pmt::uniform_vector_elements(vector, offset), send_len); + offset += send_len; + d_tcp_socket->send(boost::asio::buffer(txbuf, send_len)); + } } void socket_pdu_impl::udp_send(pmt::pmt_t msg) { + if (d_udp_endpoint_other.address().to_string() == "0.0.0.0") + return; + pmt::pmt_t vector = pmt::cdr(msg); size_t len = pmt::length(vector); - size_t offset(0); - boost::array<char, 10000> txbuf; - memcpy(&txbuf[0], pmt::uniform_vector_elements(vector, offset), len); - if (d_udp_endpoint_other.address().to_string() != "0.0.0.0") - d_udp_socket->send_to(boost::asio::buffer(txbuf,len), d_udp_endpoint_other); + size_t offset = 0; + std::vector<char> txbuf(std::min(len, d_rxbuf.size())); + while (offset < len) { + size_t send_len = std::min((len - offset), txbuf.size()); + memcpy(&txbuf[0], pmt::uniform_vector_elements(vector, offset), send_len); + offset += send_len; + d_udp_socket->send_to(boost::asio::buffer(txbuf, send_len), d_udp_endpoint_other); + } } void @@ -183,14 +209,14 @@ namespace gr { { if (!error) { pmt::pmt_t vector = pmt::init_u8vector(bytes_transferred, (const uint8_t*)&d_rxbuf[0]); - pmt::pmt_t pdu = pmt::cons( pmt::PMT_NIL, vector); + pmt::pmt_t pdu = pmt::cons(pmt::PMT_NIL, vector); message_port_pub(PDU_PORT_ID, pdu); d_udp_socket->async_receive_from(boost::asio::buffer(d_rxbuf), d_udp_endpoint_other, - boost::bind(&socket_pdu_impl::handle_udp_read, this, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); + boost::bind(&socket_pdu_impl::handle_udp_read, this, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); } } diff --git a/gr-blocks/lib/socket_pdu_impl.h b/gr-blocks/lib/socket_pdu_impl.h index 3099d90e00..2d5bc33518 100644 --- a/gr-blocks/lib/socket_pdu_impl.h +++ b/gr-blocks/lib/socket_pdu_impl.h @@ -34,13 +34,14 @@ namespace gr { { private: boost::asio::io_service d_io_service; - boost::array<char, 10000> d_rxbuf; + std::vector<char> d_rxbuf; void run_io_service() { d_io_service.run(); } // TCP specific boost::asio::ip::tcp::endpoint d_tcp_endpoint; std::vector<tcp_connection::sptr> d_tcp_connections; void handle_tcp_read(const boost::system::error_code& error, size_t bytes_transferred); + bool d_tcp_no_delay; // TCP server specific boost::shared_ptr<boost::asio::ip::tcp::acceptor> d_acceptor_tcp; @@ -60,7 +61,7 @@ namespace gr { void udp_send(pmt::pmt_t msg); public: - socket_pdu_impl(std::string type, std::string addr, std::string port, int MTU); + socket_pdu_impl(std::string type, std::string addr, std::string port, int MTU = 10000, bool tcp_no_delay = false); }; } /* namespace blocks */ |