diff options
Diffstat (limited to 'gr-zeromq/lib')
29 files changed, 1078 insertions, 1042 deletions
diff --git a/gr-zeromq/lib/base_impl.cc b/gr-zeromq/lib/base_impl.cc index 662f7b1596..b3a107c815 100644 --- a/gr-zeromq/lib/base_impl.cc +++ b/gr-zeromq/lib/base_impl.cc @@ -29,200 +29,207 @@ #include "tag_headers.h" namespace gr { - namespace zeromq { +namespace zeromq { - base_impl::base_impl(int type, size_t itemsize, size_t vlen, int timeout, bool pass_tags) - : d_vsize(itemsize * vlen), d_timeout(timeout), d_pass_tags(pass_tags) - { - /* "Fix" timeout value (ms for new API, us for old API) */ - int major, minor, patch; - zmq::version (&major, &minor, &patch); +base_impl::base_impl(int type, size_t itemsize, size_t vlen, int timeout, bool pass_tags) + : d_vsize(itemsize * vlen), d_timeout(timeout), d_pass_tags(pass_tags) +{ + /* "Fix" timeout value (ms for new API, us for old API) */ + int major, minor, patch; + zmq::version(&major, &minor, &patch); - if (major < 3) { + if (major < 3) { d_timeout *= 1000; - } - - /* Create context & socket */ - d_context = new zmq::context_t(1); - d_socket = new zmq::socket_t(*d_context, type); - } - - base_impl::~base_impl() - { - d_socket->close(); - delete d_socket; - delete d_context; - } - - std::string - base_impl::last_endpoint() - { - char addr[256]; - size_t addr_len = sizeof(addr); - d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len); - return std::string(addr, addr_len-1); } - - base_sink_impl::base_sink_impl(int type, size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) - : base_impl(type, itemsize, vlen, timeout, pass_tags) - { - /* Set high watermark */ - if (hwm >= 0) { + /* Create context & socket */ + d_context = new zmq::context_t(1); + d_socket = new zmq::socket_t(*d_context, type); +} + +base_impl::~base_impl() +{ + d_socket->close(); + delete d_socket; + delete d_context; +} + +std::string base_impl::last_endpoint() +{ + char addr[256]; + size_t addr_len = sizeof(addr); + d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len); + return std::string(addr, addr_len - 1); +} + + +base_sink_impl::base_sink_impl(int type, + size_t itemsize, + size_t vlen, + char* address, + int timeout, + bool pass_tags, + int hwm) + : base_impl(type, itemsize, vlen, timeout, pass_tags) +{ + /* Set high watermark */ + if (hwm >= 0) { #ifdef ZMQ_SNDHWM d_socket->setsockopt(ZMQ_SNDHWM, &hwm, sizeof(hwm)); #else // major < 3 uint64_t tmp = hwm; d_socket->setsockopt(ZMQ_HWM, &tmp, sizeof(tmp)); #endif - } - - /* Bind */ - d_socket->bind(address); } - int - base_sink_impl::send_message(const void *in_buf, const int in_nitems, const uint64_t in_offset) - { - /* Meta-data header */ - std::string header(""); - if(d_pass_tags){ + /* Bind */ + d_socket->bind(address); +} + +int base_sink_impl::send_message(const void* in_buf, + const int in_nitems, + const uint64_t in_offset) +{ + /* Meta-data header */ + std::string header(""); + if (d_pass_tags) { std::vector<gr::tag_t> tags; get_tags_in_range(tags, 0, in_offset, in_offset + in_nitems); header = gen_tag_header(in_offset, tags); - } + } - /* Create message */ - size_t payload_len = in_nitems * d_vsize; - size_t msg_len = d_pass_tags ? payload_len + header.length() : payload_len; - zmq::message_t msg(msg_len); + /* Create message */ + size_t payload_len = in_nitems * d_vsize; + size_t msg_len = d_pass_tags ? payload_len + header.length() : payload_len; + zmq::message_t msg(msg_len); - if(d_pass_tags){ + if (d_pass_tags) { memcpy(msg.data(), header.c_str(), header.length()); memcpy((uint8_t*)msg.data() + header.length(), in_buf, payload_len); - } else { + } else { memcpy(msg.data(), in_buf, payload_len); - } + } - /* Send */ + /* Send */ #if USE_NEW_CPPZMQ_SEND_RECV - d_socket->send(msg, zmq::send_flags::none); + d_socket->send(msg, zmq::send_flags::none); #else - d_socket->send(msg); + d_socket->send(msg); #endif - /* Report back */ - return in_nitems; - } - - base_source_impl::base_source_impl(int type, size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) - : base_impl(type, itemsize, vlen, timeout, pass_tags), - d_consumed_bytes(0), d_consumed_items(0) - { - /* Set high watermark */ - if (hwm >= 0) { + /* Report back */ + return in_nitems; +} + +base_source_impl::base_source_impl(int type, + size_t itemsize, + size_t vlen, + char* address, + int timeout, + bool pass_tags, + int hwm) + : base_impl(type, itemsize, vlen, timeout, pass_tags), + d_consumed_bytes(0), + d_consumed_items(0) +{ + /* Set high watermark */ + if (hwm >= 0) { #ifdef ZMQ_RCVHWM d_socket->setsockopt(ZMQ_RCVHWM, &hwm, sizeof(hwm)); #else // major < 3 uint64_t tmp = hwm; d_socket->setsockopt(ZMQ_HWM, &tmp, sizeof(tmp)); #endif - } - - /* Connect */ - d_socket->connect(address); } - bool - base_source_impl::has_pending() - { - return d_msg.size() > d_consumed_bytes; - } + /* Connect */ + d_socket->connect(address); +} - int - base_source_impl::flush_pending(void *out_buf, const int out_nitems, const uint64_t out_offset) - { - /* How much to copy in this call */ - int to_copy_items = std::min(out_nitems, (int)((d_msg.size() - d_consumed_bytes) / d_vsize)); - int to_copy_bytes = d_vsize * to_copy_items; +bool base_source_impl::has_pending() { return d_msg.size() > d_consumed_bytes; } - /* Copy actual data */ - memcpy(out_buf, (uint8_t*)d_msg.data() + d_consumed_bytes, to_copy_bytes); +int base_source_impl::flush_pending(void* out_buf, + const int out_nitems, + const uint64_t out_offset) +{ + /* How much to copy in this call */ + int to_copy_items = + std::min(out_nitems, (int)((d_msg.size() - d_consumed_bytes) / d_vsize)); + int to_copy_bytes = d_vsize * to_copy_items; - /* Add tags matching this segment of samples */ - for (unsigned int i=0; i<d_tags.size(); i++) - { + /* Copy actual data */ + memcpy(out_buf, (uint8_t*)d_msg.data() + d_consumed_bytes, to_copy_bytes); + + /* Add tags matching this segment of samples */ + for (unsigned int i = 0; i < d_tags.size(); i++) { if ((d_tags[i].offset >= (uint64_t)d_consumed_items) && - (d_tags[i].offset < (uint64_t)d_consumed_items + to_copy_items)) - { - gr::tag_t nt = d_tags[i]; - nt.offset += out_offset - d_consumed_items; - add_item_tag(0, nt); + (d_tags[i].offset < (uint64_t)d_consumed_items + to_copy_items)) { + gr::tag_t nt = d_tags[i]; + nt.offset += out_offset - d_consumed_items; + add_item_tag(0, nt); } - } + } - /* Update pointer */ - d_consumed_items += to_copy_items; - d_consumed_bytes += to_copy_bytes; + /* Update pointer */ + d_consumed_items += to_copy_items; + d_consumed_bytes += to_copy_bytes; - return to_copy_items; - } + return to_copy_items; +} - bool - base_source_impl::load_message(bool wait) - { - /* Poll for input */ - zmq::pollitem_t items[] = { { static_cast<void *>(*d_socket), 0, ZMQ_POLLIN, 0 } }; - zmq::poll(&items[0], 1, wait ? d_timeout : 0); +bool base_source_impl::load_message(bool wait) +{ + /* Poll for input */ + zmq::pollitem_t items[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 } }; + zmq::poll(&items[0], 1, wait ? d_timeout : 0); - if (!(items[0].revents & ZMQ_POLLIN)) + if (!(items[0].revents & ZMQ_POLLIN)) return false; - /* Is this the start or continuation of a multi-part message? */ - int64_t more = 0; - size_t more_len = sizeof(more); - d_socket->getsockopt(ZMQ_RCVMORE, &more, &more_len); + /* Is this the start or continuation of a multi-part message? */ + int64_t more = 0; + size_t more_len = sizeof(more); + d_socket->getsockopt(ZMQ_RCVMORE, &more, &more_len); - /* Reset */ - d_msg.rebuild(); - d_tags.clear(); - d_consumed_items = 0; - d_consumed_bytes = 0; + /* Reset */ + d_msg.rebuild(); + d_tags.clear(); + d_consumed_items = 0; + d_consumed_bytes = 0; - /* Get the message */ + /* Get the message */ #if USE_NEW_CPPZMQ_SEND_RECV - d_socket->recv(d_msg); + d_socket->recv(d_msg); #else - d_socket->recv(&d_msg); + d_socket->recv(&d_msg); #endif - /* Parse header from the first (or only) message of a multi-part message */ - if (d_pass_tags && !more) - { + /* Parse header from the first (or only) message of a multi-part message */ + if (d_pass_tags && !more) { uint64_t rcv_offset; /* Parse header */ d_consumed_bytes = parse_tag_header(d_msg, rcv_offset, d_tags); /* Fixup the tags offset to be relative to the start of this message */ - for (unsigned int i=0; i<d_tags.size(); i++) { - d_tags[i].offset -= rcv_offset; + for (unsigned int i = 0; i < d_tags.size(); i++) { + d_tags[i].offset -= rcv_offset; } - } + } - /* Each message must contain an integer multiple of data vectors */ - if ((d_msg.size() - d_consumed_bytes) % d_vsize != 0) - { + /* Each message must contain an integer multiple of data vectors */ + if ((d_msg.size() - d_consumed_bytes) % d_vsize != 0) { throw std::runtime_error( boost::str(boost::format("Incompatible vector sizes: " - "need a multiple of %1% bytes per message") % d_vsize)); - } - - /* We got one ! */ - return true; + "need a multiple of %1% bytes per message") % + d_vsize)); } - } /* namespace zeromq */ + /* We got one ! */ + return true; +} + +} /* namespace zeromq */ } /* namespace gr */ // vim: ts=2 sw=2 expandtab diff --git a/gr-zeromq/lib/base_impl.h b/gr-zeromq/lib/base_impl.h index 68f0fa2ea5..3783a4e375 100644 --- a/gr-zeromq/lib/base_impl.h +++ b/gr-zeromq/lib/base_impl.h @@ -27,49 +27,61 @@ #include "zmq_common_impl.h" namespace gr { - namespace zeromq { +namespace zeromq { - class base_impl : public virtual gr::sync_block - { - public: - base_impl(int type, size_t itemsize, size_t vlen, int timeout, bool pass_tags); - virtual ~base_impl(); +class base_impl : public virtual gr::sync_block +{ +public: + base_impl(int type, size_t itemsize, size_t vlen, int timeout, bool pass_tags); + virtual ~base_impl(); - protected: - std::string last_endpoint(); - zmq::context_t *d_context; - zmq::socket_t *d_socket; - size_t d_vsize; - int d_timeout; - bool d_pass_tags; - }; +protected: + std::string last_endpoint(); + zmq::context_t* d_context; + zmq::socket_t* d_socket; + size_t d_vsize; + int d_timeout; + bool d_pass_tags; +}; - class base_sink_impl : public base_impl - { - public: - base_sink_impl(int type, size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm); +class base_sink_impl : public base_impl +{ +public: + base_sink_impl(int type, + size_t itemsize, + size_t vlen, + char* address, + int timeout, + bool pass_tags, + int hwm); - protected: - int send_message(const void *in_buf, const int in_nitems, const uint64_t in_offset); - }; +protected: + int send_message(const void* in_buf, const int in_nitems, const uint64_t in_offset); +}; - class base_source_impl : public base_impl - { - public: - base_source_impl(int type, size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm); +class base_source_impl : public base_impl +{ +public: + base_source_impl(int type, + size_t itemsize, + size_t vlen, + char* address, + int timeout, + bool pass_tags, + int hwm); - protected: - zmq::message_t d_msg; - std::vector<gr::tag_t> d_tags; - size_t d_consumed_bytes; - int d_consumed_items; +protected: + zmq::message_t d_msg; + std::vector<gr::tag_t> d_tags; + size_t d_consumed_bytes; + int d_consumed_items; - bool has_pending(); - int flush_pending(void *out_buf, const int out_nitems, const uint64_t out_offset); - bool load_message(bool wait); - }; + bool has_pending(); + int flush_pending(void* out_buf, const int out_nitems, const uint64_t out_offset); + bool load_message(bool wait); +}; - } // namespace zeromq +} // namespace zeromq } // namespace gr #endif /* INCLUDED_ZEROMQ_BASE_IMPL_H */ diff --git a/gr-zeromq/lib/pub_msg_sink_impl.cc b/gr-zeromq/lib/pub_msg_sink_impl.cc index 5fc7164d2b..77ffc469cd 100644 --- a/gr-zeromq/lib/pub_msg_sink_impl.cc +++ b/gr-zeromq/lib/pub_msg_sink_impl.cc @@ -29,55 +29,52 @@ #include "tag_headers.h" namespace gr { - namespace zeromq { +namespace zeromq { - pub_msg_sink::sptr - pub_msg_sink::make(char *address, int timeout) - { - return gnuradio::get_initial_sptr - (new pub_msg_sink_impl(address, timeout)); +pub_msg_sink::sptr pub_msg_sink::make(char* address, int timeout) +{ + return gnuradio::get_initial_sptr(new pub_msg_sink_impl(address, timeout)); +} + +pub_msg_sink_impl::pub_msg_sink_impl(char* address, int timeout) + : gr::block("pub_msg_sink", + gr::io_signature::make(0, 0, 0), + gr::io_signature::make(0, 0, 0)), + d_timeout(timeout) +{ + int major, minor, patch; + zmq::version(&major, &minor, &patch); + if (major < 3) { + d_timeout = timeout * 1000; } - pub_msg_sink_impl::pub_msg_sink_impl(char *address, int timeout) - : gr::block("pub_msg_sink", - gr::io_signature::make(0, 0, 0), - gr::io_signature::make(0, 0, 0)), - d_timeout(timeout) - { - int major, minor, patch; - zmq::version (&major, &minor, &patch); - if (major < 3) { - d_timeout = timeout*1000; - } + d_context = new zmq::context_t(1); + d_socket = new zmq::socket_t(*d_context, ZMQ_PUB); + int time = 0; + d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); + d_socket->bind(address); - d_context = new zmq::context_t(1); - d_socket = new zmq::socket_t(*d_context, ZMQ_PUB); - int time = 0; - d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); - d_socket->bind(address); + message_port_register_in(pmt::mp("in")); + set_msg_handler(pmt::mp("in"), boost::bind(&pub_msg_sink_impl::handler, this, _1)); +} - message_port_register_in(pmt::mp("in")); - set_msg_handler( pmt::mp("in"), - boost::bind(&pub_msg_sink_impl::handler, this, _1)); - } +pub_msg_sink_impl::~pub_msg_sink_impl() +{ + d_socket->close(); + delete d_socket; + delete d_context; +} - pub_msg_sink_impl::~pub_msg_sink_impl() - { - d_socket->close(); - delete d_socket; - delete d_context; - } +void pub_msg_sink_impl::handler(pmt::pmt_t msg) +{ + std::stringbuf sb(""); + pmt::serialize(msg, sb); + std::string s = sb.str(); + zmq::message_t zmsg(s.size()); - void pub_msg_sink_impl::handler(pmt::pmt_t msg) - { - std::stringbuf sb(""); - pmt::serialize(msg, sb); - std::string s = sb.str(); - zmq::message_t zmsg(s.size()); - - memcpy(zmsg.data(), s.c_str(), s.size()); - d_socket->send(zmsg); - } + memcpy(zmsg.data(), s.c_str(), s.size()); + d_socket->send(zmsg); +} - } /* namespace zeromq */ +} /* namespace zeromq */ } /* namespace gr */ diff --git a/gr-zeromq/lib/pub_msg_sink_impl.h b/gr-zeromq/lib/pub_msg_sink_impl.h index 0cedfed482..cfd81ca399 100644 --- a/gr-zeromq/lib/pub_msg_sink_impl.h +++ b/gr-zeromq/lib/pub_msg_sink_impl.h @@ -27,29 +27,30 @@ #include <zmq.hpp> namespace gr { - namespace zeromq { +namespace zeromq { - class pub_msg_sink_impl : public pub_msg_sink - { - private: - float d_timeout; - zmq::context_t *d_context; - zmq::socket_t *d_socket; +class pub_msg_sink_impl : public pub_msg_sink +{ +private: + float d_timeout; + zmq::context_t* d_context; + zmq::socket_t* d_socket; - public: - pub_msg_sink_impl(char *address, int timeout); - ~pub_msg_sink_impl(); +public: + pub_msg_sink_impl(char* address, int timeout); + ~pub_msg_sink_impl(); - void handler(pmt::pmt_t msg); - std::string last_endpoint() override { + void handler(pmt::pmt_t msg); + std::string last_endpoint() override + { char addr[256]; size_t addr_len = sizeof(addr); d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len); - return std::string(addr, addr_len-1); - } - }; + return std::string(addr, addr_len - 1); + } +}; - } // namespace zeromq +} // namespace zeromq } // namespace gr #endif /* INCLUDED_ZEROMQ_PUB_MSG_SINK_IMPL_H */ diff --git a/gr-zeromq/lib/pub_sink_impl.cc b/gr-zeromq/lib/pub_sink_impl.cc index b602bc83a6..209b9e7f8c 100644 --- a/gr-zeromq/lib/pub_sink_impl.cc +++ b/gr-zeromq/lib/pub_sink_impl.cc @@ -29,33 +29,33 @@ #include "tag_headers.h" namespace gr { - namespace zeromq { - - pub_sink::sptr - pub_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) - { - return gnuradio::get_initial_sptr - (new pub_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm)); - } - - pub_sink_impl::pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) - : gr::sync_block("pub_sink", - gr::io_signature::make(1, 1, itemsize * vlen), - gr::io_signature::make(0, 0, 0)), - base_sink_impl(ZMQ_PUB, itemsize, vlen, address, timeout, pass_tags, hwm) - { - /* All is delegated */ - } - - int - pub_sink_impl::work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items) - { - return send_message(input_items[0], noutput_items, nitems_read(0)); - } - - } /* namespace zeromq */ +namespace zeromq { + +pub_sink::sptr pub_sink::make( + size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm) +{ + return gnuradio::get_initial_sptr( + new pub_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm)); +} + +pub_sink_impl::pub_sink_impl( + size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm) + : gr::sync_block("pub_sink", + gr::io_signature::make(1, 1, itemsize * vlen), + gr::io_signature::make(0, 0, 0)), + base_sink_impl(ZMQ_PUB, itemsize, vlen, address, timeout, pass_tags, hwm) +{ + /* All is delegated */ +} + +int pub_sink_impl::work(int noutput_items, + gr_vector_const_void_star& input_items, + gr_vector_void_star& output_items) +{ + return send_message(input_items[0], noutput_items, nitems_read(0)); +} + +} /* namespace zeromq */ } /* namespace gr */ // vim: ts=2 sw=2 expandtab diff --git a/gr-zeromq/lib/pub_sink_impl.h b/gr-zeromq/lib/pub_sink_impl.h index 78b5c1e1bd..6e921d9067 100644 --- a/gr-zeromq/lib/pub_sink_impl.h +++ b/gr-zeromq/lib/pub_sink_impl.h @@ -29,20 +29,25 @@ #include "base_impl.h" namespace gr { - namespace zeromq { - - class pub_sink_impl : public pub_sink, public base_sink_impl - { - public: - pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm); - - int work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items); - std::string last_endpoint() override {return base_sink_impl::last_endpoint();} - }; - - } // namespace zeromq +namespace zeromq { + +class pub_sink_impl : public pub_sink, public base_sink_impl +{ +public: + pub_sink_impl(size_t itemsize, + size_t vlen, + char* address, + int timeout, + bool pass_tags, + int hwm); + + int work(int noutput_items, + gr_vector_const_void_star& input_items, + gr_vector_void_star& output_items); + std::string last_endpoint() override { return base_sink_impl::last_endpoint(); } +}; + +} // namespace zeromq } // namespace gr #endif /* INCLUDED_ZEROMQ_PUB_SINK_IMPL_H */ diff --git a/gr-zeromq/lib/pull_msg_source_impl.cc b/gr-zeromq/lib/pull_msg_source_impl.cc index 48ad2bd5b1..100c7be5a3 100644 --- a/gr-zeromq/lib/pull_msg_source_impl.cc +++ b/gr-zeromq/lib/pull_msg_source_impl.cc @@ -31,88 +31,86 @@ #include "tag_headers.h" namespace gr { - namespace zeromq { +namespace zeromq { - pull_msg_source::sptr - pull_msg_source::make(char *address, int timeout) - { - return gnuradio::get_initial_sptr - (new pull_msg_source_impl(address, timeout)); - } +pull_msg_source::sptr pull_msg_source::make(char* address, int timeout) +{ + return gnuradio::get_initial_sptr(new pull_msg_source_impl(address, timeout)); +} - pull_msg_source_impl::pull_msg_source_impl(char *address, int timeout) - : gr::block("pull_msg_source", - gr::io_signature::make(0, 0, 0), - gr::io_signature::make(0, 0, 0)), +pull_msg_source_impl::pull_msg_source_impl(char* address, int timeout) + : gr::block("pull_msg_source", + gr::io_signature::make(0, 0, 0), + gr::io_signature::make(0, 0, 0)), d_timeout(timeout), d_port(pmt::mp("out")) - { - int major, minor, patch; - zmq::version (&major, &minor, &patch); +{ + int major, minor, patch; + zmq::version(&major, &minor, &patch); - if (major < 3) { - d_timeout = timeout*1000; - } + if (major < 3) { + d_timeout = timeout * 1000; + } - d_context = new zmq::context_t(1); - d_socket = new zmq::socket_t(*d_context, ZMQ_PULL); + d_context = new zmq::context_t(1); + d_socket = new zmq::socket_t(*d_context, ZMQ_PULL); - int time = 0; - d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); - d_socket->connect (address); + int time = 0; + d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); + d_socket->connect(address); - message_port_register_out(d_port); - } + message_port_register_out(d_port); +} - pull_msg_source_impl::~pull_msg_source_impl() - { - d_socket->close(); - delete d_socket; - delete d_context; - } +pull_msg_source_impl::~pull_msg_source_impl() +{ + d_socket->close(); + delete d_socket; + delete d_context; +} - bool pull_msg_source_impl::start() - { - d_finished = false; - d_thread = new boost::thread(boost::bind(&pull_msg_source_impl::readloop, this)); - return true; - } +bool pull_msg_source_impl::start() +{ + d_finished = false; + d_thread = new boost::thread(boost::bind(&pull_msg_source_impl::readloop, this)); + return true; +} - bool pull_msg_source_impl::stop() - { - d_finished = true; - d_thread->join(); - return true; - } +bool pull_msg_source_impl::stop() +{ + d_finished = true; + d_thread->join(); + return true; +} - void pull_msg_source_impl::readloop() - { - while(!d_finished){ +void pull_msg_source_impl::readloop() +{ + while (!d_finished) { - zmq::pollitem_t items[] = { { static_cast<void *>(*d_socket), 0, ZMQ_POLLIN, 0 } }; - zmq::poll (&items[0], 1, d_timeout); + zmq::pollitem_t items[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 } }; + zmq::poll(&items[0], 1, d_timeout); // If we got a reply, process if (items[0].revents & ZMQ_POLLIN) { - // Receive data - zmq::message_t msg; + // Receive data + zmq::message_t msg; #if USE_NEW_CPPZMQ_SEND_RECV - d_socket->recv(msg); + d_socket->recv(msg); #else - d_socket->recv(&msg); + d_socket->recv(&msg); #endif - std::string buf(static_cast<char*>(msg.data()), msg.size()); - std::stringbuf sb(buf); - pmt::pmt_t m = pmt::deserialize(sb); - message_port_pub(d_port, m); + std::string buf(static_cast<char*>(msg.data()), msg.size()); + std::stringbuf sb(buf); + pmt::pmt_t m = pmt::deserialize(sb); + message_port_pub(d_port, m); } else { - boost::this_thread::sleep(boost::posix_time::microseconds(100)); + boost::this_thread::sleep(boost::posix_time::microseconds(100)); } - } } +} - } /* namespace zeromq */ +} /* namespace zeromq */ } /* namespace gr */ diff --git a/gr-zeromq/lib/pull_msg_source_impl.h b/gr-zeromq/lib/pull_msg_source_impl.h index db01e8d038..235d80f655 100644 --- a/gr-zeromq/lib/pull_msg_source_impl.h +++ b/gr-zeromq/lib/pull_msg_source_impl.h @@ -27,37 +27,38 @@ #include "zmq_common_impl.h" namespace gr { - namespace zeromq { +namespace zeromq { - class pull_msg_source_impl : public pull_msg_source - { - private: - int d_timeout; // microseconds, -1 is blocking - zmq::context_t *d_context; - zmq::socket_t *d_socket; - boost::thread *d_thread; - const pmt::pmt_t d_port; +class pull_msg_source_impl : public pull_msg_source +{ +private: + int d_timeout; // microseconds, -1 is blocking + zmq::context_t* d_context; + zmq::socket_t* d_socket; + boost::thread* d_thread; + const pmt::pmt_t d_port; - void readloop(); + void readloop(); - public: - bool d_finished; +public: + bool d_finished; - pull_msg_source_impl(char *address, int timeout); - ~pull_msg_source_impl(); + pull_msg_source_impl(char* address, int timeout); + ~pull_msg_source_impl(); - bool start(); - bool stop(); + bool start(); + bool stop(); - std::string last_endpoint() override { + std::string last_endpoint() override + { char addr[256]; size_t addr_len = sizeof(addr); d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len); - return std::string(addr, addr_len-1); - } - }; + return std::string(addr, addr_len - 1); + } +}; - } // namespace zeromq +} // namespace zeromq } // namespace gr #endif /* INCLUDED_ZEROMQ_PULL_MSG_SOURCE_IMPL_H */ diff --git a/gr-zeromq/lib/pull_source_impl.cc b/gr-zeromq/lib/pull_source_impl.cc index 4045dd7b66..58a875f363 100644 --- a/gr-zeromq/lib/pull_source_impl.cc +++ b/gr-zeromq/lib/pull_source_impl.cc @@ -29,60 +29,57 @@ #include "tag_headers.h" namespace gr { - namespace zeromq { +namespace zeromq { - pull_source::sptr - pull_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) - { - return gnuradio::get_initial_sptr - (new pull_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm)); - } +pull_source::sptr pull_source::make( + size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm) +{ + return gnuradio::get_initial_sptr( + new pull_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm)); +} - pull_source_impl::pull_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) - : gr::sync_block("pull_source", - gr::io_signature::make(0, 0, 0), - gr::io_signature::make(1, 1, itemsize * vlen)), - base_source_impl(ZMQ_PULL, itemsize, vlen, address, timeout, pass_tags, hwm) - { - /* All is delegated */ - } +pull_source_impl::pull_source_impl( + size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm) + : gr::sync_block("pull_source", + gr::io_signature::make(0, 0, 0), + gr::io_signature::make(1, 1, itemsize * vlen)), + base_source_impl(ZMQ_PULL, itemsize, vlen, address, timeout, pass_tags, hwm) +{ + /* All is delegated */ +} - int - pull_source_impl::work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items) - { - uint8_t *out = (uint8_t *) output_items[0]; - bool first = true; - int done = 0; +int pull_source_impl::work(int noutput_items, + gr_vector_const_void_star& input_items, + gr_vector_void_star& output_items) +{ + uint8_t* out = (uint8_t*)output_items[0]; + bool first = true; + int done = 0; - /* Process as much as we can */ - while (1) - { - if (has_pending()) - { - /* Flush anything pending */ - done += flush_pending(out + (done * d_vsize), noutput_items - done, nitems_written(0) + done); + /* Process as much as we can */ + while (1) { + if (has_pending()) { + /* Flush anything pending */ + done += flush_pending( + out + (done * d_vsize), noutput_items - done, nitems_written(0) + done); - /* No more space ? */ - if (done == noutput_items) - break; - } - else - { - /* Try to get the next message */ - if (!load_message(first)) - break; /* No message, we're done for now */ + /* No more space ? */ + if (done == noutput_items) + break; + } else { + /* Try to get the next message */ + if (!load_message(first)) + break; /* No message, we're done for now */ - /* Not the first anymore */ - first = false; + /* Not the first anymore */ + first = false; } - } - - return done; } - } /* namespace zeromq */ + return done; +} + +} /* namespace zeromq */ } /* namespace gr */ // vim: ts=2 sw=2 expandtab diff --git a/gr-zeromq/lib/pull_source_impl.h b/gr-zeromq/lib/pull_source_impl.h index c08ab0521b..a27b3f9c46 100644 --- a/gr-zeromq/lib/pull_source_impl.h +++ b/gr-zeromq/lib/pull_source_impl.h @@ -29,20 +29,25 @@ #include "base_impl.h" namespace gr { - namespace zeromq { - - class pull_source_impl : public pull_source, public base_source_impl - { - public: - pull_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm); - - int work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items); - std::string last_endpoint() override {return base_source_impl::last_endpoint();} - }; - - } // namespace zeromq +namespace zeromq { + +class pull_source_impl : public pull_source, public base_source_impl +{ +public: + pull_source_impl(size_t itemsize, + size_t vlen, + char* address, + int timeout, + bool pass_tags, + int hwm); + + int work(int noutput_items, + gr_vector_const_void_star& input_items, + gr_vector_void_star& output_items); + std::string last_endpoint() override { return base_source_impl::last_endpoint(); } +}; + +} // namespace zeromq } // namespace gr #endif /* INCLUDED_ZEROMQ_PULL_SOURCE_IMPL_H */ diff --git a/gr-zeromq/lib/push_msg_sink_impl.cc b/gr-zeromq/lib/push_msg_sink_impl.cc index e9cc5bc435..0052e09210 100644 --- a/gr-zeromq/lib/push_msg_sink_impl.cc +++ b/gr-zeromq/lib/push_msg_sink_impl.cc @@ -29,57 +29,54 @@ #include "tag_headers.h" namespace gr { - namespace zeromq { +namespace zeromq { - push_msg_sink::sptr - push_msg_sink::make(char *address, int timeout) - { - return gnuradio::get_initial_sptr - (new push_msg_sink_impl(address, timeout)); - } +push_msg_sink::sptr push_msg_sink::make(char* address, int timeout) +{ + return gnuradio::get_initial_sptr(new push_msg_sink_impl(address, timeout)); +} - push_msg_sink_impl::push_msg_sink_impl(char *address, int timeout) - : gr::block("push_msg_sink", - gr::io_signature::make(0, 0, 0), - gr::io_signature::make(0, 0, 0)), - d_timeout(timeout) - { - int major, minor, patch; - zmq::version (&major, &minor, &patch); +push_msg_sink_impl::push_msg_sink_impl(char* address, int timeout) + : gr::block("push_msg_sink", + gr::io_signature::make(0, 0, 0), + gr::io_signature::make(0, 0, 0)), + d_timeout(timeout) +{ + int major, minor, patch; + zmq::version(&major, &minor, &patch); - if (major < 3) { - d_timeout = timeout*1000; - } + if (major < 3) { + d_timeout = timeout * 1000; + } - d_context = new zmq::context_t(1); - d_socket = new zmq::socket_t(*d_context, ZMQ_PUSH); + d_context = new zmq::context_t(1); + d_socket = new zmq::socket_t(*d_context, ZMQ_PUSH); - int time = 0; - d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); - d_socket->bind(address); + int time = 0; + d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); + d_socket->bind(address); - message_port_register_in(pmt::mp("in")); - set_msg_handler(pmt::mp("in"), - boost::bind(&push_msg_sink_impl::handler, this, _1)); - } + message_port_register_in(pmt::mp("in")); + set_msg_handler(pmt::mp("in"), boost::bind(&push_msg_sink_impl::handler, this, _1)); +} - push_msg_sink_impl::~push_msg_sink_impl() - { - d_socket->close(); - delete d_socket; - delete d_context; - } +push_msg_sink_impl::~push_msg_sink_impl() +{ + d_socket->close(); + delete d_socket; + delete d_context; +} - void push_msg_sink_impl::handler(pmt::pmt_t msg) - { - std::stringbuf sb(""); - pmt::serialize( msg, sb ); - std::string s = sb.str(); - zmq::message_t zmsg(s.size()); +void push_msg_sink_impl::handler(pmt::pmt_t msg) +{ + std::stringbuf sb(""); + pmt::serialize(msg, sb); + std::string s = sb.str(); + zmq::message_t zmsg(s.size()); - memcpy(zmsg.data(), s.c_str(), s.size()); - d_socket->send(zmsg); - } + memcpy(zmsg.data(), s.c_str(), s.size()); + d_socket->send(zmsg); +} - } /* namespace zeromq */ +} /* namespace zeromq */ } /* namespace gr */ diff --git a/gr-zeromq/lib/push_msg_sink_impl.h b/gr-zeromq/lib/push_msg_sink_impl.h index 0d0503cd55..4d3017f782 100644 --- a/gr-zeromq/lib/push_msg_sink_impl.h +++ b/gr-zeromq/lib/push_msg_sink_impl.h @@ -27,29 +27,30 @@ #include <zmq.hpp> namespace gr { - namespace zeromq { +namespace zeromq { - class push_msg_sink_impl : public push_msg_sink - { - private: - float d_timeout; - zmq::context_t *d_context; - zmq::socket_t *d_socket; +class push_msg_sink_impl : public push_msg_sink +{ +private: + float d_timeout; + zmq::context_t* d_context; + zmq::socket_t* d_socket; - public: - push_msg_sink_impl(char *address, int timeout); - ~push_msg_sink_impl(); +public: + push_msg_sink_impl(char* address, int timeout); + ~push_msg_sink_impl(); - void handler(pmt::pmt_t msg); - std::string last_endpoint() override { + void handler(pmt::pmt_t msg); + std::string last_endpoint() override + { char addr[256]; size_t addr_len = sizeof(addr); d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len); - return std::string(addr, addr_len-1); - } - }; + return std::string(addr, addr_len - 1); + } +}; - } // namespace zeromq +} // namespace zeromq } // namespace gr #endif /* INCLUDED_ZEROMQ_ZMQ_PUSH_MSG_SINK_IMPL_H */ diff --git a/gr-zeromq/lib/push_sink_impl.cc b/gr-zeromq/lib/push_sink_impl.cc index 5f9670e4ca..431970eef5 100644 --- a/gr-zeromq/lib/push_sink_impl.cc +++ b/gr-zeromq/lib/push_sink_impl.cc @@ -29,42 +29,42 @@ #include "tag_headers.h" namespace gr { - namespace zeromq { +namespace zeromq { - push_sink::sptr - push_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) - { - return gnuradio::get_initial_sptr - (new push_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm)); - } +push_sink::sptr push_sink::make( + size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm) +{ + return gnuradio::get_initial_sptr( + new push_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm)); +} - push_sink_impl::push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) - : gr::sync_block("push_sink", - gr::io_signature::make(1, 1, itemsize * vlen), - gr::io_signature::make(0, 0, 0)), - base_sink_impl(ZMQ_PUSH, itemsize, vlen, address, timeout, pass_tags, hwm) - { - /* All is delegated */ - } +push_sink_impl::push_sink_impl( + size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm) + : gr::sync_block("push_sink", + gr::io_signature::make(1, 1, itemsize * vlen), + gr::io_signature::make(0, 0, 0)), + base_sink_impl(ZMQ_PUSH, itemsize, vlen, address, timeout, pass_tags, hwm) +{ + /* All is delegated */ +} - int - push_sink_impl::work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items) - { - // Poll with a timeout (FIXME: scheduler can't wait for us) - zmq::pollitem_t itemsout[] = { { static_cast<void *>(*d_socket), 0, ZMQ_POLLOUT, 0 } }; - zmq::poll(&itemsout[0], 1, d_timeout); +int push_sink_impl::work(int noutput_items, + gr_vector_const_void_star& input_items, + gr_vector_void_star& output_items) +{ + // Poll with a timeout (FIXME: scheduler can't wait for us) + zmq::pollitem_t itemsout[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLOUT, 0 } }; + zmq::poll(&itemsout[0], 1, d_timeout); - // If we can send something, do it - if (itemsout[0].revents & ZMQ_POLLOUT) + // If we can send something, do it + if (itemsout[0].revents & ZMQ_POLLOUT) return send_message(input_items[0], noutput_items, nitems_read(0)); - // If not, do nothing - return 0; - } + // If not, do nothing + return 0; +} - } /* namespace zeromq */ +} /* namespace zeromq */ } /* namespace gr */ // vim: ts=2 sw=2 expandtab diff --git a/gr-zeromq/lib/push_sink_impl.h b/gr-zeromq/lib/push_sink_impl.h index 73480e1047..bfa3dacdc4 100644 --- a/gr-zeromq/lib/push_sink_impl.h +++ b/gr-zeromq/lib/push_sink_impl.h @@ -29,21 +29,26 @@ #include "base_impl.h" namespace gr { - namespace zeromq { - - class push_sink_impl : public push_sink, public base_sink_impl - { - public: - push_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm); - - int work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items); - - std::string last_endpoint() override {return base_sink_impl::last_endpoint();} - }; - - } // namespace zeromq +namespace zeromq { + +class push_sink_impl : public push_sink, public base_sink_impl +{ +public: + push_sink_impl(size_t itemsize, + size_t vlen, + char* address, + int timeout, + bool pass_tags, + int hwm); + + int work(int noutput_items, + gr_vector_const_void_star& input_items, + gr_vector_void_star& output_items); + + std::string last_endpoint() override { return base_sink_impl::last_endpoint(); } +}; + +} // namespace zeromq } // namespace gr #endif /* INCLUDED_ZEROMQ_ZMQ_PUSH_SINK_IMPL_H */ diff --git a/gr-zeromq/lib/rep_msg_sink_impl.cc b/gr-zeromq/lib/rep_msg_sink_impl.cc index 1a4696aee5..7466f77474 100644 --- a/gr-zeromq/lib/rep_msg_sink_impl.cc +++ b/gr-zeromq/lib/rep_msg_sink_impl.cc @@ -29,99 +29,100 @@ #include "tag_headers.h" namespace gr { - namespace zeromq { +namespace zeromq { - rep_msg_sink::sptr - rep_msg_sink::make(char *address, int timeout) - { - return gnuradio::get_initial_sptr - (new rep_msg_sink_impl(address, timeout)); - } +rep_msg_sink::sptr rep_msg_sink::make(char* address, int timeout) +{ + return gnuradio::get_initial_sptr(new rep_msg_sink_impl(address, timeout)); +} - rep_msg_sink_impl::rep_msg_sink_impl(char *address, int timeout) - : gr::block("rep_msg_sink", - gr::io_signature::make(0, 0, 0), - gr::io_signature::make(0, 0, 0)), +rep_msg_sink_impl::rep_msg_sink_impl(char* address, int timeout) + : gr::block("rep_msg_sink", + gr::io_signature::make(0, 0, 0), + gr::io_signature::make(0, 0, 0)), d_timeout(timeout), d_port(pmt::mp("in")) - { - int major, minor, patch; - zmq::version(&major, &minor, &patch); - - if (major < 3) { - d_timeout = timeout*1000; - } - - d_context = new zmq::context_t(1); - d_socket = new zmq::socket_t(*d_context, ZMQ_REP); - - int time = 0; - d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); - d_socket->bind (address); - - message_port_register_in(d_port); - } +{ + int major, minor, patch; + zmq::version(&major, &minor, &patch); - rep_msg_sink_impl::~rep_msg_sink_impl() - { - d_socket->close(); - delete d_socket; - delete d_context; + if (major < 3) { + d_timeout = timeout * 1000; } - bool rep_msg_sink_impl::start() - { - d_finished = false; - d_thread = new boost::thread(boost::bind(&rep_msg_sink_impl::readloop, this)); - return true; - } - - bool rep_msg_sink_impl::stop() - { - d_finished = true; - d_thread->join(); - return true; - } - - void rep_msg_sink_impl::readloop() - { - while(!d_finished) { + d_context = new zmq::context_t(1); + d_socket = new zmq::socket_t(*d_context, ZMQ_REP); + + int time = 0; + d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); + d_socket->bind(address); + + message_port_register_in(d_port); +} + +rep_msg_sink_impl::~rep_msg_sink_impl() +{ + d_socket->close(); + delete d_socket; + delete d_context; +} + +bool rep_msg_sink_impl::start() +{ + d_finished = false; + d_thread = new boost::thread(boost::bind(&rep_msg_sink_impl::readloop, this)); + return true; +} + +bool rep_msg_sink_impl::stop() +{ + d_finished = true; + d_thread->join(); + return true; +} + +void rep_msg_sink_impl::readloop() +{ + while (!d_finished) { // while we have data, wait for query... - while(!empty_p(d_port)) { + while (!empty_p(d_port)) { - // wait for query... - zmq::pollitem_t items[] = { { static_cast<void *>(*d_socket), 0, ZMQ_POLLIN, 0 } }; - zmq::poll (&items[0], 1, d_timeout); + // wait for query... + zmq::pollitem_t items[] = { + { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 } + }; + zmq::poll(&items[0], 1, d_timeout); - // If we got a reply, process - if (items[0].revents & ZMQ_POLLIN) { + // If we got a reply, process + if (items[0].revents & ZMQ_POLLIN) { - // receive data request - zmq::message_t request; + // receive data request + zmq::message_t request; #if USE_NEW_CPPZMQ_SEND_RECV - d_socket->recv(request); + d_socket->recv(request); #else - d_socket->recv(&request); + d_socket->recv(&request); #endif - int req_output_items = *(static_cast<int*>(request.data())); - if(req_output_items != 1) - throw std::runtime_error("Request was not 1 msg for rep/req request!!"); - - // create message copy and send - pmt::pmt_t msg = delete_head_nowait(d_port); - std::stringbuf sb(""); - pmt::serialize( msg, sb ); - std::string s = sb.str(); - zmq::message_t zmsg(s.size()); - memcpy( zmsg.data(), s.c_str(), s.size() ); - d_socket->send(zmsg); - } // if req - } // while !empty - - } // while !d_finished - } - - } /* namespace zeromq */ + int req_output_items = *(static_cast<int*>(request.data())); + if (req_output_items != 1) + throw std::runtime_error( + "Request was not 1 msg for rep/req request!!"); + + // create message copy and send + pmt::pmt_t msg = delete_head_nowait(d_port); + std::stringbuf sb(""); + pmt::serialize(msg, sb); + std::string s = sb.str(); + zmq::message_t zmsg(s.size()); + memcpy(zmsg.data(), s.c_str(), s.size()); + d_socket->send(zmsg); + } // if req + } // while !empty + + } // while !d_finished +} + +} /* namespace zeromq */ } /* namespace gr */ diff --git a/gr-zeromq/lib/rep_msg_sink_impl.h b/gr-zeromq/lib/rep_msg_sink_impl.h index 61895e6c32..cfd17ed9a3 100644 --- a/gr-zeromq/lib/rep_msg_sink_impl.h +++ b/gr-zeromq/lib/rep_msg_sink_impl.h @@ -27,37 +27,38 @@ #include "zmq_common_impl.h" namespace gr { - namespace zeromq { +namespace zeromq { - class rep_msg_sink_impl : public rep_msg_sink - { - private: - int d_timeout; - zmq::context_t *d_context; - zmq::socket_t *d_socket; - boost::thread *d_thread; - bool d_finished; +class rep_msg_sink_impl : public rep_msg_sink +{ +private: + int d_timeout; + zmq::context_t* d_context; + zmq::socket_t* d_socket; + boost::thread* d_thread; + bool d_finished; - const pmt::pmt_t d_port; + const pmt::pmt_t d_port; - void readloop(); + void readloop(); - public: - rep_msg_sink_impl(char *address, int timeout); - ~rep_msg_sink_impl(); +public: + rep_msg_sink_impl(char* address, int timeout); + ~rep_msg_sink_impl(); - bool start(); - bool stop(); + bool start(); + bool stop(); - std::string last_endpoint() override { + std::string last_endpoint() override + { char addr[256]; size_t addr_len = sizeof(addr); d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len); - return std::string(addr, addr_len-1); - } - }; + return std::string(addr, addr_len - 1); + } +}; - } // namespace zeromq +} // namespace zeromq } // namespace gr #endif /* INCLUDED_ZEROMQ_REP_MSG_SINK_IMPL_H */ diff --git a/gr-zeromq/lib/rep_sink_impl.cc b/gr-zeromq/lib/rep_sink_impl.cc index f402310cfc..c3b6d8c192 100644 --- a/gr-zeromq/lib/rep_sink_impl.cc +++ b/gr-zeromq/lib/rep_sink_impl.cc @@ -29,45 +29,44 @@ #include "tag_headers.h" namespace gr { - namespace zeromq { +namespace zeromq { - rep_sink::sptr - rep_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) - { - return gnuradio::get_initial_sptr - (new rep_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm)); - } +rep_sink::sptr rep_sink::make( + size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm) +{ + return gnuradio::get_initial_sptr( + new rep_sink_impl(itemsize, vlen, address, timeout, pass_tags, hwm)); +} - rep_sink_impl::rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) - : gr::sync_block("rep_sink", - gr::io_signature::make(1, 1, itemsize * vlen), - gr::io_signature::make(0, 0, 0)), - base_sink_impl(ZMQ_REP, itemsize, vlen, address, timeout, pass_tags, hwm) - { - /* All is delegated */ - } +rep_sink_impl::rep_sink_impl( + size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm) + : gr::sync_block("rep_sink", + gr::io_signature::make(1, 1, itemsize * vlen), + gr::io_signature::make(0, 0, 0)), + base_sink_impl(ZMQ_REP, itemsize, vlen, address, timeout, pass_tags, hwm) +{ + /* All is delegated */ +} - int - rep_sink_impl::work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items) - { - const uint8_t *in = (const uint8_t *) input_items[0]; - bool first = true; - int done = 0; +int rep_sink_impl::work(int noutput_items, + gr_vector_const_void_star& input_items, + gr_vector_void_star& output_items) +{ + const uint8_t* in = (const uint8_t*)input_items[0]; + bool first = true; + int done = 0; - /* Process as much as we can */ - while (1) - { + /* Process as much as we can */ + while (1) { /* Wait for a small time (FIXME: scheduler can't wait for us) */ - /* We only wait if its the first iteration, for the others we'll - * let the scheduler retry */ - zmq::pollitem_t items[] = { { static_cast<void *>(*d_socket), 0, ZMQ_POLLIN, 0 } }; + /* We only wait if its the first iteration, for the others we'll + * let the scheduler retry */ + zmq::pollitem_t items[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 } }; zmq::poll(&items[0], 1, first ? d_timeout : 0); - /* If we don't have anything, we're done */ + /* If we don't have anything, we're done */ if (!(items[0].revents & ZMQ_POLLIN)) - break; + break; /* Get and parse the request */ zmq::message_t request; @@ -78,10 +77,9 @@ namespace gr { #endif int nitems_send = noutput_items - done; - if (request.size() >= sizeof(uint32_t)) - { - int req = (int)*(static_cast<uint32_t*>(request.data())); - nitems_send = std::min(nitems_send, req); + if (request.size() >= sizeof(uint32_t)) { + int req = (int)*(static_cast<uint32_t*>(request.data())); + nitems_send = std::min(nitems_send, req); } /* Delegate the actual send */ @@ -89,11 +87,11 @@ namespace gr { /* Not the first anymore */ first = false; - } - - return done; } - } /* namespace zeromq */ + + return done; +} +} /* namespace zeromq */ } /* namespace gr */ // vim: ts=2 sw=2 expandtab diff --git a/gr-zeromq/lib/rep_sink_impl.h b/gr-zeromq/lib/rep_sink_impl.h index cfd1c09637..2fcf9853c6 100644 --- a/gr-zeromq/lib/rep_sink_impl.h +++ b/gr-zeromq/lib/rep_sink_impl.h @@ -27,20 +27,25 @@ #include "base_impl.h" namespace gr { - namespace zeromq { +namespace zeromq { - class rep_sink_impl : public rep_sink, public base_sink_impl - { - public: - rep_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm); +class rep_sink_impl : public rep_sink, public base_sink_impl +{ +public: + rep_sink_impl(size_t itemsize, + size_t vlen, + char* address, + int timeout, + bool pass_tags, + int hwm); - int work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items); - std::string last_endpoint() override {return base_sink_impl::last_endpoint();} - }; + int work(int noutput_items, + gr_vector_const_void_star& input_items, + gr_vector_void_star& output_items); + std::string last_endpoint() override { return base_sink_impl::last_endpoint(); } +}; - } // namespace zeromq +} // namespace zeromq } // namespace gr #endif /* INCLUDED_ZEROMQ_REP_SINK_IMPL_H */ diff --git a/gr-zeromq/lib/req_msg_source_impl.cc b/gr-zeromq/lib/req_msg_source_impl.cc index 6c80a77f27..6d44aa3cf0 100644 --- a/gr-zeromq/lib/req_msg_source_impl.cc +++ b/gr-zeromq/lib/req_msg_source_impl.cc @@ -31,104 +31,104 @@ #include "tag_headers.h" namespace gr { - namespace zeromq { +namespace zeromq { - req_msg_source::sptr - req_msg_source::make(char *address, int timeout) - { - return gnuradio::get_initial_sptr - (new req_msg_source_impl(address, timeout)); - } +req_msg_source::sptr req_msg_source::make(char* address, int timeout) +{ + return gnuradio::get_initial_sptr(new req_msg_source_impl(address, timeout)); +} - req_msg_source_impl::req_msg_source_impl(char *address, int timeout) - : gr::block("req_msg_source", - gr::io_signature::make(0, 0, 0), - gr::io_signature::make(0, 0, 0)), +req_msg_source_impl::req_msg_source_impl(char* address, int timeout) + : gr::block("req_msg_source", + gr::io_signature::make(0, 0, 0), + gr::io_signature::make(0, 0, 0)), d_timeout(timeout), d_port(pmt::mp("out")) - { - int major, minor, patch; - zmq::version(&major, &minor, &patch); - - if (major < 3) { - d_timeout = timeout*1000; - } - - d_context = new zmq::context_t(1); - d_socket = new zmq::socket_t(*d_context, ZMQ_REQ); - - int time = 0; - d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); - d_socket->connect (address); - - message_port_register_out(d_port); - } +{ + int major, minor, patch; + zmq::version(&major, &minor, &patch); - req_msg_source_impl::~req_msg_source_impl() - { - d_socket->close(); - delete d_socket; - delete d_context; + if (major < 3) { + d_timeout = timeout * 1000; } - bool req_msg_source_impl::start() - { - d_finished = false; - d_thread = new boost::thread(boost::bind(&req_msg_source_impl::readloop, this)); - return true; - } - - bool req_msg_source_impl::stop() - { - d_finished = true; - d_thread->join(); - return true; - } - - void req_msg_source_impl::readloop() - { - while(!d_finished){ - //std::cout << "readloop\n"; - - zmq::pollitem_t itemsout[] = { { static_cast<void *>(*d_socket), 0, ZMQ_POLLOUT, 0 } }; + d_context = new zmq::context_t(1); + d_socket = new zmq::socket_t(*d_context, ZMQ_REQ); + + int time = 0; + d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time)); + d_socket->connect(address); + + message_port_register_out(d_port); +} + +req_msg_source_impl::~req_msg_source_impl() +{ + d_socket->close(); + delete d_socket; + delete d_context; +} + +bool req_msg_source_impl::start() +{ + d_finished = false; + d_thread = new boost::thread(boost::bind(&req_msg_source_impl::readloop, this)); + return true; +} + +bool req_msg_source_impl::stop() +{ + d_finished = true; + d_thread->join(); + return true; +} + +void req_msg_source_impl::readloop() +{ + while (!d_finished) { + // std::cout << "readloop\n"; + + zmq::pollitem_t itemsout[] = { + { static_cast<void*>(*d_socket), 0, ZMQ_POLLOUT, 0 } + }; zmq::poll(&itemsout[0], 1, d_timeout); // If we got a reply, process if (itemsout[0].revents & ZMQ_POLLOUT) { - // Request data, FIXME non portable? - int nmsg = 1; - zmq::message_t request(sizeof(int)); - memcpy((void *) request.data (), &nmsg, sizeof(int)); + // Request data, FIXME non portable? + int nmsg = 1; + zmq::message_t request(sizeof(int)); + memcpy((void*)request.data(), &nmsg, sizeof(int)); #if USE_NEW_CPPZMQ_SEND_RECV - d_socket->send(request, zmq::send_flags::none); + d_socket->send(request, zmq::send_flags::none); #else - d_socket->send(request); + d_socket->send(request); #endif } - zmq::pollitem_t items[] = { { static_cast<void *>(*d_socket), 0, ZMQ_POLLIN, 0 } }; - zmq::poll (&items[0], 1, d_timeout); + zmq::pollitem_t items[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 } }; + zmq::poll(&items[0], 1, d_timeout); // If we got a reply, process if (items[0].revents & ZMQ_POLLIN) { - // Receive data - zmq::message_t msg; + // Receive data + zmq::message_t msg; #if USE_NEW_CPPZMQ_SEND_RECV - d_socket->recv(msg); + d_socket->recv(msg); #else - d_socket->recv(&msg); + d_socket->recv(&msg); #endif - std::string buf(static_cast<char*>(msg.data()), msg.size()); - std::stringbuf sb(buf); - pmt::pmt_t m = pmt::deserialize(sb); - message_port_pub(d_port, m); + std::string buf(static_cast<char*>(msg.data()), msg.size()); + std::stringbuf sb(buf); + pmt::pmt_t m = pmt::deserialize(sb); + message_port_pub(d_port, m); } else { - boost::this_thread::sleep(boost::posix_time::microseconds(100)); + boost::this_thread::sleep(boost::posix_time::microseconds(100)); } - } } +} - } /* namespace zeromq */ +} /* namespace zeromq */ } /* namespace gr */ diff --git a/gr-zeromq/lib/req_msg_source_impl.h b/gr-zeromq/lib/req_msg_source_impl.h index 7fb3e74e70..9610229f5d 100644 --- a/gr-zeromq/lib/req_msg_source_impl.h +++ b/gr-zeromq/lib/req_msg_source_impl.h @@ -27,37 +27,38 @@ #include "zmq_common_impl.h" namespace gr { - namespace zeromq { +namespace zeromq { - class req_msg_source_impl : public req_msg_source - { - private: - int d_timeout; - zmq::context_t *d_context; - zmq::socket_t *d_socket; - boost::thread *d_thread; - const pmt::pmt_t d_port; +class req_msg_source_impl : public req_msg_source +{ +private: + int d_timeout; + zmq::context_t* d_context; + zmq::socket_t* d_socket; + boost::thread* d_thread; + const pmt::pmt_t d_port; - void readloop(); + void readloop(); - public: - bool d_finished; +public: + bool d_finished; - req_msg_source_impl(char *address, int timeout); - ~req_msg_source_impl(); + req_msg_source_impl(char* address, int timeout); + ~req_msg_source_impl(); - bool start(); - bool stop(); + bool start(); + bool stop(); - std::string last_endpoint() override { + std::string last_endpoint() override + { char addr[256]; size_t addr_len = sizeof(addr); d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len); - return std::string(addr, addr_len-1); - } - }; + return std::string(addr, addr_len - 1); + } +}; - } // namespace zeromq +} // namespace zeromq } // namespace gr #endif /* INCLUDED_ZEROMQ_REQ_MSG_SOURCE_IMPL_H */ diff --git a/gr-zeromq/lib/req_source_impl.cc b/gr-zeromq/lib/req_source_impl.cc index 526736389e..5a498cf9d2 100644 --- a/gr-zeromq/lib/req_source_impl.cc +++ b/gr-zeromq/lib/req_source_impl.cc @@ -29,80 +29,77 @@ #include "tag_headers.h" namespace gr { - namespace zeromq { - - req_source::sptr - req_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) - { - return gnuradio::get_initial_sptr - (new req_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm)); - } - - req_source_impl::req_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) - : gr::sync_block("req_source", - gr::io_signature::make(0, 0, 0), - gr::io_signature::make(1, 1, itemsize * vlen)), - base_source_impl(ZMQ_REQ, itemsize, vlen, address, timeout, pass_tags, hwm), - d_req_pending(false) - { - /* All is delegated */ - } - - int - req_source_impl::work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items) - { +namespace zeromq { + +req_source::sptr req_source::make( + size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm) +{ + return gnuradio::get_initial_sptr( + new req_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm)); +} + +req_source_impl::req_source_impl( + size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm) + : gr::sync_block("req_source", + gr::io_signature::make(0, 0, 0), + gr::io_signature::make(1, 1, itemsize * vlen)), + base_source_impl(ZMQ_REQ, itemsize, vlen, address, timeout, pass_tags, hwm), + d_req_pending(false) +{ + /* All is delegated */ +} + +int req_source_impl::work(int noutput_items, + gr_vector_const_void_star& input_items, + gr_vector_void_star& output_items) +{ #if 0 #endif - uint8_t *out = (uint8_t *) output_items[0]; - bool first = true; - int done = 0; - - /* Process as much as we can */ - while (1) - { - if (has_pending()) - { - /* Flush anything pending */ - done += flush_pending(out + (done * d_vsize), noutput_items - done, nitems_written(0) + done); - - /* No more space ? */ - if (done == noutput_items) - break; - } - else - { - /* Send request if needed */ - if (!d_req_pending) - { - /* The REP/REQ pattern state machine guarantees we can send at this point */ - uint32_t req_len = noutput_items - done; - zmq::message_t request(sizeof(uint32_t)); - memcpy ((void *) request.data (), &req_len, sizeof(uint32_t)); - d_socket->send(request); - - d_req_pending = true; - } - - /* Try to get the next message */ - if (!load_message(first)) - break; /* No message, we're done for now */ - - /* Got response */ - d_req_pending = false; - - /* Not the first anymore */ - first = false; + uint8_t* out = (uint8_t*)output_items[0]; + bool first = true; + int done = 0; + + /* Process as much as we can */ + while (1) { + if (has_pending()) { + /* Flush anything pending */ + done += flush_pending( + out + (done * d_vsize), noutput_items - done, nitems_written(0) + done); + + /* No more space ? */ + if (done == noutput_items) + break; + } else { + /* Send request if needed */ + if (!d_req_pending) { + /* The REP/REQ pattern state machine guarantees we can send at this point + */ + uint32_t req_len = noutput_items - done; + zmq::message_t request(sizeof(uint32_t)); + memcpy((void*)request.data(), &req_len, sizeof(uint32_t)); + d_socket->send(request); + + d_req_pending = true; + } + + /* Try to get the next message */ + if (!load_message(first)) + break; /* No message, we're done for now */ + + /* Got response */ + d_req_pending = false; + + /* Not the first anymore */ + first = false; } - } + } - return done; + return done; - return 0; - } + return 0; +} - } /* namespace zeromq */ +} /* namespace zeromq */ } /* namespace gr */ // vim: ts=2 sw=2 expandtab diff --git a/gr-zeromq/lib/req_source_impl.h b/gr-zeromq/lib/req_source_impl.h index ec87832518..6e2d71f565 100644 --- a/gr-zeromq/lib/req_source_impl.h +++ b/gr-zeromq/lib/req_source_impl.h @@ -29,23 +29,29 @@ #include "base_impl.h" namespace gr { - namespace zeromq { +namespace zeromq { - class req_source_impl : public req_source, public base_source_impl - { - public: - req_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm); +class req_source_impl : public req_source, public base_source_impl +{ +public: + req_source_impl(size_t itemsize, + size_t vlen, + char* address, + int timeout, + bool pass_tags, + int hwm); - int work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items); + int work(int noutput_items, + gr_vector_const_void_star& input_items, + gr_vector_void_star& output_items); - std::string last_endpoint() override {return base_source_impl::last_endpoint();} - private: - bool d_req_pending; - }; + std::string last_endpoint() override { return base_source_impl::last_endpoint(); } - } // namespace zeromq +private: + bool d_req_pending; +}; + +} // namespace zeromq } // namespace gr #endif /* INCLUDED_ZEROMQ_REQ_SOURCE_IMPL_H */ diff --git a/gr-zeromq/lib/sub_msg_source_impl.cc b/gr-zeromq/lib/sub_msg_source_impl.cc index 3a08499b12..3a9fb906da 100644 --- a/gr-zeromq/lib/sub_msg_source_impl.cc +++ b/gr-zeromq/lib/sub_msg_source_impl.cc @@ -31,86 +31,84 @@ #include "tag_headers.h" namespace gr { - namespace zeromq { +namespace zeromq { - sub_msg_source::sptr - sub_msg_source::make(char *address, int timeout) - { - return gnuradio::get_initial_sptr - (new sub_msg_source_impl(address, timeout)); - } +sub_msg_source::sptr sub_msg_source::make(char* address, int timeout) +{ + return gnuradio::get_initial_sptr(new sub_msg_source_impl(address, timeout)); +} - sub_msg_source_impl::sub_msg_source_impl(char *address, int timeout) - : gr::block("sub_msg_source", - gr::io_signature::make(0, 0, 0), - gr::io_signature::make(0, 0, 0)), +sub_msg_source_impl::sub_msg_source_impl(char* address, int timeout) + : gr::block("sub_msg_source", + gr::io_signature::make(0, 0, 0), + gr::io_signature::make(0, 0, 0)), d_timeout(timeout), d_port(pmt::mp("out")) - { - int major, minor, patch; - zmq::version(&major, &minor, &patch); - - if (major < 3) { - d_timeout = timeout*1000; - } - - d_context = new zmq::context_t(1); - d_socket = new zmq::socket_t(*d_context, ZMQ_SUB); - - d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0); - d_socket->connect (address); - - message_port_register_out(d_port); - } +{ + int major, minor, patch; + zmq::version(&major, &minor, &patch); - sub_msg_source_impl::~sub_msg_source_impl() - { - d_socket->close(); - delete d_socket; - delete d_context; + if (major < 3) { + d_timeout = timeout * 1000; } - bool sub_msg_source_impl::start() - { - d_finished = false; - d_thread = new boost::thread(boost::bind(&sub_msg_source_impl::readloop, this)); - return true; - } - - bool sub_msg_source_impl::stop() - { - d_finished = true; - d_thread->join(); - return true; - } - - void sub_msg_source_impl::readloop() - { - while(!d_finished){ - - zmq::pollitem_t items[] = { { static_cast<void *>(*d_socket), 0, ZMQ_POLLIN, 0 } }; + d_context = new zmq::context_t(1); + d_socket = new zmq::socket_t(*d_context, ZMQ_SUB); + + d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0); + d_socket->connect(address); + + message_port_register_out(d_port); +} + +sub_msg_source_impl::~sub_msg_source_impl() +{ + d_socket->close(); + delete d_socket; + delete d_context; +} + +bool sub_msg_source_impl::start() +{ + d_finished = false; + d_thread = new boost::thread(boost::bind(&sub_msg_source_impl::readloop, this)); + return true; +} + +bool sub_msg_source_impl::stop() +{ + d_finished = true; + d_thread->join(); + return true; +} + +void sub_msg_source_impl::readloop() +{ + while (!d_finished) { + + zmq::pollitem_t items[] = { { static_cast<void*>(*d_socket), 0, ZMQ_POLLIN, 0 } }; zmq::poll(&items[0], 1, d_timeout); // If we got a reply, process if (items[0].revents & ZMQ_POLLIN) { - // Receive data - zmq::message_t msg; + // Receive data + zmq::message_t msg; #if USE_NEW_CPPZMQ_SEND_RECV - d_socket->recv(msg); + d_socket->recv(msg); #else - d_socket->recv(&msg); + d_socket->recv(&msg); #endif - std::string buf(static_cast<char*>(msg.data()), msg.size()); - std::stringbuf sb(buf); - pmt::pmt_t m = pmt::deserialize(sb); + std::string buf(static_cast<char*>(msg.data()), msg.size()); + std::stringbuf sb(buf); + pmt::pmt_t m = pmt::deserialize(sb); - message_port_pub(d_port, m); + message_port_pub(d_port, m); } else { - boost::this_thread::sleep(boost::posix_time::microseconds(100)); + boost::this_thread::sleep(boost::posix_time::microseconds(100)); } - } } +} - } /* namespace zeromq */ +} /* namespace zeromq */ } /* namespace gr */ diff --git a/gr-zeromq/lib/sub_msg_source_impl.h b/gr-zeromq/lib/sub_msg_source_impl.h index 4ffce4ba5f..f83b6d4639 100644 --- a/gr-zeromq/lib/sub_msg_source_impl.h +++ b/gr-zeromq/lib/sub_msg_source_impl.h @@ -27,37 +27,38 @@ #include "zmq_common_impl.h" namespace gr { - namespace zeromq { +namespace zeromq { - class sub_msg_source_impl : public sub_msg_source - { - private: - int d_timeout; // microseconds, -1 is blocking - zmq::context_t *d_context; - zmq::socket_t *d_socket; - boost::thread *d_thread; - const pmt::pmt_t d_port; +class sub_msg_source_impl : public sub_msg_source +{ +private: + int d_timeout; // microseconds, -1 is blocking + zmq::context_t* d_context; + zmq::socket_t* d_socket; + boost::thread* d_thread; + const pmt::pmt_t d_port; - void readloop(); + void readloop(); - public: - bool d_finished; +public: + bool d_finished; - sub_msg_source_impl(char *address, int timeout); - ~sub_msg_source_impl(); + sub_msg_source_impl(char* address, int timeout); + ~sub_msg_source_impl(); - bool start(); - bool stop(); + bool start(); + bool stop(); - std::string last_endpoint() override { + std::string last_endpoint() override + { char addr[256]; size_t addr_len = sizeof(addr); d_socket->getsockopt(ZMQ_LAST_ENDPOINT, addr, &addr_len); - return std::string(addr, addr_len-1); - } - }; + return std::string(addr, addr_len - 1); + } +}; - } // namespace zeromq +} // namespace zeromq } // namespace gr #endif /* INCLUDED_ZEROMQ_SUB_MSG_SOURCE_IMPL_H */ diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc index 9a2e0bfe15..d55ca116f0 100644 --- a/gr-zeromq/lib/sub_source_impl.cc +++ b/gr-zeromq/lib/sub_source_impl.cc @@ -29,61 +29,58 @@ #include "tag_headers.h" namespace gr { - namespace zeromq { +namespace zeromq { - sub_source::sptr - sub_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) - { - return gnuradio::get_initial_sptr - (new sub_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm)); - } +sub_source::sptr sub_source::make( + size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm) +{ + return gnuradio::get_initial_sptr( + new sub_source_impl(itemsize, vlen, address, timeout, pass_tags, hwm)); +} - sub_source_impl::sub_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm) - : gr::sync_block("sub_source", - gr::io_signature::make(0, 0, 0), - gr::io_signature::make(1, 1, itemsize * vlen)), - base_source_impl(ZMQ_SUB, itemsize, vlen, address, timeout, pass_tags, hwm) - { - /* Subscribe */ - d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0); - } +sub_source_impl::sub_source_impl( + size_t itemsize, size_t vlen, char* address, int timeout, bool pass_tags, int hwm) + : gr::sync_block("sub_source", + gr::io_signature::make(0, 0, 0), + gr::io_signature::make(1, 1, itemsize * vlen)), + base_source_impl(ZMQ_SUB, itemsize, vlen, address, timeout, pass_tags, hwm) +{ + /* Subscribe */ + d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0); +} - int - sub_source_impl::work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items) - { - uint8_t *out = (uint8_t *) output_items[0]; - bool first = true; - int done = 0; +int sub_source_impl::work(int noutput_items, + gr_vector_const_void_star& input_items, + gr_vector_void_star& output_items) +{ + uint8_t* out = (uint8_t*)output_items[0]; + bool first = true; + int done = 0; - /* Process as much as we can */ - while (1) - { - if (has_pending()) - { - /* Flush anything pending */ - done += flush_pending(out + (done * d_vsize), noutput_items - done, nitems_written(0) + done); + /* Process as much as we can */ + while (1) { + if (has_pending()) { + /* Flush anything pending */ + done += flush_pending( + out + (done * d_vsize), noutput_items - done, nitems_written(0) + done); - /* No more space ? */ - if (done == noutput_items) - break; - } - else - { - /* Try to get the next message */ - if (!load_message(first)) - break; /* No message, we're done for now */ + /* No more space ? */ + if (done == noutput_items) + break; + } else { + /* Try to get the next message */ + if (!load_message(first)) + break; /* No message, we're done for now */ - /* Not the first anymore */ - first = false; + /* Not the first anymore */ + first = false; } - } - - return done; } - } /* namespace zeromq */ + return done; +} + +} /* namespace zeromq */ } /* namespace gr */ // vim: ts=2 sw=2 expandtab diff --git a/gr-zeromq/lib/sub_source_impl.h b/gr-zeromq/lib/sub_source_impl.h index 727d76d6a2..890907b802 100644 --- a/gr-zeromq/lib/sub_source_impl.h +++ b/gr-zeromq/lib/sub_source_impl.h @@ -29,22 +29,26 @@ #include "base_impl.h" namespace gr { - namespace zeromq { - - class sub_source_impl : public sub_source, public base_source_impl - { - public: - sub_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags, int hwm); - - int work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items); - - std::string last_endpoint() override {return base_source_impl::last_endpoint();} - }; - - } // namespace zeromq +namespace zeromq { + +class sub_source_impl : public sub_source, public base_source_impl +{ +public: + sub_source_impl(size_t itemsize, + size_t vlen, + char* address, + int timeout, + bool pass_tags, + int hwm); + + int work(int noutput_items, + gr_vector_const_void_star& input_items, + gr_vector_void_star& output_items); + + std::string last_endpoint() override { return base_source_impl::last_endpoint(); } +}; + +} // namespace zeromq } // namespace gr #endif /* INCLUDED_ZEROMQ_SUB_SOURCE_IMPL_H */ - diff --git a/gr-zeromq/lib/tag_headers.cc b/gr-zeromq/lib/tag_headers.cc index 5a5a417420..3b59bebf77 100644 --- a/gr-zeromq/lib/tag_headers.cc +++ b/gr-zeromq/lib/tag_headers.cc @@ -26,86 +26,84 @@ #include <cstring> #include <zmq.hpp> -#define GR_HEADER_MAGIC 0x5FF0 +#define GR_HEADER_MAGIC 0x5FF0 #define GR_HEADER_VERSION 0x01 namespace gr { - namespace zeromq { +namespace zeromq { - struct membuf: std::streambuf +struct membuf : std::streambuf { + membuf(void* b, size_t len) { - membuf(void *b, size_t len) - { - char *bc = static_cast<char*>(b); - this->setg(bc, bc, bc+len); - } - }; - - std::string - gen_tag_header(uint64_t offset, std::vector<gr::tag_t> &tags) - { - std::stringbuf sb(""); - std::ostream ss(&sb); - - uint16_t header_magic = GR_HEADER_MAGIC; - uint8_t header_version = GR_HEADER_VERSION; - uint64_t ntags = (uint64_t)tags.size(); - - ss.write( (const char*)&header_magic, sizeof(uint16_t) ); - ss.write( (const char*)&header_version, sizeof(uint8_t) ); - ss.write( (const char*)&offset, sizeof(uint64_t) ); - ss.write( (const char*)&ntags, sizeof(uint64_t) ); - - for(size_t i=0; i<tags.size(); i++) - { - ss.write( (const char *)&tags[i].offset, sizeof(uint64_t) ); - pmt::serialize( tags[i].key, sb ); - pmt::serialize( tags[i].value, sb ); - pmt::serialize( tags[i].srcid, sb ); - } - - return sb.str(); + char* bc = static_cast<char*>(b); + this->setg(bc, bc, bc + len); + } +}; + +std::string gen_tag_header(uint64_t offset, std::vector<gr::tag_t>& tags) +{ + std::stringbuf sb(""); + std::ostream ss(&sb); + + uint16_t header_magic = GR_HEADER_MAGIC; + uint8_t header_version = GR_HEADER_VERSION; + uint64_t ntags = (uint64_t)tags.size(); + + ss.write((const char*)&header_magic, sizeof(uint16_t)); + ss.write((const char*)&header_version, sizeof(uint8_t)); + ss.write((const char*)&offset, sizeof(uint64_t)); + ss.write((const char*)&ntags, sizeof(uint64_t)); + + for (size_t i = 0; i < tags.size(); i++) { + ss.write((const char*)&tags[i].offset, sizeof(uint64_t)); + pmt::serialize(tags[i].key, sb); + pmt::serialize(tags[i].value, sb); + pmt::serialize(tags[i].srcid, sb); } - size_t - parse_tag_header(zmq::message_t &msg, uint64_t &offset_out, std::vector<gr::tag_t> &tags_out) - { - membuf sb(msg.data(), msg.size()); - std::istream iss(&sb); + return sb.str(); +} - size_t min_len = sizeof(uint16_t) + sizeof(uint8_t) + sizeof(uint64_t) + sizeof(uint64_t); - if (msg.size() < min_len) +size_t parse_tag_header(zmq::message_t& msg, + uint64_t& offset_out, + std::vector<gr::tag_t>& tags_out) +{ + membuf sb(msg.data(), msg.size()); + std::istream iss(&sb); + + size_t min_len = + sizeof(uint16_t) + sizeof(uint8_t) + sizeof(uint64_t) + sizeof(uint64_t); + if (msg.size() < min_len) throw std::runtime_error("incoming zmq msg too small to hold gr tag header!"); - uint16_t header_magic; - uint8_t header_version; - uint64_t rcv_ntags; + uint16_t header_magic; + uint8_t header_version; + uint64_t rcv_ntags; - iss.read( (char*)&header_magic, sizeof(uint16_t) ); - iss.read( (char*)&header_version, sizeof(uint8_t) ); + iss.read((char*)&header_magic, sizeof(uint16_t)); + iss.read((char*)&header_version, sizeof(uint8_t)); - if(header_magic != GR_HEADER_MAGIC) + if (header_magic != GR_HEADER_MAGIC) throw std::runtime_error("gr header magic does not match!"); - if(header_version != 1) + if (header_version != 1) throw std::runtime_error("gr header version too high!"); - iss.read( (char*)&offset_out, sizeof(uint64_t) ); - iss.read( (char*)&rcv_ntags, sizeof(uint64_t) ); + iss.read((char*)&offset_out, sizeof(uint64_t)); + iss.read((char*)&rcv_ntags, sizeof(uint64_t)); - for(size_t i=0; i<rcv_ntags; i++) - { + for (size_t i = 0; i < rcv_ntags; i++) { gr::tag_t newtag; - sb.sgetn( (char*)&(newtag.offset), sizeof(uint64_t) ); - newtag.key = pmt::deserialize( sb ); - newtag.value = pmt::deserialize( sb ); - newtag.srcid = pmt::deserialize( sb ); + sb.sgetn((char*)&(newtag.offset), sizeof(uint64_t)); + newtag.key = pmt::deserialize(sb); + newtag.value = pmt::deserialize(sb); + newtag.srcid = pmt::deserialize(sb); tags_out.push_back(newtag); - } - - return msg.size() - sb.in_avail(); } - } /* namespace zeromq */ + + return msg.size() - sb.in_avail(); +} +} /* namespace zeromq */ } /* namespace gr */ // vim: ts=2 sw=2 expandtab diff --git a/gr-zeromq/lib/tag_headers.h b/gr-zeromq/lib/tag_headers.h index dede5e93a6..a2d0129736 100644 --- a/gr-zeromq/lib/tag_headers.h +++ b/gr-zeromq/lib/tag_headers.h @@ -30,12 +30,14 @@ #include <zmq.hpp> namespace gr { - namespace zeromq { +namespace zeromq { - std::string gen_tag_header(uint64_t offset, std::vector<gr::tag_t> &tags); - size_t parse_tag_header(zmq::message_t &msg, uint64_t &offset_out, std::vector<gr::tag_t> &tags_out); - - } /* namespace zeromq */ +std::string gen_tag_header(uint64_t offset, std::vector<gr::tag_t>& tags); +size_t parse_tag_header(zmq::message_t& msg, + uint64_t& offset_out, + std::vector<gr::tag_t>& tags_out); + +} /* namespace zeromq */ } /* namespace gr */ #endif /* ZEROMQ_TAG_HEADERS_H */ diff --git a/gr-zeromq/lib/zmq_common_impl.h b/gr-zeromq/lib/zmq_common_impl.h index e73a3a17d3..8fb934fbdc 100644 --- a/gr-zeromq/lib/zmq_common_impl.h +++ b/gr-zeromq/lib/zmq_common_impl.h @@ -25,7 +25,8 @@ #include <zmq.hpp> -#if defined (CPPZMQ_VERSION) && defined (ZMQ_MAKE_VERSION) && CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 1) +#if defined(CPPZMQ_VERSION) && defined(ZMQ_MAKE_VERSION) && \ + CPPZMQ_VERSION >= ZMQ_MAKE_VERSION(4, 3, 1) #define USE_NEW_CPPZMQ_SEND_RECV 1 #else #define USE_NEW_CPPZMQ_SEND_RECV 0 |