diff options
author | Martin Braun <martin.braun@kit.edu> | 2013-03-15 02:12:20 -0700 |
---|---|---|
committer | Johnathan Corgan <johnathan@corganlabs.com> | 2013-03-15 02:13:55 -0700 |
commit | f968b36d4ed2b194111585605f9a1b5367638fb3 (patch) | |
tree | c5a18a3412aa3a4b2c89ede3a68d08ba7980401f /gnuradio-core/src | |
parent | 86d05c838e2cfeb9a9c73a0e19668a45cc8b333c (diff) |
Squash/rebased martin/ofdm-master onto trial merge branch
Conflicts:
gr-blocks/include/blocks/CMakeLists.txt
Diffstat (limited to 'gnuradio-core/src')
20 files changed, 554 insertions, 58 deletions
diff --git a/gnuradio-core/src/lib/gengen/gr_vector_sink_X.cc.t b/gnuradio-core/src/lib/gengen/gr_vector_sink_X.cc.t index a9e3a0a3ea..2b8207c027 100644 --- a/gnuradio-core/src/lib/gengen/gr_vector_sink_X.cc.t +++ b/gnuradio-core/src/lib/gengen/gr_vector_sink_X.cc.t @@ -28,6 +28,7 @@ #include <@NAME@.h> #include <algorithm> #include <gr_io_signature.h> +#include <iostream> @NAME@::@NAME@ (int vlen) @@ -46,7 +47,9 @@ int @TYPE@ *iptr = (@TYPE@ *) input_items[0]; for (int i = 0; i < noutput_items * d_vlen; i++) d_data.push_back (iptr[i]); - + std::vector<gr_tag_t> tags; + get_tags_in_range(tags, 0, nitems_read(0), nitems_read(0) + noutput_items); + d_tags.insert(d_tags.end(), tags.begin(), tags.end()); return noutput_items; } @@ -62,3 +65,9 @@ std::vector<@TYPE@> { return d_data; } + +std::vector<gr_tag_t> +@NAME@::tags () const +{ + return d_tags; +} diff --git a/gnuradio-core/src/lib/gengen/gr_vector_sink_X.h.t b/gnuradio-core/src/lib/gengen/gr_vector_sink_X.h.t index b9126dc7b2..b7de1d101c 100644 --- a/gnuradio-core/src/lib/gengen/gr_vector_sink_X.h.t +++ b/gnuradio-core/src/lib/gengen/gr_vector_sink_X.h.t @@ -43,6 +43,7 @@ gr_make_@BASE_NAME@ (int vlen = 1); class GR_CORE_API @NAME@ : public gr_sync_block { friend GR_CORE_API @NAME@_sptr gr_make_@BASE_NAME@ (int vlen); std::vector<@TYPE@> d_data; + std::vector<gr_tag_t> d_tags; int d_vlen; @NAME@ (int vlen); @@ -54,6 +55,7 @@ class GR_CORE_API @NAME@ : public gr_sync_block { void reset() {d_data.clear();} void clear() {reset(); } // deprecated std::vector<@TYPE@> data () const; + std::vector<gr_tag_t> tags () const; }; #endif diff --git a/gnuradio-core/src/lib/gengen/gr_vector_sink_X.i.t b/gnuradio-core/src/lib/gengen/gr_vector_sink_X.i.t index d4a9409114..ee0ebf378b 100644 --- a/gnuradio-core/src/lib/gengen/gr_vector_sink_X.i.t +++ b/gnuradio-core/src/lib/gengen/gr_vector_sink_X.i.t @@ -35,5 +35,6 @@ class @NAME@ : public gr_sync_block { void clear(); // deprecated void reset(); std::vector<@TYPE@> data () const; + std::vector<gr_tag_t> tags () const; }; diff --git a/gnuradio-core/src/lib/gengen/gr_vector_source_X.cc.t b/gnuradio-core/src/lib/gengen/gr_vector_source_X.cc.t index 9f68f9cf14..19272ee24b 100644 --- a/gnuradio-core/src/lib/gengen/gr_vector_source_X.cc.t +++ b/gnuradio-core/src/lib/gengen/gr_vector_source_X.cc.t @@ -30,20 +30,39 @@ #include <gr_io_signature.h> #include <stdexcept> - -@NAME@::@NAME@ (const std::vector<@TYPE@> &data, bool repeat, int vlen) +@NAME@::@NAME@ (const std::vector<@TYPE@> &data, bool repeat, int vlen, const std::vector<gr_tag_t> &tags) : gr_sync_block ("@BASE_NAME@", gr_make_io_signature (0, 0, 0), gr_make_io_signature (1, 1, sizeof (@TYPE@) * vlen)), d_data (data), d_repeat (repeat), d_offset (0), - d_vlen (vlen) + d_vlen (vlen), + d_tags (tags), + d_tagpos (0) { + if (tags.size() == 0) { + d_settags = 0; + } else { + d_settags = 1; + set_output_multiple(data.size() / vlen); + } if ((data.size() % vlen) != 0) throw std::invalid_argument("data length must be a multiple of vlen"); } +void +@NAME@::set_data (const std::vector<@TYPE@> &data, const std::vector<gr_tag_t> &tags){ + d_data = data; + d_tags = tags; + rewind(); + if (tags.size() == 0) { + d_settags = false; + } else { + d_settags = true; + } +} + int @NAME@::work (int noutput_items, gr_vector_const_void_star &input_items, @@ -54,36 +73,52 @@ int if (d_repeat){ unsigned int size = d_data.size (); unsigned int offset = d_offset; - if (size == 0) return -1; - for (int i = 0; i < noutput_items*d_vlen; i++){ - optr[i] = d_data[offset++]; - if (offset >= size) - offset = 0; + if (d_settags) { + int n_outputitems_per_vector = d_data.size() / d_vlen; + for (int i = 0; i < noutput_items; i += n_outputitems_per_vector) { + // FIXME do proper vector copy + memcpy((void *) optr, (const void *) &d_data[0], size * sizeof (@TYPE@)); + optr += size; + for (unsigned t = 0; t < d_tags.size(); t++) { + add_item_tag(0, nitems_written(0)+i+d_tags[t].offset, d_tags[t].key, d_tags[t].value); + } + } + } else { + for (int i = 0; i < noutput_items*d_vlen; i++){ + optr[i] = d_data[offset++]; + if (offset >= size) { + offset = 0; + } + } } + + d_offset = offset; return noutput_items; - } - - else { + } else { if (d_offset >= d_data.size ()) return -1; // Done! unsigned n = std::min ((unsigned) d_data.size () - d_offset, - (unsigned) noutput_items*d_vlen); - for (unsigned i = 0; i < n; i++) + (unsigned) noutput_items*d_vlen); + for (unsigned i = 0; i < n; i++) { optr[i] = d_data[d_offset + i]; - + } + for (unsigned t = 0; t < d_tags.size(); t++) { + if ((d_tags[t].offset >= d_offset) && (d_tags[t].offset < d_offset+n)) + add_item_tag(0, d_tags[t].offset, d_tags[t].key, d_tags[t].value); + } d_offset += n; return n/d_vlen; } } @NAME@_sptr -gr_make_@BASE_NAME@ (const std::vector<@TYPE@> &data, bool repeat, int vlen) +gr_make_@BASE_NAME@ (const std::vector<@TYPE@> &data, bool repeat, int vlen, const std::vector<gr_tag_t> &tags) { - return gnuradio::get_initial_sptr(new @NAME@ (data, repeat, vlen)); + return gnuradio::get_initial_sptr(new @NAME@ (data, repeat, vlen, tags)); } diff --git a/gnuradio-core/src/lib/gengen/gr_vector_source_X.h.t b/gnuradio-core/src/lib/gengen/gr_vector_source_X.h.t index fe02c1346f..592467778a 100644 --- a/gnuradio-core/src/lib/gengen/gr_vector_source_X.h.t +++ b/gnuradio-core/src/lib/gengen/gr_vector_source_X.h.t @@ -38,24 +38,27 @@ typedef boost::shared_ptr<@NAME@> @NAME@_sptr; class @NAME@ : public gr_sync_block { friend GR_CORE_API @NAME@_sptr - gr_make_@BASE_NAME@ (const std::vector<@TYPE@> &data, bool repeat, int vlen); + gr_make_@BASE_NAME@ (const std::vector<@TYPE@> &data, bool repeat, int vlen, const std::vector<gr_tag_t> &tags); std::vector<@TYPE@> d_data; bool d_repeat; unsigned int d_offset; int d_vlen; + bool d_settags; + std::vector<gr_tag_t> d_tags; + unsigned int d_tagpos; - @NAME@ (const std::vector<@TYPE@> &data, bool repeat, int vlen); + @NAME@ (const std::vector<@TYPE@> &data, bool repeat, int vlen, const std::vector<gr_tag_t> &tags); public: void rewind() {d_offset=0;} virtual int work (int noutput_items, gr_vector_const_void_star &input_items, gr_vector_void_star &output_items); - void set_data(const std::vector<@TYPE@> &data){ d_data = data; rewind(); } + void set_data(const std::vector<@TYPE@> &data, const std::vector<gr_tag_t> &tags); }; GR_CORE_API @NAME@_sptr -gr_make_@BASE_NAME@ (const std::vector<@TYPE@> &data, bool repeat = false, int vlen = 1); +gr_make_@BASE_NAME@ (const std::vector<@TYPE@> &data, bool repeat = false, int vlen = 1, const std::vector<gr_tag_t> &tags = std::vector<gr_tag_t>()); #endif diff --git a/gnuradio-core/src/lib/gengen/gr_vector_source_X.i.t b/gnuradio-core/src/lib/gengen/gr_vector_source_X.i.t index 4986c68a35..c661ca6d3f 100644 --- a/gnuradio-core/src/lib/gengen/gr_vector_source_X.i.t +++ b/gnuradio-core/src/lib/gengen/gr_vector_source_X.i.t @@ -25,13 +25,13 @@ GR_SWIG_BLOCK_MAGIC(gr,@BASE_NAME@); @NAME@_sptr -gr_make_@BASE_NAME@ (const std::vector<@TYPE@> &data, bool repeat = false, int vlen = 1) +gr_make_@BASE_NAME@ (const std::vector<@TYPE@> &data, bool repeat = false, int vlen = 1, const std::vector<gr_tag_t> &tags=std::vector<gr_tag_t>()) throw(std::invalid_argument); class @NAME@ : public gr_sync_block { public: void rewind(); - void set_data(const std::vector<@TYPE@> &data); + void set_data(const std::vector<@TYPE@> &data, const std::vector<gr_tag_t> &tags); private: - @NAME@ (const std::vector<@TYPE@> &data, int vlen); + @NAME@ (const std::vector<@TYPE@> &data, bool repeat, int vlen, const std::vector<gr_tag_t> &tags); }; diff --git a/gnuradio-core/src/lib/io/gr_message_sink.cc b/gnuradio-core/src/lib/io/gr_message_sink.cc index ae0b5c7649..3816f5da48 100644 --- a/gnuradio-core/src/lib/io/gr_message_sink.cc +++ b/gnuradio-core/src/lib/io/gr_message_sink.cc @@ -34,7 +34,6 @@ #include <stdexcept> #include <string.h> - // public constructor that returns a shared_ptr gr_message_sink_sptr @@ -42,12 +41,25 @@ gr_make_message_sink (size_t itemsize, gr_msg_queue_sptr msgq, bool dont_block) { return gnuradio::get_initial_sptr(new gr_message_sink(itemsize, msgq, dont_block)); } +gr_message_sink_sptr +gr_make_message_sink (size_t itemsize, gr_msg_queue_sptr msgq, bool dont_block, const std::string& lengthtagname) +{ + return gnuradio::get_initial_sptr(new gr_message_sink(itemsize, msgq, dont_block, lengthtagname)); +} gr_message_sink::gr_message_sink (size_t itemsize, gr_msg_queue_sptr msgq, bool dont_block) : gr_sync_block("message_sink", gr_make_io_signature(1, 1, itemsize), gr_make_io_signature(0, 0, 0)), - d_itemsize(itemsize), d_msgq(msgq), d_dont_block(dont_block) + d_itemsize(itemsize), d_msgq(msgq), d_dont_block(dont_block), d_tags(false), d_items_read(0) +{ +} + +gr_message_sink::gr_message_sink (size_t itemsize, gr_msg_queue_sptr msgq, bool dont_block, const std::string& lengthtagname) + : gr_sync_block("message_sink", + gr_make_io_signature(1, 1, itemsize), + gr_make_io_signature(0, 0, 0)), + d_itemsize(itemsize), d_msgq(msgq), d_dont_block(dont_block), d_tags(true), d_lengthtagname(lengthtagname), d_items_read(0) { } @@ -62,18 +74,46 @@ gr_message_sink::work(int noutput_items, { const char *in = (const char *) input_items[0]; - // if we'd block, drop the data on the floor and say everything is OK - if (d_dont_block && d_msgq->full_p()) - return noutput_items; + if (d_tags) { + long packet_length = 0; + std::vector<gr_tag_t> tags; + this->get_tags_in_range(tags, 0, d_items_read, d_items_read+1); + //const size_t ninput_items = noutput_items; //assumption for sync block, this can change + for (unsigned int i = 0; i < tags.size(); i++) { + if (pmt::pmt_symbol_to_string(tags[i].key) == d_lengthtagname) { + packet_length = pmt::pmt_to_long(tags[i].value); + } + } + assert(packet_length != 0); - // build a message to hold whatever we've got - gr_message_sptr msg = gr_make_message(0, // msg type - d_itemsize, // arg1 for other end - noutput_items, // arg2 for other end (redundant) - noutput_items * d_itemsize); // len of msg - memcpy(msg->msg(), in, noutput_items * d_itemsize); - - d_msgq->handle(msg); // send it - - return noutput_items; + // FIXME run this multiple times if input_items >= N * packet_length + if (noutput_items >= packet_length ) { + // If the message queue is full we drop the packet. + if (!d_msgq->full_p()) { + gr_message_sptr msg = gr_make_message(0, // msg type + d_itemsize, // arg1 for other end + packet_length, // arg2 for other end (redundant) + packet_length * d_itemsize); // len of msg + memcpy(msg->msg(), in, packet_length * d_itemsize); + d_msgq->handle(msg); // send it + } + d_items_read += packet_length; + return packet_length; + } else { + return 0; + } + } else { + // If the queue if full we drop all the data we got. + if (!d_msgq->full_p()) { + // build a message to hold whatever we've got + gr_message_sptr msg = gr_make_message(0, // msg type + d_itemsize, // arg1 for other end + noutput_items, // arg2 for other end (redundant) + noutput_items * d_itemsize); // len of msg + memcpy(msg->msg(), in, noutput_items * d_itemsize); + + d_msgq->handle(msg); // send it + } + return noutput_items; + } } diff --git a/gnuradio-core/src/lib/io/gr_message_sink.h b/gnuradio-core/src/lib/io/gr_message_sink.h index 84005694a1..2db84cff44 100644 --- a/gnuradio-core/src/lib/io/gr_message_sink.h +++ b/gnuradio-core/src/lib/io/gr_message_sink.h @@ -27,13 +27,23 @@ #include <gr_sync_block.h> #include <gr_message.h> #include <gr_msg_queue.h> +#include <string> class gr_message_sink; typedef boost::shared_ptr<gr_message_sink> gr_message_sink_sptr; -GR_CORE_API gr_message_sink_sptr gr_make_message_sink (size_t itemsize, - gr_msg_queue_sptr msgq, - bool dont_block); +GR_CORE_API gr_message_sink_sptr gr_make_message_sink ( + size_t itemsize, + gr_msg_queue_sptr msgq, + bool dont_block +); + +GR_CORE_API gr_message_sink_sptr gr_make_message_sink ( + size_t itemsize, + gr_msg_queue_sptr msgq, + bool dont_block, + const std::string& lengthtagname +); /*! * \brief Gather received items into messages and insert into msgq @@ -45,12 +55,21 @@ class GR_CORE_API gr_message_sink : public gr_sync_block size_t d_itemsize; gr_msg_queue_sptr d_msgq; bool d_dont_block; + bool d_tags; + std::string d_lengthtagname; + uint64_t d_items_read; friend GR_CORE_API gr_message_sink_sptr - gr_make_message_sink(size_t itemsize, gr_msg_queue_sptr msgq, bool dont_block); + gr_make_message_sink(size_t itemsize, gr_msg_queue_sptr msgq, + bool dont_block); + friend GR_CORE_API gr_message_sink_sptr + gr_make_message_sink(size_t itemsize, gr_msg_queue_sptr msgq, + bool dont_block, const std::string& lengthtagname); protected: gr_message_sink (size_t itemsize, gr_msg_queue_sptr msgq, bool dont_block); + gr_message_sink (size_t itemsize, gr_msg_queue_sptr msgq, bool dont_block, + const std::string& lengthtagname); public: ~gr_message_sink (); diff --git a/gnuradio-core/src/lib/io/gr_message_sink.i b/gnuradio-core/src/lib/io/gr_message_sink.i index 8415cbd66d..b22eff18c0 100644 --- a/gnuradio-core/src/lib/io/gr_message_sink.i +++ b/gnuradio-core/src/lib/io/gr_message_sink.i @@ -22,14 +22,21 @@ GR_SWIG_BLOCK_MAGIC(gr,message_sink); +#include <string> + gr_message_sink_sptr gr_make_message_sink (size_t itemsize, gr_msg_queue_sptr msgq, bool dont_block); +gr_message_sink_sptr gr_make_message_sink (size_t itemsize, + gr_msg_queue_sptr msgq, + bool dont_block, + const std::string& lengthtagname); class gr_message_sink : public gr_sync_block { protected: gr_message_sink (size_t itemsize, gr_msg_queue_sptr msgq, bool dont_block); + gr_message_sink (size_t itemsize, gr_msg_queue_sptr msgq, bool dont_block, const std::string& lengthtagname); public: ~gr_message_sink (); diff --git a/gnuradio-core/src/lib/io/gr_message_source.cc b/gnuradio-core/src/lib/io/gr_message_source.cc index fb3da89a8b..0b79bb526b 100644 --- a/gnuradio-core/src/lib/io/gr_message_source.cc +++ b/gnuradio-core/src/lib/io/gr_message_source.cc @@ -36,7 +36,6 @@ // public constructor that returns a shared_ptr - gr_message_source_sptr gr_make_message_source(size_t itemsize, int msgq_limit) { @@ -50,11 +49,19 @@ gr_make_message_source(size_t itemsize, gr_msg_queue_sptr msgq) return gnuradio::get_initial_sptr(new gr_message_source(itemsize, msgq)); } +// public constructor that takes existing message queue and adds messages length tags. +gr_message_source_sptr +gr_make_message_source(size_t itemsize, gr_msg_queue_sptr msgq, const std::string& lengthtagname) +{ + return gnuradio::get_initial_sptr(new gr_message_source(itemsize, msgq, lengthtagname)); +} + gr_message_source::gr_message_source (size_t itemsize, int msgq_limit) : gr_sync_block("message_source", gr_make_io_signature(0, 0, 0), gr_make_io_signature(1, 1, itemsize)), - d_itemsize(itemsize), d_msgq(gr_make_msg_queue(msgq_limit)), d_msg_offset(0), d_eof(false) + d_itemsize(itemsize), d_msgq(gr_make_msg_queue(msgq_limit)), d_msg_offset(0), d_eof(false), + d_tags(false) { } @@ -62,11 +69,19 @@ gr_message_source::gr_message_source (size_t itemsize, gr_msg_queue_sptr msgq) : gr_sync_block("message_source", gr_make_io_signature(0, 0, 0), gr_make_io_signature(1, 1, itemsize)), - d_itemsize(itemsize), d_msgq(msgq), d_msg_offset(0), d_eof(false) + d_itemsize(itemsize), d_msgq(msgq), d_msg_offset(0), d_eof(false), d_tags(false) { } gr_message_source::~gr_message_source() +{} + +gr_message_source::gr_message_source(size_t itemsize, gr_msg_queue_sptr msgq, const std::string& lengthtagname) + : gr_sync_block("message_source", + gr_make_io_signature(0, 0, 0), + gr_make_io_signature(1, 1, itemsize)), + d_itemsize(itemsize), d_msgq(msgq), d_msg_offset(0), d_eof(false), + d_tags(true), d_lengthtagname(lengthtagname) { } @@ -86,15 +101,21 @@ gr_message_source::work(int noutput_items, int mm = std::min(noutput_items - nn, (int)((d_msg->length() - d_msg_offset) / d_itemsize)); memcpy (out, &(d_msg->msg()[d_msg_offset]), mm * d_itemsize); + if (d_tags && (d_msg_offset == 0)) { + const uint64_t offset = this->nitems_written(0) + nn; + pmt::pmt_t key = pmt::pmt_string_to_symbol(d_lengthtagname); + pmt::pmt_t value = pmt::pmt_from_long(d_msg->length()); + this->add_item_tag(0, offset, key, value); + } nn += mm; out += mm * d_itemsize; d_msg_offset += mm * d_itemsize; assert(d_msg_offset <= d_msg->length()); if (d_msg_offset == d_msg->length()){ - if (d_msg->type() == 1) // type == 1 sets EOF - d_eof = true; - d_msg.reset(); + if (d_msg->type() == 1) // type == 1 sets EOF + d_eof = true; + d_msg.reset(); } } else { @@ -102,17 +123,17 @@ gr_message_source::work(int noutput_items, // No current message // if (d_msgq->empty_p() && nn > 0){ // no more messages in the queue, return what we've got - break; + break; } - + if (d_eof) - return -1; - + return -1; + d_msg = d_msgq->delete_head(); // block, waiting for a message d_msg_offset = 0; - + if ((d_msg->length() % d_itemsize) != 0) - throw std::runtime_error("msg length is not a multiple of d_itemsize"); + throw std::runtime_error("msg length is not a multiple of d_itemsize"); } } diff --git a/gnuradio-core/src/lib/io/gr_message_source.h b/gnuradio-core/src/lib/io/gr_message_source.h index c510d1775f..1828475987 100644 --- a/gnuradio-core/src/lib/io/gr_message_source.h +++ b/gnuradio-core/src/lib/io/gr_message_source.h @@ -33,6 +33,8 @@ typedef boost::shared_ptr<gr_message_source> gr_message_source_sptr; GR_CORE_API gr_message_source_sptr gr_make_message_source (size_t itemsize, int msgq_limit=0); GR_CORE_API gr_message_source_sptr gr_make_message_source (size_t itemsize, gr_msg_queue_sptr msgq); +GR_CORE_API gr_message_source_sptr gr_make_message_source (size_t itemsize, gr_msg_queue_sptr msgq, + const std::string& lengthtagname); /*! * \brief Turn received messages into a stream @@ -46,15 +48,21 @@ class GR_CORE_API gr_message_source : public gr_sync_block gr_message_sptr d_msg; unsigned d_msg_offset; bool d_eof; + bool d_tags; + // FIXME: Is this adequate tagname length. + std::string d_lengthtagname; friend GR_CORE_API gr_message_source_sptr - gr_make_message_source(size_t itemsize, int msgq_limit); + gr_make_message_source(size_t itemsize, int msgq_limit); friend GR_CORE_API gr_message_source_sptr - gr_make_message_source(size_t itemsize, gr_msg_queue_sptr msgq); + gr_make_message_source(size_t itemsize, gr_msg_queue_sptr msgq); + friend GR_CORE_API gr_message_source_sptr + gr_make_message_source(size_t itemsize, gr_msg_queue_sptr msgq, const std::string& lengthtagname); protected: gr_message_source (size_t itemsize, int msgq_limit); gr_message_source (size_t itemsize, gr_msg_queue_sptr msgq); + gr_message_source (size_t itemsize, gr_msg_queue_sptr msgq, const std::string& lengthtagname); public: ~gr_message_source (); diff --git a/gnuradio-core/src/lib/io/gr_message_source.i b/gnuradio-core/src/lib/io/gr_message_source.i index 9ee9157e8c..bb1ddfcc31 100644 --- a/gnuradio-core/src/lib/io/gr_message_source.i +++ b/gnuradio-core/src/lib/io/gr_message_source.i @@ -22,14 +22,18 @@ GR_SWIG_BLOCK_MAGIC(gr,message_source); +#include <string> + gr_message_source_sptr gr_make_message_source (size_t itemsize, int msgq_limit=0); gr_message_source_sptr gr_make_message_source (size_t itemsize, gr_msg_queue_sptr msgq); +gr_message_source_sptr gr_make_message_source (size_t itemsize, gr_msg_queue_sptr msgq, const std::string& lengthtagname); class gr_message_source : public gr_sync_block { protected: gr_message_source (size_t itemsize, int msgq_limit); gr_message_source (size_t itemsize, gr_msg_queue_sptr msgq); + gr_message_source (size_t itemsize, gr_msg_queue_sptr msgq, const std::string& lengthtagname); public: ~gr_message_source (); diff --git a/gnuradio-core/src/lib/runtime/CMakeLists.txt b/gnuradio-core/src/lib/runtime/CMakeLists.txt index 6772f7a26d..80db1e7e7e 100644 --- a/gnuradio-core/src/lib/runtime/CMakeLists.txt +++ b/gnuradio-core/src/lib/runtime/CMakeLists.txt @@ -79,6 +79,7 @@ list(APPEND gnuradio_core_sources ${CMAKE_CURRENT_SOURCE_DIR}/gr_sync_decimator.cc ${CMAKE_CURRENT_SOURCE_DIR}/gr_sync_interpolator.cc ${CMAKE_CURRENT_SOURCE_DIR}/gr_sys_paths.cc + ${CMAKE_CURRENT_SOURCE_DIR}/gr_tagged_stream_block.cc ${CMAKE_CURRENT_SOURCE_DIR}/gr_top_block.cc ${CMAKE_CURRENT_SOURCE_DIR}/gr_top_block_impl.cc ${CMAKE_CURRENT_SOURCE_DIR}/gr_tpb_detail.cc @@ -146,6 +147,7 @@ install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/gr_sync_block.h ${CMAKE_CURRENT_SOURCE_DIR}/gr_sync_decimator.h ${CMAKE_CURRENT_SOURCE_DIR}/gr_sync_interpolator.h + ${CMAKE_CURRENT_SOURCE_DIR}/gr_tagged_stream_block.h ${CMAKE_CURRENT_SOURCE_DIR}/gr_top_block.h ${CMAKE_CURRENT_SOURCE_DIR}/gr_top_block_impl.h ${CMAKE_CURRENT_SOURCE_DIR}/gr_tpb_detail.h @@ -182,6 +184,7 @@ install(FILES ${CMAKE_CURRENT_SOURCE_DIR}/gr_sync_block.i ${CMAKE_CURRENT_SOURCE_DIR}/gr_sync_decimator.i ${CMAKE_CURRENT_SOURCE_DIR}/gr_sync_interpolator.i + ${CMAKE_CURRENT_SOURCE_DIR}/gr_tagged_stream_block.i ${CMAKE_CURRENT_SOURCE_DIR}/gr_tags.i ${CMAKE_CURRENT_SOURCE_DIR}/gr_top_block.i ${CMAKE_CURRENT_SOURCE_DIR}/runtime.i diff --git a/gnuradio-core/src/lib/runtime/gr_tagged_stream_block.cc b/gnuradio-core/src/lib/runtime/gr_tagged_stream_block.cc new file mode 100644 index 0000000000..3c4a75fd37 --- /dev/null +++ b/gnuradio-core/src/lib/runtime/gr_tagged_stream_block.cc @@ -0,0 +1,144 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifdef HAVE_CONFIG_H +#include "config.h" +#endif + +#include <gr_tagged_stream_block.h> + +gr_tagged_stream_block::gr_tagged_stream_block (const std::string &name, + gr_io_signature_sptr input_signature, + gr_io_signature_sptr output_signature, + const std::string &length_tag_key) + : gr_block(name, input_signature, output_signature), + d_length_tag_key(pmt::pmt_string_to_symbol(length_tag_key)), + d_n_input_items_reqd(input_signature->min_streams(), 0), + d_length_tag_key_str(length_tag_key) +{ +} + + +// This is evil hackery: We trick the scheduler into creating the right number of input items +void +gr_tagged_stream_block::forecast (int noutput_items, gr_vector_int &ninput_items_required) +{ + unsigned ninputs = ninput_items_required.size(); + for (unsigned i = 0; i < ninputs; i++) { + if (i < d_n_input_items_reqd.size() && d_n_input_items_reqd[i] != 0) { + ninput_items_required[i] = d_n_input_items_reqd[i]; + } else { + // If there's no item, there's no tag--so there must at least be one! + ninput_items_required[i] = std::max(1, (int) std::floor((double) noutput_items / relative_rate() + 0.5)); + } + } +} + + +void +gr_tagged_stream_block::parse_length_tags( + const std::vector<std::vector<gr_tag_t> > &tags, + gr_vector_int &n_input_items_reqd +){ + for (unsigned i = 0; i < tags.size(); i++) { + for (unsigned k = 0; k < tags[i].size(); k++) { + if (tags[i][k].key == d_length_tag_key) { + n_input_items_reqd[i] = pmt::pmt_to_long(tags[i][k].value); + remove_item_tag(i, tags[i][k]); + } + } + } +} + + +int +gr_tagged_stream_block::calculate_output_stream_length(const gr_vector_int &ninput_items) +{ + int noutput_items = *std::max_element(ninput_items.begin(), ninput_items.end()); + return (int) std::floor(relative_rate() * noutput_items + 0.5); +} + + +void +gr_tagged_stream_block::update_length_tags(int n_produced, int n_ports) +{ + for (int i = 0; i < n_ports; i++) { + add_item_tag(i, nitems_written(i), + d_length_tag_key, + pmt::pmt_from_long(n_produced) + ); + } + return; +} + + +int +gr_tagged_stream_block::general_work (int noutput_items, + gr_vector_int &ninput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items) +{ + if (d_length_tag_key_str.empty()) { + return work(noutput_items, ninput_items, input_items, output_items); + } + + if (d_n_input_items_reqd[0] == 0) { // Otherwise, it's already set from a previous call + std::vector<std::vector<gr_tag_t> > tags(input_items.size(), std::vector<gr_tag_t>()); + for (unsigned i = 0; i < input_items.size(); i++) { + get_tags_in_range(tags[i], i, nitems_read(i), nitems_read(i)+1); + } + d_n_input_items_reqd.assign(input_items.size(), -1); + parse_length_tags(tags, d_n_input_items_reqd); + } + for (unsigned i = 0; i < input_items.size(); i++) { + if (d_n_input_items_reqd[i] == -1) { + throw std::runtime_error("Missing length tag."); + } + if (d_n_input_items_reqd[i] > ninput_items[i]) { + return 0; + } + } + + int min_output_size = calculate_output_stream_length(d_n_input_items_reqd); + if (noutput_items < min_output_size) { + set_min_noutput_items(min_output_size); + return 0; + } + set_min_noutput_items(1); + + // WORK CALLED HERE // + int n_produced = work(noutput_items, d_n_input_items_reqd, input_items, output_items); + ////////////////////// + + if (n_produced == WORK_DONE) { + return n_produced; + } + for (int i = 0; i < (int) d_n_input_items_reqd.size(); i++) { + consume(i, d_n_input_items_reqd[i]); + } + update_length_tags(n_produced, output_items.size()); + + d_n_input_items_reqd.assign(input_items.size(), 0); + + return n_produced; +} + diff --git a/gnuradio-core/src/lib/runtime/gr_tagged_stream_block.h b/gnuradio-core/src/lib/runtime/gr_tagged_stream_block.h new file mode 100644 index 0000000000..a9d396c06f --- /dev/null +++ b/gnuradio-core/src/lib/runtime/gr_tagged_stream_block.h @@ -0,0 +1,138 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ + +#ifndef INCLUDED_GR_TAGGED_STREAM_BLOCK_H +#define INCLUDED_GR_TAGGED_STREAM_BLOCK_H + +#include <gr_core_api.h> +#include <gr_block.h> + +/*! + * \brief Block that operates on PDUs in form of tagged streams + * \ingroup base_blk + * + * Override work to provide the signal processing implementation. + */ +class GR_CORE_API gr_tagged_stream_block : public gr_block +{ + private: + pmt::pmt_t d_length_tag_key; //! This is the key for the tag that stores the PDU length + gr_vector_int d_n_input_items_reqd; //! How many input items do I need to process the next PDU? + + protected: + std::string d_length_tag_key_str; + gr_tagged_stream_block (void){} //allows pure virtual interface sub-classes + gr_tagged_stream_block (const std::string &name, + gr_io_signature_sptr input_signature, + gr_io_signature_sptr output_signature, + const std::string &length_tag_key); + + /* \brief Parse all tags on the first sample of a PDU, return the number of items per input + * and prune the length tags. + * + * In most cases, you don't need to override this, unless the number of items read + * is not directly coded in one single tag. + * + * Default behaviour: + * - Go through all input ports + * - On every input port, search for the tag with the key specified in \p length_tag_key + * - Copy that value as an int to the corresponding position in \p n_input_items_reqd + * - Remove the length tag. + * + * \param[in] tags All the tags found on the first item of every input port. + * \param[out] n_input_items_reqd Number of items which will be read from every input + */ + virtual void parse_length_tags( + const std::vector<std::vector<gr_tag_t> > &tags, + gr_vector_int &n_input_items_reqd + ); + + /* \brief Calculate the number of output items. + * + * This is basically the inverse function to forecast(): Given a number of input + * items, it returns the maximum number of output items. + * + * You most likely need to override this function, unless your block is a sync + * block or integer interpolator/decimator. + * + */ + virtual int calculate_output_stream_length(const gr_vector_int &ninput_items); + + /* \brief Set the new length tags on the output stream + * + * Default behaviour: Set a tag with key \p length_tag_key and + * the number of produced items on every output port. + * + * For anything else, override this. + * + * \param n_produced Length of the new PDU + * \param n_ports Number of output ports + */ + virtual void update_length_tags(int n_produced, int n_ports); + + public: + + /* \brief Don't override this. + */ + void /* final */ forecast (int noutput_items, gr_vector_int &ninput_items_required); + + /* - Reads the number of input items from the tags using parse_length_tags() + * - Checks there's enough data on the input and output buffers + * - If not, inform the scheduler and do nothing + * - Calls work() with the exact number of items per PDU + * - Updates the tags using update_length_tags() + */ + int general_work (int noutput_items, + gr_vector_int &ninput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items); + + /*! + * \brief Just like gr_block::general_work, but makes sure the input is valid + * + * The user must override work to define the signal processing code. + * Check the documentation for general_work() to see what happens here. + * + * Like gr_sync_block, this calls consume() for you (it consumes ninput_items[i] + * items from the i-th port). + * + * A note on tag propagation: The PDU length tags are handled by other functions, + * but all other tags are handled just as in any other \p gr_block. So, most likely, + * you either set the tag propagation policy to TPP_DONT and handle the tag + * propagation manually, or you propagate tags through the scheduler and don't + * do anything here. + * + * \param noutput_items The size of the writable output buffer + * \param ninput_items The exact size of the items on every input for this particular PDU. + * These will be consumed if a length tag key is provided! + * \param input_items See gr_block + * \param output_items See gr_block + */ + virtual int work (int noutput_items, + gr_vector_int &ninput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items) = 0; + +}; + +#endif /* INCLUDED_GR_TAGGED_STREAM_BLOCK_H */ + diff --git a/gnuradio-core/src/lib/runtime/gr_tagged_stream_block.i b/gnuradio-core/src/lib/runtime/gr_tagged_stream_block.i new file mode 100644 index 0000000000..9fc803dca1 --- /dev/null +++ b/gnuradio-core/src/lib/runtime/gr_tagged_stream_block.i @@ -0,0 +1,30 @@ +/* -*- c++ -*- */ +/* + * Copyright 2013 Free Software Foundation, Inc. + * + * This file is part of GNU Radio + * + * GNU Radio is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation; either version 3, or (at your option) + * any later version. + * + * GNU Radio is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with GNU Radio; see the file COPYING. If not, write to + * the Free Software Foundation, Inc., 51 Franklin Street, + * Boston, MA 02110-1301, USA. + */ +class gr_tagged_stream_block : public gr_block +{ + protected: + gr_tagged_stream_block (const std::string &name, + gr_io_signature_sptr input_signature, + gr_io_signature_sptr output_signature, + const std::string &length_tag_key); +}; + diff --git a/gnuradio-core/src/lib/runtime/runtime.i b/gnuradio-core/src/lib/runtime/runtime.i index 8902c6103f..dd7b095547 100644 --- a/gnuradio-core/src/lib/runtime/runtime.i +++ b/gnuradio-core/src/lib/runtime/runtime.i @@ -39,6 +39,7 @@ #include <gr_sync_block.h> #include <gr_sync_decimator.h> #include <gr_sync_interpolator.h> +#include <gr_tagged_stream_block.h> #include <gr_top_block.h> #include <gr_logger.h> %} @@ -67,5 +68,6 @@ %include <gr_sync_block.i> %include <gr_sync_decimator.i> %include <gr_sync_interpolator.i> +%include <gr_tagged_stream_block.i> %include <gr_top_block.i> %include <gr_logger.i> diff --git a/gnuradio-core/src/lib/swig/gnuradio.i b/gnuradio-core/src/lib/swig/gnuradio.i index 3a421ad5d6..4378e6aad2 100644 --- a/gnuradio-core/src/lib/swig/gnuradio.i +++ b/gnuradio-core/src/lib/swig/gnuradio.i @@ -38,6 +38,7 @@ #include <gr_types.h> #include <stddef.h> // size_t #include <complex> +#include <string.h> %} %feature("autodoc","1"); @@ -48,6 +49,7 @@ %include <std_vector.i> %include <stl.i> %include <std_except.i> +%include <std_string.i> typedef std::complex<float> gr_complex; typedef std::complex<double> gr_complexd; @@ -64,7 +66,8 @@ namespace std { %template() vector<int>; %template() vector<float>; %template() vector<double>; - // %template() std::complex<float>; + %template() vector<std::string>; + %template() vector<gr_tag_t>; %template() vector< std::complex<float> >; %template() vector< std::vector< unsigned char > >; diff --git a/gnuradio-core/src/lib/swig/gnuradio_swig_bug_workaround.h b/gnuradio-core/src/lib/swig/gnuradio_swig_bug_workaround.h index 1994f06609..bbbabaf07b 100644 --- a/gnuradio-core/src/lib/swig/gnuradio_swig_bug_workaround.h +++ b/gnuradio-core/src/lib/swig/gnuradio_swig_bug_workaround.h @@ -40,6 +40,7 @@ class gr_msg_queue; class gr_sync_block; class gr_sync_decimator; class gr_sync_interpolator; +class gr_tagged_stream_block; class gr_top_block; #endif /* INCLUDED_GNURADIO_SWIG_BUG_WORKAROUND_H */ diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_message_tags.py b/gnuradio-core/src/python/gnuradio/gr/qa_message_tags.py new file mode 100644 index 0000000000..566a09988e --- /dev/null +++ b/gnuradio-core/src/python/gnuradio/gr/qa_message_tags.py @@ -0,0 +1,26 @@ +import time + +from gnuradio import gr, gr_unittest + +class test_message_tags (gr_unittest.TestCase): + + def test_1 (self): + data = ('hello', 'you', 'there') + tx_msgq = gr.msg_queue () + rx_msgq = gr.msg_queue () + for d in data: + tx_msgq.insert_tail(gr.message_from_string(d)) + tb = gr.top_block() + src = gr.message_source(gr.sizeof_char, tx_msgq, "packet_length") + snk = gr.message_sink(gr.sizeof_char, rx_msgq, False, "packet_length") + tb.connect(src, snk) + tb.start() + time.sleep(1) + tb.stop() + for d in data: + msg = rx_msgq.delete_head() + contents = msg.to_string() + self.assertEqual(d, contents) + +if __name__ == '__main__': + gr_unittest.run(test_message_tags, "test_message_tags.xml") |