summaryrefslogtreecommitdiff
path: root/gnuradio-core/src/python
diff options
context:
space:
mode:
authorTom Rondeau <trondeau@vt.edu>2012-12-14 16:10:30 -0500
committerTom Rondeau <trondeau@vt.edu>2012-12-14 17:11:40 -0500
commit461ece56b36a44b2405282630157739c7f9a26ba (patch)
treec5b1a484a43a35324e1ea1beae7652aa80da5d83 /gnuradio-core/src/python
parent8e8ed231cd2469e1a39c5ae6af23ac9b29264af7 (diff)
blocks: moving file metadata sink/source to gr-blocks.
Diffstat (limited to 'gnuradio-core/src/python')
-rw-r--r--gnuradio-core/src/python/gnuradio/CMakeLists.txt1
-rw-r--r--gnuradio-core/src/python/gnuradio/gr/qa_file_metadata.py196
-rw-r--r--gnuradio-core/src/python/gnuradio/parse_file_metadata.py183
3 files changed, 0 insertions, 380 deletions
diff --git a/gnuradio-core/src/python/gnuradio/CMakeLists.txt b/gnuradio-core/src/python/gnuradio/CMakeLists.txt
index 31cde6921c..bf696e0d34 100644
--- a/gnuradio-core/src/python/gnuradio/CMakeLists.txt
+++ b/gnuradio-core/src/python/gnuradio/CMakeLists.txt
@@ -32,7 +32,6 @@ GR_PYTHON_INSTALL(FILES
gr_unittest.py
gr_xmlrunner.py
optfir.py
- parse_file_metadata.py
window.py
DESTINATION ${GR_PYTHON_DIR}/gnuradio
COMPONENT "core_python"
diff --git a/gnuradio-core/src/python/gnuradio/gr/qa_file_metadata.py b/gnuradio-core/src/python/gnuradio/gr/qa_file_metadata.py
deleted file mode 100644
index 849f322991..0000000000
--- a/gnuradio-core/src/python/gnuradio/gr/qa_file_metadata.py
+++ /dev/null
@@ -1,196 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright 2012 Free Software Foundation, Inc.
-#
-# This file is part of GNU Radio
-#
-# GNU Radio is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 3, or (at your option)
-# any later version.
-#
-# GNU Radio is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with GNU Radio; see the file COPYING. If not, write to
-# the Free Software Foundation, Inc., 51 Franklin Street,
-# Boston, MA 02110-1301, USA.
-#
-
-from gnuradio import gr, gr_unittest
-from gnuradio import parse_file_metadata
-import pmt
-import os, time
-
-class test_file_metadata(gr_unittest.TestCase):
-
- def setUp(self):
- self.tb = gr.top_block()
-
- def tearDown(self):
- self.tb = None
-
- def test_001(self):
- outfile = "test_out.dat"
-
- detached = False
- samp_rate = 200000
- key = pmt.pmt_intern("samp_rate")
- val = pmt.pmt_from_double(samp_rate)
- extras = pmt.pmt_make_dict()
- extras = pmt.pmt_dict_add(extras, key, val)
- extras_str = pmt.pmt_serialize_str(extras)
-
- src = gr.sig_source_c(samp_rate, gr.GR_COS_WAVE, 1000, 1, 0)
- head = gr.head(gr.sizeof_gr_complex, 1000)
- fsnk = gr.file_meta_sink(gr.sizeof_gr_complex, outfile,
- samp_rate, 1,
- gr.GR_FILE_FLOAT, True,
- 1000000, extras_str, detached)
- fsnk.set_unbuffered(True)
-
- self.tb.connect(src, head, fsnk)
- self.tb.run()
- fsnk.close()
-
- handle = open(outfile, "rb")
- header_str = handle.read(parse_file_metadata.HEADER_LENGTH)
- if(len(header_str) == 0):
- self.assertFalse()
-
- try:
- header = pmt.pmt_deserialize_str(header_str)
- except RuntimeError:
- self.assertFalse()
-
- info = parse_file_metadata.parse_header(header, False)
-
- extra_str = handle.read(info["extra_len"])
- self.assertGreater(len(extra_str), 0)
- handle.close()
-
- try:
- extra = pmt.pmt_deserialize_str(extra_str)
- except RuntimeError:
- self.assertFalse()
-
- extra_info = parse_file_metadata.parse_extra_dict(extra, info, False)
-
- self.assertEqual(info['rx_rate'], samp_rate)
- self.assertEqual(pmt.pmt_to_double(extra_info['samp_rate']), samp_rate)
-
-
- # Test file metadata source
- # Create a new sig source to start from the beginning
- src2 = gr.sig_source_c(samp_rate, gr.GR_COS_WAVE, 1000, 1, 0)
- fsrc = gr.file_meta_source(outfile, False)
- vsnk = gr.vector_sink_c()
- tsnk = gr.tag_debug(gr.sizeof_gr_complex, "QA")
- ssnk = gr.vector_sink_c()
- head.reset()
- self.tb.disconnect(src, head, fsnk)
- self.tb.connect(fsrc, vsnk)
- self.tb.connect(fsrc, tsnk)
- self.tb.connect(src2, head, ssnk)
- self.tb.run()
-
- # Test to make sure tags with 'samp_rate' and 'rx_rate' keys
- # were generated and received correctly.
- tags = tsnk.current_tags()
- for t in tags:
- if(pmt.pmt_eq(t.key, pmt.pmt_intern("samp_rate"))):
- self.assertEqual(pmt.pmt_to_double(t.value), samp_rate)
- elif(pmt.pmt_eq(t.key, pmt.pmt_intern("rx_rate"))):
- self.assertEqual(pmt.pmt_to_double(t.value), samp_rate)
-
- # Test that the data portion was extracted and received correctly.
- self.assertComplexTuplesAlmostEqual(vsnk.data(), ssnk.data(), 5)
-
- os.remove(outfile)
-
- def test_002(self):
- outfile = "test_out.dat"
- outfile_hdr = "test_out.dat.hdr"
-
- detached = True
- samp_rate = 200000
- key = pmt.pmt_intern("samp_rate")
- val = pmt.pmt_from_double(samp_rate)
- extras = pmt.pmt_make_dict()
- extras = pmt.pmt_dict_add(extras, key, val)
- extras_str = pmt.pmt_serialize_str(extras)
-
- src = gr.sig_source_c(samp_rate, gr.GR_COS_WAVE, 1000, 1, 0)
- head = gr.head(gr.sizeof_gr_complex, 1000)
- fsnk = gr.file_meta_sink(gr.sizeof_gr_complex, outfile,
- samp_rate, 1,
- gr.GR_FILE_FLOAT, True,
- 1000000, extras_str, detached)
- fsnk.set_unbuffered(True)
-
- self.tb.connect(src, head, fsnk)
- self.tb.run()
- fsnk.close()
-
- # Open detached header for reading
- handle = open(outfile_hdr, "rb")
- header_str = handle.read(parse_file_metadata.HEADER_LENGTH)
- if(len(header_str) == 0):
- self.assertFalse()
-
- try:
- header = pmt.pmt_deserialize_str(header_str)
- except RuntimeError:
- self.assertFalse()
-
- info = parse_file_metadata.parse_header(header, False)
-
- extra_str = handle.read(info["extra_len"])
- self.assertGreater(len(extra_str), 0)
- handle.close()
-
- try:
- extra = pmt.pmt_deserialize_str(extra_str)
- except RuntimeError:
- self.assertFalse()
-
- extra_info = parse_file_metadata.parse_extra_dict(extra, info, False)
-
- self.assertEqual(info['rx_rate'], samp_rate)
- self.assertEqual(pmt.pmt_to_double(extra_info['samp_rate']), samp_rate)
-
-
- # Test file metadata source
- # Create a new sig source to start from the beginning
- src2 = gr.sig_source_c(samp_rate, gr.GR_COS_WAVE, 1000, 1, 0)
- fsrc = gr.file_meta_source(outfile, False, detached, outfile_hdr)
- vsnk = gr.vector_sink_c()
- tsnk = gr.tag_debug(gr.sizeof_gr_complex, "QA")
- ssnk = gr.vector_sink_c()
- head.reset()
- self.tb.disconnect(src, head, fsnk)
- self.tb.connect(fsrc, vsnk)
- self.tb.connect(fsrc, tsnk)
- self.tb.connect(src2, head, ssnk)
- self.tb.run()
-
- # Test to make sure tags with 'samp_rate' and 'rx_rate' keys
- # were generated and received correctly.
- tags = tsnk.current_tags()
- for t in tags:
- if(pmt.pmt_eq(t.key, pmt.pmt_intern("samp_rate"))):
- self.assertEqual(pmt.pmt_to_double(t.value), samp_rate)
- elif(pmt.pmt_eq(t.key, pmt.pmt_intern("rx_rate"))):
- self.assertEqual(pmt.pmt_to_double(t.value), samp_rate)
-
- # Test that the data portion was extracted and received correctly.
- self.assertComplexTuplesAlmostEqual(vsnk.data(), ssnk.data(), 5)
-
- os.remove(outfile)
- os.remove(outfile_hdr)
-
-if __name__ == '__main__':
- gr_unittest.run(test_file_metadata, "test_file_metadata.xml")
diff --git a/gnuradio-core/src/python/gnuradio/parse_file_metadata.py b/gnuradio-core/src/python/gnuradio/parse_file_metadata.py
deleted file mode 100644
index 0cee5a02cc..0000000000
--- a/gnuradio-core/src/python/gnuradio/parse_file_metadata.py
+++ /dev/null
@@ -1,183 +0,0 @@
-#!/usr/bin/env python
-#
-# Copyright 2012 Free Software Foundation, Inc.
-#
-# This file is part of GNU Radio
-#
-# GNU Radio is free software; you can redistribute it and/or modify
-# it under the terms of the GNU General Public License as published by
-# the Free Software Foundation; either version 3, or (at your option)
-# any later version.
-#
-# GNU Radio is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with GNU Radio; see the file COPYING. If not, write to
-# the Free Software Foundation, Inc., 51 Franklin Street,
-# Boston, MA 02110-1301, USA.
-#
-
-import sys
-from gnuradio import gr
-from gruel import pmt
-
-'''
-sr Sample rate (samples/second)
-time Time as uint64(secs), double(fractional secs)
-type Type of data (see gr_file_types enum)
-cplx is complex? (True or False)
-strt Start of data (or size of header) in bytes
-size Size of data in bytes
-'''
-
-HEADER_LENGTH = gr.METADATA_HEADER_SIZE
-
-ftype_to_string = {gr.GR_FILE_BYTE: "bytes",
- gr.GR_FILE_SHORT: "short",
- gr.GR_FILE_INT: "int",
- gr.GR_FILE_LONG: "long",
- gr.GR_FILE_LONG_LONG: "long long",
- gr.GR_FILE_FLOAT: "float",
- gr.GR_FILE_DOUBLE: "double" }
-
-ftype_to_size = {gr.GR_FILE_BYTE: gr.sizeof_char,
- gr.GR_FILE_SHORT: gr.sizeof_short,
- gr.GR_FILE_INT: gr.sizeof_int,
- gr.GR_FILE_LONG: gr.sizeof_int,
- gr.GR_FILE_LONG_LONG: 2*gr.sizeof_int,
- gr.GR_FILE_FLOAT: gr.sizeof_float,
- gr.GR_FILE_DOUBLE: gr.sizeof_double}
-
-def parse_header(p, VERBOSE=False):
- dump = pmt.PMT_NIL
-
- info = dict()
-
- if(pmt.pmt_is_dict(p) is False):
- sys.stderr.write("Header is not a PMT dictionary: invalid or corrupt data file.\n")
- sys.exit(1)
-
- # GET FILE FORMAT VERSION NUMBER
- if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("version"))):
- r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("version"), dump)
- version = pmt.pmt_to_long(r)
- if(VERBOSE):
- print "Version Number: {0}".format(version)
- else:
- sys.stderr.write("Could not find key 'sr': invalid or corrupt data file.\n")
- sys.exit(1)
-
- # EXTRACT SAMPLE RATE
- if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("rx_rate"))):
- r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("rx_rate"), dump)
- samp_rate = pmt.pmt_to_double(r)
- info["rx_rate"] = samp_rate
- if(VERBOSE):
- print "Sample Rate: {0:.2f} sps".format(samp_rate)
- else:
- sys.stderr.write("Could not find key 'sr': invalid or corrupt data file.\n")
- sys.exit(1)
-
- # EXTRACT TIME STAMP
- if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("rx_time"))):
- r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("rx_time"), dump)
- pmt_secs = pmt.pmt_tuple_ref(r, 0)
- pmt_fracs = pmt.pmt_tuple_ref(r, 1)
- secs = float(pmt.pmt_to_uint64(pmt_secs))
- fracs = pmt.pmt_to_double(pmt_fracs)
- t = secs + fracs
- info["rx_time"] = t
- if(VERBOSE):
- print "Seconds: {0:.6f}".format(t)
- else:
- sys.stderr.write("Could not find key 'time': invalid or corrupt data file.\n")
- sys.exit(1)
-
- # EXTRACT ITEM SIZE
- if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("size"))):
- r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("size"), dump)
- dsize = pmt.pmt_to_long(r)
- info["size"] = dsize
- if(VERBOSE):
- print "Item size: {0}".format(dsize)
- else:
- sys.stderr.write("Could not find key 'size': invalid or corrupt data file.\n")
- sys.exit(1)
-
- # EXTRACT DATA TYPE
- if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("type"))):
- r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("type"), dump)
- dtype = pmt.pmt_to_long(r)
- stype = ftype_to_string[dtype]
- info["type"] = stype
- if(VERBOSE):
- print "Data Type: {0} ({1})".format(stype, dtype)
- else:
- sys.stderr.write("Could not find key 'type': invalid or corrupt data file.\n")
- sys.exit(1)
-
- # EXTRACT COMPLEX
- if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("cplx"))):
- r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("cplx"), dump)
- cplx = pmt.pmt_to_bool(r)
- info["cplx"] = cplx
- if(VERBOSE):
- print "Complex? {0}".format(cplx)
- else:
- sys.stderr.write("Could not find key 'cplx': invalid or corrupt data file.\n")
- sys.exit(1)
-
- # EXTRACT WHERE CURRENT SEGMENT STARTS
- if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("strt"))):
- r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("strt"), dump)
- seg_start = pmt.pmt_to_uint64(r)
- info["hdr_len"] = seg_start
- info["extra_len"] = seg_start - HEADER_LENGTH
- info["has_extra"] = info["extra_len"] > 0
- if(VERBOSE):
- print "Header Length: {0} bytes".format(info["hdr_len"])
- print "Extra Length: {0}".format((info["extra_len"]))
- print "Extra Header? {0}".format(info["has_extra"])
- else:
- sys.stderr.write("Could not find key 'strt': invalid or corrupt data file.\n")
- sys.exit(1)
-
- # EXTRACT SIZE OF DATA
- if(pmt.pmt_dict_has_key(p, pmt.pmt_string_to_symbol("bytes"))):
- r = pmt.pmt_dict_ref(p, pmt.pmt_string_to_symbol("bytes"), dump)
- nbytes = pmt.pmt_to_uint64(r)
-
- nitems = nbytes/dsize
- info["nitems"] = nitems
- info["nbytes"] = nbytes
-
- if(VERBOSE):
- print "Size of Data: {0} bytes".format(nbytes)
- print " {0} items".format(nitems)
- else:
- sys.stderr.write("Could not find key 'size': invalid or corrupt data file.\n")
- sys.exit(1)
-
- return info
-
-# IF THERE IS EXTRA DATA, PULL OUT THE DICTIONARY AND PARSE IT
-def parse_extra_dict(p, info, VERBOSE=False):
- if(pmt.pmt_is_dict(p) is False):
- sys.stderr.write("Extra header is not a PMT dictionary: invalid or corrupt data file.\n")
- sys.exit(1)
-
- items = pmt.pmt_dict_items(p)
- nitems = pmt.pmt_length(items)
- for i in xrange(nitems):
- item = pmt.pmt_nth(i, items)
- key = pmt.pmt_symbol_to_string(pmt.pmt_car(item))
- val = pmt.pmt_cdr(item)
- info[key] = val
- if(VERBOSE):
- print "{0}: ".format(key)
- pmt.pmt_print(val)
-
- return info