summaryrefslogtreecommitdiff
path: root/gr-zeromq/lib/pub_msg_sink_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'gr-zeromq/lib/pub_msg_sink_impl.cc')
-rw-r--r--gr-zeromq/lib/pub_msg_sink_impl.cc83
1 files changed, 40 insertions, 43 deletions
diff --git a/gr-zeromq/lib/pub_msg_sink_impl.cc b/gr-zeromq/lib/pub_msg_sink_impl.cc
index 5fc7164d2b..77ffc469cd 100644
--- a/gr-zeromq/lib/pub_msg_sink_impl.cc
+++ b/gr-zeromq/lib/pub_msg_sink_impl.cc
@@ -29,55 +29,52 @@
#include "tag_headers.h"
namespace gr {
- namespace zeromq {
+namespace zeromq {
- pub_msg_sink::sptr
- pub_msg_sink::make(char *address, int timeout)
- {
- return gnuradio::get_initial_sptr
- (new pub_msg_sink_impl(address, timeout));
+pub_msg_sink::sptr pub_msg_sink::make(char* address, int timeout)
+{
+ return gnuradio::get_initial_sptr(new pub_msg_sink_impl(address, timeout));
+}
+
+pub_msg_sink_impl::pub_msg_sink_impl(char* address, int timeout)
+ : gr::block("pub_msg_sink",
+ gr::io_signature::make(0, 0, 0),
+ gr::io_signature::make(0, 0, 0)),
+ d_timeout(timeout)
+{
+ int major, minor, patch;
+ zmq::version(&major, &minor, &patch);
+ if (major < 3) {
+ d_timeout = timeout * 1000;
}
- pub_msg_sink_impl::pub_msg_sink_impl(char *address, int timeout)
- : gr::block("pub_msg_sink",
- gr::io_signature::make(0, 0, 0),
- gr::io_signature::make(0, 0, 0)),
- d_timeout(timeout)
- {
- int major, minor, patch;
- zmq::version (&major, &minor, &patch);
- if (major < 3) {
- d_timeout = timeout*1000;
- }
+ d_context = new zmq::context_t(1);
+ d_socket = new zmq::socket_t(*d_context, ZMQ_PUB);
+ int time = 0;
+ d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
+ d_socket->bind(address);
- d_context = new zmq::context_t(1);
- d_socket = new zmq::socket_t(*d_context, ZMQ_PUB);
- int time = 0;
- d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
- d_socket->bind(address);
+ message_port_register_in(pmt::mp("in"));
+ set_msg_handler(pmt::mp("in"), boost::bind(&pub_msg_sink_impl::handler, this, _1));
+}
- message_port_register_in(pmt::mp("in"));
- set_msg_handler( pmt::mp("in"),
- boost::bind(&pub_msg_sink_impl::handler, this, _1));
- }
+pub_msg_sink_impl::~pub_msg_sink_impl()
+{
+ d_socket->close();
+ delete d_socket;
+ delete d_context;
+}
- pub_msg_sink_impl::~pub_msg_sink_impl()
- {
- d_socket->close();
- delete d_socket;
- delete d_context;
- }
+void pub_msg_sink_impl::handler(pmt::pmt_t msg)
+{
+ std::stringbuf sb("");
+ pmt::serialize(msg, sb);
+ std::string s = sb.str();
+ zmq::message_t zmsg(s.size());
- void pub_msg_sink_impl::handler(pmt::pmt_t msg)
- {
- std::stringbuf sb("");
- pmt::serialize(msg, sb);
- std::string s = sb.str();
- zmq::message_t zmsg(s.size());
-
- memcpy(zmsg.data(), s.c_str(), s.size());
- d_socket->send(zmsg);
- }
+ memcpy(zmsg.data(), s.c_str(), s.size());
+ d_socket->send(zmsg);
+}
- } /* namespace zeromq */
+} /* namespace zeromq */
} /* namespace gr */