From 6e482c5bb6bf49f000f6b8d35a1ca84127e38c46 Mon Sep 17 00:00:00 2001
From: Sylvain Munaut <tnt@246tNt.com>
Date: Wed, 27 Jan 2016 12:58:50 +0100
Subject: gr-zeromq: Big rework for performance and correctness

 - Use class hierarchy trying to maximize code re-use.
 - Dont' drop samples on receive if the output buffer doesn't have
   enough space.
 - Don't drop tags on receive by putting tags in the future.
 - Better metadata creation/parsing avoiding copying lots data.
 - Always do as much work as possible in a single call to work()
   to avoid scheduler overhead as long as possible.
 - Allow setting the  high watermark to avoid older version of
   zeromq's default of buffering infinite messages and causing a
   paging thrash to/from disk when the flow graph can't keep up.

Signed-off-by: Sylvain Munaut <tnt@246tNt.com>
---
 gr-zeromq/lib/req_source_impl.cc | 107 ++++++++++++++++++---------------------
 1 file changed, 48 insertions(+), 59 deletions(-)

(limited to 'gr-zeromq/lib/req_source_impl.cc')

diff --git a/gr-zeromq/lib/req_source_impl.cc b/gr-zeromq/lib/req_source_impl.cc
index f69d447f98..526736389e 100644
--- a/gr-zeromq/lib/req_source_impl.cc
+++ b/gr-zeromq/lib/req_source_impl.cc
@@ -32,38 +32,20 @@ namespace gr {
   namespace zeromq {
 
     req_source::sptr
-    req_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
+    req_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm)
     {
       return gnuradio::get_initial_sptr
-        (new req_source_impl(itemsize, vlen, address, timeout, pass_tags));
+        (new req_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
     }
 
-    req_source_impl::req_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
+    req_source_impl::req_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm)
       : gr::sync_block("req_source",
                        gr::io_signature::make(0, 0, 0),
                        gr::io_signature::make(1, 1, itemsize * vlen)),
-        d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags)
+        base_source_impl(ZMQ_REQ, itemsize, vlen, address, timeout, pass_tags, hwm),
+        d_req_pending(false)
     {
-      int major, minor, patch;
-      zmq::version (&major, &minor, &patch);
-
-      if (major < 3) {
-        d_timeout = timeout*1000;
-      }
-
-      d_context = new zmq::context_t(1);
-      d_socket = new zmq::socket_t(*d_context, ZMQ_REQ);
-
-      int time = 0;
-      d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
-      d_socket->connect (address);
-    }
-
-    req_source_impl::~req_source_impl()
-    {
-      d_socket->close();
-      delete d_socket;
-      delete d_context;
+      /* All is delegated */
     }
 
     int
@@ -71,49 +53,56 @@ namespace gr {
                           gr_vector_const_void_star &input_items,
                           gr_vector_void_star &output_items)
     {
-      char *out = (char*)output_items[0];
+#if 0
+#endif
+      uint8_t *out = (uint8_t *) output_items[0];
+      bool first = true;
+      int done = 0;
+
+      /* Process as much as we can */
+      while (1)
+      {
+        if (has_pending())
+        {
+          /* Flush anything pending */
+          done += flush_pending(out + (done * d_vsize), noutput_items - done, nitems_written(0) + done);
+
+          /* No more space ? */
+          if (done == noutput_items)
+            break;
+        }
+        else
+        {
+          /* Send request if needed */
+          if (!d_req_pending)
+          {
+            /* The REP/REQ pattern state machine guarantees we can send at this point */
+            uint32_t req_len = noutput_items - done;
+            zmq::message_t request(sizeof(uint32_t));
+            memcpy ((void *) request.data (), &req_len, sizeof(uint32_t));
+            d_socket->send(request);
+
+            d_req_pending = true;
+          }
 
-      zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } };
-      zmq::poll (&itemsout[0], 1, d_timeout);
+          /* Try to get the next message */
+          if (!load_message(first))
+            break;  /* No message, we're done for now */
 
-      //  If we got a reply, process
-      if (itemsout[0].revents & ZMQ_POLLOUT) {
-        // Request data, FIXME non portable?
-        zmq::message_t request(sizeof(int));
-        memcpy ((void *) request.data (), &noutput_items, sizeof(int));
-        d_socket->send(request);
-      }
+          /* Got response */
+          d_req_pending = false;
 
-      zmq::pollitem_t itemsin[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
-      zmq::poll(&itemsin[0], 1, d_timeout);
-
-      //  If we got a reply, process
-      if (itemsin[0].revents & ZMQ_POLLIN) {
-        // Receive data
-        zmq::message_t reply;
-        d_socket->recv(&reply);
-
-        // Deserialize header data / tags
-        std::string buf(static_cast<char*>(reply.data()), reply.size());
-
-        if(d_pass_tags){
-          uint64_t rcv_offset;
-          std::vector<gr::tag_t> tags;
-          buf = parse_tag_header(buf, rcv_offset, tags);
-          for(size_t i=0; i<tags.size(); i++){
-            tags[i].offset -= rcv_offset - nitems_written(0);
-            add_item_tag(0, tags[i]);
-          }
+          /* Not the first anymore */
+          first = false;
         }
-
-
-        // Copy to ouput buffer and return
-        memcpy(out, (void *)&buf[0], buf.size());
-        return buf.size()/(d_itemsize*d_vlen);
       }
 
+      return done;
+
       return 0;
     }
 
   } /* namespace zeromq */
 } /* namespace gr */
+
+// vim: ts=2 sw=2 expandtab
-- 
cgit v1.2.3