diff options
-rw-r--r-- | gr-blocks/examples/msg_passing/strobe.grc | 182 | ||||
-rw-r--r-- | gr-blocks/include/gnuradio/blocks/pdu_to_tagged_stream.h | 4 | ||||
-rw-r--r-- | gr-blocks/lib/pdu_to_tagged_stream_impl.cc | 119 | ||||
-rw-r--r-- | gr-blocks/lib/pdu_to_tagged_stream_impl.h | 15 | ||||
-rwxr-xr-x | gr-blocks/python/blocks/qa_pdu.py | 2 |
5 files changed, 198 insertions, 124 deletions
diff --git a/gr-blocks/examples/msg_passing/strobe.grc b/gr-blocks/examples/msg_passing/strobe.grc index e5001a4262..f24887ab76 100644 --- a/gr-blocks/examples/msg_passing/strobe.grc +++ b/gr-blocks/examples/msg_passing/strobe.grc @@ -1,4 +1,5 @@ <?xml version='1.0' encoding='ASCII'?> +<?grc format='1' created='3.7.6'?> <flow_graph> <timestamp>Sun Mar 17 20:42:59 2013</timestamp> <block> @@ -52,8 +53,12 @@ <value></value> </param> <param> + <key>alias</key> + <value></value> + </param> + <param> <key>_coordinate</key> - <value>(10, 10)</value> + <value>(-8, -12)</value> </param> <param> <key>_rotation</key> @@ -61,41 +66,42 @@ </param> </block> <block> - <key>variable</key> + <key>blocks_pdu_to_tagged_stream</key> <param> <key>id</key> - <value>samp_rate</value> + <value>blocks_pdu_to_tagged_stream_0</value> </param> <param> <key>_enabled</key> <value>True</value> </param> <param> - <key>value</key> - <value>32000</value> + <key>type</key> + <value>byte</value> </param> <param> - <key>_coordinate</key> - <value>(10, 170)</value> + <key>tag</key> + <value>packet_len</value> </param> <param> - <key>_rotation</key> - <value>0</value> + <key>alias</key> + <value></value> </param> - </block> - <block> - <key>blocks_message_debug</key> <param> - <key>id</key> - <value>blocks_message_debug_0</value> + <key>affinity</key> + <value></value> </param> <param> - <key>_enabled</key> - <value>True</value> + <key>minoutbuf</key> + <value>0</value> + </param> + <param> + <key>maxoutbuf</key> + <value>0</value> </param> <param> <key>_coordinate</key> - <value>(1049, 176)</value> + <value>(264, 179)</value> </param> <param> <key>_rotation</key> @@ -103,22 +109,42 @@ </param> </block> <block> - <key>blocks_pdu_to_tagged_stream</key> + <key>blocks_message_strobe</key> <param> <key>id</key> - <value>blocks_pdu_to_tagged_stream_0</value> + <value>blocks_message_strobe_0_0</value> </param> <param> <key>_enabled</key> <value>True</value> </param> <param> - <key>type</key> - <value>byte</value> + <key>msg</key> + <value>pmt.cons( pmt.PMT_NIL, pmt.make_u8vector(512,0) )</value> + </param> + <param> + <key>period</key> + <value>750</value> + </param> + <param> + <key>alias</key> + <value></value> + </param> + <param> + <key>affinity</key> + <value></value> + </param> + <param> + <key>minoutbuf</key> + <value>0</value> + </param> + <param> + <key>maxoutbuf</key> + <value>0</value> </param> <param> <key>_coordinate</key> - <value>(388, 85)</value> + <value>(48, 172)</value> </param> <param> <key>_rotation</key> @@ -129,7 +155,7 @@ <key>blocks_message_strobe</key> <param> <key>id</key> - <value>blocks_message_strobe_0_0</value> + <value>blocks_message_strobe_0</value> </param> <param> <key>_enabled</key> @@ -137,15 +163,31 @@ </param> <param> <key>msg</key> - <value>pmt.cons( pmt.PMT_NIL, pmt.make_u8vector(512,0) )</value> + <value>pmt.intern("TEST")</value> </param> <param> <key>period</key> - <value>750</value> + <value>1000</value> + </param> + <param> + <key>alias</key> + <value></value> + </param> + <param> + <key>affinity</key> + <value></value> + </param> + <param> + <key>minoutbuf</key> + <value>0</value> + </param> + <param> + <key>maxoutbuf</key> + <value>0</value> </param> <param> <key>_coordinate</key> - <value>(95, 95)</value> + <value>(56, 108)</value> </param> <param> <key>_rotation</key> @@ -175,8 +217,28 @@ <value>1</value> </param> <param> + <key>showports</key> + <value>True</value> + </param> + <param> + <key>alias</key> + <value></value> + </param> + <param> + <key>affinity</key> + <value></value> + </param> + <param> + <key>minoutbuf</key> + <value>0</value> + </param> + <param> + <key>maxoutbuf</key> + <value>0</value> + </param> + <param> <key>_coordinate</key> - <value>(629, 81)</value> + <value>(496, 179)</value> </param> <param> <key>_rotation</key> @@ -198,8 +260,28 @@ <value>byte</value> </param> <param> + <key>tag</key> + <value>packet_len</value> + </param> + <param> + <key>alias</key> + <value></value> + </param> + <param> + <key>affinity</key> + <value></value> + </param> + <param> + <key>minoutbuf</key> + <value>0</value> + </param> + <param> + <key>maxoutbuf</key> + <value>0</value> + </param> + <param> <key>_coordinate</key> - <value>(837, 47)</value> + <value>(664, 179)</value> </param> <param> <key>_rotation</key> @@ -207,26 +289,26 @@ </param> </block> <block> - <key>blocks_message_strobe</key> + <key>blocks_message_debug</key> <param> <key>id</key> - <value>blocks_message_strobe_0</value> + <value>blocks_message_debug_0</value> </param> <param> <key>_enabled</key> <value>True</value> </param> <param> - <key>msg</key> - <value>pmt.intern("TEST")</value> + <key>alias</key> + <value></value> </param> <param> - <key>period</key> - <value>1000</value> + <key>affinity</key> + <value></value> </param> <param> <key>_coordinate</key> - <value>(423, 164)</value> + <value>(944, 120)</value> </param> <param> <key>_rotation</key> @@ -234,22 +316,16 @@ </param> </block> <connection> - <source_block_id>blocks_message_strobe_0</source_block_id> - <sink_block_id>blocks_message_debug_0</sink_block_id> - <source_key>0</source_key> - <sink_key>0</sink_key> - </connection> - <connection> <source_block_id>blocks_message_strobe_0_0</source_block_id> <sink_block_id>blocks_pdu_to_tagged_stream_0</sink_block_id> - <source_key>0</source_key> - <sink_key>0</sink_key> + <source_key>strobe</source_key> + <sink_key>pdus</sink_key> </connection> <connection> - <source_block_id>blocks_tagged_stream_to_pdu_0</source_block_id> - <sink_block_id>blocks_message_debug_0</sink_block_id> + <source_block_id>blocks_pdu_to_tagged_stream_0</source_block_id> + <sink_block_id>blocks_copy_0</sink_block_id> <source_key>0</source_key> - <sink_key>2</sink_key> + <sink_key>0</sink_key> </connection> <connection> <source_block_id>blocks_copy_0</source_block_id> @@ -258,9 +334,15 @@ <sink_key>0</sink_key> </connection> <connection> - <source_block_id>blocks_pdu_to_tagged_stream_0</source_block_id> - <sink_block_id>blocks_copy_0</sink_block_id> - <source_key>0</source_key> - <sink_key>0</sink_key> + <source_block_id>blocks_message_strobe_0</source_block_id> + <sink_block_id>blocks_message_debug_0</sink_block_id> + <source_key>strobe</source_key> + <sink_key>print</sink_key> + </connection> + <connection> + <source_block_id>blocks_tagged_stream_to_pdu_0</source_block_id> + <sink_block_id>blocks_message_debug_0</sink_block_id> + <source_key>pdus</source_key> + <sink_key>print_pdu</sink_key> </connection> </flow_graph> diff --git a/gr-blocks/include/gnuradio/blocks/pdu_to_tagged_stream.h b/gr-blocks/include/gnuradio/blocks/pdu_to_tagged_stream.h index 9ae2ecd297..479e367e72 100644 --- a/gr-blocks/include/gnuradio/blocks/pdu_to_tagged_stream.h +++ b/gr-blocks/include/gnuradio/blocks/pdu_to_tagged_stream.h @@ -25,7 +25,7 @@ #include <gnuradio/blocks/api.h> #include <gnuradio/blocks/pdu.h> -#include <gnuradio/sync_block.h> +#include <gnuradio/tagged_stream_block.h> namespace gr { namespace blocks { @@ -34,7 +34,7 @@ namespace gr { * \brief Turns received PDUs into a tagged stream of items * \ingroup message_tools_blk */ - class BLOCKS_API pdu_to_tagged_stream : virtual public sync_block + class BLOCKS_API pdu_to_tagged_stream : virtual public tagged_stream_block { public: // gr::blocks::pdu_to_tagged_stream::sptr diff --git a/gr-blocks/lib/pdu_to_tagged_stream_impl.cc b/gr-blocks/lib/pdu_to_tagged_stream_impl.cc index 8baa9773d6..a8ba5d22b9 100644 --- a/gr-blocks/lib/pdu_to_tagged_stream_impl.cc +++ b/gr-blocks/lib/pdu_to_tagged_stream_impl.cc @@ -32,90 +32,79 @@ namespace gr { namespace blocks { pdu_to_tagged_stream::sptr - pdu_to_tagged_stream::make(pdu::vector_type type, const std::string& lengthtagname) + pdu_to_tagged_stream::make(pdu::vector_type type, const std::string& tsb_tag_key) { - return gnuradio::get_initial_sptr(new pdu_to_tagged_stream_impl(type, lengthtagname)); + return gnuradio::get_initial_sptr(new pdu_to_tagged_stream_impl(type, tsb_tag_key)); } - pdu_to_tagged_stream_impl::pdu_to_tagged_stream_impl(pdu::vector_type type, const std::string& lengthtagname) - : sync_block("pdu_to_tagged_stream", - io_signature::make(0, 0, 0), - io_signature::make(1, 1, pdu::itemsize(type))), - d_itemsize(pdu::itemsize(type)), - d_type(type), - d_tag(pmt::mp(lengthtagname)) + pdu_to_tagged_stream_impl::pdu_to_tagged_stream_impl(pdu::vector_type type, const std::string& tsb_tag_key) + : tagged_stream_block("pdu_to_tagged_stream", + io_signature::make(0, 0, 0), + io_signature::make(1, 1, pdu::itemsize(type)), + tsb_tag_key), + d_itemsize(pdu::itemsize(type)), + d_type(type), + d_curr_len(0) { message_port_register_in(PDU_PORT_ID); } - int - pdu_to_tagged_stream_impl::work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items) + int pdu_to_tagged_stream_impl::calculate_output_stream_length(const gr_vector_int &) { - char *out = (char *)output_items[0]; - int nout = 0; - - // if we have remaining output, send it - if (d_remain.size() > 0) { - nout = std::min((size_t)d_remain.size()/d_itemsize, (size_t)noutput_items); - memcpy(out, &d_remain[0], nout*d_itemsize); - d_remain.erase(d_remain.begin(), d_remain.begin()+nout*d_itemsize); - noutput_items -= nout; - out += nout*d_itemsize; - } - - // if we have space for at least one item output as much as we can - if (noutput_items > 0) { - - // grab a message if one exists - pmt::pmt_t msg(delete_head_nowait(PDU_PORT_ID)); - if (msg.get() == NULL) - return nout; - - // make sure type is valid - if (!pmt::is_pair(msg)) // TODO: implement pdu::is_valid() - throw std::runtime_error("received a malformed pdu message"); - - // grab the components of the pdu message - pmt::pmt_t meta(pmt::car(msg)); - pmt::pmt_t vect(pmt::cdr(msg)); + if (d_curr_len == 0) { + pmt::pmt_t msg(delete_head_nowait(PDU_PORT_ID)); + if (msg.get() == NULL) { + return 0; + } - // compute offset for output tag - uint64_t offset = nitems_written(0) + nout; + if (!pmt::is_pair(msg)) + throw std::runtime_error("received a malformed pdu message"); - // add a tag for pdu length - add_item_tag(0, offset, d_tag, pmt::from_long(pmt::length(vect)), pmt::mp(alias())); + d_curr_meta = pmt::car(msg); + d_curr_vect = pmt::cdr(msg); + d_curr_len = pmt::length(d_curr_vect); + } - // if we recieved metadata add it as tags - if (!pmt::eq(meta, pmt::PMT_NIL) ) { - pmt::pmt_t klist(pmt::dict_keys(meta)); - for(size_t i=0; i<pmt::length(klist); i++){ - pmt::pmt_t k(pmt::nth(i, klist)); - pmt::pmt_t v(pmt::dict_ref(meta, k, pmt::PMT_NIL)); - add_item_tag(0, offset, k, v, pmt::mp(alias())); - } + return d_curr_len; } - // copy vector output - size_t ncopy = std::min((size_t)noutput_items, (size_t)pmt::length(vect)); - size_t nsave = pmt::length(vect) - ncopy; + int + pdu_to_tagged_stream_impl::work (int noutput_items, + gr_vector_int &ninput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items) + { + uint8_t *out = (uint8_t*) output_items[0]; - // copy output - size_t io(0); - nout += ncopy; - const uint8_t* ptr = (uint8_t*) uniform_vector_elements(vect, io); - memcpy(out, ptr, ncopy*d_itemsize); + if (d_curr_len == 0) { + return 0; + } - // save leftover items if needed for next work call - if (nsave > 0) { - d_remain.resize(nsave*d_itemsize, 0); - memcpy(&d_remain[0], ptr + ncopy*d_itemsize, nsave*d_itemsize); + // work() should only be called if the current PDU fits entirely + // into the output buffer. + assert(noutput_items >= d_curr_len); + + // Copy vector output + size_t nout = d_curr_len; + size_t io(0); + const uint8_t* ptr = (uint8_t*) uniform_vector_elements(d_curr_vect, io); + memcpy(out, ptr, d_curr_len*d_itemsize); + + // Copy tags + if (!pmt::eq(d_curr_meta, pmt::PMT_NIL) ) { + pmt::pmt_t klist(pmt::dict_keys(d_curr_meta)); + for (size_t i = 0; i < pmt::length(klist); i++) { + pmt::pmt_t k(pmt::nth(i, klist)); + pmt::pmt_t v(pmt::dict_ref(d_curr_meta, k, pmt::PMT_NIL)); + add_item_tag(0, nitems_written(0), k, v, pmt::mp(alias())); } } + // Reset state + d_curr_len = 0; + return nout; - } + } /* work() */ } /* namespace blocks */ } /* namespace gr */ diff --git a/gr-blocks/lib/pdu_to_tagged_stream_impl.h b/gr-blocks/lib/pdu_to_tagged_stream_impl.h index 4bfd9cfd4e..99f68147c8 100644 --- a/gr-blocks/lib/pdu_to_tagged_stream_impl.h +++ b/gr-blocks/lib/pdu_to_tagged_stream_impl.h @@ -32,18 +32,23 @@ namespace gr { { size_t d_itemsize; pdu::vector_type d_type; - std::vector<uint8_t> d_remain; - pmt::pmt_t d_tag; + pmt::pmt_t d_curr_meta; + pmt::pmt_t d_curr_vect; + size_t d_curr_len; public: pdu_to_tagged_stream_impl(pdu::vector_type type, const std::string& lengthtagname="packet_len"); + int calculate_output_stream_length(const gr_vector_int &ninput_items); + int work(int noutput_items, - gr_vector_const_void_star &input_items, - gr_vector_void_star &output_items); + gr_vector_int &ninput_items, + gr_vector_const_void_star &input_items, + gr_vector_void_star &output_items + ); }; } /* namespace blocks */ } /* namespace gr */ -#endif +#endif /* INCLUDED_PDU_TO_TAGGED_STREAM_IMPL_H */ diff --git a/gr-blocks/python/blocks/qa_pdu.py b/gr-blocks/python/blocks/qa_pdu.py index 5a29e04735..bbee3605ba 100755 --- a/gr-blocks/python/blocks/qa_pdu.py +++ b/gr-blocks/python/blocks/qa_pdu.py @@ -99,8 +99,6 @@ class test_pdu(gr_unittest.TestCase): msg = pmt.cons( pmt.PMT_NIL, pmt.init_f32vector(10, src_data)) src.to_basic_block()._post(port, msg) - src.set_max_noutput_items(5) - self.tb.start() #ideally, would wait until we get ten samples time.sleep(0.2) |