summaryrefslogtreecommitdiff
path: root/gr-zeromq/lib/pull_source_impl.cc
diff options
context:
space:
mode:
authorJohnathan Corgan <johnathan@corganlabs.com>2015-01-12 16:51:39 -0800
committerJohnathan Corgan <johnathan@corganlabs.com>2015-01-12 16:51:39 -0800
commit0b65c1aa92356c9b3d848718b232176b96fda30e (patch)
tree44250ce926a32fedfd041e01e553c325faa9c057 /gr-zeromq/lib/pull_source_impl.cc
parent92dfbb61f226f38a514e11b0abbb2f8f3305e133 (diff)
zeromq: minor cleanup
Diffstat (limited to 'gr-zeromq/lib/pull_source_impl.cc')
-rw-r--r--gr-zeromq/lib/pull_source_impl.cc22
1 files changed, 11 insertions, 11 deletions
diff --git a/gr-zeromq/lib/pull_source_impl.cc b/gr-zeromq/lib/pull_source_impl.cc
index 87b330a862..3215096f56 100644
--- a/gr-zeromq/lib/pull_source_impl.cc
+++ b/gr-zeromq/lib/pull_source_impl.cc
@@ -46,11 +46,14 @@ namespace gr {
{
int major, minor, patch;
zmq::version (&major, &minor, &patch);
+
if (major < 3) {
d_timeout = timeout*1000;
}
+
d_context = new zmq::context_t(1);
d_socket = new zmq::socket_t(*d_context, ZMQ_PULL);
+
int time = 0;
d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
d_socket->connect (address);
@@ -86,25 +89,22 @@ namespace gr {
// check header for tags...
std::string buf(static_cast<char*>(msg.data()), msg.size());
if(d_pass_tags){
- uint64_t rcv_offset;
- std::vector<gr::tag_t> tags;
- buf = parse_tag_header(buf, rcv_offset, tags);
- for(size_t i=0; i<tags.size(); i++){
- tags[i].offset -= rcv_offset - nitems_written(0);
- add_item_tag(0, tags[i]);
- }
- }
-
+ uint64_t rcv_offset;
+ std::vector<gr::tag_t> tags;
+ buf = parse_tag_header(buf, rcv_offset, tags);
+ for(size_t i=0; i<tags.size(); i++){
+ tags[i].offset -= rcv_offset - nitems_written(0);
+ add_item_tag(0, tags[i]);
+ }
+ }
// Copy to ouput buffer and return
if (buf.size() >= d_itemsize*d_vlen*noutput_items) {
memcpy(out, (void *)&buf[0], d_itemsize*d_vlen*noutput_items);
-
return noutput_items;
}
else {
memcpy(out, (void *)&buf[0], buf.size());
-
return buf.size()/(d_itemsize*d_vlen);
}
}