summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohnathan Corgan <johnathan@corganlabs.com>2017-08-27 11:04:47 -0700
committerJohnathan Corgan <johnathan@corganlabs.com>2017-08-27 11:04:47 -0700
commit765040724dfe02b9f6533c60de1a6d67a975f388 (patch)
tree586262690050a7ebbea679fac180d7441556a14b
parentbb10a47f37ad0d0fbe9ed4494595de384168a414 (diff)
parenta9b1f1a8747ae97db9e2473832e54159454649f5 (diff)
Merge branch 'next' into python3
-rw-r--r--gr-blocks/lib/socket_pdu_impl.h1
-rw-r--r--gr-blocks/lib/tcp_connection.cc22
-rw-r--r--gr-blocks/lib/tcp_connection.h4
-rw-r--r--gr-blocks/python/blocks/qa_socket_pdu.py28
-rw-r--r--gr-digital/grc/digital_correlate_access_code_tag_xx.xml4
-rw-r--r--gr-digital/include/gnuradio/digital/correlate_access_code_tag_bb.h2
-rw-r--r--gr-digital/include/gnuradio/digital/correlate_access_code_tag_ff.h2
-rw-r--r--gr-digital/lib/correlate_access_code_tag_bb_impl.cc4
-rw-r--r--gr-digital/lib/correlate_access_code_tag_bb_impl.h4
-rw-r--r--gr-digital/lib/correlate_access_code_tag_ff_impl.cc4
-rw-r--r--gr-digital/lib/correlate_access_code_tag_ff_impl.h4
11 files changed, 70 insertions, 9 deletions
diff --git a/gr-blocks/lib/socket_pdu_impl.h b/gr-blocks/lib/socket_pdu_impl.h
index e45f6d4463..1df99fdeff 100644
--- a/gr-blocks/lib/socket_pdu_impl.h
+++ b/gr-blocks/lib/socket_pdu_impl.h
@@ -24,7 +24,6 @@
#define INCLUDED_BLOCKS_SOCKET_PDU_IMPL_H
#include <gnuradio/blocks/socket_pdu.h>
-#include "stream_pdu_base.h"
#include "tcp_connection.h"
namespace gr {
diff --git a/gr-blocks/lib/tcp_connection.cc b/gr-blocks/lib/tcp_connection.cc
index 3b0afa13fc..b28accccf7 100644
--- a/gr-blocks/lib/tcp_connection.cc
+++ b/gr-blocks/lib/tcp_connection.cc
@@ -54,16 +54,24 @@ namespace gr {
tcp_connection::send(pmt::pmt_t vector)
{
size_t len = pmt::blob_length(vector);
+
+ // Asio async_write() requires the buffer to remain valid until the handler is called.
+ boost::shared_ptr<char[]> txbuf(new char[len]);
+
+ size_t temp = 0;
+ memcpy(txbuf.get(), pmt::uniform_vector_elements(vector, temp), len);
+
size_t offset = 0;
- std::vector<char> txbuf(std::min(len, d_buf.size()));
while (offset < len) {
- size_t send_len = std::min((len - offset), txbuf.size());
- memcpy(&txbuf[0], pmt::uniform_vector_elements(vector, offset), send_len);
+ // Limit the size of each write() to the MTU.
+ // FIXME: Note that this has the effect of breaking a large PDU into several smaller PDUs, each
+ // containing <= MTU bytes. Is this the desired behavior?
+ size_t send_len = std::min((len - offset), d_buf.size());
+ boost::asio::async_write(d_socket, boost::asio::buffer(txbuf.get() + offset, send_len),
+ boost::bind(&tcp_connection::handle_write, this, txbuf,
+ boost::asio::placeholders::error,
+ boost::asio::placeholders::bytes_transferred));
offset += send_len;
- boost::asio::async_write(d_socket, boost::asio::buffer(txbuf, send_len),
- boost::bind(&tcp_connection::handle_write, this,
- boost::asio::placeholders::error,
- boost::asio::placeholders::bytes_transferred));
}
}
diff --git a/gr-blocks/lib/tcp_connection.h b/gr-blocks/lib/tcp_connection.h
index 6eeb64e54f..eb4c0df285 100644
--- a/gr-blocks/lib/tcp_connection.h
+++ b/gr-blocks/lib/tcp_connection.h
@@ -25,6 +25,7 @@
#include <boost/array.hpp>
#include <boost/asio.hpp>
+#include <boost/shared_ptr.hpp>
#include <pmt/pmt.h>
namespace gr {
@@ -53,7 +54,8 @@ namespace gr {
void start(gr::basic_block *block);
void send(pmt::pmt_t vector);
void handle_read(const boost::system::error_code& error, size_t bytes_transferred);
- void handle_write(const boost::system::error_code& error, size_t bytes_transferred) { }
+ void handle_write(boost::shared_ptr<char[]> txbuf, const boost::system::error_code& error,
+ size_t bytes_transferred) { }
};
} /* namespace blocks */
diff --git a/gr-blocks/python/blocks/qa_socket_pdu.py b/gr-blocks/python/blocks/qa_socket_pdu.py
index 60da3d6bdd..2e033491d3 100644
--- a/gr-blocks/python/blocks/qa_socket_pdu.py
+++ b/gr-blocks/python/blocks/qa_socket_pdu.py
@@ -101,6 +101,34 @@ class qa_socket_pdu (gr_unittest.TestCase):
#self.tb.connect(pdu_to_ts, head, sink)
self.tb.run()
+ def test_004 (self):
+ # Test that the TCP server can stream PDUs <= the MTU size.
+ port = str(random.Random().randint(0, 30000) + 10000)
+ mtu = 10000
+ srcdata = tuple([x % 256 for x in xrange(mtu)])
+ data = pmt.init_u8vector(srcdata.__len__(), srcdata)
+ pdu_msg = pmt.cons(pmt.PMT_NIL, data)
+
+ self.pdu_source = blocks.message_strobe(pdu_msg, 500)
+ self.pdu_send = blocks.socket_pdu("TCP_SERVER", "localhost", port, mtu)
+ self.pdu_recv = blocks.socket_pdu("TCP_CLIENT", "localhost", port, mtu)
+ self.pdu_sink = blocks.message_debug()
+
+ self.tb.msg_connect(self.pdu_source, "strobe", self.pdu_send, "pdus")
+ self.tb.msg_connect(self.pdu_recv, "pdus", self.pdu_sink, "store")
+
+ self.tb.start()
+ time.sleep(1)
+ self.tb.stop()
+ self.tb.wait()
+
+ received = self.pdu_sink.get_message(0)
+ received_data = pmt.cdr(received)
+ msg_data = []
+ for i in xrange(mtu):
+ msg_data.append(pmt.u8vector_ref(received_data, i))
+ self.assertEqual(srcdata, tuple(msg_data))
+
if __name__ == '__main__':
gr_unittest.run(qa_socket_pdu, "qa_socket_pdu.xml")
diff --git a/gr-digital/grc/digital_correlate_access_code_tag_xx.xml b/gr-digital/grc/digital_correlate_access_code_tag_xx.xml
index 83ccb422ea..c7c137187f 100644
--- a/gr-digital/grc/digital_correlate_access_code_tag_xx.xml
+++ b/gr-digital/grc/digital_correlate_access_code_tag_xx.xml
@@ -9,6 +9,10 @@
<key>digital_correlate_access_code_tag_xx</key>
<import>from gnuradio import digital</import>
<make>digital.correlate_access_code_tag_$(type.fcn)($access_code, $threshold, $tagname)</make>
+ <callback>set_access_code($access_code)</callback>
+ <callback>set_threshold($threshold)</callback>
+ <callback>set_tagname($tagname)</callback>
+
<param>
<name>IO Type</name>
<key>type</key>
diff --git a/gr-digital/include/gnuradio/digital/correlate_access_code_tag_bb.h b/gr-digital/include/gnuradio/digital/correlate_access_code_tag_bb.h
index 475c038dc7..d064c45709 100644
--- a/gr-digital/include/gnuradio/digital/correlate_access_code_tag_bb.h
+++ b/gr-digital/include/gnuradio/digital/correlate_access_code_tag_bb.h
@@ -63,6 +63,8 @@ namespace gr {
* e.g., "010101010111000100"
*/
virtual bool set_access_code(const std::string &access_code) = 0;
+ virtual void set_threshold(int threshold) = 0;
+ virtual void set_tagname(const std::string &tagname) = 0;
};
} /* namespace digital */
diff --git a/gr-digital/include/gnuradio/digital/correlate_access_code_tag_ff.h b/gr-digital/include/gnuradio/digital/correlate_access_code_tag_ff.h
index 93e89d6a6e..9eaad08c69 100644
--- a/gr-digital/include/gnuradio/digital/correlate_access_code_tag_ff.h
+++ b/gr-digital/include/gnuradio/digital/correlate_access_code_tag_ff.h
@@ -64,6 +64,8 @@ namespace gr {
* e.g., "010101010111000100"
*/
virtual bool set_access_code(const std::string &access_code) = 0;
+ virtual void set_threshold(int threshold) = 0;
+ virtual void set_tagname(const std::string &tagname) = 0;
};
} /* namespace digital */
diff --git a/gr-digital/lib/correlate_access_code_tag_bb_impl.cc b/gr-digital/lib/correlate_access_code_tag_bb_impl.cc
index 753efa7a51..b7e93719fc 100644
--- a/gr-digital/lib/correlate_access_code_tag_bb_impl.cc
+++ b/gr-digital/lib/correlate_access_code_tag_bb_impl.cc
@@ -73,6 +73,8 @@ namespace gr {
correlate_access_code_tag_bb_impl::set_access_code(
const std::string &access_code)
{
+ gr::thread::scoped_lock l(d_mutex_access_code);
+
d_len = access_code.length(); // # of bytes in string
if(d_len > 64)
return false;
@@ -96,6 +98,8 @@ namespace gr {
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items)
{
+ gr::thread::scoped_lock l(d_mutex_access_code);
+
const unsigned char *in = (const unsigned char*)input_items[0];
unsigned char *out = (unsigned char*)output_items[0];
diff --git a/gr-digital/lib/correlate_access_code_tag_bb_impl.h b/gr-digital/lib/correlate_access_code_tag_bb_impl.h
index df558dd17e..7f6d58b5fc 100644
--- a/gr-digital/lib/correlate_access_code_tag_bb_impl.h
+++ b/gr-digital/lib/correlate_access_code_tag_bb_impl.h
@@ -42,6 +42,8 @@ namespace gr {
pmt::pmt_t d_key, d_me; //d_key is the tag name, d_me is the block name + unique ID
+ gr::thread::mutex d_mutex_access_code;
+
public:
correlate_access_code_tag_bb_impl(const std::string &access_code,
int threshold,
@@ -53,6 +55,8 @@ namespace gr {
gr_vector_void_star &output_items);
bool set_access_code(const std::string &access_code);
+ void set_threshold(int threshold) { d_threshold = threshold; };
+ void set_tagname(const std::string &tag_name) { d_key = pmt::string_to_symbol(tag_name); };
};
} /* namespace digital */
diff --git a/gr-digital/lib/correlate_access_code_tag_ff_impl.cc b/gr-digital/lib/correlate_access_code_tag_ff_impl.cc
index 6efacbb08b..1aa3d5aca9 100644
--- a/gr-digital/lib/correlate_access_code_tag_ff_impl.cc
+++ b/gr-digital/lib/correlate_access_code_tag_ff_impl.cc
@@ -74,6 +74,8 @@ namespace gr {
correlate_access_code_tag_ff_impl::set_access_code(
const std::string &access_code)
{
+ gr::thread::scoped_lock l(d_mutex_access_code);
+
d_len = access_code.length(); // # of bytes in string
if(d_len > 64)
return false;
@@ -97,6 +99,8 @@ namespace gr {
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items)
{
+ gr::thread::scoped_lock l(d_mutex_access_code);
+
const float *in = (const float*)input_items[0];
float *out = (float*)output_items[0];
diff --git a/gr-digital/lib/correlate_access_code_tag_ff_impl.h b/gr-digital/lib/correlate_access_code_tag_ff_impl.h
index 67d3ba2d6d..3b5946228a 100644
--- a/gr-digital/lib/correlate_access_code_tag_ff_impl.h
+++ b/gr-digital/lib/correlate_access_code_tag_ff_impl.h
@@ -42,6 +42,8 @@ namespace gr {
pmt::pmt_t d_key, d_me; //d_key is the tag name, d_me is the block name + unique ID
+ gr::thread::mutex d_mutex_access_code;
+
public:
correlate_access_code_tag_ff_impl(const std::string &access_code,
int threshold,
@@ -53,6 +55,8 @@ namespace gr {
gr_vector_void_star &output_items);
bool set_access_code(const std::string &access_code);
+ void set_threshold(int threshold) { d_threshold = threshold; };
+ void set_tagname(const std::string &tag_name) { d_key = pmt::string_to_symbol(tag_name); };
};
} /* namespace digital */