summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--gr-blocks/grc/blocks_socket_pdu.xml28
-rw-r--r--gr-blocks/include/gnuradio/blocks/socket_pdu.h3
-rw-r--r--gr-blocks/lib/socket_pdu_impl.cc136
-rw-r--r--gr-blocks/lib/socket_pdu_impl.h5
4 files changed, 113 insertions, 59 deletions
diff --git a/gr-blocks/grc/blocks_socket_pdu.xml b/gr-blocks/grc/blocks_socket_pdu.xml
index 1e897cfc4b..72dc38134c 100644
--- a/gr-blocks/grc/blocks_socket_pdu.xml
+++ b/gr-blocks/grc/blocks_socket_pdu.xml
@@ -8,7 +8,7 @@
<name>Socket PDU</name>
<key>blocks_socket_pdu</key>
<import>from gnuradio import blocks</import>
- <make>blocks.socket_pdu($type, $host, $port, $mtu)</make>
+ <make>blocks.socket_pdu($type, $host, $port, $mtu, $tcp_no_delay)</make>
<param>
<name>Type</name>
<key>type</key>
@@ -49,6 +49,31 @@
<value>10000</value>
<type>int</type>
</param>
+ <param>
+ <name>TCP No Delay</name>
+ <key>tcp_no_delay</key>
+ <value>False</value>
+ <type>enum</type>
+ <hide>
+#if (($type() == '"TCP_CLIENT"') or ($type() == '"TCP_SERVER"'))
+#if (str($tcp_no_delay()) == 'False')
+part
+#else
+none
+#end if
+#else
+all
+#end if
+</hide>
+ <option>
+ <name>Enabled</name>
+ <key>True</key>
+ </option>
+ <option>
+ <name>Disabled</name>
+ <key>False</key>
+ </option>
+ </param>
<sink>
<name>pdus</name>
<type>message</type>
@@ -59,4 +84,5 @@
<type>message</type>
<optional>1</optional>
</source>
+ <doc>For server modes, leave Host blank to bind to all interfaces (equivalent to 0.0.0.0).</doc>
</block>
diff --git a/gr-blocks/include/gnuradio/blocks/socket_pdu.h b/gr-blocks/include/gnuradio/blocks/socket_pdu.h
index 82a7632e7b..31468a3f43 100644
--- a/gr-blocks/include/gnuradio/blocks/socket_pdu.h
+++ b/gr-blocks/include/gnuradio/blocks/socket_pdu.h
@@ -45,8 +45,9 @@ namespace gr {
* \param addr network address to use
* \param port network port to use
* \param MTU maximum transmission unit
+ * \param tcp_no_delay TCP No Delay option (set to True to disable Nagle algorithm)
*/
- static sptr make(std::string type, std::string addr, std::string port, int MTU=10000);
+ static sptr make(std::string type, std::string addr, std::string port, int MTU=10000, bool tcp_no_delay=false);
};
} /* namespace blocks */
diff --git a/gr-blocks/lib/socket_pdu_impl.cc b/gr-blocks/lib/socket_pdu_impl.cc
index 9daf8c30c9..3e483fb8a7 100644
--- a/gr-blocks/lib/socket_pdu_impl.cc
+++ b/gr-blocks/lib/socket_pdu_impl.cc
@@ -33,41 +33,56 @@ namespace gr {
namespace blocks {
socket_pdu::sptr
- socket_pdu::make(std::string type, std::string addr, std::string port, int MTU)
+ socket_pdu::make(std::string type, std::string addr, std::string port, int MTU/*= 10000*/, bool tcp_no_delay/*= false*/)
{
- return gnuradio::get_initial_sptr(new socket_pdu_impl(type, addr, port, MTU));
+ return gnuradio::get_initial_sptr(new socket_pdu_impl(type, addr, port, MTU, tcp_no_delay));
}
- socket_pdu_impl::socket_pdu_impl(std::string type, std::string addr, std::string port, int MTU)
- : block("socket_pdu",
- io_signature::make (0, 0, 0),
- io_signature::make (0, 0, 0))
+ socket_pdu_impl::socket_pdu_impl(std::string type, std::string addr, std::string port, int MTU/*= 10000*/, bool tcp_no_delay/*= false*/)
+ : block("socket_pdu",
+ io_signature::make (0, 0, 0),
+ io_signature::make (0, 0, 0)),
+ d_tcp_no_delay(tcp_no_delay)
{
+ d_rxbuf.resize(MTU);
+
message_port_register_in(PDU_PORT_ID);
message_port_register_out(PDU_PORT_ID);
- if ((type == "TCP_SERVER") || (type == "TCP_CLIENT")) {
+ if ((type == "TCP_SERVER") && ((addr.empty()) || (addr == "0.0.0.0"))) { // Bind on all interfaces
+ int port_num = atoi(port.c_str());
+ if (port_num == 0)
+ throw std::invalid_argument("gr::blocks:socket_pdu: invalid port for TCP_SERVER");
+ d_tcp_endpoint = boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port_num);
+ }
+ else if ((type == "TCP_SERVER") || (type == "TCP_CLIENT")) {
boost::asio::ip::tcp::resolver resolver(d_io_service);
boost::asio::ip::tcp::resolver::query query(boost::asio::ip::tcp::v4(), addr, port);
- d_tcp_endpoint = *resolver.resolve(query);
+ d_tcp_endpoint = *resolver.resolve(query);
}
-
- if ((type == "UDP_SERVER") || (type == "UDP_CLIENT")) {
- boost::asio::ip::udp::resolver resolver(d_io_service);
+ else if ((type == "UDP_SERVER") && ((addr.empty()) || (addr == "0.0.0.0"))) { // Bind on all interfaces
+ int port_num = atoi(port.c_str());
+ if (port_num == 0)
+ throw std::invalid_argument("gr::blocks:socket_pdu: invalid port for UDP_SERVER");
+ d_udp_endpoint = boost::asio::ip::udp::endpoint(boost::asio::ip::udp::v4(), port_num);
+ }
+ else if ((type == "UDP_SERVER") || (type == "UDP_CLIENT")) {
+ boost::asio::ip::udp::resolver resolver(d_io_service);
boost::asio::ip::udp::resolver::query query(boost::asio::ip::udp::v4(), addr, port);
if (type == "UDP_SERVER")
- d_udp_endpoint = *resolver.resolve(query);
+ d_udp_endpoint = *resolver.resolve(query);
else
- d_udp_endpoint_other = *resolver.resolve(query);
+ d_udp_endpoint_other = *resolver.resolve(query);
}
if (type == "TCP_SERVER") {
d_acceptor_tcp.reset(new boost::asio::ip::tcp::acceptor(d_io_service, d_tcp_endpoint));
d_acceptor_tcp->set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
+
start_tcp_accept();
+
set_msg_handler(PDU_PORT_ID, boost::bind(&socket_pdu_impl::tcp_server_send, this, _1));
-
}
else if (type =="TCP_CLIENT") {
boost::system::error_code error = boost::asio::error::host_not_found;
@@ -75,34 +90,35 @@ namespace gr {
d_tcp_socket->connect(d_tcp_endpoint, error);
if (error)
throw boost::system::system_error(error);
+ d_tcp_socket->set_option(boost::asio::ip::tcp::no_delay(d_tcp_no_delay));
set_msg_handler(PDU_PORT_ID, boost::bind(&socket_pdu_impl::tcp_client_send, this, _1));
- d_tcp_socket->async_read_some(
- boost::asio::buffer(d_rxbuf),
- boost::bind(&socket_pdu_impl::handle_tcp_read, this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred)
- );
+ d_tcp_socket->async_read_some(boost::asio::buffer(d_rxbuf),
+ boost::bind(&socket_pdu_impl::handle_tcp_read, this,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
}
else if (type =="UDP_SERVER") {
d_udp_socket.reset(new boost::asio::ip::udp::socket(d_io_service, d_udp_endpoint));
d_udp_socket->async_receive_from(boost::asio::buffer(d_rxbuf), d_udp_endpoint_other,
- boost::bind(&socket_pdu_impl::handle_udp_read, this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
+ boost::bind(&socket_pdu_impl::handle_udp_read, this,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
+
set_msg_handler(PDU_PORT_ID, boost::bind(&socket_pdu_impl::udp_send, this, _1));
}
else if (type =="UDP_CLIENT") {
d_udp_socket.reset(new boost::asio::ip::udp::socket(d_io_service, d_udp_endpoint));
d_udp_socket->async_receive_from(boost::asio::buffer(d_rxbuf), d_udp_endpoint_other,
- boost::bind(&socket_pdu_impl::handle_udp_read, this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
+ boost::bind(&socket_pdu_impl::handle_udp_read, this,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
+
set_msg_handler(PDU_PORT_ID, boost::bind(&socket_pdu_impl::udp_send, this, _1));
}
else
- throw std::runtime_error("gr::blocks:socket_pdu: unknown socket type");
+ throw std::runtime_error("gr::blocks:socket_pdu: unknown socket type");
d_thread = gr::thread::thread(boost::bind(&socket_pdu_impl::run_io_service, this));
d_started = true;
@@ -112,14 +128,14 @@ namespace gr {
socket_pdu_impl::handle_tcp_read(const boost::system::error_code& error, size_t bytes_transferred)
{
if (!error) {
- pmt::pmt_t vector = pmt::init_u8vector(bytes_transferred, (const uint8_t *)&d_rxbuf[0]);
- pmt::pmt_t pdu = pmt::cons(pmt::PMT_NIL, vector);
- message_port_pub(PDU_PORT_ID, pdu);
-
- d_tcp_socket->async_read_some(boost::asio::buffer(d_rxbuf),
- boost::bind(&socket_pdu_impl::handle_tcp_read, this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
+ pmt::pmt_t vector = pmt::init_u8vector(bytes_transferred, (const uint8_t *)&d_rxbuf[0]);
+ pmt::pmt_t pdu = pmt::cons(pmt::PMT_NIL, vector);
+ message_port_pub(PDU_PORT_ID, pdu);
+
+ d_tcp_socket->async_read_some(boost::asio::buffer(d_rxbuf),
+ boost::bind(&socket_pdu_impl::handle_tcp_read, this,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
}
else
throw boost::system::system_error(error);
@@ -128,11 +144,11 @@ namespace gr {
void
socket_pdu_impl::start_tcp_accept()
{
- tcp_connection::sptr new_connection = tcp_connection::make(d_acceptor_tcp->get_io_service());
+ tcp_connection::sptr new_connection = tcp_connection::make(d_acceptor_tcp->get_io_service(), d_rxbuf.size(), d_tcp_no_delay);
d_acceptor_tcp->async_accept(new_connection->socket(),
- boost::bind(&socket_pdu_impl::handle_tcp_accept, this,
- new_connection, boost::asio::placeholders::error));
+ boost::bind(&socket_pdu_impl::handle_tcp_accept, this,
+ new_connection, boost::asio::placeholders::error));
}
void
@@ -147,12 +163,12 @@ namespace gr {
socket_pdu_impl::handle_tcp_accept(tcp_connection::sptr new_connection, const boost::system::error_code& error)
{
if (!error) {
- new_connection->start(this);
- d_tcp_connections.push_back(new_connection);
- start_tcp_accept();
+ new_connection->start(this);
+ d_tcp_connections.push_back(new_connection);
+ start_tcp_accept();
}
else
- std::cout << error << std::endl;
+ std::cout << error << std::endl;
}
void
@@ -160,22 +176,32 @@ namespace gr {
{
pmt::pmt_t vector = pmt::cdr(msg);
size_t len = pmt::length(vector);
- size_t offset(0);
- boost::array<char, 10000> txbuf;
- memcpy(&txbuf[0], pmt::uniform_vector_elements(vector, offset), len);
- d_tcp_socket->send(boost::asio::buffer(txbuf,len));
+ size_t offset = 0;
+ std::vector<char> txbuf(std::min(len, d_rxbuf.size()));
+ while (offset < len) {
+ size_t send_len = std::min((len - offset), txbuf.size());
+ memcpy(&txbuf[0], pmt::uniform_vector_elements(vector, offset), send_len);
+ offset += send_len;
+ d_tcp_socket->send(boost::asio::buffer(txbuf, send_len));
+ }
}
void
socket_pdu_impl::udp_send(pmt::pmt_t msg)
{
+ if (d_udp_endpoint_other.address().to_string() == "0.0.0.0")
+ return;
+
pmt::pmt_t vector = pmt::cdr(msg);
size_t len = pmt::length(vector);
- size_t offset(0);
- boost::array<char, 10000> txbuf;
- memcpy(&txbuf[0], pmt::uniform_vector_elements(vector, offset), len);
- if (d_udp_endpoint_other.address().to_string() != "0.0.0.0")
- d_udp_socket->send_to(boost::asio::buffer(txbuf,len), d_udp_endpoint_other);
+ size_t offset = 0;
+ std::vector<char> txbuf(std::min(len, d_rxbuf.size()));
+ while (offset < len) {
+ size_t send_len = std::min((len - offset), txbuf.size());
+ memcpy(&txbuf[0], pmt::uniform_vector_elements(vector, offset), send_len);
+ offset += send_len;
+ d_udp_socket->send_to(boost::asio::buffer(txbuf, send_len), d_udp_endpoint_other);
+ }
}
void
@@ -183,14 +209,14 @@ namespace gr {
{
if (!error) {
pmt::pmt_t vector = pmt::init_u8vector(bytes_transferred, (const uint8_t*)&d_rxbuf[0]);
- pmt::pmt_t pdu = pmt::cons( pmt::PMT_NIL, vector);
+ pmt::pmt_t pdu = pmt::cons(pmt::PMT_NIL, vector);
message_port_pub(PDU_PORT_ID, pdu);
d_udp_socket->async_receive_from(boost::asio::buffer(d_rxbuf), d_udp_endpoint_other,
- boost::bind(&socket_pdu_impl::handle_udp_read, this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
+ boost::bind(&socket_pdu_impl::handle_udp_read, this,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
}
}
diff --git a/gr-blocks/lib/socket_pdu_impl.h b/gr-blocks/lib/socket_pdu_impl.h
index 3099d90e00..2d5bc33518 100644
--- a/gr-blocks/lib/socket_pdu_impl.h
+++ b/gr-blocks/lib/socket_pdu_impl.h
@@ -34,13 +34,14 @@ namespace gr {
{
private:
boost::asio::io_service d_io_service;
- boost::array<char, 10000> d_rxbuf;
+ std::vector<char> d_rxbuf;
void run_io_service() { d_io_service.run(); }
// TCP specific
boost::asio::ip::tcp::endpoint d_tcp_endpoint;
std::vector<tcp_connection::sptr> d_tcp_connections;
void handle_tcp_read(const boost::system::error_code& error, size_t bytes_transferred);
+ bool d_tcp_no_delay;
// TCP server specific
boost::shared_ptr<boost::asio::ip::tcp::acceptor> d_acceptor_tcp;
@@ -60,7 +61,7 @@ namespace gr {
void udp_send(pmt::pmt_t msg);
public:
- socket_pdu_impl(std::string type, std::string addr, std::string port, int MTU);
+ socket_pdu_impl(std::string type, std::string addr, std::string port, int MTU = 10000, bool tcp_no_delay = false);
};
} /* namespace blocks */