summaryrefslogtreecommitdiff
path: root/gr-blocks/lib/message_source_impl.cc
diff options
context:
space:
mode:
authorJohnathan Corgan <johnathan@corganlabs.com>2013-03-15 09:57:31 -0700
committerJohnathan Corgan <johnathan@corganlabs.com>2013-03-15 09:57:31 -0700
commite965a5bb209ad46a509ccd21f393667fd69d95f9 (patch)
tree40d683ef8f80980f12eac6cfe7f3423c49b45f87 /gr-blocks/lib/message_source_impl.cc
parent2bf9c4cb4b0b426690f353fc1662a13e70c0d5e0 (diff)
parent27990ca9e236931e39a830e48f0a1efe13ec085f (diff)
Merge branch 'ofdm-master' into ofdm-next
Added fixups for next branch changes Conflicts: CMakeLists.txt gnuradio-core/src/lib/io/gr_message_sink.cc gnuradio-core/src/lib/io/gr_message_sink.h gnuradio-core/src/lib/io/gr_message_sink.i gnuradio-core/src/lib/io/gr_message_source.cc gnuradio-core/src/lib/io/gr_message_source.h gnuradio-core/src/lib/io/gr_message_source.i gr-blocks/CMakeLists.txt gr-digital/CMakeLists.txt gr-digital/grc/digital_block_tree.xml gr-digital/include/digital/CMakeLists.txt gr-digital/include/digital_ofdm_cyclic_prefixer.h gr-digital/lib/CMakeLists.txt gr-digital/lib/digital_ofdm_cyclic_prefixer.cc gr-digital/lib/ofdm_cyclic_prefixer_impl.h gr-digital/python/CMakeLists.txt gr-digital/swig/CMakeLists.txt gr-digital/swig/digital_swig.i
Diffstat (limited to 'gr-blocks/lib/message_source_impl.cc')
-rw-r--r--gr-blocks/lib/message_source_impl.cc100
1 files changed, 62 insertions, 38 deletions
diff --git a/gr-blocks/lib/message_source_impl.cc b/gr-blocks/lib/message_source_impl.cc
index cda4fc16c0..818cd336f1 100644
--- a/gr-blocks/lib/message_source_impl.cc
+++ b/gr-blocks/lib/message_source_impl.cc
@@ -51,12 +51,20 @@ namespace gr {
(new message_source_impl(itemsize, msgq));
}
+ message_source::sptr
+ message_source::make(size_t itemsize, gr_msg_queue_sptr msgq,
+ const std::string& lengthtagname)
+ {
+ return gnuradio::get_initial_sptr
+ (new message_source_impl(itemsize, msgq, lengthtagname));
+ }
+
message_source_impl::message_source_impl(size_t itemsize, int msgq_limit)
: gr_sync_block("message_source",
gr_make_io_signature(0, 0, 0),
gr_make_io_signature(1, 1, itemsize)),
d_itemsize(itemsize), d_msgq(gr_make_msg_queue(msgq_limit)),
- d_msg_offset(0), d_eof(false)
+ d_msg_offset(0), d_eof(false), d_tags(false)
{
}
@@ -65,7 +73,17 @@ namespace gr {
gr_make_io_signature(0, 0, 0),
gr_make_io_signature(1, 1, itemsize)),
d_itemsize(itemsize), d_msgq(msgq),
- d_msg_offset(0), d_eof(false)
+ d_msg_offset(0), d_eof(false), d_tags(false)
+ {
+ }
+
+ message_source_impl::message_source_impl(size_t itemsize, gr_msg_queue_sptr msgq,
+ const std::string& lengthtagname)
+ : gr_sync_block("message_source",
+ gr_make_io_signature(0, 0, 0),
+ gr_make_io_signature(1, 1, itemsize)),
+ d_itemsize(itemsize), d_msgq(msgq), d_msg_offset(0), d_eof(false),
+ d_tags(true), d_lengthtagname(lengthtagname)
{
}
@@ -82,42 +100,48 @@ namespace gr {
int nn = 0;
while(nn < noutput_items) {
- if(d_msg) {
- //
- // Consume whatever we can from the current message
- //
- int mm = std::min(noutput_items - nn,
- (int)((d_msg->length() - d_msg_offset) / d_itemsize));
- memcpy(out, &(d_msg->msg()[d_msg_offset]), mm * d_itemsize);
-
- nn += mm;
- out += mm * d_itemsize;
- d_msg_offset += mm * d_itemsize;
- assert(d_msg_offset <= d_msg->length());
-
- if(d_msg_offset == d_msg->length()) {
- if(d_msg->type() == 1) // type == 1 sets EOF
- d_eof = true;
- d_msg.reset();
- }
- }
- else {
- //
- // No current message
- //
- if(d_msgq->empty_p() && nn > 0) { // no more messages in the queue, return what we've got
- break;
- }
-
- if(d_eof)
- return -1;
-
- d_msg = d_msgq->delete_head(); // block, waiting for a message
- d_msg_offset = 0;
-
- if((d_msg->length() % d_itemsize) != 0)
- throw std::runtime_error("msg length is not a multiple of d_itemsize");
- }
+ if (d_msg){
+ //
+ // Consume whatever we can from the current message
+ //
+ int mm = std::min(noutput_items - nn, (int)((d_msg->length() - d_msg_offset) / d_itemsize));
+ memcpy (out, &(d_msg->msg()[d_msg_offset]), mm * d_itemsize);
+
+ if (d_tags && (d_msg_offset == 0)) {
+ const uint64_t offset = this->nitems_written(0) + nn;
+ pmt::pmt_t key = pmt::string_to_symbol(d_lengthtagname);
+ pmt::pmt_t value = pmt::from_long(d_msg->length());
+ this->add_item_tag(0, offset, key, value);
+ }
+ nn += mm;
+ out += mm * d_itemsize;
+ d_msg_offset += mm * d_itemsize;
+ assert(d_msg_offset <= d_msg->length());
+
+ if (d_msg_offset == d_msg->length()){
+ if (d_msg->type() == 1) // type == 1 sets EOF
+ d_eof = true;
+ d_msg.reset();
+ }
+ }
+ else {
+ //
+ // No current message
+ //
+ if (d_msgq->empty_p() && nn > 0){ // no more messages in the queue, return what we've got
+ break;
+ }
+
+ if (d_eof)
+ return -1;
+
+ d_msg = d_msgq->delete_head(); // block, waiting for a message
+ d_msg_offset = 0;
+
+ if ((d_msg->length() % d_itemsize) != 0)
+ throw std::runtime_error("msg length is not a multiple of d_itemsize");
+ }
+
}
return nn;