summaryrefslogtreecommitdiff
path: root/gr-zeromq/lib/sub_source_impl.cc
diff options
context:
space:
mode:
authorSylvain Munaut <tnt@246tNt.com>2016-01-27 12:58:50 +0100
committerJohnathan Corgan <johnathan@corganlabs.com>2016-01-27 10:31:26 -0800
commit6e482c5bb6bf49f000f6b8d35a1ca84127e38c46 (patch)
treeee9e4e3323da08dfabe91257229b970a38dff066 /gr-zeromq/lib/sub_source_impl.cc
parent393624c072417e0b8a74480979eeccdfb8278e77 (diff)
gr-zeromq: Big rework for performance and correctness
- Use class hierarchy trying to maximize code re-use. - Dont' drop samples on receive if the output buffer doesn't have enough space. - Don't drop tags on receive by putting tags in the future. - Better metadata creation/parsing avoiding copying lots data. - Always do as much work as possible in a single call to work() to avoid scheduler overhead as long as possible. - Allow setting the high watermark to avoid older version of zeromq's default of buffering infinite messages and causing a paging thrash to/from disk when the flow graph can't keep up. Signed-off-by: Sylvain Munaut <tnt@246tNt.com>
Diffstat (limited to 'gr-zeromq/lib/sub_source_impl.cc')
-rw-r--r--gr-zeromq/lib/sub_source_impl.cc94
1 files changed, 32 insertions, 62 deletions
diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc
index 1242688a90..9a2e0bfe15 100644
--- a/gr-zeromq/lib/sub_source_impl.cc
+++ b/gr-zeromq/lib/sub_source_impl.cc
@@ -32,40 +32,20 @@ namespace gr {
namespace zeromq {
sub_source::sptr
- sub_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
+ sub_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm)
{
return gnuradio::get_initial_sptr
- (new sub_source_impl(itemsize, vlen, address, timeout, pass_tags));
+ (new sub_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
}
- sub_source_impl::sub_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
+ sub_source_impl::sub_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm)
: gr::sync_block("sub_source",
gr::io_signature::make(0, 0, 0),
gr::io_signature::make(1, 1, itemsize * vlen)),
- d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags)
+ base_source_impl(ZMQ_SUB, itemsize, vlen, address, timeout, pass_tags, hwm)
{
- int major, minor, patch;
- zmq::version (&major, &minor, &patch);
-
- if (major < 3) {
- d_timeout = timeout*1000;
- }
-
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, ZMQ_SUB);
-
+ /* Subscribe */
d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
- d_socket->connect (address);
- }
-
- /*
- * Our virtual destructor.
- */
- sub_source_impl::~sub_source_impl()
- {
- d_socket->close();
- delete d_socket;
- delete d_context;
}
int
@@ -73,47 +53,37 @@ namespace gr {
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items)
{
- char *out = (char*)output_items[0];
-
- zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
- zmq::poll (&items[0], 1, d_timeout);
-
- // If we got a reply, process
- if (items[0].revents & ZMQ_POLLIN) {
-
- // Receive data
- zmq::message_t msg;
- d_socket->recv(&msg);
-
- // Deserialize header data / tags
- std::string buf(static_cast<char*>(msg.data()), msg.size());
-
- if(d_pass_tags){
- uint64_t rcv_offset;
- std::vector<gr::tag_t> tags;
-
- buf = parse_tag_header(buf, rcv_offset, tags);
-
- for(size_t i=0; i<tags.size(); i++) {
- tags[i].offset -= rcv_offset - nitems_written(0);
- add_item_tag(0, tags[i]);
- }
+ uint8_t *out = (uint8_t *) output_items[0];
+ bool first = true;
+ int done = 0;
+
+ /* Process as much as we can */
+ while (1)
+ {
+ if (has_pending())
+ {
+ /* Flush anything pending */
+ done += flush_pending(out + (done * d_vsize), noutput_items - done, nitems_written(0) + done);
+
+ /* No more space ? */
+ if (done == noutput_items)
+ break;
}
-
- // Copy to ouput buffer and return
- if (buf.size() >= d_itemsize*d_vlen*noutput_items) {
- memcpy(out, (void *)&buf[0], d_itemsize*d_vlen*noutput_items);
- return noutput_items;
+ else
+ {
+ /* Try to get the next message */
+ if (!load_message(first))
+ break; /* No message, we're done for now */
+
+ /* Not the first anymore */
+ first = false;
}
- else {
- memcpy(out, (void *)&buf[0], buf.size());
- return buf.size()/(d_itemsize*d_vlen);
- }
- }
- else {
- return 0; // FIXME: someday when the scheduler does all the poll/selects
}
+
+ return done;
}
} /* namespace zeromq */
} /* namespace gr */
+
+// vim: ts=2 sw=2 expandtab