summaryrefslogtreecommitdiff
path: root/gr-zeromq
diff options
context:
space:
mode:
authorTim O'Shea <tim.oshea753@gmail.com>2014-10-27 16:25:16 -0400
committerTim O'Shea <tim.oshea753@gmail.com>2014-10-27 16:25:16 -0400
commit6a3efa633309fff2834321dfbadb343064b0ab50 (patch)
tree3685ba3fd87ea670df6e1dd35f3ccf30d04bbeab /gr-zeromq
parentffad094471e749351aa09cbd149d53a7a0e77a61 (diff)
zmq: default to not pass tags (compatible wire format)
Diffstat (limited to 'gr-zeromq')
-rw-r--r--gr-zeromq/include/gnuradio/zeromq/pub_sink.h2
-rw-r--r--gr-zeromq/include/gnuradio/zeromq/sub_source.h2
-rw-r--r--gr-zeromq/lib/pub_sink_impl.cc16
-rw-r--r--gr-zeromq/lib/pub_sink_impl.h3
-rw-r--r--gr-zeromq/lib/sub_source_impl.cc9
-rw-r--r--gr-zeromq/lib/sub_source_impl.h3
6 files changed, 22 insertions, 13 deletions
diff --git a/gr-zeromq/include/gnuradio/zeromq/pub_sink.h b/gr-zeromq/include/gnuradio/zeromq/pub_sink.h
index a60fb15c88..11f251ede2 100644
--- a/gr-zeromq/include/gnuradio/zeromq/pub_sink.h
+++ b/gr-zeromq/include/gnuradio/zeromq/pub_sink.h
@@ -53,7 +53,7 @@ namespace gr {
* \param address ZMQ socket address specifier
* \param timeout Receive timeout in seconds, default is 100ms, 1us increments
*/
- static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100);
+ static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100, bool pass_tags=false);
};
} // namespace zeromq
diff --git a/gr-zeromq/include/gnuradio/zeromq/sub_source.h b/gr-zeromq/include/gnuradio/zeromq/sub_source.h
index 9deaa7f3ff..f97dc5ac2c 100644
--- a/gr-zeromq/include/gnuradio/zeromq/sub_source.h
+++ b/gr-zeromq/include/gnuradio/zeromq/sub_source.h
@@ -51,7 +51,7 @@ namespace gr {
* \param timeout Receive timeout in seconds, default is 100ms, 1us increments
*
*/
- static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100);
+ static sptr make(size_t itemsize, size_t vlen, char *address, int timeout=100, bool pass_tags=false);
};
} // namespace zeromq
diff --git a/gr-zeromq/lib/pub_sink_impl.cc b/gr-zeromq/lib/pub_sink_impl.cc
index 8115d7213a..7b57e27ccc 100644
--- a/gr-zeromq/lib/pub_sink_impl.cc
+++ b/gr-zeromq/lib/pub_sink_impl.cc
@@ -33,17 +33,17 @@ namespace gr {
namespace zeromq {
pub_sink::sptr
- pub_sink::make(size_t itemsize, size_t vlen, char *address, int timeout)
+ pub_sink::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
{
return gnuradio::get_initial_sptr
(new pub_sink_impl(itemsize, vlen, address, timeout));
}
- pub_sink_impl::pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout)
+ pub_sink_impl::pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
: gr::sync_block("pub_sink",
gr::io_signature::make(1, 1, itemsize * vlen),
gr::io_signature::make(0, 0, 0)),
- d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout)
+ d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags)
{
int major, minor, patch;
zmq::version (&major, &minor, &patch);
@@ -72,9 +72,11 @@ namespace gr {
const char *in = (const char *)input_items[0];
// encode the current offset, # tags, and tags into header
+ size_t headlen(0);
+ std::stringstream ss;
+ if(d_pass_tags){
std::vector<gr::tag_t> tags;
get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0)+noutput_items);
- std::stringstream ss;
size_t ntags = tags.size();
ss.write( reinterpret_cast< const char* >( nitems_read(0) ), sizeof(uint64_t) ); // offset
ss.write( reinterpret_cast< const char* >( &ntags ), sizeof(size_t) ); // num tags
@@ -87,11 +89,13 @@ namespace gr {
pmt::serialize( tags[i].srcid, sb ); // srcid
ss.write( sb.str().c_str() , sb.str().length() ); // offset
}
- size_t headlen( ss.gcount() );
+ headlen = ss.gcount();
+ }
// create message copy and send
zmq::message_t msg(headlen + d_itemsize*d_vlen*noutput_items);
- memcpy((void*) msg.data(), ss.str().c_str(), ss.str().length() );
+ if(d_pass_tags)
+ memcpy((void*) msg.data(), ss.str().c_str(), ss.str().length() );
memcpy((uint8_t *)msg.data() + headlen, in, d_itemsize*d_vlen*noutput_items);
d_socket->send(msg);
diff --git a/gr-zeromq/lib/pub_sink_impl.h b/gr-zeromq/lib/pub_sink_impl.h
index 9c956ef2fa..049e5876f6 100644
--- a/gr-zeromq/lib/pub_sink_impl.h
+++ b/gr-zeromq/lib/pub_sink_impl.h
@@ -37,9 +37,10 @@ namespace gr {
float d_timeout;
zmq::context_t *d_context;
zmq::socket_t *d_socket;
+ bool d_pass_tags;
public:
- pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout);
+ pub_sink_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags=false);
~pub_sink_impl();
int work(int noutput_items,
diff --git a/gr-zeromq/lib/sub_source_impl.cc b/gr-zeromq/lib/sub_source_impl.cc
index 4cc9c25def..6c9da1bbbe 100644
--- a/gr-zeromq/lib/sub_source_impl.cc
+++ b/gr-zeromq/lib/sub_source_impl.cc
@@ -31,17 +31,17 @@ namespace gr {
namespace zeromq {
sub_source::sptr
- sub_source::make(size_t itemsize, size_t vlen, char *address, int timeout)
+ sub_source::make(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
{
return gnuradio::get_initial_sptr
(new sub_source_impl(itemsize, vlen, address, timeout));
}
- sub_source_impl::sub_source_impl(size_t itemsize, size_t vlen, char *address, int timeout)
+ sub_source_impl::sub_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags)
: gr::sync_block("sub_source",
gr::io_signature::make(0, 0, 0),
gr::io_signature::make(1, 1, itemsize * vlen)),
- d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout)
+ d_itemsize(itemsize), d_vlen(vlen), d_timeout(timeout), d_pass_tags(pass_tags)
{
int major, minor, patch;
zmq::version (&major, &minor, &patch);
@@ -84,6 +84,8 @@ namespace gr {
// Deserialize header data / tags
std::istringstream iss( std::string(static_cast<char*>(msg.data()), msg.size()));
+
+ if(d_pass_tags){
uint64_t rcv_offset;
size_t rcv_ntags;
iss.read( (char*)&rcv_offset, sizeof(uint64_t ) );
@@ -99,6 +101,7 @@ namespace gr {
add_item_tag(0, new_tag_offset, key, val, src);
iss.str(sb.str());
}
+ }
// Pass sample data along
std::vector<char> samp(iss.gcount());
diff --git a/gr-zeromq/lib/sub_source_impl.h b/gr-zeromq/lib/sub_source_impl.h
index 44647527b1..f52829463e 100644
--- a/gr-zeromq/lib/sub_source_impl.h
+++ b/gr-zeromq/lib/sub_source_impl.h
@@ -37,9 +37,10 @@ namespace gr {
int d_timeout; // microseconds, -1 is blocking
zmq::context_t *d_context;
zmq::socket_t *d_socket;
+ bool d_pass_tags;
public:
- sub_source_impl(size_t itemsize, size_t vlen, char *address, int timeout);
+ sub_source_impl(size_t itemsize, size_t vlen, char *address, int timeout, bool pass_tags=false);
~sub_source_impl();
int work(int noutput_items,