summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJohnathan Corgan <johnathan@corganlabs.com>2015-01-12 16:04:45 -0800
committerJohnathan Corgan <johnathan@corganlabs.com>2015-01-12 16:06:41 -0800
commit5de5b1a87f71bdbed9925f80648756c566c3a91b (patch)
tree9ce1394c23245358974d9b32338c46c35d49c72b
parent7bf8b05bde26bfb1e2df684e2ab55b878b25350c (diff)
zeromq: cleanup and made req_msg_source derive from gr::block
-rw-r--r--gr-zeromq/include/gnuradio/zeromq/req_msg_source.h13
-rw-r--r--gr-zeromq/lib/req_msg_source_impl.cc67
-rw-r--r--gr-zeromq/lib/req_msg_source_impl.h9
3 files changed, 39 insertions, 50 deletions
diff --git a/gr-zeromq/include/gnuradio/zeromq/req_msg_source.h b/gr-zeromq/include/gnuradio/zeromq/req_msg_source.h
index cf9183375f..05d80b8e7f 100644
--- a/gr-zeromq/include/gnuradio/zeromq/req_msg_source.h
+++ b/gr-zeromq/include/gnuradio/zeromq/req_msg_source.h
@@ -1,6 +1,6 @@
/* -*- c++ -*- */
/*
- * Copyright 2013 Free Software Foundation, Inc.
+ * Copyright 2013-2015 Free Software Foundation, Inc.
*
* This file is part of GNU Radio.
*
@@ -24,20 +24,20 @@
#define INCLUDED_ZEROMQ_REQ_MSG_SOURCE_H
#include <gnuradio/zeromq/api.h>
-#include <gnuradio/sync_block.h>
+#include <gnuradio/block.h>
namespace gr {
namespace zeromq {
/*!
- * \brief Receive messages on ZMQ REQ socket and source stream
+ * \brief Receive messages on ZMQ REQ socket output async messages
* \ingroup zeromq
*
* \details
- * This block will connect to a ZMQ REP socket, then produce all
- * incoming messages as streaming output.
+ * This block will connect to a ZMQ REP socket, then resend all
+ * incoming messages as asynchronous messages.
*/
- class ZEROMQ_API req_msg_source : virtual public gr::sync_block
+ class ZEROMQ_API req_msg_source : virtual public gr::block
{
public:
typedef boost::shared_ptr<req_msg_source> sptr;
@@ -45,7 +45,6 @@ namespace gr {
/*!
* \brief Return a shared_ptr to a new instance of zeromq::req_msg_source.
*
- *
* \param address ZMQ socket address specifier
* \param timeout Receive timeout in seconds, default is 100ms, 1us increments
*
diff --git a/gr-zeromq/lib/req_msg_source_impl.cc b/gr-zeromq/lib/req_msg_source_impl.cc
index 2dda15208f..b30ef2679d 100644
--- a/gr-zeromq/lib/req_msg_source_impl.cc
+++ b/gr-zeromq/lib/req_msg_source_impl.cc
@@ -39,18 +39,21 @@ namespace gr {
}
req_msg_source_impl::req_msg_source_impl(char *address, int timeout)
- : gr::sync_block("req_msg_source",
- gr::io_signature::make(0, 0, 0),
- gr::io_signature::make(0, 0, 0)),
- d_timeout(timeout)
+ : gr::block("req_msg_source",
+ gr::io_signature::make(0, 0, 0),
+ gr::io_signature::make(0, 0, 0)),
+ d_timeout(timeout)
{
int major, minor, patch;
- zmq::version (&major, &minor, &patch);
+ zmq::version(&major, &minor, &patch);
+
if (major < 3) {
d_timeout = timeout*1000;
}
+
d_context = new zmq::context_t(1);
d_socket = new zmq::socket_t(*d_context, ZMQ_REQ);
+
int time = 0;
d_socket->setsockopt(ZMQ_LINGER, &time, sizeof(time));
d_socket->connect (address);
@@ -65,67 +68,55 @@ namespace gr {
delete d_context;
}
- bool req_msg_source_impl::start(){
+ bool req_msg_source_impl::start()
+ {
d_finished = false;
- d_thread = new boost::thread( boost::bind( &req_msg_source_impl::readloop , this ) );
+ d_thread = new boost::thread(boost::bind(&req_msg_source_impl::readloop, this));
return true;
}
- bool req_msg_source_impl::stop(){
+ bool req_msg_source_impl::stop()
+ {
d_finished = true;
d_thread->join();
return true;
}
- void req_msg_source_impl::readloop(){
+ void req_msg_source_impl::readloop()
+ {
while(!d_finished){
//std::cout << "readloop\n";
-
+
zmq::pollitem_t itemsout[] = { { *d_socket, 0, ZMQ_POLLOUT, 0 } };
- zmq::poll (&itemsout[0], 1, d_timeout);
-
+ zmq::poll(&itemsout[0], 1, d_timeout);
+
// If we got a reply, process
if (itemsout[0].revents & ZMQ_POLLOUT) {
// Request data, FIXME non portable?
int nmsg = 1;
zmq::message_t request(sizeof(int));
- memcpy ((void *) request.data (), &nmsg, sizeof(int));
+ memcpy((void *) request.data (), &nmsg, sizeof(int));
d_socket->send(request);
- //std::cout << "sent request...\n";
}
zmq::pollitem_t items[] = { { *d_socket, 0, ZMQ_POLLIN, 0 } };
zmq::poll (&items[0], 1, d_timeout);
- //std::cout << "rx response...\n";
// If we got a reply, process
if (items[0].revents & ZMQ_POLLIN) {
- //std::cout << "rx response... got data\n";
-
- // Receive data
- zmq::message_t msg;
- d_socket->recv(&msg);
+ // Receive data
+ zmq::message_t msg;
+ d_socket->recv(&msg);
- //std::cout << "got msg...\n";
+ std::string buf(static_cast<char*>(msg.data()), msg.size());
+ std::stringbuf sb(buf);
+ pmt::pmt_t m = pmt::deserialize(sb);
+ message_port_pub(pmt::mp("out"), m);
- std::string buf(static_cast<char*>(msg.data()), msg.size());
- std::stringbuf sb(buf);
- pmt::pmt_t m = pmt::deserialize(sb);
- //std::cout << m << "\n";
- message_port_pub(pmt::mp("out"), m);
-
- } else {
- usleep(100);
- }
+ } else {
+ usleep(100);
}
- }
-
- int
- req_msg_source_impl::work(int noutput_items,
- gr_vector_const_void_star &input_items,
- gr_vector_void_star &output_items)
- {
- return noutput_items;
+ }
}
} /* namespace zeromq */
diff --git a/gr-zeromq/lib/req_msg_source_impl.h b/gr-zeromq/lib/req_msg_source_impl.h
index 3a691743b5..5835dd4b4d 100644
--- a/gr-zeromq/lib/req_msg_source_impl.h
+++ b/gr-zeromq/lib/req_msg_source_impl.h
@@ -35,19 +35,18 @@ namespace gr {
int d_timeout;
zmq::context_t *d_context;
zmq::socket_t *d_socket;
+ boost::thread *d_thread;
+
void readloop();
- boost::thread *d_thread;
public:
+ bool d_finished;
+
req_msg_source_impl(char *address, int timeout);
~req_msg_source_impl();
bool start();
bool stop();
- bool d_finished;
- int work(int noutput_items,
- gr_vector_const_void_star &input_items,
- gr_vector_void_star &output_items);
};
} // namespace zeromq