summaryrefslogtreecommitdiff
path: root/gr-zeromq/lib/rep_sink_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'gr-zeromq/lib/rep_sink_impl.cc')
-rw-r--r--gr-zeromq/lib/rep_sink_impl.cc19
1 files changed, 11 insertions, 8 deletions
diff --git a/gr-zeromq/lib/rep_sink_impl.cc b/gr-zeromq/lib/rep_sink_impl.cc
index 88ed6c11c0..85f9a786c9 100644
--- a/gr-zeromq/lib/rep_sink_impl.cc
+++ b/gr-zeromq/lib/rep_sink_impl.cc
@@ -46,11 +46,14 @@ namespace gr {
{
int major, minor, patch;
zmq::version (&major, &minor, &patch);
+
if (major < 3) {
d_timeout = timeout*1000;
}
+
d_context = new zmq::context_t(1);
d_socket = new zmq::socket_t(*d_context, ZMQ_REP);
+
int time = 0;
d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
d_socket->bind (address);
@@ -71,7 +74,7 @@ namespace gr {
const char *in = (const char *) input_items[0];
zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
- zmq::poll (&items[0], 1, d_timeout);
+ zmq::poll(&items[0], 1, d_timeout);
// If we got a reply, process
if (items[0].revents & ZMQ_POLLIN) {
@@ -80,20 +83,20 @@ namespace gr {
d_socket->recv(&request);
int req_output_items = *(static_cast<int*>(request.data()));
int nitems_send = std::min(noutput_items, req_output_items);
-
+
// encode the current offset, # tags, and tags into header
std::string header("");
if(d_pass_tags){
- uint64_t offset = nitems_read(0);
- std::vector<gr::tag_t> tags;
- get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items);
- header = gen_tag_header( offset, tags );
- }
+ uint64_t offset = nitems_read(0);
+ std::vector<gr::tag_t> tags;
+ get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items);
+ header = gen_tag_header( offset, tags );
+ }
// create message copy and send
zmq::message_t msg(header.length() + d_itemsize*d_vlen*nitems_send);
if(d_pass_tags)
- memcpy((void*) msg.data(), header.c_str(), header.length() );
+ memcpy((void*) msg.data(), header.c_str(), header.length() );
memcpy((uint8_t *)msg.data() + header.length(), in, d_itemsize*d_vlen*nitems_send);
d_socket->send(msg);