diff options
author | Martin Braun <martin.braun@ettus.com> | 2014-02-17 09:19:13 +0100 |
---|---|---|
committer | Martin Braun <martin.braun@ettus.com> | 2014-02-17 09:19:13 +0100 |
commit | 6b579fa2ee60fd820ba0fe356c1a281fd7ee59e7 (patch) | |
tree | 0f6a9560ee5e29e13509cc621605a7760f0c53d5 | |
parent | cf8997ccc1e9c543aa234e561e7f002a176e13dc (diff) |
blocks: tagged_stream_to_pdu now uses logic from tagged_stream_block, also more QA
-rw-r--r-- | gr-blocks/include/gnuradio/blocks/tagged_stream_to_pdu.h | 12 | ||||
-rw-r--r-- | gr-blocks/lib/tagged_stream_to_pdu_impl.cc | 99 | ||||
-rw-r--r-- | gr-blocks/lib/tagged_stream_to_pdu_impl.h | 13 | ||||
-rwxr-xr-x | gr-blocks/python/blocks/qa_pdu.py | 43 |
4 files changed, 68 insertions, 99 deletions
diff --git a/gr-blocks/include/gnuradio/blocks/tagged_stream_to_pdu.h b/gr-blocks/include/gnuradio/blocks/tagged_stream_to_pdu.h index 9d5f509952..15ed50f252 100644 --- a/gr-blocks/include/gnuradio/blocks/tagged_stream_to_pdu.h +++ b/gr-blocks/include/gnuradio/blocks/tagged_stream_to_pdu.h @@ -1,6 +1,6 @@ /* -*- c++ -*- */ /* - * Copyright 2013 Free Software Foundation, Inc. + * Copyright 2013,2014 Free Software Foundation, Inc. * * This file is part of GNU Radio * @@ -25,16 +25,19 @@ #include <gnuradio/blocks/api.h> #include <gnuradio/blocks/pdu.h> -#include <gnuradio/sync_block.h> +#include <gnuradio/tagged_stream_block.h> namespace gr { namespace blocks { /*! - * \brief Turns received stream data and tags into PDUs + * \brief Turns received stream data and tags into PDUs and sends them through a message port. * \ingroup message_tools_blk + * + * The sent message is a PMT-pair (created by pmt::cons()). The first element is a dictionary + * containing all the tags. The second is a vector containing the actual data. */ - class BLOCKS_API tagged_stream_to_pdu : virtual public sync_block + class BLOCKS_API tagged_stream_to_pdu : virtual public tagged_stream_block { public: // gr::blocks::tagged_stream_to_pdu::sptr @@ -44,7 +47,6 @@ namespace gr { * \brief Construct a tagged_stream_to_pdu block * \param type PDU type of pdu::vector_type * \param lengthtagname The name of the tag that specifies how long the packet is. - * Defaults to 'packet_len'. */ static sptr make(pdu::vector_type type, const std::string& lengthtagname="packet_len"); diff --git a/gr-blocks/lib/tagged_stream_to_pdu_impl.cc b/gr-blocks/lib/tagged_stream_to_pdu_impl.cc index da8940359a..04871aef8f 100644 --- a/gr-blocks/lib/tagged_stream_to_pdu_impl.cc +++ b/gr-blocks/lib/tagged_stream_to_pdu_impl.cc @@ -38,100 +38,43 @@ namespace gr { } tagged_stream_to_pdu_impl::tagged_stream_to_pdu_impl(pdu::vector_type type, const std::string& lengthtagname) - : sync_block("tagged_stream_to_pdu", + : tagged_stream_block("tagged_stream_to_pdu", io_signature::make(1, 1, pdu::itemsize(type)), - io_signature::make(0, 0, 0)), - d_itemsize(pdu::itemsize(type)), - d_inpdu(false), + io_signature::make(0, 0, 0), lengthtagname), d_type(type), d_pdu_meta(pmt::PMT_NIL), - d_pdu_vector(pmt::PMT_NIL), - d_tag(pmt::mp(lengthtagname)) + d_pdu_vector(pmt::PMT_NIL) { message_port_register_out(PDU_PORT_ID); } int - tagged_stream_to_pdu_impl::work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items) + tagged_stream_to_pdu_impl::work (int noutput_items, + gr_vector_int &ninput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items) { const uint8_t *in = (const uint8_t*) input_items[0]; - uint64_t abs_N = nitems_read(0); - // if we are not in a pdu already, start a new one - if (!d_inpdu) { - bool found_length_tag(false); - - get_tags_in_range(d_tags, 0, abs_N, abs_N+1); - - for (d_tags_itr = d_tags.begin(); (d_tags_itr != d_tags.end()) && (!found_length_tag); d_tags_itr++) { - if (pmt::eq((*d_tags_itr).key, d_tag)) { - - if ((*d_tags_itr).offset != abs_N ) - throw std::runtime_error("expected next pdu length tag on a different item..."); - - found_length_tag = true; - d_pdu_length = pmt::to_long((*d_tags_itr).value); - d_pdu_remain = d_pdu_length; - d_pdu_meta = pmt::make_dict(); - break; - } // if have length tag - } // iter over tags - - if (!found_length_tag) - throw std::runtime_error("tagged stream does not contain a pdu_length tag"); + // Grab tags, throw them into dict + get_tags_in_range(d_tags, 0, + nitems_read(0), + nitems_read(0) + ninput_items[0] + ); + d_pdu_meta = pmt::make_dict(); + for (d_tags_itr = d_tags.begin(); d_tags_itr != d_tags.end(); d_tags_itr++) { + d_pdu_meta = dict_add(d_pdu_meta, (*d_tags_itr).key, (*d_tags_itr).value); } - size_t ncopy = std::min((size_t)noutput_items, d_pdu_remain); - - // copy any tags in this range into our meta object - get_tags_in_range(d_tags, 0, abs_N, abs_N+ncopy); - for (d_tags_itr = d_tags.begin(); d_tags_itr != d_tags.end(); d_tags_itr++) - if(!pmt::eq((*d_tags_itr).key, d_tag )) - d_pdu_meta = dict_add(d_pdu_meta, (*d_tags_itr).key, (*d_tags_itr).value); + // Grab data, throw into vector + d_pdu_vector = pdu::make_pdu_vector(d_type, in, ninput_items[0]); - // copy samples for this vector into either a pmt or our save buffer - if (ncopy == d_pdu_remain) { // we will send this pdu - if (d_save.size() == 0) { - d_pdu_vector = pdu::make_pdu_vector(d_type, in, ncopy); - send_message(); - } - else { - size_t oldsize = d_save.size()/d_itemsize; - d_save.resize((oldsize + ncopy)*d_itemsize, 0); - memcpy(&d_save[oldsize*d_itemsize], in, ncopy*d_itemsize); - d_pdu_vector = pdu::make_pdu_vector(d_type, &d_save[0], d_pdu_length); - send_message(); - d_save.clear(); - } - } - else { - size_t oldsize = d_save.size()/d_itemsize; - d_inpdu = true; - d_save.resize((oldsize+ncopy)*d_itemsize); - memcpy(&d_save[oldsize*d_itemsize], in, ncopy*d_itemsize); - d_pdu_remain -= ncopy; - } - - return ncopy; - } - - void - tagged_stream_to_pdu_impl::send_message() - { - if (pmt::length(d_pdu_vector) != d_pdu_length) - throw std::runtime_error("msg length not correct"); - + // Send msg pmt::pmt_t msg = pmt::cons(d_pdu_meta, d_pdu_vector); message_port_pub(PDU_PORT_ID, msg); - - d_pdu_meta = pmt::PMT_NIL; - d_pdu_vector = pmt::PMT_NIL; - d_pdu_length = 0; - d_pdu_remain = 0; - d_inpdu = false; + + return ninput_items[0]; } - + } /* namespace blocks */ } /* namespace gr */ diff --git a/gr-blocks/lib/tagged_stream_to_pdu_impl.h b/gr-blocks/lib/tagged_stream_to_pdu_impl.h index 9620778a91..0a83a87beb 100644 --- a/gr-blocks/lib/tagged_stream_to_pdu_impl.h +++ b/gr-blocks/lib/tagged_stream_to_pdu_impl.h @@ -30,27 +30,20 @@ namespace gr { class BLOCKS_API tagged_stream_to_pdu_impl : public tagged_stream_to_pdu { - size_t d_itemsize; - size_t d_pdu_length; - size_t d_pdu_remain; - bool d_inpdu; pdu::vector_type d_type; - std::vector<uint8_t> d_save; pmt::pmt_t d_pdu_meta; pmt::pmt_t d_pdu_vector; - - pmt::pmt_t d_tag; std::vector<tag_t>::iterator d_tags_itr; - std::vector<tag_t> d_tags; - + std::vector<tag_t> d_tags; + public: tagged_stream_to_pdu_impl(pdu::vector_type type, const std::string& lengthtagname); int work(int noutput_items, + gr_vector_int &ninput_items, gr_vector_const_void_star &input_items, gr_vector_void_star &output_items); - void send_message(); }; } /* namespace blocks */ diff --git a/gr-blocks/python/blocks/qa_pdu.py b/gr-blocks/python/blocks/qa_pdu.py index 0bd838b9c0..81cd8cf71c 100755 --- a/gr-blocks/python/blocks/qa_pdu.py +++ b/gr-blocks/python/blocks/qa_pdu.py @@ -86,27 +86,58 @@ class test_pdu(gr_unittest.TestCase): actual_data = 16*[0xFF,] self.assertEqual(actual_data, list(result_data)) self.assertEqual(actual_data, msg_data) - + def test_001(self): #Test the overflow buffer in pdu_to_tagged_stream src_data = [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0] src = blocks.pdu_to_tagged_stream(blocks.float_t) snk = blocks.vector_sink_f() - + self.tb.connect(src, snk) port = pmt.intern("pdus") - + msg = pmt.cons( pmt.PMT_NIL, pmt.init_f32vector(10, src_data)) src.to_basic_block()._post(port, msg) - + src.set_max_noutput_items(5) - + self.tb.start() #ideally, would wait until we get ten samples time.sleep(0.2) self.tb.stop() - + self.assertEqual(src_data, list(snk.data()) ) + + def test_002_tags_plus_data(self): + packet_len = 16 + src_data = range(packet_len) + tag1 = gr.tag_t() + tag1.offset = 0 + tag1.key = pmt.string_to_symbol('spam') + tag1.value = pmt.from_long(23) + tag2 = gr.tag_t() + tag2.offset = 10 # Must be < packet_len + tag2.key = pmt.string_to_symbol('eggs') + tag2.value = pmt.from_long(42) + src = blocks.vector_source_f(src_data, tags=(tag1, tag2)) + s2ts = blocks.stream_to_tagged_stream(gr.sizeof_float, vlen=1, packet_len=packet_len, len_tag_key="packet_len") + ts2pdu = blocks.tagged_stream_to_pdu(blocks.float_t, "packet_len") + dbg = blocks.message_debug() + self.tb.connect(src, s2ts, ts2pdu) + self.tb.msg_connect(ts2pdu, "pdus", dbg, "store") + self.tb.start() + while dbg.num_messages() < 1: + time.sleep(0.1) + self.tb.stop() + self.tb.wait() + result_msg = dbg.get_message(0) + metadata = pmt.to_python(pmt.car(result_msg)) + vector = pmt.f32vector_elements(pmt.cdr(result_msg)) + self.assertEqual(metadata, {'eggs': 42, 'spam': 23}) + self.assertFloatTuplesAlmostEqual(tuple(vector), src_data) + + if __name__ == '__main__': gr_unittest.run(test_pdu, "test_pdu.xml") + |