diff options
Diffstat (limited to 'gr-blocks/python/blocks/msg_meta_to_pair.py')
-rw-r--r-- | gr-blocks/python/blocks/msg_meta_to_pair.py | 67 |
1 files changed, 67 insertions, 0 deletions
diff --git a/gr-blocks/python/blocks/msg_meta_to_pair.py b/gr-blocks/python/blocks/msg_meta_to_pair.py new file mode 100644 index 0000000000..1c6dde2f61 --- /dev/null +++ b/gr-blocks/python/blocks/msg_meta_to_pair.py @@ -0,0 +1,67 @@ +#!/usr/bin/env python +# +# Copyright 2020 Free Software Foundation, Inc. +# +# This file is part of GNU Radio +# +# SPDX-License-Identifier: GPL-3.0-or-later +# +# + +from gnuradio import gr +import pmt + +class meta_to_pair(gr.sync_block): + """ + This block converts a metadata dictionary item to a pmt pair that is + compatible with other blocks expecting a pair in. You can specify + which item in the incoming metadata to output as a pair and what + the pair name is. + """ + def __init__(self, incomingKeyName, outgoingPairName): + gr.sync_block.__init__(self, name="meta_to_pair", in_sig=None, out_sig=None) + + self.incomingKeyName = str(incomingKeyName) + self.outgoingPairName = str(outgoingPairName) + + self.message_port_register_in(pmt.intern("inmeta")) + self.set_msg_handler(pmt.intern("inmeta"), self.msg_handler) + self.message_port_register_out(pmt.intern("outpair")) + + def msg_handler(self, msg): + if not pmt.is_pair(msg): + gr.log.warn("Incoming message is not a pair. Only pairs are supported. " + "No message generated.") + return + + meta = pmt.to_python(pmt.car(msg)) + + if not type(meta) is dict: + gr.log.warn("Incoming message does not contain a dictionary. " + "No message generated.") + return + + if not self.incomingKeyName in meta: + gr.log.warn("Incoming message dictionary does not contain key %s. " + "No message generated." % self.incomingKeyName) + return + + incomingVal = meta[self.incomingKeyName] + + new_pair = None + + try: + new_pair = pmt.cons(pmt.intern(self.outgoingPairName), pmt.to_pmt(incomingVal)) + except Exception as e: + gr.log.error("Cannot construct new message: %s" % str(e)) + return + + try: + self.message_port_pub(pmt.intern("outpair"), new_pair) + except Exception as e: + gr.log.error("Cannot send message: %s" % str(e)) + gr.log.error("Incoming dictionary (%s):" % str(type(meta))) + gr.log.error(str(meta)) + + def stop(self): + return True |