diff options
author | Johnathan Corgan <johnathan@corganlabs.com> | 2017-08-27 11:03:50 -0700 |
---|---|---|
committer | Johnathan Corgan <johnathan@corganlabs.com> | 2017-08-27 11:03:50 -0700 |
commit | 34933ac6bc9be665b916be146cf040197fc19229 (patch) | |
tree | db565567e9d2611866d257d701cb4460e5892b25 | |
parent | 6cd3fc6ae4f6c1aea308063aa6543ac465c95c8c (diff) | |
parent | 643fee9496b84061a01042c4926e4b5e98f1e498 (diff) |
Merge remote-tracking branch 'github/pr/1419'
-rw-r--r-- | gr-blocks/lib/socket_pdu_impl.h | 1 | ||||
-rw-r--r-- | gr-blocks/lib/tcp_connection.cc | 22 | ||||
-rw-r--r-- | gr-blocks/lib/tcp_connection.h | 4 | ||||
-rwxr-xr-x | gr-blocks/python/blocks/qa_socket_pdu.py | 28 |
4 files changed, 46 insertions, 9 deletions
diff --git a/gr-blocks/lib/socket_pdu_impl.h b/gr-blocks/lib/socket_pdu_impl.h index e45f6d4463..1df99fdeff 100644 --- a/gr-blocks/lib/socket_pdu_impl.h +++ b/gr-blocks/lib/socket_pdu_impl.h @@ -24,7 +24,6 @@ #define INCLUDED_BLOCKS_SOCKET_PDU_IMPL_H #include <gnuradio/blocks/socket_pdu.h> -#include "stream_pdu_base.h" #include "tcp_connection.h" namespace gr { diff --git a/gr-blocks/lib/tcp_connection.cc b/gr-blocks/lib/tcp_connection.cc index 3b0afa13fc..b28accccf7 100644 --- a/gr-blocks/lib/tcp_connection.cc +++ b/gr-blocks/lib/tcp_connection.cc @@ -54,16 +54,24 @@ namespace gr { tcp_connection::send(pmt::pmt_t vector) { size_t len = pmt::blob_length(vector); + + // Asio async_write() requires the buffer to remain valid until the handler is called. + boost::shared_ptr<char[]> txbuf(new char[len]); + + size_t temp = 0; + memcpy(txbuf.get(), pmt::uniform_vector_elements(vector, temp), len); + size_t offset = 0; - std::vector<char> txbuf(std::min(len, d_buf.size())); while (offset < len) { - size_t send_len = std::min((len - offset), txbuf.size()); - memcpy(&txbuf[0], pmt::uniform_vector_elements(vector, offset), send_len); + // Limit the size of each write() to the MTU. + // FIXME: Note that this has the effect of breaking a large PDU into several smaller PDUs, each + // containing <= MTU bytes. Is this the desired behavior? + size_t send_len = std::min((len - offset), d_buf.size()); + boost::asio::async_write(d_socket, boost::asio::buffer(txbuf.get() + offset, send_len), + boost::bind(&tcp_connection::handle_write, this, txbuf, + boost::asio::placeholders::error, + boost::asio::placeholders::bytes_transferred)); offset += send_len; - boost::asio::async_write(d_socket, boost::asio::buffer(txbuf, send_len), - boost::bind(&tcp_connection::handle_write, this, - boost::asio::placeholders::error, - boost::asio::placeholders::bytes_transferred)); } } diff --git a/gr-blocks/lib/tcp_connection.h b/gr-blocks/lib/tcp_connection.h index 6eeb64e54f..eb4c0df285 100644 --- a/gr-blocks/lib/tcp_connection.h +++ b/gr-blocks/lib/tcp_connection.h @@ -25,6 +25,7 @@ #include <boost/array.hpp> #include <boost/asio.hpp> +#include <boost/shared_ptr.hpp> #include <pmt/pmt.h> namespace gr { @@ -53,7 +54,8 @@ namespace gr { void start(gr::basic_block *block); void send(pmt::pmt_t vector); void handle_read(const boost::system::error_code& error, size_t bytes_transferred); - void handle_write(const boost::system::error_code& error, size_t bytes_transferred) { } + void handle_write(boost::shared_ptr<char[]> txbuf, const boost::system::error_code& error, + size_t bytes_transferred) { } }; } /* namespace blocks */ diff --git a/gr-blocks/python/blocks/qa_socket_pdu.py b/gr-blocks/python/blocks/qa_socket_pdu.py index db9f53c71e..5285585340 100755 --- a/gr-blocks/python/blocks/qa_socket_pdu.py +++ b/gr-blocks/python/blocks/qa_socket_pdu.py @@ -100,6 +100,34 @@ class qa_socket_pdu (gr_unittest.TestCase): #self.tb.connect(pdu_to_ts, head, sink) self.tb.run() + def test_004 (self): + # Test that the TCP server can stream PDUs <= the MTU size. + port = str(random.Random().randint(0, 30000) + 10000) + mtu = 10000 + srcdata = tuple([x % 256 for x in xrange(mtu)]) + data = pmt.init_u8vector(srcdata.__len__(), srcdata) + pdu_msg = pmt.cons(pmt.PMT_NIL, data) + + self.pdu_source = blocks.message_strobe(pdu_msg, 500) + self.pdu_send = blocks.socket_pdu("TCP_SERVER", "localhost", port, mtu) + self.pdu_recv = blocks.socket_pdu("TCP_CLIENT", "localhost", port, mtu) + self.pdu_sink = blocks.message_debug() + + self.tb.msg_connect(self.pdu_source, "strobe", self.pdu_send, "pdus") + self.tb.msg_connect(self.pdu_recv, "pdus", self.pdu_sink, "store") + + self.tb.start() + time.sleep(1) + self.tb.stop() + self.tb.wait() + + received = self.pdu_sink.get_message(0) + received_data = pmt.cdr(received) + msg_data = [] + for i in xrange(mtu): + msg_data.append(pmt.u8vector_ref(received_data, i)) + self.assertEqual(srcdata, tuple(msg_data)) + if __name__ == '__main__': gr_unittest.run(qa_socket_pdu, "qa_socket_pdu.xml") |