summaryrefslogtreecommitdiff
path: root/gr-zeromq/lib
diff options
context:
space:
mode:
Diffstat (limited to 'gr-zeromq/lib')
-rw-r--r--gr-zeromq/lib/base_impl.cc269
-rw-r--r--gr-zeromq/lib/base_impl.h82
-rw-r--r--gr-zeromq/lib/pub_msg_sink_impl.cc83
-rw-r--r--gr-zeromq/lib/pub_msg_sink_impl.h33
-rw-r--r--gr-zeromq/lib/pub_sink_impl.cc54
-rw-r--r--gr-zeromq/lib/pub_sink_impl.h33
-rw-r--r--gr-zeromq/lib/pull_msg_source_impl.cc114
-rw-r--r--gr-zeromq/lib/pull_msg_source_impl.h43
-rw-r--r--gr-zeromq/lib/pull_source_impl.cc87
-rw-r--r--gr-zeromq/lib/pull_source_impl.h33
-rw-r--r--gr-zeromq/lib/push_msg_sink_impl.cc83
-rw-r--r--gr-zeromq/lib/push_msg_sink_impl.h33
-rw-r--r--gr-zeromq/lib/push_sink_impl.cc58
-rw-r--r--gr-zeromq/lib/push_sink_impl.h35
-rw-r--r--gr-zeromq/lib/rep_msg_sink_impl.cc159
-rw-r--r--gr-zeromq/lib/rep_msg_sink_impl.h43
-rw-r--r--gr-zeromq/lib/rep_sink_impl.cc76
-rw-r--r--gr-zeromq/lib/rep_sink_impl.h27
-rw-r--r--gr-zeromq/lib/req_msg_source_impl.cc144
-rw-r--r--gr-zeromq/lib/req_msg_source_impl.h43
-rw-r--r--gr-zeromq/lib/req_source_impl.cc133
-rw-r--r--gr-zeromq/lib/req_source_impl.h32
-rw-r--r--gr-zeromq/lib/sub_msg_source_impl.cc122
-rw-r--r--gr-zeromq/lib/sub_msg_source_impl.h43
-rw-r--r--gr-zeromq/lib/sub_source_impl.cc89
-rw-r--r--gr-zeromq/lib/sub_source_impl.h36
-rw-r--r--gr-zeromq/lib/tag_headers.cc118
-rw-r--r--gr-zeromq/lib/tag_headers.h12
-rw-r--r--gr-zeromq/lib/zmq_common_impl.h3
29 files changed, 1078 insertions, 1042 deletions
diff --git a/gr-zeromq/lib/base_impl.cc b/gr-zeromq/lib/base_impl.cc
index 662f7b1596..b3a107c815 100644
--- a/gr-zeromq/lib/base_impl.cc
+++ b/gr-zeromq/lib/base_impl.cc
@@ -29,200 +29,207 @@
#include "tag_headers.h"
namespace gr {
- namespace zeromq {
+namespace zeromq {
- base_impl::base_impl(int type, size_t itemsize, size_t vlen, int timeout, bool pass_tags)
- : d_vsize(itemsize * vlen), d_timeout(timeout), d_pass_tags(pass_tags)
- {
- /* "Fix" timeout value (ms for new API, us for old API) */
- int major, minor, patch;
- zmq::version (&major, &minor, &patch);
+base_impl::base_impl(int type, size_t itemsize, size_t vlen, int timeout, bool pass_tags)
+ : d_vsize(itemsize * vlen), d_timeout(timeout), d_pass_tags(pass_tags)
+{
+ /* "Fix" timeout value (ms for new API, us for old API) */
+ int major, minor, patch;
+ zmq::version(&major, &minor, &patch);
- if (major < 3) {
+ if (major < 3) {
d_timeout *= 1000;
- }
-
- /* Create context & socket */
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, type);
- }
-
- base_impl::~base_impl()
- {
- d_socket->close();
- delete d_socket;
- delete d_context;
- }
-
- std::string
- base_impl::last_endpoint()
- {
- char addr[256];
- size_t addr_len = sizeof(addr);
- d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len);
- return std::string(addr, addr_len-1);
}
-
- base_sink_impl::base_sink_impl(int type, size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm)
- : base_impl(type, itemsize, vlen, timeout, pass_tags)
- {
- /* Set high watermark */
- if (hwm >= 0) {
+ /* Create context & socket */
+ d_context = new zmq::context_t(1);
+ d_socket = new zmq::socket_t(*d_context, type);
+}
+
+base_impl::~base_impl()
+{
+ d_socket->close();
+ delete d_socket;
+ delete d_context;
+}
+
+std::string base_impl::last_endpoint()
+{
+ char addr[256];
+ size_t addr_len = sizeof(addr);
+ d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len);
+ return std::string(addr, addr_len - 1);
+}
+
+
+base_sink_impl::base_sink_impl(int type,
+ size_t itemsize,
+ size_t vlen,
+ char* address,
+ int timeout,
+ bool pass_tags,
+ int hwm)
+ : base_impl(type, itemsize, vlen, timeout, pass_tags)
+{
+ /* Set high watermark */
+ if (hwm >= 0) {
#ifdef ZMQ_SNDHWM
d_socket->setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm));
#else // major < 3
uint64_t tmp = hwm;
d_socket->setsockopt(ZMQ_HWM, &tmp, sizeof(tmp));
#endif
- }
-
- /* Bind */
- d_socket->bind(address);
}
- int
- base_sink_impl::send_message(const void *in_buf, const int in_nitems, const uint64_t in_offset)
- {
- /* Meta-data header */
- std::string header("");
- if(d_pass_tags){
+ /* Bind */
+ d_socket->bind(address);
+}
+
+int base_sink_impl::send_message(const void* in_buf,
+ const int in_nitems,
+ const uint64_t in_offset)
+{
+ /* Meta-data header */
+ std::string header("");
+ if (d_pass_tags) {
std::vector<gr::tag_t> tags;
get_tags_in_range(tags, 0, in_offset, in_offset + in_nitems);
header = gen_tag_header(in_offset, tags);
- }
+ }
- /* Create message */
- size_t payload_len = in_nitems * d_vsize;
- size_t msg_len = d_pass_tags ? payload_len + header.length() : payload_len;
- zmq::message_t msg(msg_len);
+ /* Create message */
+ size_t payload_len = in_nitems * d_vsize;
+ size_t msg_len = d_pass_tags ? payload_len + header.length() : payload_len;
+ zmq::message_t msg(msg_len);
- if(d_pass_tags){
+ if (d_pass_tags) {
memcpy(msg.data(), header.c_str(), header.length());
memcpy((uint8_t*)msg.data() + header.length(), in_buf, payload_len);
- } else {
+ } else {
memcpy(msg.data(), in_buf, payload_len);
- }
+ }
- /* Send */
+ /* Send */
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->send(msg, zmq::send_flags::none);
+ d_socket->send(msg, zmq::send_flags::none);
#else
- d_socket->send(msg);
+ d_socket->send(msg);
#endif
- /* Report back */
- return in_nitems;
- }
-
- base_source_impl::base_source_impl(int type, size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm)
- : base_impl(type, itemsize, vlen, timeout, pass_tags),
- d_consumed_bytes(0), d_consumed_items(0)
- {
- /* Set high watermark */
- if (hwm >= 0) {
+ /* Report back */
+ return in_nitems;
+}
+
+base_source_impl::base_source_impl(int type,
+ size_t itemsize,
+ size_t vlen,
+ char* address,
+ int timeout,
+ bool pass_tags,
+ int hwm)
+ : base_impl(type, itemsize, vlen, timeout, pass_tags),
+ d_consumed_bytes(0),
+ d_consumed_items(0)
+{
+ /* Set high watermark */
+ if (hwm >= 0) {
#ifdef ZMQ_RCVHWM
d_socket->setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm));
#else // major < 3
uint64_t tmp = hwm;
d_socket->setsockopt(ZMQ_HWM, &tmp, sizeof(tmp));
#endif
- }
-
- /* Connect */
- d_socket->connect(address);
}
- bool
- base_source_impl::has_pending()
- {
- return d_msg.size() > d_consumed_bytes;
- }
+ /* Connect */
+ d_socket->connect(address);
+}
- int
- base_source_impl::flush_pending(void *out_buf, const int out_nitems, const uint64_t out_offset)
- {
- /* How much to copy in this call */
- int to_copy_items = std::min(out_nitems, (int)((d_msg.size() - d_consumed_bytes) / d_vsize));
- int to_copy_bytes = d_vsize * to_copy_items;
+bool base_source_impl::has_pending() { return d_msg.size() > d_consumed_bytes; }
- /* Copy actual data */
- memcpy(out_buf, (uint8_t*)d_msg.data() + d_consumed_bytes, to_copy_bytes);
+int base_source_impl::flush_pending(void* out_buf,
+ const int out_nitems,
+ const uint64_t out_offset)
+{
+ /* How much to copy in this call */
+ int to_copy_items =
+ std::min(out_nitems, (int)((d_msg.size() - d_consumed_bytes) / d_vsize));
+ int to_copy_bytes = d_vsize * to_copy_items;
- /* Add tags matching this segment of samples */
- for (unsigned int i=0; i<d_tags.size(); i++)
- {
+ /* Copy actual data */
+ memcpy(out_buf, (uint8_t*)d_msg.data() + d_consumed_bytes, to_copy_bytes);
+
+ /* Add tags matching this segment of samples */
+ for (unsigned int i = 0; i < d_tags.size(); i++) {
if ((d_tags[i].offset >= (uint64_t)d_consumed_items) &&
- (d_tags[i].offset < (uint64_t)d_consumed_items + to_copy_items))
- {
- gr::tag_t nt = d_tags[i];
- nt.offset += out_offset - d_consumed_items;
- add_item_tag(0, nt);
+ (d_tags[i].offset < (uint64_t)d_consumed_items + to_copy_items)) {
+ gr::tag_t nt = d_tags[i];
+ nt.offset += out_offset - d_consumed_items;
+ add_item_tag(0, nt);
}
- }
+ }
- /* Update pointer */
- d_consumed_items += to_copy_items;
- d_consumed_bytes += to_copy_bytes;
+ /* Update pointer */
+ d_consumed_items += to_copy_items;
+ d_consumed_bytes += to_copy_bytes;
- return to_copy_items;
- }
+ return to_copy_items;
+}
- bool
- base_source_impl::load_message(bool wait)
- {
- /* Poll for input */
- zmq::pollitem_t items[] = { { static_cast<void *>(*d_socket), 0, ZMQ_POLLIN, 0 } };
- zmq::poll(&items[0], 1, wait ? d_timeout : 0);
+bool base_source_impl::load_message(bool wait)
+{
+ /* Poll for input */
+ zmq::pollitem_t items[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 } };
+ zmq::poll(&items[0], 1, wait ? d_timeout : 0);
- if (!(items[0].revents & ZMQ_POLLIN))
+ if (!(items[0].revents & ZMQ_POLLIN))
return false;
- /* Is this the start or continuation of a multi-part message? */
- int64_t more = 0;
- size_t more_len = sizeof(more);
- d_socket->getsockopt(ZMQ_RCVMORE, &more, &more_len);
+ /* Is this the start or continuation of a multi-part message? */
+ int64_t more = 0;
+ size_t more_len = sizeof(more);
+ d_socket->getsockopt(ZMQ_RCVMORE, &more, &more_len);
- /* Reset */
- d_msg.rebuild();
- d_tags.clear();
- d_consumed_items = 0;
- d_consumed_bytes = 0;
+ /* Reset */
+ d_msg.rebuild();
+ d_tags.clear();
+ d_consumed_items = 0;
+ d_consumed_bytes = 0;
- /* Get the message */
+ /* Get the message */
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->recv(d_msg);
+ d_socket->recv(d_msg);
#else
- d_socket->recv(&d_msg);
+ d_socket->recv(&d_msg);
#endif
- /* Parse header from the first (or only) message of a multi-part message */
- if (d_pass_tags && !more)
- {
+ /* Parse header from the first (or only) message of a multi-part message */
+ if (d_pass_tags && !more) {
uint64_t rcv_offset;
/* Parse header */
d_consumed_bytes = parse_tag_header(d_msg, rcv_offset, d_tags);
/* Fixup the tags offset to be relative to the start of this message */
- for (unsigned int i=0; i<d_tags.size(); i++) {
- d_tags[i].offset -= rcv_offset;
+ for (unsigned int i = 0; i < d_tags.size(); i++) {
+ d_tags[i].offset -= rcv_offset;
}
- }
+ }
- /* Each message must contain an integer multiple of data vectors */
- if ((d_msg.size() - d_consumed_bytes) % d_vsize != 0)
- {
+ /* Each message must contain an integer multiple of data vectors */
+ if ((d_msg.size() - d_consumed_bytes) % d_vsize != 0) {
throw std::runtime_error(
boost::str(boost::format("Incompatible vector sizes: "
- "need a multiple of %1% bytes per message") % d_vsize));
- }
-
- /* We got one ! */
- return true;
+ "need a multiple of %1% bytes per message") %
+ d_vsize));
}
- } /* namespace zeromq */
+ /* We got one ! */
+ return true;
+}
+
+} /* namespace zeromq */
} /* namespace gr */
// vim: ts=2 sw=2 expandtab
diff --git a/gr-zeromq/lib/base_impl.h b/gr-zeromq/lib/base_impl.h
index 68f0fa2ea5..3783a4e375 100644
--- a/gr-zeromq/lib/base_impl.h
+++ b/gr-zeromq/lib/base_impl.h
@@ -27,49 +27,61 @@
#include "zmq_common_impl.h"
namespace gr {
- namespace zeromq {
+namespace zeromq {
- class base_impl : public virtual gr::sync_block
- {
- public:
- base_impl(int type, size_t itemsize, size_t vlen, int timeout, bool pass_tags);
- virtual ~base_impl();
+class base_impl : public virtual gr::sync_block
+{
+public:
+ base_impl(int type, size_t itemsize, size_t vlen, int timeout, bool pass_tags);
+ virtual ~base_impl();
- protected:
- std::string last_endpoint();
- zmq::context_t *d_context;
- zmq::socket_t *d_socket;
- size_t d_vsize;
- int d_timeout;
- bool d_pass_tags;
- };
+protected:
+ std::string last_endpoint();
+ zmq::context_t* d_context;
+ zmq::socket_t* d_socket;
+ size_t d_vsize;
+ int d_timeout;
+ bool d_pass_tags;
+};
- class base_sink_impl : public base_impl
- {
- public:
- base_sink_impl(int type, size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm);
+class base_sink_impl : public base_impl
+{
+public:
+ base_sink_impl(int type,
+ size_t itemsize,
+ size_t vlen,
+ char* address,
+ int timeout,
+ bool pass_tags,
+ int hwm);
- protected:
- int send_message(const void *in_buf, const int in_nitems, const uint64_t in_offset);
- };
+protected:
+ int send_message(const void* in_buf, const int in_nitems, const uint64_t in_offset);
+};
- class base_source_impl : public base_impl
- {
- public:
- base_source_impl(int type, size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm);
+class base_source_impl : public base_impl
+{
+public:
+ base_source_impl(int type,
+ size_t itemsize,
+ size_t vlen,
+ char* address,
+ int timeout,
+ bool pass_tags,
+ int hwm);
- protected:
- zmq::message_t d_msg;
- std::vector<gr::tag_t> d_tags;
- size_t d_consumed_bytes;
- int d_consumed_items;
+protected:
+ zmq::message_t d_msg;
+ std::vector<gr::tag_t> d_tags;
+ size_t d_consumed_bytes;
+ int d_consumed_items;
- bool has_pending();
- int flush_pending(void *out_buf, const int out_nitems, const uint64_t out_offset);
- bool load_message(bool wait);
- };
+ bool has_pending();
+ int flush_pending(void* out_buf, const int out_nitems, const uint64_t out_offset);
+ bool load_message(bool wait);
+};
- } // namespace zeromq
+} // namespace zeromq
} // namespace gr
#endif /* INCLUDED_ZEROMQ_BASE_IMPL_H */
diff --git a/gr-zeromq/lib/pub_msg_sink_impl.cc b/gr-zeromq/lib/pub_msg_sink_impl.cc
index 5fc7164d2b..77ffc469cd 100644
--- a/gr-zeromq/lib/pub_msg_sink_impl.cc
+++ b/gr-zeromq/lib/pub_msg_sink_impl.cc
@@ -29,55 +29,52 @@
#include "tag_headers.h"
namespace gr {
- namespace zeromq {
+namespace zeromq {
- pub_msg_sink::sptr
- pub_msg_sink::make(char *address, int timeout)
- {
- return gnuradio::get_initial_sptr
- (new pub_msg_sink_impl(address, timeout));
+pub_msg_sink::sptr pub_msg_sink::make(char* address, int timeout)
+{
+ return gnuradio::get_initial_sptr(new pub_msg_sink_impl(address, timeout));
+}
+
+pub_msg_sink_impl::pub_msg_sink_impl(char* address, int timeout)
+ : gr::block("pub_msg_sink",
+ gr::io_signature::make(0, 0, 0),
+ gr::io_signature::make(0, 0, 0)),
+ d_timeout(timeout)
+{
+ int major, minor, patch;
+ zmq::version(&major, &minor, &patch);
+ if (major < 3) {
+ d_timeout = timeout * 1000;
}
- pub_msg_sink_impl::pub_msg_sink_impl(char *address, int timeout)
- : gr::block("pub_msg_sink",
- gr::io_signature::make(0, 0, 0),
- gr::io_signature::make(0, 0, 0)),
- d_timeout(timeout)
- {
- int major, minor, patch;
- zmq::version (&major, &minor, &patch);
- if (major < 3) {
- d_timeout = timeout*1000;
- }
+ d_context = new zmq::context_t(1);
+ d_socket = new zmq::socket_t(*d_context, ZMQ_PUB);
+ int time = 0;
+ d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
+ d_socket->bind(address);
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, ZMQ_PUB);
- int time = 0;
- d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
- d_socket->bind(address);
+ message_port_register_in(pmt::mp("in"));
+ set_msg_handler(pmt::mp("in"), boost::bind(&pub_msg_sink_impl::handler, this, _1));
+}
- message_port_register_in(pmt::mp("in"));
- set_msg_handler( pmt::mp("in"),
- boost::bind(&pub_msg_sink_impl::handler, this, _1));
- }
+pub_msg_sink_impl::~pub_msg_sink_impl()
+{
+ d_socket->close();
+ delete d_socket;
+ delete d_context;
+}
- pub_msg_sink_impl::~pub_msg_sink_impl()
- {
- d_socket->close();
- delete d_socket;
- delete d_context;
- }
+void pub_msg_sink_impl::handler(pmt::pmt_t msg)
+{
+ std::stringbuf sb("");
+ pmt::serialize(msg, sb);
+ std::string s = sb.str();
+ zmq::message_t zmsg(s.size());
- void pub_msg_sink_impl::handler(pmt::pmt_t msg)
- {
- std::stringbuf sb("");
- pmt::serialize(msg, sb);
- std::string s = sb.str();
- zmq::message_t zmsg(s.size());
-
- memcpy(zmsg.data(), s.c_str(), s.size());
- d_socket->send(zmsg);
- }
+ memcpy(zmsg.data(), s.c_str(), s.size());
+ d_socket->send(zmsg);
+}
- } /* namespace zeromq */
+} /* namespace zeromq */
} /* namespace gr */
diff --git a/gr-zeromq/lib/pub_msg_sink_impl.h b/gr-zeromq/lib/pub_msg_sink_impl.h
index 0cedfed482..cfd81ca399 100644
--- a/gr-zeromq/lib/pub_msg_sink_impl.h
+++ b/gr-zeromq/lib/pub_msg_sink_impl.h
@@ -27,29 +27,30 @@
#include <zmq.hpp>
namespace gr {
- namespace zeromq {
+namespace zeromq {
- class pub_msg_sink_impl : public pub_msg_sink
- {
- private:
- float d_timeout;
- zmq::context_t *d_context;
- zmq::socket_t *d_socket;
+class pub_msg_sink_impl : public pub_msg_sink
+{
+private:
+ float d_timeout;
+ zmq::context_t* d_context;
+ zmq::socket_t* d_socket;
- public:
- pub_msg_sink_impl(char *address, int timeout);
- ~pub_msg_sink_impl();
+public:
+ pub_msg_sink_impl(char* address, int timeout);
+ ~pub_msg_sink_impl();
- void handler(pmt::pmt_t msg);
- std::string last_endpoint() override {
+ void handler(pmt::pmt_t msg);
+ std::string last_endpoint() override
+ {
char addr[256];
size_t addr_len = sizeof(addr);
d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len);
- return std::string(addr, addr_len-1);
- }
- };
+ return std::string(addr, addr_len - 1);
+ }
+};
- } // namespace zeromq
+} // namespace zeromq
} // namespace gr
#endif /* INCLUDED_ZEROMQ_PUB_MSG_SINK_IMPL_H */
diff --git a/gr-zeromq/lib/pub_sink_impl.cc b/gr-zeromq/lib/pub_sink_impl.cc
index b602bc83a6..209b9e7f8c 100644
--- a/gr-zeromq/lib/pub_sink_impl.cc
+++ b/gr-zeromq/lib/pub_sink_impl.cc
@@ -29,33 +29,33 @@
#include "tag_headers.h"
namespace gr {
- namespace zeromq {
-
- pub_sink::sptr
- pub_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm)
- {
- return gnuradio::get_initial_sptr
- (new pub_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
- }
-
- pub_sink_impl::pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm)
- : gr::sync_block("pub_sink",
- gr::io_signature::make(1, 1, itemsize * vlen),
- gr::io_signature::make(0, 0, 0)),
- base_sink_impl(ZMQ_PUB, itemsize, vlen, address, timeout, pass_tags, hwm)
- {
- /* All is delegated */
- }
-
- int
- pub_sink_impl::work(int noutput_items,
- gr_vector_const_void_star &input_items,
- gr_vector_void_star &output_items)
- {
- return send_message(input_items[0], noutput_items, nitems_read(0));
- }
-
- } /* namespace zeromq */
+namespace zeromq {
+
+pub_sink::sptr pub_sink::make(
+ size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm)
+{
+ return gnuradio::get_initial_sptr(
+ new pub_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
+}
+
+pub_sink_impl::pub_sink_impl(
+ size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm)
+ : gr::sync_block("pub_sink",
+ gr::io_signature::make(1, 1, itemsize * vlen),
+ gr::io_signature::make(0, 0, 0)),
+ base_sink_impl(ZMQ_PUB, itemsize, vlen, address, timeout, pass_tags, hwm)
+{
+ /* All is delegated */
+}
+
+int pub_sink_impl::work(int noutput_items,
+ gr_vector_const_void_star& input_items,
+ gr_vector_void_star& output_items)
+{
+ return send_message(input_items[0], noutput_items, nitems_read(0));
+}
+
+} /* namespace zeromq */
} /* namespace gr */
// vim: ts=2 sw=2 expandtab
diff --git a/gr-zeromq/lib/pub_sink_impl.h b/gr-zeromq/lib/pub_sink_impl.h
index 78b5c1e1bd..6e921d9067 100644
--- a/gr-zeromq/lib/pub_sink_impl.h
+++ b/gr-zeromq/lib/pub_sink_impl.h
@@ -29,20 +29,25 @@
#include "base_impl.h"
namespace gr {
- namespace zeromq {
-
- class pub_sink_impl : public pub_sink, public base_sink_impl
- {
- public:
- pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm);
-
- int work(int noutput_items,
- gr_vector_const_void_star &input_items,
- gr_vector_void_star &output_items);
- std::string last_endpoint() override {return base_sink_impl::last_endpoint();}
- };
-
- } // namespace zeromq
+namespace zeromq {
+
+class pub_sink_impl : public pub_sink, public base_sink_impl
+{
+public:
+ pub_sink_impl(size_t itemsize,
+ size_t vlen,
+ char* address,
+ int timeout,
+ bool pass_tags,
+ int hwm);
+
+ int work(int noutput_items,
+ gr_vector_const_void_star& input_items,
+ gr_vector_void_star& output_items);
+ std::string last_endpoint() override { return base_sink_impl::last_endpoint(); }
+};
+
+} // namespace zeromq
} // namespace gr
#endif /* INCLUDED_ZEROMQ_PUB_SINK_IMPL_H */
diff --git a/gr-zeromq/lib/pull_msg_source_impl.cc b/gr-zeromq/lib/pull_msg_source_impl.cc
index 48ad2bd5b1..100c7be5a3 100644
--- a/gr-zeromq/lib/pull_msg_source_impl.cc
+++ b/gr-zeromq/lib/pull_msg_source_impl.cc
@@ -31,88 +31,86 @@
#include "tag_headers.h"
namespace gr {
- namespace zeromq {
+namespace zeromq {
- pull_msg_source::sptr
- pull_msg_source::make(char *address, int timeout)
- {
- return gnuradio::get_initial_sptr
- (new pull_msg_source_impl(address, timeout));
- }
+pull_msg_source::sptr pull_msg_source::make(char* address, int timeout)
+{
+ return gnuradio::get_initial_sptr(new pull_msg_source_impl(address, timeout));
+}
- pull_msg_source_impl::pull_msg_source_impl(char *address, int timeout)
- : gr::block("pull_msg_source",
- gr::io_signature::make(0, 0, 0),
- gr::io_signature::make(0, 0, 0)),
+pull_msg_source_impl::pull_msg_source_impl(char* address, int timeout)
+ : gr::block("pull_msg_source",
+ gr::io_signature::make(0, 0, 0),
+ gr::io_signature::make(0, 0, 0)),
d_timeout(timeout),
d_port(pmt::mp("out"))
- {
- int major, minor, patch;
- zmq::version (&major, &minor, &patch);
+{
+ int major, minor, patch;
+ zmq::version(&major, &minor, &patch);
- if (major < 3) {
- d_timeout = timeout*1000;
- }
+ if (major < 3) {
+ d_timeout = timeout * 1000;
+ }
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, ZMQ_PULL);
+ d_context = new zmq::context_t(1);
+ d_socket = new zmq::socket_t(*d_context, ZMQ_PULL);
- int time = 0;
- d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
- d_socket->connect (address);
+ int time = 0;
+ d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
+ d_socket->connect(address);
- message_port_register_out(d_port);
- }
+ message_port_register_out(d_port);
+}
- pull_msg_source_impl::~pull_msg_source_impl()
- {
- d_socket->close();
- delete d_socket;
- delete d_context;
- }
+pull_msg_source_impl::~pull_msg_source_impl()
+{
+ d_socket->close();
+ delete d_socket;
+ delete d_context;
+}
- bool pull_msg_source_impl::start()
- {
- d_finished = false;
- d_thread = new boost::thread(boost::bind(&pull_msg_source_impl::readloop, this));
- return true;
- }
+bool pull_msg_source_impl::start()
+{
+ d_finished = false;
+ d_thread = new boost::thread(boost::bind(&pull_msg_source_impl::readloop, this));
+ return true;
+}
- bool pull_msg_source_impl::stop()
- {
- d_finished = true;
- d_thread->join();
- return true;
- }
+bool pull_msg_source_impl::stop()
+{
+ d_finished = true;
+ d_thread->join();
+ return true;
+}
- void pull_msg_source_impl::readloop()
- {
- while(!d_finished){
+void pull_msg_source_impl::readloop()
+{
+ while (!d_finished) {
- zmq::pollitem_t items[] = { { static_cast<void *>(*d_socket), 0, ZMQ_POLLIN, 0 } };
- zmq::poll (&items[0], 1, d_timeout);
+ zmq::pollitem_t items[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 } };
+ zmq::poll(&items[0], 1, d_timeout);
// If we got a reply, process
if (items[0].revents & ZMQ_POLLIN) {
- // Receive data
- zmq::message_t msg;
+ // Receive data
+ zmq::message_t msg;
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->recv(msg);
+ d_socket->recv(msg);
#else
- d_socket->recv(&msg);
+ d_socket->recv(&msg);
#endif
- std::string buf(static_cast<char*>(msg.data()), msg.size());
- std::stringbuf sb(buf);
- pmt::pmt_t m = pmt::deserialize(sb);
- message_port_pub(d_port, m);
+ std::string buf(static_cast<char*>(msg.data()), msg.size());
+ std::stringbuf sb(buf);
+ pmt::pmt_t m = pmt::deserialize(sb);
+ message_port_pub(d_port, m);
} else {
- boost::this_thread::sleep(boost::posix_time::microseconds(100));
+ boost::this_thread::sleep(boost::posix_time::microseconds(100));
}
- }
}
+}
- } /* namespace zeromq */
+} /* namespace zeromq */
} /* namespace gr */
diff --git a/gr-zeromq/lib/pull_msg_source_impl.h b/gr-zeromq/lib/pull_msg_source_impl.h
index db01e8d038..235d80f655 100644
--- a/gr-zeromq/lib/pull_msg_source_impl.h
+++ b/gr-zeromq/lib/pull_msg_source_impl.h
@@ -27,37 +27,38 @@
#include "zmq_common_impl.h"
namespace gr {
- namespace zeromq {
+namespace zeromq {
- class pull_msg_source_impl : public pull_msg_source
- {
- private:
- int d_timeout; // microseconds, -1 is blocking
- zmq::context_t *d_context;
- zmq::socket_t *d_socket;
- boost::thread *d_thread;
- const pmt::pmt_t d_port;
+class pull_msg_source_impl : public pull_msg_source
+{
+private:
+ int d_timeout; // microseconds, -1 is blocking
+ zmq::context_t* d_context;
+ zmq::socket_t* d_socket;
+ boost::thread* d_thread;
+ const pmt::pmt_t d_port;
- void readloop();
+ void readloop();
- public:
- bool d_finished;
+public:
+ bool d_finished;
- pull_msg_source_impl(char *address, int timeout);
- ~pull_msg_source_impl();
+ pull_msg_source_impl(char* address, int timeout);
+ ~pull_msg_source_impl();
- bool start();
- bool stop();
+ bool start();
+ bool stop();
- std::string last_endpoint() override {
+ std::string last_endpoint() override
+ {
char addr[256];
size_t addr_len = sizeof(addr);
d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len);
- return std::string(addr, addr_len-1);
- }
- };
+ return std::string(addr, addr_len - 1);
+ }
+};
- } // namespace zeromq
+} // namespace zeromq
} // namespace gr
#endif /* INCLUDED_ZEROMQ_PULL_MSG_SOURCE_IMPL_H */
diff --git a/gr-zeromq/lib/pull_source_impl.cc b/gr-zeromq/lib/pull_source_impl.cc
index 4045dd7b66..58a875f363 100644
--- a/gr-zeromq/lib/pull_source_impl.cc
+++ b/gr-zeromq/lib/pull_source_impl.cc
@@ -29,60 +29,57 @@
#include "tag_headers.h"
namespace gr {
- namespace zeromq {
+namespace zeromq {
- pull_source::sptr
- pull_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm)
- {
- return gnuradio::get_initial_sptr
- (new pull_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
- }
+pull_source::sptr pull_source::make(
+ size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm)
+{
+ return gnuradio::get_initial_sptr(
+ new pull_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
+}
- pull_source_impl::pull_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm)
- : gr::sync_block("pull_source",
- gr::io_signature::make(0, 0, 0),
- gr::io_signature::make(1, 1, itemsize * vlen)),
- base_source_impl(ZMQ_PULL, itemsize, vlen, address, timeout, pass_tags, hwm)
- {
- /* All is delegated */
- }
+pull_source_impl::pull_source_impl(
+ size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm)
+ : gr::sync_block("pull_source",
+ gr::io_signature::make(0, 0, 0),
+ gr::io_signature::make(1, 1, itemsize * vlen)),
+ base_source_impl(ZMQ_PULL, itemsize, vlen, address, timeout, pass_tags, hwm)
+{
+ /* All is delegated */
+}
- int
- pull_source_impl::work(int noutput_items,
- gr_vector_const_void_star &input_items,
- gr_vector_void_star &output_items)
- {
- uint8_t *out = (uint8_t *) output_items[0];
- bool first = true;
- int done = 0;
+int pull_source_impl::work(int noutput_items,
+ gr_vector_const_void_star& input_items,
+ gr_vector_void_star& output_items)
+{
+ uint8_t* out = (uint8_t*)output_items[0];
+ bool first = true;
+ int done = 0;
- /* Process as much as we can */
- while (1)
- {
- if (has_pending())
- {
- /* Flush anything pending */
- done += flush_pending(out + (done * d_vsize), noutput_items - done, nitems_written(0) + done);
+ /* Process as much as we can */
+ while (1) {
+ if (has_pending()) {
+ /* Flush anything pending */
+ done += flush_pending(
+ out + (done * d_vsize), noutput_items - done, nitems_written(0) + done);
- /* No more space ? */
- if (done == noutput_items)
- break;
- }
- else
- {
- /* Try to get the next message */
- if (!load_message(first))
- break; /* No message, we're done for now */
+ /* No more space ? */
+ if (done == noutput_items)
+ break;
+ } else {
+ /* Try to get the next message */
+ if (!load_message(first))
+ break; /* No message, we're done for now */
- /* Not the first anymore */
- first = false;
+ /* Not the first anymore */
+ first = false;
}
- }
-
- return done;
}
- } /* namespace zeromq */
+ return done;
+}
+
+} /* namespace zeromq */
} /* namespace gr */
// vim: ts=2 sw=2 expandtab
diff --git a/gr-zeromq/lib/pull_source_impl.h b/gr-zeromq/lib/pull_source_impl.h
index c08ab0521b..a27b3f9c46 100644
--- a/gr-zeromq/lib/pull_source_impl.h
+++ b/gr-zeromq/lib/pull_source_impl.h
@@ -29,20 +29,25 @@
#include "base_impl.h"
namespace gr {
- namespace zeromq {
-
- class pull_source_impl : public pull_source, public base_source_impl
- {
- public:
- pull_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm);
-
- int work(int noutput_items,
- gr_vector_const_void_star &input_items,
- gr_vector_void_star &output_items);
- std::string last_endpoint() override {return base_source_impl::last_endpoint();}
- };
-
- } // namespace zeromq
+namespace zeromq {
+
+class pull_source_impl : public pull_source, public base_source_impl
+{
+public:
+ pull_source_impl(size_t itemsize,
+ size_t vlen,
+ char* address,
+ int timeout,
+ bool pass_tags,
+ int hwm);
+
+ int work(int noutput_items,
+ gr_vector_const_void_star& input_items,
+ gr_vector_void_star& output_items);
+ std::string last_endpoint() override { return base_source_impl::last_endpoint(); }
+};
+
+} // namespace zeromq
} // namespace gr
#endif /* INCLUDED_ZEROMQ_PULL_SOURCE_IMPL_H */
diff --git a/gr-zeromq/lib/push_msg_sink_impl.cc b/gr-zeromq/lib/push_msg_sink_impl.cc
index e9cc5bc435..0052e09210 100644
--- a/gr-zeromq/lib/push_msg_sink_impl.cc
+++ b/gr-zeromq/lib/push_msg_sink_impl.cc
@@ -29,57 +29,54 @@
#include "tag_headers.h"
namespace gr {
- namespace zeromq {
+namespace zeromq {
- push_msg_sink::sptr
- push_msg_sink::make(char *address, int timeout)
- {
- return gnuradio::get_initial_sptr
- (new push_msg_sink_impl(address, timeout));
- }
+push_msg_sink::sptr push_msg_sink::make(char* address, int timeout)
+{
+ return gnuradio::get_initial_sptr(new push_msg_sink_impl(address, timeout));
+}
- push_msg_sink_impl::push_msg_sink_impl(char *address, int timeout)
- : gr::block("push_msg_sink",
- gr::io_signature::make(0, 0, 0),
- gr::io_signature::make(0, 0, 0)),
- d_timeout(timeout)
- {
- int major, minor, patch;
- zmq::version (&major, &minor, &patch);
+push_msg_sink_impl::push_msg_sink_impl(char* address, int timeout)
+ : gr::block("push_msg_sink",
+ gr::io_signature::make(0, 0, 0),
+ gr::io_signature::make(0, 0, 0)),
+ d_timeout(timeout)
+{
+ int major, minor, patch;
+ zmq::version(&major, &minor, &patch);
- if (major < 3) {
- d_timeout = timeout*1000;
- }
+ if (major < 3) {
+ d_timeout = timeout * 1000;
+ }
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, ZMQ_PUSH);
+ d_context = new zmq::context_t(1);
+ d_socket = new zmq::socket_t(*d_context, ZMQ_PUSH);
- int time = 0;
- d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
- d_socket->bind(address);
+ int time = 0;
+ d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
+ d_socket->bind(address);
- message_port_register_in(pmt::mp("in"));
- set_msg_handler(pmt::mp("in"),
- boost::bind(&push_msg_sink_impl::handler, this, _1));
- }
+ message_port_register_in(pmt::mp("in"));
+ set_msg_handler(pmt::mp("in"), boost::bind(&push_msg_sink_impl::handler, this, _1));
+}
- push_msg_sink_impl::~push_msg_sink_impl()
- {
- d_socket->close();
- delete d_socket;
- delete d_context;
- }
+push_msg_sink_impl::~push_msg_sink_impl()
+{
+ d_socket->close();
+ delete d_socket;
+ delete d_context;
+}
- void push_msg_sink_impl::handler(pmt::pmt_t msg)
- {
- std::stringbuf sb("");
- pmt::serialize( msg, sb );
- std::string s = sb.str();
- zmq::message_t zmsg(s.size());
+void push_msg_sink_impl::handler(pmt::pmt_t msg)
+{
+ std::stringbuf sb("");
+ pmt::serialize(msg, sb);
+ std::string s = sb.str();
+ zmq::message_t zmsg(s.size());
- memcpy(zmsg.data(), s.c_str(), s.size());
- d_socket->send(zmsg);
- }
+ memcpy(zmsg.data(), s.c_str(), s.size());
+ d_socket->send(zmsg);
+}
- } /* namespace zeromq */
+} /* namespace zeromq */
} /* namespace gr */
diff --git a/gr-zeromq/lib/push_msg_sink_impl.h b/gr-zeromq/lib/push_msg_sink_impl.h
index 0d0503cd55..4d3017f782 100644
--- a/gr-zeromq/lib/push_msg_sink_impl.h
+++ b/gr-zeromq/lib/push_msg_sink_impl.h
@@ -27,29 +27,30 @@
#include <zmq.hpp>
namespace gr {
- namespace zeromq {
+namespace zeromq {
- class push_msg_sink_impl : public push_msg_sink
- {
- private:
- float d_timeout;
- zmq::context_t *d_context;
- zmq::socket_t *d_socket;
+class push_msg_sink_impl : public push_msg_sink
+{
+private:
+ float d_timeout;
+ zmq::context_t* d_context;
+ zmq::socket_t* d_socket;
- public:
- push_msg_sink_impl(char *address, int timeout);
- ~push_msg_sink_impl();
+public:
+ push_msg_sink_impl(char* address, int timeout);
+ ~push_msg_sink_impl();
- void handler(pmt::pmt_t msg);
- std::string last_endpoint() override {
+ void handler(pmt::pmt_t msg);
+ std::string last_endpoint() override
+ {
char addr[256];
size_t addr_len = sizeof(addr);
d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len);
- return std::string(addr, addr_len-1);
- }
- };
+ return std::string(addr, addr_len - 1);
+ }
+};
- } // namespace zeromq
+} // namespace zeromq
} // namespace gr
#endif /* INCLUDED_ZEROMQ_ZMQ_PUSH_MSG_SINK_IMPL_H */
diff --git a/gr-zeromq/lib/push_sink_impl.cc b/gr-zeromq/lib/push_sink_impl.cc
index 5f9670e4ca..431970eef5 100644
--- a/gr-zeromq/lib/push_sink_impl.cc
+++ b/gr-zeromq/lib/push_sink_impl.cc
@@ -29,42 +29,42 @@
#include "tag_headers.h"
namespace gr {
- namespace zeromq {
+namespace zeromq {
- push_sink::sptr
- push_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm)
- {
- return gnuradio::get_initial_sptr
- (new push_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
- }
+push_sink::sptr push_sink::make(
+ size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm)
+{
+ return gnuradio::get_initial_sptr(
+ new push_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
+}
- push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm)
- : gr::sync_block("push_sink",
- gr::io_signature::make(1, 1, itemsize * vlen),
- gr::io_signature::make(0, 0, 0)),
- base_sink_impl(ZMQ_PUSH, itemsize, vlen, address, timeout, pass_tags, hwm)
- {
- /* All is delegated */
- }
+push_sink_impl::push_sink_impl(
+ size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm)
+ : gr::sync_block("push_sink",
+ gr::io_signature::make(1, 1, itemsize * vlen),
+ gr::io_signature::make(0, 0, 0)),
+ base_sink_impl(ZMQ_PUSH, itemsize, vlen, address, timeout, pass_tags, hwm)
+{
+ /* All is delegated */
+}
- int
- push_sink_impl::work(int noutput_items,
- gr_vector_const_void_star &input_items,
- gr_vector_void_star &output_items)
- {
- // Poll with a timeout (FIXME: scheduler can't wait for us)
- zmq::pollitem_t itemsout[] = { { static_cast<void *>(*d_socket), 0, ZMQ_POLLOUT, 0 } };
- zmq::poll(&itemsout[0], 1, d_timeout);
+int push_sink_impl::work(int noutput_items,
+ gr_vector_const_void_star& input_items,
+ gr_vector_void_star& output_items)
+{
+ // Poll with a timeout (FIXME: scheduler can't wait for us)
+ zmq::pollitem_t itemsout[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLOUT, 0 } };
+ zmq::poll(&itemsout[0], 1, d_timeout);
- // If we can send something, do it
- if (itemsout[0].revents & ZMQ_POLLOUT)
+ // If we can send something, do it
+ if (itemsout[0].revents & ZMQ_POLLOUT)
return send_message(input_items[0], noutput_items, nitems_read(0));
- // If not, do nothing
- return 0;
- }
+ // If not, do nothing
+ return 0;
+}
- } /* namespace zeromq */
+} /* namespace zeromq */
} /* namespace gr */
// vim: ts=2 sw=2 expandtab
diff --git a/gr-zeromq/lib/push_sink_impl.h b/gr-zeromq/lib/push_sink_impl.h
index 73480e1047..bfa3dacdc4 100644
--- a/gr-zeromq/lib/push_sink_impl.h
+++ b/gr-zeromq/lib/push_sink_impl.h
@@ -29,21 +29,26 @@
#include "base_impl.h"
namespace gr {
- namespace zeromq {
-
- class push_sink_impl : public push_sink, public base_sink_impl
- {
- public:
- push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm);
-
- int work(int noutput_items,
- gr_vector_const_void_star &input_items,
- gr_vector_void_star &output_items);
-
- std::string last_endpoint() override {return base_sink_impl::last_endpoint();}
- };
-
- } // namespace zeromq
+namespace zeromq {
+
+class push_sink_impl : public push_sink, public base_sink_impl
+{
+public:
+ push_sink_impl(size_t itemsize,
+ size_t vlen,
+ char* address,
+ int timeout,
+ bool pass_tags,
+ int hwm);
+
+ int work(int noutput_items,
+ gr_vector_const_void_star& input_items,
+ gr_vector_void_star& output_items);
+
+ std::string last_endpoint() override { return base_sink_impl::last_endpoint(); }
+};
+
+} // namespace zeromq
} // namespace gr
#endif /* INCLUDED_ZEROMQ_ZMQ_PUSH_SINK_IMPL_H */
diff --git a/gr-zeromq/lib/rep_msg_sink_impl.cc b/gr-zeromq/lib/rep_msg_sink_impl.cc
index 1a4696aee5..7466f77474 100644
--- a/gr-zeromq/lib/rep_msg_sink_impl.cc
+++ b/gr-zeromq/lib/rep_msg_sink_impl.cc
@@ -29,99 +29,100 @@
#include "tag_headers.h"
namespace gr {
- namespace zeromq {
+namespace zeromq {
- rep_msg_sink::sptr
- rep_msg_sink::make(char *address, int timeout)
- {
- return gnuradio::get_initial_sptr
- (new rep_msg_sink_impl(address, timeout));
- }
+rep_msg_sink::sptr rep_msg_sink::make(char* address, int timeout)
+{
+ return gnuradio::get_initial_sptr(new rep_msg_sink_impl(address, timeout));
+}
- rep_msg_sink_impl::rep_msg_sink_impl(char *address, int timeout)
- : gr::block("rep_msg_sink",
- gr::io_signature::make(0, 0, 0),
- gr::io_signature::make(0, 0, 0)),
+rep_msg_sink_impl::rep_msg_sink_impl(char* address, int timeout)
+ : gr::block("rep_msg_sink",
+ gr::io_signature::make(0, 0, 0),
+ gr::io_signature::make(0, 0, 0)),
d_timeout(timeout),
d_port(pmt::mp("in"))
- {
- int major, minor, patch;
- zmq::version(&major, &minor, &patch);
-
- if (major < 3) {
- d_timeout = timeout*1000;
- }
-
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, ZMQ_REP);
-
- int time = 0;
- d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
- d_socket->bind (address);
-
- message_port_register_in(d_port);
- }
+{
+ int major, minor, patch;
+ zmq::version(&major, &minor, &patch);
- rep_msg_sink_impl::~rep_msg_sink_impl()
- {
- d_socket->close();
- delete d_socket;
- delete d_context;
+ if (major < 3) {
+ d_timeout = timeout * 1000;
}
- bool rep_msg_sink_impl::start()
- {
- d_finished = false;
- d_thread = new boost::thread(boost::bind(&rep_msg_sink_impl::readloop, this));
- return true;
- }
-
- bool rep_msg_sink_impl::stop()
- {
- d_finished = true;
- d_thread->join();
- return true;
- }
-
- void rep_msg_sink_impl::readloop()
- {
- while(!d_finished) {
+ d_context = new zmq::context_t(1);
+ d_socket = new zmq::socket_t(*d_context, ZMQ_REP);
+
+ int time = 0;
+ d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
+ d_socket->bind(address);
+
+ message_port_register_in(d_port);
+}
+
+rep_msg_sink_impl::~rep_msg_sink_impl()
+{
+ d_socket->close();
+ delete d_socket;
+ delete d_context;
+}
+
+bool rep_msg_sink_impl::start()
+{
+ d_finished = false;
+ d_thread = new boost::thread(boost::bind(&rep_msg_sink_impl::readloop, this));
+ return true;
+}
+
+bool rep_msg_sink_impl::stop()
+{
+ d_finished = true;
+ d_thread->join();
+ return true;
+}
+
+void rep_msg_sink_impl::readloop()
+{
+ while (!d_finished) {
// while we have data, wait for query...
- while(!empty_p(d_port)) {
+ while (!empty_p(d_port)) {
- // wait for query...
- zmq::pollitem_t items[] = { { static_cast<void *>(*d_socket), 0, ZMQ_POLLIN, 0 } };
- zmq::poll (&items[0], 1, d_timeout);
+ // wait for query...
+ zmq::pollitem_t items[] = {
+ { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 }
+ };
+ zmq::poll(&items[0], 1, d_timeout);
- // If we got a reply, process
- if (items[0].revents & ZMQ_POLLIN) {
+ // If we got a reply, process
+ if (items[0].revents & ZMQ_POLLIN) {
- // receive data request
- zmq::message_t request;
+ // receive data request
+ zmq::message_t request;
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->recv(request);
+ d_socket->recv(request);
#else
- d_socket->recv(&request);
+ d_socket->recv(&request);
#endif
- int req_output_items = *(static_cast<int*>(request.data()));
- if(req_output_items != 1)
- throw std::runtime_error("Request was not 1 msg for rep/req request!!");
-
- // create message copy and send
- pmt::pmt_t msg = delete_head_nowait(d_port);
- std::stringbuf sb("");
- pmt::serialize( msg, sb );
- std::string s = sb.str();
- zmq::message_t zmsg(s.size());
- memcpy( zmsg.data(), s.c_str(), s.size() );
- d_socket->send(zmsg);
- } // if req
- } // while !empty
-
- } // while !d_finished
- }
-
- } /* namespace zeromq */
+ int req_output_items = *(static_cast<int*>(request.data()));
+ if (req_output_items != 1)
+ throw std::runtime_error(
+ "Request was not 1 msg for rep/req request!!");
+
+ // create message copy and send
+ pmt::pmt_t msg = delete_head_nowait(d_port);
+ std::stringbuf sb("");
+ pmt::serialize(msg, sb);
+ std::string s = sb.str();
+ zmq::message_t zmsg(s.size());
+ memcpy(zmsg.data(), s.c_str(), s.size());
+ d_socket->send(zmsg);
+ } // if req
+ } // while !empty
+
+ } // while !d_finished
+}
+
+} /* namespace zeromq */
} /* namespace gr */
diff --git a/gr-zeromq/lib/rep_msg_sink_impl.h b/gr-zeromq/lib/rep_msg_sink_impl.h
index 61895e6c32..cfd17ed9a3 100644
--- a/gr-zeromq/lib/rep_msg_sink_impl.h
+++ b/gr-zeromq/lib/rep_msg_sink_impl.h
@@ -27,37 +27,38 @@
#include "zmq_common_impl.h"
namespace gr {
- namespace zeromq {
+namespace zeromq {
- class rep_msg_sink_impl : public rep_msg_sink
- {
- private:
- int d_timeout;
- zmq::context_t *d_context;
- zmq::socket_t *d_socket;
- boost::thread *d_thread;
- bool d_finished;
+class rep_msg_sink_impl : public rep_msg_sink
+{
+private:
+ int d_timeout;
+ zmq::context_t* d_context;
+ zmq::socket_t* d_socket;
+ boost::thread* d_thread;
+ bool d_finished;
- const pmt::pmt_t d_port;
+ const pmt::pmt_t d_port;
- void readloop();
+ void readloop();
- public:
- rep_msg_sink_impl(char *address, int timeout);
- ~rep_msg_sink_impl();
+public:
+ rep_msg_sink_impl(char* address, int timeout);
+ ~rep_msg_sink_impl();
- bool start();
- bool stop();
+ bool start();
+ bool stop();
- std::string last_endpoint() override {
+ std::string last_endpoint() override
+ {
char addr[256];
size_t addr_len = sizeof(addr);
d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len);
- return std::string(addr, addr_len-1);
- }
- };
+ return std::string(addr, addr_len - 1);
+ }
+};
- } // namespace zeromq
+} // namespace zeromq
} // namespace gr
#endif /* INCLUDED_ZEROMQ_REP_MSG_SINK_IMPL_H */
diff --git a/gr-zeromq/lib/rep_sink_impl.cc b/gr-zeromq/lib/rep_sink_impl.cc
index f402310cfc..c3b6d8c192 100644
--- a/gr-zeromq/lib/rep_sink_impl.cc
+++ b/gr-zeromq/lib/rep_sink_impl.cc
@@ -29,45 +29,44 @@
#include "tag_headers.h"
namespace gr {
- namespace zeromq {
+namespace zeromq {
- rep_sink::sptr
- rep_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm)
- {
- return gnuradio::get_initial_sptr
- (new rep_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
- }
+rep_sink::sptr rep_sink::make(
+ size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm)
+{
+ return gnuradio::get_initial_sptr(
+ new rep_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
+}
- rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm)
- : gr::sync_block("rep_sink",
- gr::io_signature::make(1, 1, itemsize * vlen),
- gr::io_signature::make(0, 0, 0)),
- base_sink_impl(ZMQ_REP, itemsize, vlen, address, timeout, pass_tags, hwm)
- {
- /* All is delegated */
- }
+rep_sink_impl::rep_sink_impl(
+ size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm)
+ : gr::sync_block("rep_sink",
+ gr::io_signature::make(1, 1, itemsize * vlen),
+ gr::io_signature::make(0, 0, 0)),
+ base_sink_impl(ZMQ_REP, itemsize, vlen, address, timeout, pass_tags, hwm)
+{
+ /* All is delegated */
+}
- int
- rep_sink_impl::work(int noutput_items,
- gr_vector_const_void_star &input_items,
- gr_vector_void_star &output_items)
- {
- const uint8_t *in = (const uint8_t *) input_items[0];
- bool first = true;
- int done = 0;
+int rep_sink_impl::work(int noutput_items,
+ gr_vector_const_void_star& input_items,
+ gr_vector_void_star& output_items)
+{
+ const uint8_t* in = (const uint8_t*)input_items[0];
+ bool first = true;
+ int done = 0;
- /* Process as much as we can */
- while (1)
- {
+ /* Process as much as we can */
+ while (1) {
/* Wait for a small time (FIXME: scheduler can't wait for us) */
- /* We only wait if its the first iteration, for the others we'll
- * let the scheduler retry */
- zmq::pollitem_t items[] = { { static_cast<void *>(*d_socket), 0, ZMQ_POLLIN, 0 } };
+ /* We only wait if its the first iteration, for the others we'll
+ * let the scheduler retry */
+ zmq::pollitem_t items[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 } };
zmq::poll(&items[0], 1, first ? d_timeout : 0);
- /* If we don't have anything, we're done */
+ /* If we don't have anything, we're done */
if (!(items[0].revents & ZMQ_POLLIN))
- break;
+ break;
/* Get and parse the request */
zmq::message_t request;
@@ -78,10 +77,9 @@ namespace gr {
#endif
int nitems_send = noutput_items - done;
- if (request.size() >= sizeof(uint32_t))
- {
- int req = (int)*(static_cast<uint32_t*>(request.data()));
- nitems_send = std::min(nitems_send, req);
+ if (request.size() >= sizeof(uint32_t)) {
+ int req = (int)*(static_cast<uint32_t*>(request.data()));
+ nitems_send = std::min(nitems_send, req);
}
/* Delegate the actual send */
@@ -89,11 +87,11 @@ namespace gr {
/* Not the first anymore */
first = false;
- }
-
- return done;
}
- } /* namespace zeromq */
+
+ return done;
+}
+} /* namespace zeromq */
} /* namespace gr */
// vim: ts=2 sw=2 expandtab
diff --git a/gr-zeromq/lib/rep_sink_impl.h b/gr-zeromq/lib/rep_sink_impl.h
index cfd1c09637..2fcf9853c6 100644
--- a/gr-zeromq/lib/rep_sink_impl.h
+++ b/gr-zeromq/lib/rep_sink_impl.h
@@ -27,20 +27,25 @@
#include "base_impl.h"
namespace gr {
- namespace zeromq {
+namespace zeromq {
- class rep_sink_impl : public rep_sink, public base_sink_impl
- {
- public:
- rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm);
+class rep_sink_impl : public rep_sink, public base_sink_impl
+{
+public:
+ rep_sink_impl(size_t itemsize,
+ size_t vlen,
+ char* address,
+ int timeout,
+ bool pass_tags,
+ int hwm);
- int work(int noutput_items,
- gr_vector_const_void_star &input_items,
- gr_vector_void_star &output_items);
- std::string last_endpoint() override {return base_sink_impl::last_endpoint();}
- };
+ int work(int noutput_items,
+ gr_vector_const_void_star& input_items,
+ gr_vector_void_star& output_items);
+ std::string last_endpoint() override { return base_sink_impl::last_endpoint(); }
+};
- } // namespace zeromq
+} // namespace zeromq
} // namespace gr
#endif /* INCLUDED_ZEROMQ_REP_SINK_IMPL_H */
diff --git a/gr-zeromq/lib/req_msg_source_impl.cc b/gr-zeromq/lib/req_msg_source_impl.cc
index 6c80a77f27..6d44aa3cf0 100644
--- a/gr-zeromq/lib/req_msg_source_impl.cc
+++ b/gr-zeromq/lib/req_msg_source_impl.cc
@@ -31,104 +31,104 @@
#include "tag_headers.h"
namespace gr {
- namespace zeromq {
+namespace zeromq {
- req_msg_source::sptr
- req_msg_source::make(char *address, int timeout)
- {
- return gnuradio::get_initial_sptr
- (new req_msg_source_impl(address, timeout));
- }
+req_msg_source::sptr req_msg_source::make(char* address, int timeout)
+{
+ return gnuradio::get_initial_sptr(new req_msg_source_impl(address, timeout));
+}
- req_msg_source_impl::req_msg_source_impl(char *address, int timeout)
- : gr::block("req_msg_source",
- gr::io_signature::make(0, 0, 0),
- gr::io_signature::make(0, 0, 0)),
+req_msg_source_impl::req_msg_source_impl(char* address, int timeout)
+ : gr::block("req_msg_source",
+ gr::io_signature::make(0, 0, 0),
+ gr::io_signature::make(0, 0, 0)),
d_timeout(timeout),
d_port(pmt::mp("out"))
- {
- int major, minor, patch;
- zmq::version(&major, &minor, &patch);
-
- if (major < 3) {
- d_timeout = timeout*1000;
- }
-
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, ZMQ_REQ);
-
- int time = 0;
- d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
- d_socket->connect (address);
-
- message_port_register_out(d_port);
- }
+{
+ int major, minor, patch;
+ zmq::version(&major, &minor, &patch);
- req_msg_source_impl::~req_msg_source_impl()
- {
- d_socket->close();
- delete d_socket;
- delete d_context;
+ if (major < 3) {
+ d_timeout = timeout * 1000;
}
- bool req_msg_source_impl::start()
- {
- d_finished = false;
- d_thread = new boost::thread(boost::bind(&req_msg_source_impl::readloop, this));
- return true;
- }
-
- bool req_msg_source_impl::stop()
- {
- d_finished = true;
- d_thread->join();
- return true;
- }
-
- void req_msg_source_impl::readloop()
- {
- while(!d_finished){
- //std::cout << "readloop\n";
-
- zmq::pollitem_t itemsout[] = { { static_cast<void *>(*d_socket), 0, ZMQ_POLLOUT, 0 } };
+ d_context = new zmq::context_t(1);
+ d_socket = new zmq::socket_t(*d_context, ZMQ_REQ);
+
+ int time = 0;
+ d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
+ d_socket->connect(address);
+
+ message_port_register_out(d_port);
+}
+
+req_msg_source_impl::~req_msg_source_impl()
+{
+ d_socket->close();
+ delete d_socket;
+ delete d_context;
+}
+
+bool req_msg_source_impl::start()
+{
+ d_finished = false;
+ d_thread = new boost::thread(boost::bind(&req_msg_source_impl::readloop, this));
+ return true;
+}
+
+bool req_msg_source_impl::stop()
+{
+ d_finished = true;
+ d_thread->join();
+ return true;
+}
+
+void req_msg_source_impl::readloop()
+{
+ while (!d_finished) {
+ // std::cout << "readloop\n";
+
+ zmq::pollitem_t itemsout[] = {
+ { static_cast<void*>(*d_socket), 0, ZMQ_POLLOUT, 0 }
+ };
zmq::poll(&itemsout[0], 1, d_timeout);
// If we got a reply, process
if (itemsout[0].revents & ZMQ_POLLOUT) {
- // Request data, FIXME non portable?
- int nmsg = 1;
- zmq::message_t request(sizeof(int));
- memcpy((void *) request.data (), &nmsg, sizeof(int));
+ // Request data, FIXME non portable?
+ int nmsg = 1;
+ zmq::message_t request(sizeof(int));
+ memcpy((void*)request.data(), &nmsg, sizeof(int));
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->send(request, zmq::send_flags::none);
+ d_socket->send(request, zmq::send_flags::none);
#else
- d_socket->send(request);
+ d_socket->send(request);
#endif
}
- zmq::pollitem_t items[] = { { static_cast<void *>(*d_socket), 0, ZMQ_POLLIN, 0 } };
- zmq::poll (&items[0], 1, d_timeout);
+ zmq::pollitem_t items[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 } };
+ zmq::poll(&items[0], 1, d_timeout);
// If we got a reply, process
if (items[0].revents & ZMQ_POLLIN) {
- // Receive data
- zmq::message_t msg;
+ // Receive data
+ zmq::message_t msg;
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->recv(msg);
+ d_socket->recv(msg);
#else
- d_socket->recv(&msg);
+ d_socket->recv(&msg);
#endif
- std::string buf(static_cast<char*>(msg.data()), msg.size());
- std::stringbuf sb(buf);
- pmt::pmt_t m = pmt::deserialize(sb);
- message_port_pub(d_port, m);
+ std::string buf(static_cast<char*>(msg.data()), msg.size());
+ std::stringbuf sb(buf);
+ pmt::pmt_t m = pmt::deserialize(sb);
+ message_port_pub(d_port, m);
} else {
- boost::this_thread::sleep(boost::posix_time::microseconds(100));
+ boost::this_thread::sleep(boost::posix_time::microseconds(100));
}
- }
}
+}
- } /* namespace zeromq */
+} /* namespace zeromq */
} /* namespace gr */
diff --git a/gr-zeromq/lib/req_msg_source_impl.h b/gr-zeromq/lib/req_msg_source_impl.h
index 7fb3e74e70..9610229f5d 100644
--- a/gr-zeromq/lib/req_msg_source_impl.h
+++ b/gr-zeromq/lib/req_msg_source_impl.h
@@ -27,37 +27,38 @@
#include "zmq_common_impl.h"
namespace gr {
- namespace zeromq {
+namespace zeromq {
- class req_msg_source_impl : public req_msg_source
- {
- private:
- int d_timeout;
- zmq::context_t *d_context;
- zmq::socket_t *d_socket;
- boost::thread *d_thread;
- const pmt::pmt_t d_port;
+class req_msg_source_impl : public req_msg_source
+{
+private:
+ int d_timeout;
+ zmq::context_t* d_context;
+ zmq::socket_t* d_socket;
+ boost::thread* d_thread;
+ const pmt::pmt_t d_port;
- void readloop();
+ void readloop();
- public:
- bool d_finished;
+public:
+ bool d_finished;
- req_msg_source_impl(char *address, int timeout);
- ~req_msg_source_impl();
+ req_msg_source_impl(char* address, int timeout);
+ ~req_msg_source_impl();
- bool start();
- bool stop();
+ bool start();
+ bool stop();
- std::string last_endpoint() override {
+ std::string last_endpoint() override
+ {
char addr[256];
size_t addr_len = sizeof(addr);
d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len);
- return std::string(addr, addr_len-1);
- }
- };
+ return std::string(addr, addr_len - 1);
+ }
+};
- } // namespace zeromq
+} // namespace zeromq
} // namespace gr
#endif /* INCLUDED_ZEROMQ_REQ_MSG_SOURCE_IMPL_H */
diff --git a/gr-zeromq/lib/req_source_impl.cc b/gr-zeromq/lib/req_source_impl.cc
index 526736389e..5a498cf9d2 100644
--- a/gr-zeromq/lib/req_source_impl.cc
+++ b/gr-zeromq/lib/req_source_impl.cc
@@ -29,80 +29,77 @@
#include "tag_headers.h"
namespace gr {
- namespace zeromq {
-
- req_source::sptr
- req_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm)
- {
- return gnuradio::get_initial_sptr
- (new req_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
- }
-
- req_source_impl::req_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm)
- : gr::sync_block("req_source",
- gr::io_signature::make(0, 0, 0),
- gr::io_signature::make(1, 1, itemsize * vlen)),
- base_source_impl(ZMQ_REQ, itemsize, vlen, address, timeout, pass_tags, hwm),
- d_req_pending(false)
- {
- /* All is delegated */
- }
-
- int
- req_source_impl::work(int noutput_items,
- gr_vector_const_void_star &input_items,
- gr_vector_void_star &output_items)
- {
+namespace zeromq {
+
+req_source::sptr req_source::make(
+ size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm)
+{
+ return gnuradio::get_initial_sptr(
+ new req_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
+}
+
+req_source_impl::req_source_impl(
+ size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm)
+ : gr::sync_block("req_source",
+ gr::io_signature::make(0, 0, 0),
+ gr::io_signature::make(1, 1, itemsize * vlen)),
+ base_source_impl(ZMQ_REQ, itemsize, vlen, address, timeout, pass_tags, hwm),
+ d_req_pending(false)
+{
+ /* All is delegated */
+}
+
+int req_source_impl::work(int noutput_items,
+ gr_vector_const_void_star& input_items,
+ gr_vector_void_star& output_items)
+{
#if 0
#endif
- uint8_t *out = (uint8_t *) output_items[0];
- bool first = true;
- int done = 0;
-
- /* Process as much as we can */
- while (1)
- {
- if (has_pending())
- {
- /* Flush anything pending */
- done += flush_pending(out + (done * d_vsize), noutput_items - done, nitems_written(0) + done);
-
- /* No more space ? */
- if (done == noutput_items)
- break;
- }
- else
- {
- /* Send request if needed */
- if (!d_req_pending)
- {
- /* The REP/REQ pattern state machine guarantees we can send at this point */
- uint32_t req_len = noutput_items - done;
- zmq::message_t request(sizeof(uint32_t));
- memcpy ((void *) request.data (), &req_len, sizeof(uint32_t));
- d_socket->send(request);
-
- d_req_pending = true;
- }
-
- /* Try to get the next message */
- if (!load_message(first))
- break; /* No message, we're done for now */
-
- /* Got response */
- d_req_pending = false;
-
- /* Not the first anymore */
- first = false;
+ uint8_t* out = (uint8_t*)output_items[0];
+ bool first = true;
+ int done = 0;
+
+ /* Process as much as we can */
+ while (1) {
+ if (has_pending()) {
+ /* Flush anything pending */
+ done += flush_pending(
+ out + (done * d_vsize), noutput_items - done, nitems_written(0) + done);
+
+ /* No more space ? */
+ if (done == noutput_items)
+ break;
+ } else {
+ /* Send request if needed */
+ if (!d_req_pending) {
+ /* The REP/REQ pattern state machine guarantees we can send at this point
+ */
+ uint32_t req_len = noutput_items - done;
+ zmq::message_t request(sizeof(uint32_t));
+ memcpy((void*)request.data(), &req_len, sizeof(uint32_t));
+ d_socket->send(request);
+
+ d_req_pending = true;
+ }
+
+ /* Try to get the next message */
+ if (!load_message(first))
+ break; /* No message, we're done for now */
+
+ /* Got response */
+ d_req_pending = false;
+
+ /* Not the first anymore */
+ first = false;
}
- }
+ }
- return done;
+ return done;
- return 0;
- }
+ return 0;
+}
- } /* namespace zeromq */
+} /* namespace zeromq */
} /* namespace gr */
// vim: ts=2 sw=2 expandtab
diff --git a/gr-zeromq/lib/req_source_impl.h b/gr-zeromq/lib/req_source_impl.h
index ec87832518..6e2d71f565 100644
--- a/gr-zeromq/lib/req_source_impl.h
+++ b/gr-zeromq/lib/req_source_impl.h
@@ -29,23 +29,29 @@
#include "base_impl.h"
namespace gr {
- namespace zeromq {
+namespace zeromq {
- class req_source_impl : public req_source, public base_source_impl
- {
- public:
- req_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm);
+class req_source_impl : public req_source, public base_source_impl
+{
+public:
+ req_source_impl(size_t itemsize,
+ size_t vlen,
+ char* address,
+ int timeout,
+ bool pass_tags,
+ int hwm);
- int work(int noutput_items,
- gr_vector_const_void_star &input_items,
- gr_vector_void_star &output_items);
+ int work(int noutput_items,
+ gr_vector_const_void_star& input_items,
+ gr_vector_void_star& output_items);
- std::string last_endpoint() override {return base_source_impl::last_endpoint();}
- private:
- bool d_req_pending;
- };
+ std::string last_endpoint() override { return base_source_impl::last_endpoint(); }
- } // namespace zeromq
+private:
+ bool d_req_pending;
+};
+
+} // namespace zeromq
} // namespace gr
#endif /* INCLUDED_ZEROMQ_REQ_SOURCE_IMPL_H */
diff --git a/gr-zeromq/lib/sub_msg_source_impl.cc b/gr-zeromq/lib/sub_msg_source_impl.cc
index 3a08499b12..3a9fb906da 100644
--- a/gr-zeromq/lib/sub_msg_source_impl.cc
+++ b/gr-zeromq/lib/sub_msg_source_impl.cc
@@ -31,86 +31,84 @@
#include "tag_headers.h"
namespace gr {
- namespace zeromq {
+namespace zeromq {
- sub_msg_source::sptr
- sub_msg_source::make(char *address, int timeout)
- {
- return gnuradio::get_initial_sptr
- (new sub_msg_source_impl(address, timeout));
- }
+sub_msg_source::sptr sub_msg_source::make(char* address, int timeout)
+{
+ return gnuradio::get_initial_sptr(new sub_msg_source_impl(address, timeout));
+}
- sub_msg_source_impl::sub_msg_source_impl(char *address, int timeout)
- : gr::block("sub_msg_source",
- gr::io_signature::make(0, 0, 0),
- gr::io_signature::make(0, 0, 0)),
+sub_msg_source_impl::sub_msg_source_impl(char* address, int timeout)
+ : gr::block("sub_msg_source",
+ gr::io_signature::make(0, 0, 0),
+ gr::io_signature::make(0, 0, 0)),
d_timeout(timeout),
d_port(pmt::mp("out"))
- {
- int major, minor, patch;
- zmq::version(&major, &minor, &patch);
-
- if (major < 3) {
- d_timeout = timeout*1000;
- }
-
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, ZMQ_SUB);
-
- d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
- d_socket->connect (address);
-
- message_port_register_out(d_port);
- }
+{
+ int major, minor, patch;
+ zmq::version(&major, &minor, &patch);
- sub_msg_source_impl::~sub_msg_source_impl()
- {
- d_socket->close();
- delete d_socket;
- delete d_context;
+ if (major < 3) {
+ d_timeout = timeout * 1000;
}
- bool sub_msg_source_impl::start()
- {
- d_finished = false;
- d_thread = new boost::thread(boost::bind(&sub_msg_source_impl::readloop, this));
- return true;
- }
-
- bool sub_msg_source_impl::stop()
- {
- d_finished = true;
- d_thread->join();
- return true;
- }
-
- void sub_msg_source_impl::readloop()
- {
- while(!d_finished){
-
- zmq::pollitem_t items[] = { { static_cast<void *>(*d_socket), 0, ZMQ_POLLIN, 0 } };
+ d_context = new zmq::context_t(1);
+ d_socket = new zmq::socket_t(*d_context, ZMQ_SUB);
+
+ d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
+ d_socket->connect(address);
+
+ message_port_register_out(d_port);
+}
+
+sub_msg_source_impl::~sub_msg_source_impl()
+{
+ d_socket->close();
+ delete d_socket;
+ delete d_context;
+}
+
+bool sub_msg_source_impl::start()
+{
+ d_finished = false;
+ d_thread = new boost::thread(boost::bind(&sub_msg_source_impl::readloop, this));
+ return true;
+}
+
+bool sub_msg_source_impl::stop()
+{
+ d_finished = true;
+ d_thread->join();
+ return true;
+}
+
+void sub_msg_source_impl::readloop()
+{
+ while (!d_finished) {
+
+ zmq::pollitem_t items[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 } };
zmq::poll(&items[0], 1, d_timeout);
// If we got a reply, process
if (items[0].revents & ZMQ_POLLIN) {
- // Receive data
- zmq::message_t msg;
+ // Receive data
+ zmq::message_t msg;
#if USE_NEW_CPPZMQ_SEND_RECV
- d_socket->recv(msg);
+ d_socket->recv(msg);
#else
- d_socket->recv(&msg);
+ d_socket->recv(&msg);
#endif
- std::string buf(static_cast<char*>(msg.data()), msg.size());
- std::stringbuf sb(buf);
- pmt::pmt_t m = pmt::deserialize(sb);
+ std::string buf(static_cast<char*>(msg.data()), msg.size());
+ std::stringbuf sb(buf);
+ pmt::pmt_t m = pmt::deserialize(sb);
- message_port_pub(d_port, m);
+ message_port_pub(d_port, m);
} else {
- boost::this_thread::sleep(boost::posix_time::microseconds(100));
+ boost::this_thread::sleep(boost::posix_time::microseconds(100));
}
- }
}
+}
- } /* namespace zeromq */
+} /* namespace zeromq */
} /* namespace gr */
diff --git a/gr-zeromq/lib/sub_msg_source_impl.h b/gr-zeromq/lib/sub_msg_source_impl.h
index 4ffce4ba5f..f83b6d4639 100644
--- a/gr-zeromq/lib/sub_msg_source_impl.h
+++ b/gr-zeromq/lib/sub_msg_source_impl.h
@@ -27,37 +27,38 @@
#include "zmq_common_impl.h"
namespace gr {
- namespace zeromq {
+namespace zeromq {
- class sub_msg_source_impl : public sub_msg_source
- {
- private:
- int d_timeout; // microseconds, -1 is blocking
- zmq::context_t *d_context;
- zmq::socket_t *d_socket;
- boost::thread *d_thread;
- const pmt::pmt_t d_port;
+class sub_msg_source_impl : public sub_msg_source
+{
+private:
+ int d_timeout; // microseconds, -1 is blocking
+ zmq::context_t* d_context;
+ zmq::socket_t* d_socket;
+ boost::thread* d_thread;
+ const pmt::pmt_t d_port;
- void readloop();
+ void readloop();
- public:
- bool d_finished;
+public:
+ bool d_finished;
- sub_msg_source_impl(char *address, int timeout);
- ~sub_msg_source_impl();
+ sub_msg_source_impl(char* address, int timeout);
+ ~sub_msg_source_impl();
- bool start();
- bool stop();
+ bool start();
+ bool stop();
- std::string last_endpoint() override {
+ std::string last_endpoint() override
+ {
char addr[256];
size_t addr_len = sizeof(addr);
d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len);
- return std::string(addr, addr_len-1);
- }
- };
+ return std::string(addr, addr_len - 1);
+ }
+};
- } // namespace zeromq
+} // namespace zeromq
} // namespace gr
#endif /* INCLUDED_ZEROMQ_SUB_MSG_SOURCE_IMPL_H */
diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc
index 9a2e0bfe15..d55ca116f0 100644
--- a/gr-zeromq/lib/sub_source_impl.cc
+++ b/gr-zeromq/lib/sub_source_impl.cc
@@ -29,61 +29,58 @@
#include "tag_headers.h"
namespace gr {
- namespace zeromq {
+namespace zeromq {
- sub_source::sptr
- sub_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm)
- {
- return gnuradio::get_initial_sptr
- (new sub_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
- }
+sub_source::sptr sub_source::make(
+ size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm)
+{
+ return gnuradio::get_initial_sptr(
+ new sub_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm));
+}
- sub_source_impl::sub_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm)
- : gr::sync_block("sub_source",
- gr::io_signature::make(0, 0, 0),
- gr::io_signature::make(1, 1, itemsize * vlen)),
- base_source_impl(ZMQ_SUB, itemsize, vlen, address, timeout, pass_tags, hwm)
- {
- /* Subscribe */
- d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
- }
+sub_source_impl::sub_source_impl(
+ size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm)
+ : gr::sync_block("sub_source",
+ gr::io_signature::make(0, 0, 0),
+ gr::io_signature::make(1, 1, itemsize * vlen)),
+ base_source_impl(ZMQ_SUB, itemsize, vlen, address, timeout, pass_tags, hwm)
+{
+ /* Subscribe */
+ d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
+}
- int
- sub_source_impl::work(int noutput_items,
- gr_vector_const_void_star &input_items,
- gr_vector_void_star &output_items)
- {
- uint8_t *out = (uint8_t *) output_items[0];
- bool first = true;
- int done = 0;
+int sub_source_impl::work(int noutput_items,
+ gr_vector_const_void_star& input_items,
+ gr_vector_void_star& output_items)
+{
+ uint8_t* out = (uint8_t*)output_items[0];
+ bool first = true;
+ int done = 0;
- /* Process as much as we can */
- while (1)
- {
- if (has_pending())
- {
- /* Flush anything pending */
- done += flush_pending(out + (done * d_vsize), noutput_items - done, nitems_written(0) + done);
+ /* Process as much as we can */
+ while (1) {
+ if (has_pending()) {
+ /* Flush anything pending */
+ done += flush_pending(
+ out + (done * d_vsize), noutput_items - done, nitems_written(0) + done);
- /* No more space ? */
- if (done == noutput_items)
- break;
- }
- else
- {
- /* Try to get the next message */
- if (!load_message(first))
- break; /* No message, we're done for now */
+ /* No more space ? */
+ if (done == noutput_items)
+ break;
+ } else {
+ /* Try to get the next message */
+ if (!load_message(first))
+ break; /* No message, we're done for now */
- /* Not the first anymore */
- first = false;
+ /* Not the first anymore */
+ first = false;
}
- }
-
- return done;
}
- } /* namespace zeromq */
+ return done;
+}
+
+} /* namespace zeromq */
} /* namespace gr */
// vim: ts=2 sw=2 expandtab
diff --git a/gr-zeromq/lib/sub_source_impl.h b/gr-zeromq/lib/sub_source_impl.h
index 727d76d6a2..890907b802 100644
--- a/gr-zeromq/lib/sub_source_impl.h
+++ b/gr-zeromq/lib/sub_source_impl.h
@@ -29,22 +29,26 @@
#include "base_impl.h"
namespace gr {
- namespace zeromq {
-
- class sub_source_impl : public sub_source, public base_source_impl
- {
- public:
- sub_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm);
-
- int work(int noutput_items,
- gr_vector_const_void_star &input_items,
- gr_vector_void_star &output_items);
-
- std::string last_endpoint() override {return base_source_impl::last_endpoint();}
- };
-
- } // namespace zeromq
+namespace zeromq {
+
+class sub_source_impl : public sub_source, public base_source_impl
+{
+public:
+ sub_source_impl(size_t itemsize,
+ size_t vlen,
+ char* address,
+ int timeout,
+ bool pass_tags,
+ int hwm);
+
+ int work(int noutput_items,
+ gr_vector_const_void_star& input_items,
+ gr_vector_void_star& output_items);
+
+ std::string last_endpoint() override { return base_source_impl::last_endpoint(); }
+};
+
+} // namespace zeromq
} // namespace gr
#endif /* INCLUDED_ZEROMQ_SUB_SOURCE_IMPL_H */
-
diff --git a/gr-zeromq/lib/tag_headers.cc b/gr-zeromq/lib/tag_headers.cc
index 5a5a417420..3b59bebf77 100644
--- a/gr-zeromq/lib/tag_headers.cc
+++ b/gr-zeromq/lib/tag_headers.cc
@@ -26,86 +26,84 @@
#include <cstring>
#include <zmq.hpp>
-#define GR_HEADER_MAGIC 0x5FF0
+#define GR_HEADER_MAGIC 0x5FF0
#define GR_HEADER_VERSION 0x01
namespace gr {
- namespace zeromq {
+namespace zeromq {
- struct membuf: std::streambuf
+struct membuf : std::streambuf {
+ membuf(void* b, size_t len)
{
- membuf(void *b, size_t len)
- {
- char *bc = static_cast<char*>(b);
- this->setg(bc, bc, bc+len);
- }
- };
-
- std::string
- gen_tag_header(uint64_t offset, std::vector<gr::tag_t> &tags)
- {
- std::stringbuf sb("");
- std::ostream ss(&sb);
-
- uint16_t header_magic = GR_HEADER_MAGIC;
- uint8_t header_version = GR_HEADER_VERSION;
- uint64_t ntags = (uint64_t)tags.size();
-
- ss.write( (const char*)&header_magic, sizeof(uint16_t) );
- ss.write( (const char*)&header_version, sizeof(uint8_t) );
- ss.write( (const char*)&offset, sizeof(uint64_t) );
- ss.write( (const char*)&ntags, sizeof(uint64_t) );
-
- for(size_t i=0; i<tags.size(); i++)
- {
- ss.write( (const char *)&tags[i].offset, sizeof(uint64_t) );
- pmt::serialize( tags[i].key, sb );
- pmt::serialize( tags[i].value, sb );
- pmt::serialize( tags[i].srcid, sb );
- }
-
- return sb.str();
+ char* bc = static_cast<char*>(b);
+ this->setg(bc, bc, bc + len);
+ }
+};
+
+std::string gen_tag_header(uint64_t offset, std::vector<gr::tag_t>& tags)
+{
+ std::stringbuf sb("");
+ std::ostream ss(&sb);
+
+ uint16_t header_magic = GR_HEADER_MAGIC;
+ uint8_t header_version = GR_HEADER_VERSION;
+ uint64_t ntags = (uint64_t)tags.size();
+
+ ss.write((const char*)&header_magic, sizeof(uint16_t));
+ ss.write((const char*)&header_version, sizeof(uint8_t));
+ ss.write((const char*)&offset, sizeof(uint64_t));
+ ss.write((const char*)&ntags, sizeof(uint64_t));
+
+ for (size_t i = 0; i < tags.size(); i++) {
+ ss.write((const char*)&tags[i].offset, sizeof(uint64_t));
+ pmt::serialize(tags[i].key, sb);
+ pmt::serialize(tags[i].value, sb);
+ pmt::serialize(tags[i].srcid, sb);
}
- size_t
- parse_tag_header(zmq::message_t &msg, uint64_t &offset_out, std::vector<gr::tag_t> &tags_out)
- {
- membuf sb(msg.data(), msg.size());
- std::istream iss(&sb);
+ return sb.str();
+}
- size_t min_len = sizeof(uint16_t) + sizeof(uint8_t) + sizeof(uint64_t) + sizeof(uint64_t);
- if (msg.size() < min_len)
+size_t parse_tag_header(zmq::message_t& msg,
+ uint64_t& offset_out,
+ std::vector<gr::tag_t>& tags_out)
+{
+ membuf sb(msg.data(), msg.size());
+ std::istream iss(&sb);
+
+ size_t min_len =
+ sizeof(uint16_t) + sizeof(uint8_t) + sizeof(uint64_t) + sizeof(uint64_t);
+ if (msg.size() < min_len)
throw std::runtime_error("incoming zmq msg too small to hold gr tag header!");
- uint16_t header_magic;
- uint8_t header_version;
- uint64_t rcv_ntags;
+ uint16_t header_magic;
+ uint8_t header_version;
+ uint64_t rcv_ntags;
- iss.read( (char*)&header_magic, sizeof(uint16_t) );
- iss.read( (char*)&header_version, sizeof(uint8_t) );
+ iss.read((char*)&header_magic, sizeof(uint16_t));
+ iss.read((char*)&header_version, sizeof(uint8_t));
- if(header_magic != GR_HEADER_MAGIC)
+ if (header_magic != GR_HEADER_MAGIC)
throw std::runtime_error("gr header magic does not match!");
- if(header_version != 1)
+ if (header_version != 1)
throw std::runtime_error("gr header version too high!");
- iss.read( (char*)&offset_out, sizeof(uint64_t) );
- iss.read( (char*)&rcv_ntags, sizeof(uint64_t) );
+ iss.read((char*)&offset_out, sizeof(uint64_t));
+ iss.read((char*)&rcv_ntags, sizeof(uint64_t));
- for(size_t i=0; i<rcv_ntags; i++)
- {
+ for (size_t i = 0; i < rcv_ntags; i++) {
gr::tag_t newtag;
- sb.sgetn( (char*)&(newtag.offset), sizeof(uint64_t) );
- newtag.key = pmt::deserialize( sb );
- newtag.value = pmt::deserialize( sb );
- newtag.srcid = pmt::deserialize( sb );
+ sb.sgetn((char*)&(newtag.offset), sizeof(uint64_t));
+ newtag.key = pmt::deserialize(sb);
+ newtag.value = pmt::deserialize(sb);
+ newtag.srcid = pmt::deserialize(sb);
tags_out.push_back(newtag);
- }
-
- return msg.size() - sb.in_avail();
}
- } /* namespace zeromq */
+
+ return msg.size() - sb.in_avail();
+}
+} /* namespace zeromq */
} /* namespace gr */
// vim: ts=2 sw=2 expandtab
diff --git a/gr-zeromq/lib/tag_headers.h b/gr-zeromq/lib/tag_headers.h
index dede5e93a6..a2d0129736 100644
--- a/gr-zeromq/lib/tag_headers.h
+++ b/gr-zeromq/lib/tag_headers.h
@@ -30,12 +30,14 @@
#include <zmq.hpp>
namespace gr {
- namespace zeromq {
+namespace zeromq {
- std::string gen_tag_header(uint64_t offset, std::vector<gr::tag_t> &tags);
- size_t parse_tag_header(zmq::message_t &msg, uint64_t &offset_out, std::vector<gr::tag_t> &tags_out);
-
- } /* namespace zeromq */
+std::string gen_tag_header(uint64_t offset, std::vector<gr::tag_t>& tags);
+size_t parse_tag_header(zmq::message_t& msg,
+ uint64_t& offset_out,
+ std::vector<gr::tag_t>& tags_out);
+
+} /* namespace zeromq */
} /* namespace gr */
#endif /* ZEROMQ_TAG_HEADERS_H */
diff --git a/gr-zeromq/lib/zmq_common_impl.h b/gr-zeromq/lib/zmq_common_impl.h
index e73a3a17d3..8fb934fbdc 100644
--- a/gr-zeromq/lib/zmq_common_impl.h
+++ b/gr-zeromq/lib/zmq_common_impl.h
@@ -25,7 +25,8 @@
#include <zmq.hpp>
-#if defined (CPPZMQ_VERSION) && defined (ZMQ_MAKE_VERSION) && CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 1)
+#if defined(CPPZMQ_VERSION) && defined(ZMQ_MAKE_VERSION) && \
+ CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 1)
#define USE_NEW_CPPZMQ_SEND_RECV 1
#else
#define USE_NEW_CPPZMQ_SEND_RECV 0