diff options
author | Tom Rondeau <trondeau@vt.edu> | 2012-12-14 16:10:30 -0500 |
---|---|---|
committer | Tom Rondeau <trondeau@vt.edu> | 2012-12-14 17:11:40 -0500 |
commit | 461ece56b36a44b2405282630157739c7f9a26ba (patch) | |
tree | c5b1a484a43a35324e1ea1beae7652aa80da5d83 /gnuradio-core/src/python | |
parent | 8e8ed231cd2469e1a39c5ae6af23ac9b29264af7 (diff) |
blocks: moving file metadata sink/source to gr-blocks.
Diffstat (limited to 'gnuradio-core/src/python')
-rw-r--r-- | gnuradio-core/src/python/gnuradio/CMakeLists.txt | 1 | ||||
-rw-r--r-- | gnuradio-core/src/python/gnuradio/gr/qa_file_metadata.py | 196 | ||||
-rw-r--r-- | gnuradio-core/src/python/gnuradio/parse_file_metadata.py | 183 |
3 files changed, 0 insertions, 380 deletions
diff --git a/gnuradio-core/src/python/gnuradio/CMakeLists.txt b/gnuradio-core/src/python/gnuradio/CMakeLists.txt index 31cde6921c..bf696e0d34 100644 --- a/gnuradio-core/src/python/gnuradio/CMakeLists.txt +++ b/gnuradio-core/src/python/gnuradio/CMakeLists.txt @@ -32,7 +32,6 @@ GR_PYTHON_INSTALL(FILES gr_unittest.py gr_xmlrunner.py optfir.py - parse_file_metadata.py window.py DESTINATION ${GR_PYTHON_DIR}/gnuradio COMPONENT "core_python" diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_file_metadata.py b/gnuradio-core/src/python/gnuradio/gr/qa_file_metadata.py deleted file mode 100644 index 849f322991..0000000000 --- a/gnuradio-core/src/python/gnuradio/gr/qa_file_metadata.py +++ /dev/null @@ -1,196 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2012 Free Software Foundation, Inc. -# -# This file is part of GNU Radio -# -# GNU Radio is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3, or (at your option) -# any later version. -# -# GNU Radio is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with GNU Radio; see the file COPYING. If not, write to -# the Free Software Foundation, Inc., 51 Franklin Street, -# Boston, MA 02110-1301, USA. -# - -from gnuradio import gr, gr_unittest -from gnuradio import parse_file_metadata -import pmt -import os, time - -class test_file_metadata(gr_unittest.TestCase): - - def setUp(self): - self.tb = gr.top_block() - - def tearDown(self): - self.tb = None - - def test_001(self): - outfile = "test_out.dat" - - detached = False - samp_rate = 200000 - key = pmt.pmt_intern("samp_rate") - val = pmt.pmt_from_double(samp_rate) - extras = pmt.pmt_make_dict() - extras = pmt.pmt_dict_add(extras, key, val) - extras_str = pmt.pmt_serialize_str(extras) - - src = gr.sig_source_c(samp_rate, gr.GR_COS_WAVE, 1000, 1, 0) - head = gr.head(gr.sizeof_gr_complex, 1000) - fsnk = gr.file_meta_sink(gr.sizeof_gr_complex, outfile, - samp_rate, 1, - gr.GR_FILE_FLOAT, True, - 1000000, extras_str, detached) - fsnk.set_unbuffered(True) - - self.tb.connect(src, head, fsnk) - self.tb.run() - fsnk.close() - - handle = open(outfile, "rb") - header_str = handle.read(parse_file_metadata.HEADER_LENGTH) - if(len(header_str) == 0): - self.assertFalse() - - try: - header = pmt.pmt_deserialize_str(header_str) - except RuntimeError: - self.assertFalse() - - info = parse_file_metadata.parse_header(header, False) - - extra_str = handle.read(info["extra_len"]) - self.assertGreater(len(extra_str), 0) - handle.close() - - try: - extra = pmt.pmt_deserialize_str(extra_str) - except RuntimeError: - self.assertFalse() - - extra_info = parse_file_metadata.parse_extra_dict(extra, info, False) - - self.assertEqual(info['rx_rate'], samp_rate) - self.assertEqual(pmt.pmt_to_double(extra_info['samp_rate']), samp_rate) - - - # Test file metadata source - # Create a new sig source to start from the beginning - src2 = gr.sig_source_c(samp_rate, gr.GR_COS_WAVE, 1000, 1, 0) - fsrc = gr.file_meta_source(outfile, False) - vsnk = gr.vector_sink_c() - tsnk = gr.tag_debug(gr.sizeof_gr_complex, "QA") - ssnk = gr.vector_sink_c() - head.reset() - self.tb.disconnect(src, head, fsnk) - self.tb.connect(fsrc, vsnk) - self.tb.connect(fsrc, tsnk) - self.tb.connect(src2, head, ssnk) - self.tb.run() - - # Test to make sure tags with 'samp_rate' and 'rx_rate' keys - # were generated and received correctly. - tags = tsnk.current_tags() - for t in tags: - if(pmt.pmt_eq(t.key, pmt.pmt_intern("samp_rate"))): - self.assertEqual(pmt.pmt_to_double(t.value), samp_rate) - elif(pmt.pmt_eq(t.key, pmt.pmt_intern("rx_rate"))): - self.assertEqual(pmt.pmt_to_double(t.value), samp_rate) - - # Test that the data portion was extracted and received correctly. - self.assertComplexTuplesAlmostEqual(vsnk.data(), ssnk.data(), 5) - - os.remove(outfile) - - def test_002(self): - outfile = "test_out.dat" - outfile_hdr = "test_out.dat.hdr" - - detached = True - samp_rate = 200000 - key = pmt.pmt_intern("samp_rate") - val = pmt.pmt_from_double(samp_rate) - extras = pmt.pmt_make_dict() - extras = pmt.pmt_dict_add(extras, key, val) - extras_str = pmt.pmt_serialize_str(extras) - - src = gr.sig_source_c(samp_rate, gr.GR_COS_WAVE, 1000, 1, 0) - head = gr.head(gr.sizeof_gr_complex, 1000) - fsnk = gr.file_meta_sink(gr.sizeof_gr_complex, outfile, - samp_rate, 1, - gr.GR_FILE_FLOAT, True, - 1000000, extras_str, detached) - fsnk.set_unbuffered(True) - - self.tb.connect(src, head, fsnk) - self.tb.run() - fsnk.close() - - # Open detached header for reading - handle = open(outfile_hdr, "rb") - header_str = handle.read(parse_file_metadata.HEADER_LENGTH) - if(len(header_str) == 0): - self.assertFalse() - - try: - header = pmt.pmt_deserialize_str(header_str) - except RuntimeError: - self.assertFalse() - - info = parse_file_metadata.parse_header(header, False) - - extra_str = handle.read(info["extra_len"]) - self.assertGreater(len(extra_str), 0) - handle.close() - - try: - extra = pmt.pmt_deserialize_str(extra_str) - except RuntimeError: - self.assertFalse() - - extra_info = parse_file_metadata.parse_extra_dict(extra, info, False) - - self.assertEqual(info['rx_rate'], samp_rate) - self.assertEqual(pmt.pmt_to_double(extra_info['samp_rate']), samp_rate) - - - # Test file metadata source - # Create a new sig source to start from the beginning - src2 = gr.sig_source_c(samp_rate, gr.GR_COS_WAVE, 1000, 1, 0) - fsrc = gr.file_meta_source(outfile, False, detached, outfile_hdr) - vsnk = gr.vector_sink_c() - tsnk = gr.tag_debug(gr.sizeof_gr_complex, "QA") - ssnk = gr.vector_sink_c() - head.reset() - self.tb.disconnect(src, head, fsnk) - self.tb.connect(fsrc, vsnk) - self.tb.connect(fsrc, tsnk) - self.tb.connect(src2, head, ssnk) - self.tb.run() - - # Test to make sure tags with 'samp_rate' and 'rx_rate' keys - # were generated and received correctly. - tags = tsnk.current_tags() - for t in tags: - if(pmt.pmt_eq(t.key, pmt.pmt_intern("samp_rate"))): - self.assertEqual(pmt.pmt_to_double(t.value), samp_rate) - elif(pmt.pmt_eq(t.key, pmt.pmt_intern("rx_rate"))): - self.assertEqual(pmt.pmt_to_double(t.value), samp_rate) - - # Test that the data portion was extracted and received correctly. - self.assertComplexTuplesAlmostEqual(vsnk.data(), ssnk.data(), 5) - - os.remove(outfile) - os.remove(outfile_hdr) - -if __name__ == '__main__': - gr_unittest.run(test_file_metadata, "test_file_metadata.xml") diff --git a/gnuradio-core/src/python/gnuradio/parse_file_metadata.py b/gnuradio-core/src/python/gnuradio/parse_file_metadata.py deleted file mode 100644 index 0cee5a02cc..0000000000 --- a/gnuradio-core/src/python/gnuradio/parse_file_metadata.py +++ /dev/null @@ -1,183 +0,0 @@ -#!/usr/bin/env python -# -# Copyright 2012 Free Software Foundation, Inc. -# -# This file is part of GNU Radio -# -# GNU Radio is free software; you can redistribute it and/or modify -# it under the terms of the GNU General Public License as published by -# the Free Software Foundation; either version 3, or (at your option) -# any later version. -# -# GNU Radio is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU General Public License for more details. -# -# You should have received a copy of the GNU General Public License -# along with GNU Radio; see the file COPYING. If not, write to -# the Free Software Foundation, Inc., 51 Franklin Street, -# Boston, MA 02110-1301, USA. -# - -import sys -from gnuradio import gr -from gruel import pmt - -''' -sr Sample rate (samples/second) -time Time as uint64(secs), double(fractional secs) -type Type of data (see gr_file_types enum) -cplx is complex? (True or False) -strt Start of data (or size of header) in bytes -size Size of data in bytes -''' - -HEADER_LENGTH = gr.METADATA_HEADER_SIZE - -ftype_to_string = {gr.GR_FILE_BYTE: "bytes", - gr.GR_FILE_SHORT: "short", - gr.GR_FILE_INT: "int", - gr.GR_FILE_LONG: "long", - gr.GR_FILE_LONG_LONG: "long long", - gr.GR_FILE_FLOAT: "float", - gr.GR_FILE_DOUBLE: "double" } - -ftype_to_size = {gr.GR_FILE_BYTE: gr.sizeof_char, - gr.GR_FILE_SHORT: gr.sizeof_short, - gr.GR_FILE_INT: gr.sizeof_int, - gr.GR_FILE_LONG: gr.sizeof_int, - gr.GR_FILE_LONG_LONG: 2*gr.sizeof_int, - gr.GR_FILE_FLOAT: gr.sizeof_float, - gr.GR_FILE_DOUBLE: gr.sizeof_double} - -def parse_header(p, VERBOSE=False): - dump = pmt.PMT_NIL - - info = dict() - - if(pmt.pmt_is_dict(p) is False): - sys.stderr.write("Header is not a PMT dictionary: invalid or corrupt data file.\n") - sys.exit(1) - - # GET FILE FORMAT VERSION NUMBER - if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("version"))): - r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("version"), dump) - version = pmt.pmt_to_long(r) - if(VERBOSE): - print "Version Number: {0}".format(version) - else: - sys.stderr.write("Could not find key 'sr': invalid or corrupt data file.\n") - sys.exit(1) - - # EXTRACT SAMPLE RATE - if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("rx_rate"))): - r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("rx_rate"), dump) - samp_rate = pmt.pmt_to_double(r) - info["rx_rate"] = samp_rate - if(VERBOSE): - print "Sample Rate: {0:.2f} sps".format(samp_rate) - else: - sys.stderr.write("Could not find key 'sr': invalid or corrupt data file.\n") - sys.exit(1) - - # EXTRACT TIME STAMP - if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("rx_time"))): - r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("rx_time"), dump) - pmt_secs = pmt.pmt_tuple_ref(r, 0) - pmt_fracs = pmt.pmt_tuple_ref(r, 1) - secs = float(pmt.pmt_to_uint64(pmt_secs)) - fracs = pmt.pmt_to_double(pmt_fracs) - t = secs + fracs - info["rx_time"] = t - if(VERBOSE): - print "Seconds: {0:.6f}".format(t) - else: - sys.stderr.write("Could not find key 'time': invalid or corrupt data file.\n") - sys.exit(1) - - # EXTRACT ITEM SIZE - if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("size"))): - r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("size"), dump) - dsize = pmt.pmt_to_long(r) - info["size"] = dsize - if(VERBOSE): - print "Item size: {0}".format(dsize) - else: - sys.stderr.write("Could not find key 'size': invalid or corrupt data file.\n") - sys.exit(1) - - # EXTRACT DATA TYPE - if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("type"))): - r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("type"), dump) - dtype = pmt.pmt_to_long(r) - stype = ftype_to_string[dtype] - info["type"] = stype - if(VERBOSE): - print "Data Type: {0} ({1})".format(stype, dtype) - else: - sys.stderr.write("Could not find key 'type': invalid or corrupt data file.\n") - sys.exit(1) - - # EXTRACT COMPLEX - if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("cplx"))): - r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("cplx"), dump) - cplx = pmt.pmt_to_bool(r) - info["cplx"] = cplx - if(VERBOSE): - print "Complex? {0}".format(cplx) - else: - sys.stderr.write("Could not find key 'cplx': invalid or corrupt data file.\n") - sys.exit(1) - - # EXTRACT WHERE CURRENT SEGMENT STARTS - if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("strt"))): - r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("strt"), dump) - seg_start = pmt.pmt_to_uint64(r) - info["hdr_len"] = seg_start - info["extra_len"] = seg_start - HEADER_LENGTH - info["has_extra"] = info["extra_len"] > 0 - if(VERBOSE): - print "Header Length: {0} bytes".format(info["hdr_len"]) - print "Extra Length: {0}".format((info["extra_len"])) - print "Extra Header? {0}".format(info["has_extra"]) - else: - sys.stderr.write("Could not find key 'strt': invalid or corrupt data file.\n") - sys.exit(1) - - # EXTRACT SIZE OF DATA - if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("bytes"))): - r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("bytes"), dump) - nbytes = pmt.pmt_to_uint64(r) - - nitems = nbytes/dsize - info["nitems"] = nitems - info["nbytes"] = nbytes - - if(VERBOSE): - print "Size of Data: {0} bytes".format(nbytes) - print " {0} items".format(nitems) - else: - sys.stderr.write("Could not find key 'size': invalid or corrupt data file.\n") - sys.exit(1) - - return info - -# IF THERE IS EXTRA DATA, PULL OUT THE DICTIONARY AND PARSE IT -def parse_extra_dict(p, info, VERBOSE=False): - if(pmt.pmt_is_dict(p) is False): - sys.stderr.write("Extra header is not a PMT dictionary: invalid or corrupt data file.\n") - sys.exit(1) - - items = pmt.pmt_dict_items(p) - nitems = pmt.pmt_length(items) - for i in xrange(nitems): - item = pmt.pmt_nth(i, items) - key = pmt.pmt_symbol_to_string(pmt.pmt_car(item)) - val = pmt.pmt_cdr(item) - info[key] = val - if(VERBOSE): - print "{0}: ".format(key) - pmt.pmt_print(val) - - return info |