summaryrefslogtreecommitdiff
path: root/gr-zeromq/lib/rep_sink_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'gr-zeromq/lib/rep_sink_impl.cc')
-rw-r--r--gr-zeromq/lib/rep_sink_impl.cc91
1 files changed, 35 insertions, 56 deletions
diff --git a/gr-zeromq/lib/rep_sink_impl.cc b/gr-zeromq/lib/rep_sink_impl.cc
index 034a5b0f83..ac6fc9c8b1 100644
--- a/gr-zeromq/lib/rep_sink_impl.cc
+++ b/gr-zeromq/lib/rep_sink_impl.cc
@@ -32,38 +32,19 @@ namespace gr {
namespace zeromq {
rep_sink::sptr
- rep_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
+ rep_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm)
{
return gnuradio::get_initial_sptr
- (new rep_sink_impl(itemsize, vlen, address, timeout, pass_tags));
+ (new rep_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
}
- rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
+ rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm)
: gr::sync_block("rep_sink",
gr::io_signature::make(1, 1, itemsize * vlen),
gr::io_signature::make(0, 0, 0)),
- d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags)
+ base_sink_impl(ZMQ_REP, itemsize, vlen, address, timeout, pass_tags, hwm)
{
- int major, minor, patch;
- zmq::version (&major, &minor, &patch);
-
- if (major < 3) {
- d_timeout = timeout*1000;
- }
-
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, ZMQ_REP);
-
- int time = 0;
- d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
- d_socket->bind (address);
- }
-
- rep_sink_impl::~rep_sink_impl()
- {
- d_socket->close();
- delete d_socket;
- delete d_context;
+ /* All is delegated */
}
int
@@ -71,46 +52,44 @@ namespace gr {
gr_vector_const_void_star &input_items,
gr_vector_void_star &output_items)
{
- const char *in = (const char *) input_items[0];
-
- zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
- zmq::poll(&items[0], 1, d_timeout);
-
- // If we got a reply, process
- if (items[0].revents & ZMQ_POLLIN) {
- // receive data request
+ const uint8_t *in = (const uint8_t *) input_items[0];
+ bool first = true;
+ int done = 0;
+
+ /* Process as much as we can */
+ while (1)
+ {
+ /* Wait for a small time (FIXME: scheduler can't wait for us) */
+ /* We only wait if its the first iteration, for the others we'll
+ * let the scheduler retry */
+ zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
+ zmq::poll(&items[0], 1, first ? d_timeout : 0);
+
+ /* If we dont have anything, we're done */
+ if (!(items[0].revents & ZMQ_POLLIN))
+ break;
+
+ /* Get and parse the request */
zmq::message_t request;
d_socket->recv(&request);
- int req_output_items = *(static_cast<int*>(request.data()));
- int nitems_send = std::min(noutput_items, req_output_items);
- // encode the current offset, # tags, and tags into header
- std::string header("");
- if(d_pass_tags){
- uint64_t offset = nitems_read(0);
- std::vector<gr::tag_t> tags;
- get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items);
- header = gen_tag_header( offset, tags );
+ int nitems_send = noutput_items - done;
+ if (request.size() >= sizeof(uint32_t))
+ {
+ int req = (int)*(static_cast<uint32_t*>(request.data()));
+ nitems_send = std::min(nitems_send, req);
}
+ /* Delegate the actual send */
+ done += send_message(in + (done * d_vsize), nitems_send, nitems_read(0) + done);
- // create message copy and send
- int payloadlen = d_itemsize * d_vlen * noutput_items;
- int msglen = d_pass_tags ? payloadlen + header.length() : payloadlen;
- zmq::message_t msg(msglen);
-
- if(d_pass_tags){
- memcpy((void*) msg.data(), header.c_str(), header.length() );
- memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*noutput_items);
- } else {
- memcpy((uint8_t *)msg.data(), in, d_itemsize*d_vlen*noutput_items);
- }
- d_socket->send(msg);
-
- return nitems_send;
+ /* Not the first anymore */
+ first = false;
}
- return 0;
+ return done;
}
} /* namespace zeromq */
} /* namespace gr */
+
+// vim: ts=2 sw=2 expandtab