summaryrefslogtreecommitdiff
path: root/gnuradio-core/src/lib/io
diff options
context:
space:
mode:
Diffstat (limited to 'gnuradio-core/src/lib/io')
-rw-r--r--gnuradio-core/src/lib/io/gr_message_sink.cc70
-rw-r--r--gnuradio-core/src/lib/io/gr_message_sink.h27
-rw-r--r--gnuradio-core/src/lib/io/gr_message_sink.i7
-rw-r--r--gnuradio-core/src/lib/io/gr_message_source.cc45
-rw-r--r--gnuradio-core/src/lib/io/gr_message_source.h12
-rw-r--r--gnuradio-core/src/lib/io/gr_message_source.i4
-rw-r--r--gnuradio-core/src/lib/io/gr_pdu.h13
-rw-r--r--gnuradio-core/src/lib/io/gr_pdu_to_tagged_stream.cc8
-rw-r--r--gnuradio-core/src/lib/io/gr_tagged_stream_to_pdu.cc8
9 files changed, 147 insertions, 47 deletions
diff --git a/gnuradio-core/src/lib/io/gr_message_sink.cc b/gnuradio-core/src/lib/io/gr_message_sink.cc
index ae0b5c7649..3816f5da48 100644
--- a/gnuradio-core/src/lib/io/gr_message_sink.cc
+++ b/gnuradio-core/src/lib/io/gr_message_sink.cc
@@ -34,7 +34,6 @@
#include <stdexcept>
#include <string.h>
-
// public constructor that returns a shared_ptr
gr_message_sink_sptr
@@ -42,12 +41,25 @@ gr_make_message_sink (size_t itemsize, gr_msg_queue_sptr msgq, bool dont_block)
{
return gnuradio::get_initial_sptr(new gr_message_sink(itemsize, msgq, dont_block));
}
+gr_message_sink_sptr
+gr_make_message_sink (size_t itemsize, gr_msg_queue_sptr msgq, bool dont_block, const std::string& lengthtagname)
+{
+ return gnuradio::get_initial_sptr(new gr_message_sink(itemsize, msgq, dont_block, lengthtagname));
+}
gr_message_sink::gr_message_sink (size_t itemsize, gr_msg_queue_sptr msgq, bool dont_block)
: gr_sync_block("message_sink",
gr_make_io_signature(1, 1, itemsize),
gr_make_io_signature(0, 0, 0)),
- d_itemsize(itemsize), d_msgq(msgq), d_dont_block(dont_block)
+ d_itemsize(itemsize), d_msgq(msgq), d_dont_block(dont_block), d_tags(false), d_items_read(0)
+{
+}
+
+gr_message_sink::gr_message_sink (size_t itemsize, gr_msg_queue_sptr msgq, bool dont_block, const std::string& lengthtagname)
+ : gr_sync_block("message_sink",
+ gr_make_io_signature(1, 1, itemsize),
+ gr_make_io_signature(0, 0, 0)),
+ d_itemsize(itemsize), d_msgq(msgq), d_dont_block(dont_block), d_tags(true), d_lengthtagname(lengthtagname), d_items_read(0)
{
}
@@ -62,18 +74,46 @@ gr_message_sink::work(int noutput_items,
{
const char *in = (const char *) input_items[0];
- // if we'd block, drop the data on the floor and say everything is OK
- if (d_dont_block && d_msgq->full_p())
- return noutput_items;
+ if (d_tags) {
+ long packet_length = 0;
+ std::vector<gr_tag_t> tags;
+ this->get_tags_in_range(tags, 0, d_items_read, d_items_read+1);
+ //const size_t ninput_items = noutput_items; //assumption for sync block, this can change
+ for (unsigned int i = 0; i < tags.size(); i++) {
+ if (pmt::pmt_symbol_to_string(tags[i].key) == d_lengthtagname) {
+ packet_length = pmt::pmt_to_long(tags[i].value);
+ }
+ }
+ assert(packet_length != 0);
- // build a message to hold whatever we've got
- gr_message_sptr msg = gr_make_message(0, // msg type
- d_itemsize, // arg1 for other end
- noutput_items, // arg2 for other end (redundant)
- noutput_items * d_itemsize); // len of msg
- memcpy(msg->msg(), in, noutput_items * d_itemsize);
-
- d_msgq->handle(msg); // send it
-
- return noutput_items;
+ // FIXME run this multiple times if input_items >= N * packet_length
+ if (noutput_items >= packet_length ) {
+ // If the message queue is full we drop the packet.
+ if (!d_msgq->full_p()) {
+ gr_message_sptr msg = gr_make_message(0, // msg type
+ d_itemsize, // arg1 for other end
+ packet_length, // arg2 for other end (redundant)
+ packet_length * d_itemsize); // len of msg
+ memcpy(msg->msg(), in, packet_length * d_itemsize);
+ d_msgq->handle(msg); // send it
+ }
+ d_items_read += packet_length;
+ return packet_length;
+ } else {
+ return 0;
+ }
+ } else {
+ // If the queue if full we drop all the data we got.
+ if (!d_msgq->full_p()) {
+ // build a message to hold whatever we've got
+ gr_message_sptr msg = gr_make_message(0, // msg type
+ d_itemsize, // arg1 for other end
+ noutput_items, // arg2 for other end (redundant)
+ noutput_items * d_itemsize); // len of msg
+ memcpy(msg->msg(), in, noutput_items * d_itemsize);
+
+ d_msgq->handle(msg); // send it
+ }
+ return noutput_items;
+ }
}
diff --git a/gnuradio-core/src/lib/io/gr_message_sink.h b/gnuradio-core/src/lib/io/gr_message_sink.h
index 84005694a1..2db84cff44 100644
--- a/gnuradio-core/src/lib/io/gr_message_sink.h
+++ b/gnuradio-core/src/lib/io/gr_message_sink.h
@@ -27,13 +27,23 @@
#include <gr_sync_block.h>
#include <gr_message.h>
#include <gr_msg_queue.h>
+#include <string>
class gr_message_sink;
typedef boost::shared_ptr<gr_message_sink> gr_message_sink_sptr;
-GR_CORE_API gr_message_sink_sptr gr_make_message_sink (size_t itemsize,
- gr_msg_queue_sptr msgq,
- bool dont_block);
+GR_CORE_API gr_message_sink_sptr gr_make_message_sink (
+ size_t itemsize,
+ gr_msg_queue_sptr msgq,
+ bool dont_block
+);
+
+GR_CORE_API gr_message_sink_sptr gr_make_message_sink (
+ size_t itemsize,
+ gr_msg_queue_sptr msgq,
+ bool dont_block,
+ const std::string& lengthtagname
+);
/*!
* \brief Gather received items into messages and insert into msgq
@@ -45,12 +55,21 @@ class GR_CORE_API gr_message_sink : public gr_sync_block
size_t d_itemsize;
gr_msg_queue_sptr d_msgq;
bool d_dont_block;
+ bool d_tags;
+ std::string d_lengthtagname;
+ uint64_t d_items_read;
friend GR_CORE_API gr_message_sink_sptr
- gr_make_message_sink(size_t itemsize, gr_msg_queue_sptr msgq, bool dont_block);
+ gr_make_message_sink(size_t itemsize, gr_msg_queue_sptr msgq,
+ bool dont_block);
+ friend GR_CORE_API gr_message_sink_sptr
+ gr_make_message_sink(size_t itemsize, gr_msg_queue_sptr msgq,
+ bool dont_block, const std::string& lengthtagname);
protected:
gr_message_sink (size_t itemsize, gr_msg_queue_sptr msgq, bool dont_block);
+ gr_message_sink (size_t itemsize, gr_msg_queue_sptr msgq, bool dont_block,
+ const std::string& lengthtagname);
public:
~gr_message_sink ();
diff --git a/gnuradio-core/src/lib/io/gr_message_sink.i b/gnuradio-core/src/lib/io/gr_message_sink.i
index 8415cbd66d..b22eff18c0 100644
--- a/gnuradio-core/src/lib/io/gr_message_sink.i
+++ b/gnuradio-core/src/lib/io/gr_message_sink.i
@@ -22,14 +22,21 @@
GR_SWIG_BLOCK_MAGIC(gr,message_sink);
+#include <string>
+
gr_message_sink_sptr gr_make_message_sink (size_t itemsize,
gr_msg_queue_sptr msgq,
bool dont_block);
+gr_message_sink_sptr gr_make_message_sink (size_t itemsize,
+ gr_msg_queue_sptr msgq,
+ bool dont_block,
+ const std::string& lengthtagname);
class gr_message_sink : public gr_sync_block
{
protected:
gr_message_sink (size_t itemsize, gr_msg_queue_sptr msgq, bool dont_block);
+ gr_message_sink (size_t itemsize, gr_msg_queue_sptr msgq, bool dont_block, const std::string& lengthtagname);
public:
~gr_message_sink ();
diff --git a/gnuradio-core/src/lib/io/gr_message_source.cc b/gnuradio-core/src/lib/io/gr_message_source.cc
index fb3da89a8b..0b79bb526b 100644
--- a/gnuradio-core/src/lib/io/gr_message_source.cc
+++ b/gnuradio-core/src/lib/io/gr_message_source.cc
@@ -36,7 +36,6 @@
// public constructor that returns a shared_ptr
-
gr_message_source_sptr
gr_make_message_source(size_t itemsize, int msgq_limit)
{
@@ -50,11 +49,19 @@ gr_make_message_source(size_t itemsize, gr_msg_queue_sptr msgq)
return gnuradio::get_initial_sptr(new gr_message_source(itemsize, msgq));
}
+// public constructor that takes existing message queue and adds messages length tags.
+gr_message_source_sptr
+gr_make_message_source(size_t itemsize, gr_msg_queue_sptr msgq, const std::string& lengthtagname)
+{
+ return gnuradio::get_initial_sptr(new gr_message_source(itemsize, msgq, lengthtagname));
+}
+
gr_message_source::gr_message_source (size_t itemsize, int msgq_limit)
: gr_sync_block("message_source",
gr_make_io_signature(0, 0, 0),
gr_make_io_signature(1, 1, itemsize)),
- d_itemsize(itemsize), d_msgq(gr_make_msg_queue(msgq_limit)), d_msg_offset(0), d_eof(false)
+ d_itemsize(itemsize), d_msgq(gr_make_msg_queue(msgq_limit)), d_msg_offset(0), d_eof(false),
+ d_tags(false)
{
}
@@ -62,11 +69,19 @@ gr_message_source::gr_message_source (size_t itemsize, gr_msg_queue_sptr msgq)
: gr_sync_block("message_source",
gr_make_io_signature(0, 0, 0),
gr_make_io_signature(1, 1, itemsize)),
- d_itemsize(itemsize), d_msgq(msgq), d_msg_offset(0), d_eof(false)
+ d_itemsize(itemsize), d_msgq(msgq), d_msg_offset(0), d_eof(false), d_tags(false)
{
}
gr_message_source::~gr_message_source()
+{}
+
+gr_message_source::gr_message_source(size_t itemsize, gr_msg_queue_sptr msgq, const std::string& lengthtagname)
+ : gr_sync_block("message_source",
+ gr_make_io_signature(0, 0, 0),
+ gr_make_io_signature(1, 1, itemsize)),
+ d_itemsize(itemsize), d_msgq(msgq), d_msg_offset(0), d_eof(false),
+ d_tags(true), d_lengthtagname(lengthtagname)
{
}
@@ -86,15 +101,21 @@ gr_message_source::work(int noutput_items,
int mm = std::min(noutput_items - nn, (int)((d_msg->length() - d_msg_offset) / d_itemsize));
memcpy (out, &(d_msg->msg()[d_msg_offset]), mm * d_itemsize);
+ if (d_tags && (d_msg_offset == 0)) {
+ const uint64_t offset = this->nitems_written(0) + nn;
+ pmt::pmt_t key = pmt::pmt_string_to_symbol(d_lengthtagname);
+ pmt::pmt_t value = pmt::pmt_from_long(d_msg->length());
+ this->add_item_tag(0, offset, key, value);
+ }
nn += mm;
out += mm * d_itemsize;
d_msg_offset += mm * d_itemsize;
assert(d_msg_offset <= d_msg->length());
if (d_msg_offset == d_msg->length()){
- if (d_msg->type() == 1) // type == 1 sets EOF
- d_eof = true;
- d_msg.reset();
+ if (d_msg->type() == 1) // type == 1 sets EOF
+ d_eof = true;
+ d_msg.reset();
}
}
else {
@@ -102,17 +123,17 @@ gr_message_source::work(int noutput_items,
// No current message
//
if (d_msgq->empty_p() && nn > 0){ // no more messages in the queue, return what we've got
- break;
+ break;
}
-
+
if (d_eof)
- return -1;
-
+ return -1;
+
d_msg = d_msgq->delete_head(); // block, waiting for a message
d_msg_offset = 0;
-
+
if ((d_msg->length() % d_itemsize) != 0)
- throw std::runtime_error("msg length is not a multiple of d_itemsize");
+ throw std::runtime_error("msg length is not a multiple of d_itemsize");
}
}
diff --git a/gnuradio-core/src/lib/io/gr_message_source.h b/gnuradio-core/src/lib/io/gr_message_source.h
index c510d1775f..1828475987 100644
--- a/gnuradio-core/src/lib/io/gr_message_source.h
+++ b/gnuradio-core/src/lib/io/gr_message_source.h
@@ -33,6 +33,8 @@ typedef boost::shared_ptr<gr_message_source> gr_message_source_sptr;
GR_CORE_API gr_message_source_sptr gr_make_message_source (size_t itemsize, int msgq_limit=0);
GR_CORE_API gr_message_source_sptr gr_make_message_source (size_t itemsize, gr_msg_queue_sptr msgq);
+GR_CORE_API gr_message_source_sptr gr_make_message_source (size_t itemsize, gr_msg_queue_sptr msgq,
+ const std::string& lengthtagname);
/*!
* \brief Turn received messages into a stream
@@ -46,15 +48,21 @@ class GR_CORE_API gr_message_source : public gr_sync_block
gr_message_sptr d_msg;
unsigned d_msg_offset;
bool d_eof;
+ bool d_tags;
+ // FIXME: Is this adequate tagname length.
+ std::string d_lengthtagname;
friend GR_CORE_API gr_message_source_sptr
- gr_make_message_source(size_t itemsize, int msgq_limit);
+ gr_make_message_source(size_t itemsize, int msgq_limit);
friend GR_CORE_API gr_message_source_sptr
- gr_make_message_source(size_t itemsize, gr_msg_queue_sptr msgq);
+ gr_make_message_source(size_t itemsize, gr_msg_queue_sptr msgq);
+ friend GR_CORE_API gr_message_source_sptr
+ gr_make_message_source(size_t itemsize, gr_msg_queue_sptr msgq, const std::string& lengthtagname);
protected:
gr_message_source (size_t itemsize, int msgq_limit);
gr_message_source (size_t itemsize, gr_msg_queue_sptr msgq);
+ gr_message_source (size_t itemsize, gr_msg_queue_sptr msgq, const std::string& lengthtagname);
public:
~gr_message_source ();
diff --git a/gnuradio-core/src/lib/io/gr_message_source.i b/gnuradio-core/src/lib/io/gr_message_source.i
index 9ee9157e8c..bb1ddfcc31 100644
--- a/gnuradio-core/src/lib/io/gr_message_source.i
+++ b/gnuradio-core/src/lib/io/gr_message_source.i
@@ -22,14 +22,18 @@
GR_SWIG_BLOCK_MAGIC(gr,message_source);
+#include <string>
+
gr_message_source_sptr gr_make_message_source (size_t itemsize, int msgq_limit=0);
gr_message_source_sptr gr_make_message_source (size_t itemsize, gr_msg_queue_sptr msgq);
+gr_message_source_sptr gr_make_message_source (size_t itemsize, gr_msg_queue_sptr msgq, const std::string& lengthtagname);
class gr_message_source : public gr_sync_block
{
protected:
gr_message_source (size_t itemsize, int msgq_limit);
gr_message_source (size_t itemsize, gr_msg_queue_sptr msgq);
+ gr_message_source (size_t itemsize, gr_msg_queue_sptr msgq, const std::string& lengthtagname);
public:
~gr_message_source ();
diff --git a/gnuradio-core/src/lib/io/gr_pdu.h b/gnuradio-core/src/lib/io/gr_pdu.h
index a5ae87db7f..53058ccb6c 100644
--- a/gnuradio-core/src/lib/io/gr_pdu.h
+++ b/gnuradio-core/src/lib/io/gr_pdu.h
@@ -23,17 +23,18 @@
#ifndef GR_PDU_H
#define GR_PDU_H
+#include <gr_core_api.h>
#include <gr_complex.h>
#include <gruel/pmt.h>
-#define pdu_port_id pmt::mp("pdus")
-#define pdu_length_tag pmt::mp("pdu_length")
+#define PDU_PORT_ID pmt::mp("pdus")
+#define PDU_LENGTH_TAG pmt::mp("pdu_length")
enum gr_pdu_vector_type { pdu_byte, pdu_float, pdu_complex };
-size_t gr_pdu_itemsize(gr_pdu_vector_type type);
-bool gr_pdu_type_matches(gr_pdu_vector_type type, pmt::pmt_t v);
-pmt::pmt_t gr_pdu_make_vector(gr_pdu_vector_type type, const uint8_t* buf, size_t items);
-gr_pdu_vector_type type_from_pmt(pmt::pmt_t vector);
+GR_CORE_API size_t gr_pdu_itemsize(gr_pdu_vector_type type);
+GR_CORE_API bool gr_pdu_type_matches(gr_pdu_vector_type type, pmt::pmt_t v);
+GR_CORE_API pmt::pmt_t gr_pdu_make_vector(gr_pdu_vector_type type, const uint8_t* buf, size_t items);
+GR_CORE_API gr_pdu_vector_type type_from_pmt(pmt::pmt_t vector);
#endif
diff --git a/gnuradio-core/src/lib/io/gr_pdu_to_tagged_stream.cc b/gnuradio-core/src/lib/io/gr_pdu_to_tagged_stream.cc
index 5c319dc39d..a702b66a2b 100644
--- a/gnuradio-core/src/lib/io/gr_pdu_to_tagged_stream.cc
+++ b/gnuradio-core/src/lib/io/gr_pdu_to_tagged_stream.cc
@@ -49,7 +49,7 @@ gr_pdu_to_tagged_stream::gr_pdu_to_tagged_stream (gr_pdu_vector_type t)
gr_make_io_signature(1, 1, gr_pdu_itemsize(t))),
d_vectortype(t), d_itemsize(gr_pdu_itemsize(t))
{
- message_port_register_in(pdu_port_id);
+ message_port_register_in(PDU_PORT_ID);
}
gr_pdu_to_tagged_stream::~gr_pdu_to_tagged_stream()
@@ -77,8 +77,8 @@ gr_pdu_to_tagged_stream::work(int noutput_items,
if(noutput_items > 0){
// grab a message if one exists
- //pmt::pmt_t msg( delete_head_nowait( pdu_port_id ) );
- pmt::pmt_t msg( delete_head_blocking( pdu_port_id ) );
+ //pmt::pmt_t msg( delete_head_nowait( PDU_PORT_ID ) );
+ pmt::pmt_t msg( delete_head_blocking( PDU_PORT_ID ) );
if(msg.get() == NULL ){
return nout;
}
@@ -99,7 +99,7 @@ gr_pdu_to_tagged_stream::work(int noutput_items,
uint64_t offset = nitems_written(0) + nout;
// add a tag for pdu length
- add_item_tag(0, offset, pdu_length_tag, pmt::pmt_from_long( pmt::pmt_length(vect) ), pmt::mp(alias()));
+ add_item_tag(0, offset, PDU_LENGTH_TAG, pmt::pmt_from_long( pmt::pmt_length(vect) ), pmt::mp(alias()));
// if we recieved metadata add it as tags
if( !pmt_eq(meta, pmt::PMT_NIL) ){
diff --git a/gnuradio-core/src/lib/io/gr_tagged_stream_to_pdu.cc b/gnuradio-core/src/lib/io/gr_tagged_stream_to_pdu.cc
index 8211b7672d..1b869edfab 100644
--- a/gnuradio-core/src/lib/io/gr_tagged_stream_to_pdu.cc
+++ b/gnuradio-core/src/lib/io/gr_tagged_stream_to_pdu.cc
@@ -49,7 +49,7 @@ gr_tagged_stream_to_pdu::gr_tagged_stream_to_pdu (gr_pdu_vector_type t)
d_vectortype(t), d_itemsize(gr_pdu_itemsize(t)), d_inpdu(false),
d_pdu_meta(pmt::PMT_NIL), d_pdu_vector(pmt::PMT_NIL)
{
- message_port_register_out(pdu_port_id);
+ message_port_register_out(PDU_PORT_ID);
}
gr_tagged_stream_to_pdu::~gr_tagged_stream_to_pdu()
@@ -70,7 +70,7 @@ gr_tagged_stream_to_pdu::work(int noutput_items,
get_tags_in_range(d_tags, 0, abs_N, abs_N+1);
bool found_length_tag(false);
for(d_tags_itr = d_tags.begin(); (d_tags_itr != d_tags.end()) && (!found_length_tag); d_tags_itr++){
- if( pmt::pmt_equal( (*d_tags_itr).key, pdu_length_tag ) ){
+ if( pmt::pmt_equal( (*d_tags_itr).key, PDU_LENGTH_TAG ) ){
if( (*d_tags_itr).offset != abs_N ){
throw std::runtime_error("expected next pdu length tag on a different item...");
}
@@ -91,7 +91,7 @@ gr_tagged_stream_to_pdu::work(int noutput_items,
// copy any tags in this range into our meta object
get_tags_in_range(d_tags, 0, abs_N, abs_N+ncopy);
for(d_tags_itr = d_tags.begin(); d_tags_itr != d_tags.end(); d_tags_itr++){
- if( ! pmt_equal( (*d_tags_itr).key, pdu_length_tag ) ){
+ if( ! pmt_equal( (*d_tags_itr).key, PDU_LENGTH_TAG ) ){
d_pdu_meta = pmt_dict_add(d_pdu_meta, (*d_tags_itr).key, (*d_tags_itr).value);
}
}
@@ -127,7 +127,7 @@ void gr_tagged_stream_to_pdu::send_message(){
}
pmt::pmt_t msg = pmt::pmt_cons( d_pdu_meta, d_pdu_vector );
- message_port_pub( pdu_port_id, msg );
+ message_port_pub( PDU_PORT_ID, msg );
d_pdu_meta = pmt::PMT_NIL;
d_pdu_vector = pmt::PMT_NIL;