summaryrefslogtreecommitdiff
path: root/gr-zeromq/lib/sub_msg_source_impl.cc
diff options
context:
space:
mode:
authorJohnathan Corgan <johnathan@corganlabs.com>2015-01-12 13:46:33 -0800
committerJohnathan Corgan <johnathan@corganlabs.com>2015-01-12 13:46:33 -0800
commit9bf7123e477772bcb5fc53d3139e75c4d63a044a (patch)
tree61bfa9d6d5ca23f531a8e2f99a7ead339bca05bc /gr-zeromq/lib/sub_msg_source_impl.cc
parent3827ff325ef1bcdbeedb46b7b4efff4f897da0ae (diff)
zeromq: cleanup and convert sub_msg_source to derive from gr::block
Diffstat (limited to 'gr-zeromq/lib/sub_msg_source_impl.cc')
-rw-r--r--gr-zeromq/lib/sub_msg_source_impl.cc65
1 files changed, 29 insertions, 36 deletions
diff --git a/gr-zeromq/lib/sub_msg_source_impl.cc b/gr-zeromq/lib/sub_msg_source_impl.cc
index f7a9bc9c26..b016405d40 100644
--- a/gr-zeromq/lib/sub_msg_source_impl.cc
+++ b/gr-zeromq/lib/sub_msg_source_impl.cc
@@ -39,19 +39,21 @@ namespace gr {
}
sub_msg_source_impl::sub_msg_source_impl(char *address, int timeout)
- : gr::sync_block("sub_msg_source",
- gr::io_signature::make(0, 0, 0),
- gr::io_signature::make(0, 0, 0)),
+ : gr::block("sub_msg_source",
+ gr::io_signature::make(0, 0, 0),
+ gr::io_signature::make(0, 0, 0)),
d_timeout(timeout)
{
int major, minor, patch;
- zmq::version (&major, &minor, &patch);
+ zmq::version(&major, &minor, &patch);
+
if (major < 3) {
d_timeout = timeout*1000;
}
+
d_context = new zmq::context_t(1);
d_socket = new zmq::socket_t(*d_context, ZMQ_SUB);
- //int time = 0;
+
d_socket->setsockopt(ZMQ_SUBSCRIBE, "", 0);
d_socket->connect (address);
@@ -65,52 +67,43 @@ namespace gr {
delete d_context;
}
- bool sub_msg_source_impl::start(){
+ bool sub_msg_source_impl::start()
+ {
d_finished = false;
- d_thread = new boost::thread( boost::bind( &sub_msg_source_impl::readloop , this ) );
+ d_thread = new boost::thread(boost::bind(&sub_msg_source_impl::readloop, this));
return true;
}
- bool sub_msg_source_impl::stop(){
+ bool sub_msg_source_impl::stop()
+ {
d_finished = true;
- d_thread->join();
+ d_thread->join();
return true;
}
- void sub_msg_source_impl::readloop(){
+ void sub_msg_source_impl::readloop()
+ {
while(!d_finished){
- //std::cout << "readloop\n";
-
+
zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
- zmq::poll (&items[0], 1, d_timeout);
+ zmq::poll(&items[0], 1, d_timeout);
// If we got a reply, process
if (items[0].revents & ZMQ_POLLIN) {
- // Receive data
- zmq::message_t msg;
- d_socket->recv(&msg);
-
- //std::cout << "got msg...\n";
-
- std::string buf(static_cast<char*>(msg.data()), msg.size());
- std::stringbuf sb(buf);
- pmt::pmt_t m = pmt::deserialize(sb);
- //std::cout << m << "\n";
- message_port_pub(pmt::mp("out"), m);
-
- } else {
- usleep(100);
- }
- }
- }
+ // Receive data
+ zmq::message_t msg;
+ d_socket->recv(&msg);
- int
- sub_msg_source_impl::work(int noutput_items,
- gr_vector_const_void_star &input_items,
- gr_vector_void_star &output_items)
- {
- return noutput_items;
+ std::string buf(static_cast<char*>(msg.data()), msg.size());
+ std::stringbuf sb(buf);
+ pmt::pmt_t m = pmt::deserialize(sb);
+
+ message_port_pub(pmt::mp("out"), m);
+ } else {
+ usleep(100);
+ }
+ }
}
} /* namespace zeromq */