diff options
Diffstat (limited to 'gr-blocks/lib/message_sink_impl.cc')
-rw-r--r-- | gr-blocks/lib/message_sink_impl.cc | 79 |
1 files changed, 64 insertions, 15 deletions
diff --git a/gr-blocks/lib/message_sink_impl.cc b/gr-blocks/lib/message_sink_impl.cc index a8dbfb4c71..fbc7b27d58 100644 --- a/gr-blocks/lib/message_sink_impl.cc +++ b/gr-blocks/lib/message_sink_impl.cc @@ -44,11 +44,30 @@ namespace gr { (new message_sink_impl(itemsize, msgq, dont_block)); } + message_sink::sptr + message_sink::make(size_t itemsize, gr_msg_queue_sptr msgq, bool dont_block, + const std::string& lengthtagname) + { + return gnuradio::get_initial_sptr + (new message_sink_impl(itemsize, msgq, dont_block, lengthtagname)); + } + message_sink_impl::message_sink_impl(size_t itemsize, gr_msg_queue_sptr msgq, bool dont_block) : gr_sync_block("message_sink", gr_make_io_signature(1, 1, itemsize), gr_make_io_signature(0, 0, 0)), - d_itemsize(itemsize), d_msgq(msgq), d_dont_block(dont_block) + d_itemsize(itemsize), d_msgq(msgq), d_dont_block(dont_block), + d_tags(false), d_items_read(0) + { + } + + message_sink_impl::message_sink_impl(size_t itemsize, gr_msg_queue_sptr msgq, bool dont_block, + const std::string& lengthtagname) + : gr_sync_block("message_sink", + gr_make_io_signature(1, 1, itemsize), + gr_make_io_signature(0, 0, 0)), + d_itemsize(itemsize), d_msgq(msgq), d_dont_block(dont_block), + d_tags(true), d_lengthtagname(lengthtagname), d_items_read(0) { } @@ -61,23 +80,53 @@ namespace gr { gr_vector_const_void_star &input_items, gr_vector_void_star &output_items) { - const char *in = (const char*)input_items[0]; - - // if we'd block, drop the data on the floor and say everything is OK - if(d_dont_block && d_msgq->full_p()) - return noutput_items; - - // build a message to hold whatever we've got - gr_message_sptr msg = gr_make_message(0, // msg type - d_itemsize, // arg1 for other end - noutput_items, // arg2 for other end (redundant) - noutput_items * d_itemsize); // len of msg - memcpy(msg->msg(), in, noutput_items * d_itemsize); + const char *in = (const char *) input_items[0]; - d_msgq->handle(msg); // send it + if (d_tags) { + long packet_length = 0; + std::vector<gr_tag_t> tags; + this->get_tags_in_range(tags, 0, d_items_read, d_items_read+1); + //const size_t ninput_items = noutput_items; //assumption for sync block, this can change + for (unsigned int i = 0; i < tags.size(); i++) { + if (pmt::symbol_to_string(tags[i].key) == d_lengthtagname) { + packet_length = pmt::to_long(tags[i].value); + } + } + assert(packet_length != 0); + + // FIXME run this multiple times if input_items >= N * packet_length + if (noutput_items >= packet_length ) { + // If the message queue is full we drop the packet. + if (!d_msgq->full_p()) { + gr_message_sptr msg = gr_make_message(0, // msg type + d_itemsize, // arg1 for other end + packet_length, // arg2 for other end (redundant) + packet_length * d_itemsize); // len of msg + memcpy(msg->msg(), in, packet_length * d_itemsize); + d_msgq->handle(msg); // send it + } + d_items_read += packet_length; + return packet_length; + } else { + return 0; + } + } else { + // If the queue if full we drop all the data we got. + if (!d_msgq->full_p()) { + // build a message to hold whatever we've got + gr_message_sptr msg = gr_make_message(0, // msg type + d_itemsize, // arg1 for other end + noutput_items, // arg2 for other end (redundant) + noutput_items * d_itemsize); // len of msg + memcpy(msg->msg(), in, noutput_items * d_itemsize); + + d_msgq->handle(msg); // send it + } - return noutput_items; + return noutput_items; + } } } /* namespace blocks */ } /* namespace gr */ + |