summaryrefslogtreecommitdiff
path: root/gr-zeromq/lib/req_source_impl.cc
diff options
context:
space:
mode:
authorSylvain Munaut <tnt@246tNt.com>2016-01-27 12:58:50 +0100
committerJohnathan Corgan <johnathan@corganlabs.com>2016-01-27 10:31:26 -0800
commit6e482c5bb6bf49f000f6b8d35a1ca84127e38c46 (patch)
treeee9e4e3323da08dfabe91257229b970a38dff066 /gr-zeromq/lib/req_source_impl.cc
parent393624c072417e0b8a74480979eeccdfb8278e77 (diff)
gr-zeromq: Big rework for performance and correctness
- Use class hierarchy trying to maximize code re-use. - Dont' drop samples on receive if the output buffer doesn't have enough space. - Don't drop tags on receive by putting tags in the future. - Better metadata creation/parsing avoiding copying lots data. - Always do as much work as possible in a single call to work() to avoid scheduler overhead as long as possible. - Allow setting the high watermark to avoid older version of zeromq's default of buffering infinite messages and causing a paging thrash to/from disk when the flow graph can't keep up. Signed-off-by: Sylvain Munaut <tnt@246tNt.com>
Diffstat (limited to 'gr-zeromq/lib/req_source_impl.cc')
-rw-r--r--gr-zeromq/lib/req_source_impl.cc107
1 files changed, 48 insertions, 59 deletions
diff --git a/gr-zeromq/lib/req_source_impl.cc b/gr-zeromq/lib/req_source_impl.cc
index f69d447f98..526736389e 100644
--- a/gr-zeromq/lib/req_source_impl.cc
+++ b/gr-zeromq/lib/req_source_impl.cc
@@ -32,38 +32,20 @@ namespace gr {
namespace zeromq {
req_source::sptr
- req_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
+ req_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm)
{
return gnuradio::get_initial_sptr
- (new req_source_impl(itemsize, vlen, address, timeout, pass_tags));
+ (new req_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
}
- req_source_impl::req_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
+ req_source_impl::req_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm)
: gr::sync_block("req_source",
gr::io_signature::make(0, 0, 0),
gr::io_signature::make(1, 1, itemsize * vlen)),
- d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags)
+ base_source_impl(ZMQ_REQ, itemsize, vlen, address, timeout, pass_tags, hwm),
+ d_req_pending(false)
{
- int major, minor, patch;
- zmq::version (&major, &minor, &patch);
-
- if (major < 3) {
- d_timeout = timeout*1000;
- }
-
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, ZMQ_REQ);
-
- int time = 0;
- d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
- d_socket->connect (address);
- }
-
- req_source_impl::~req_source_impl()
- {
- d_socket->close();
- delete d_socket;
- delete d_context;
+ /* All is delegated */
}
int
@@ -71,49 +53,56 @@ namespace gr {
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items)
{
- char *out = (char*)output_items[0];
+#if 0
+#endif
+ uint8_t *out = (uint8_t *) output_items[0];
+ bool first = true;
+ int done = 0;
+
+ /* Process as much as we can */
+ while (1)
+ {
+ if (has_pending())
+ {
+ /* Flush anything pending */
+ done += flush_pending(out + (done * d_vsize), noutput_items - done, nitems_written(0) + done);
+
+ /* No more space ? */
+ if (done == noutput_items)
+ break;
+ }
+ else
+ {
+ /* Send request if needed */
+ if (!d_req_pending)
+ {
+ /* The REP/REQ pattern state machine guarantees we can send at this point */
+ uint32_t req_len = noutput_items - done;
+ zmq::message_t request(sizeof(uint32_t));
+ memcpy ((void *) request.data (), &req_len, sizeof(uint32_t));
+ d_socket->send(request);
+
+ d_req_pending = true;
+ }
- zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } };
- zmq::poll (&itemsout[0], 1, d_timeout);
+ /* Try to get the next message */
+ if (!load_message(first))
+ break; /* No message, we're done for now */
- // If we got a reply, process
- if (itemsout[0].revents & ZMQ_POLLOUT) {
- // Request data, FIXME non portable?
- zmq::message_t request(sizeof(int));
- memcpy ((void *) request.data (), &noutput_items, sizeof(int));
- d_socket->send(request);
- }
+ /* Got response */
+ d_req_pending = false;
- zmq::pollitem_t itemsin[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
- zmq::poll(&itemsin[0], 1, d_timeout);
-
- // If we got a reply, process
- if (itemsin[0].revents & ZMQ_POLLIN) {
- // Receive data
- zmq::message_t reply;
- d_socket->recv(&reply);
-
- // Deserialize header data / tags
- std::string buf(static_cast<char*>(reply.data()), reply.size());
-
- if(d_pass_tags){
- uint64_t rcv_offset;
- std::vector<gr::tag_t> tags;
- buf = parse_tag_header(buf, rcv_offset, tags);
- for(size_t i=0; i<tags.size(); i++){
- tags[i].offset -= rcv_offset - nitems_written(0);
- add_item_tag(0, tags[i]);
- }
+ /* Not the first anymore */
+ first = false;
}
-
-
- // Copy to ouput buffer and return
- memcpy(out, (void *)&buf[0], buf.size());
- return buf.size()/(d_itemsize*d_vlen);
}
+ return done;
+
return 0;
}
} /* namespace zeromq */
} /* namespace gr */
+
+// vim: ts=2 sw=2 expandtab