summaryrefslogtreecommitdiff
path: root/gr-utils/python/blocktool/core/parseheader.py
diff options
context:
space:
mode:
Diffstat (limited to 'gr-utils/python/blocktool/core/parseheader.py')
-rw-r--r--gr-utils/python/blocktool/core/parseheader.py277
1 files changed, 277 insertions, 0 deletions
diff --git a/gr-utils/python/blocktool/core/parseheader.py b/gr-utils/python/blocktool/core/parseheader.py
new file mode 100644
index 0000000000..7b1e743554
--- /dev/null
+++ b/gr-utils/python/blocktool/core/parseheader.py
@@ -0,0 +1,277 @@
+#
+# Copyright 2019 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio
+#
+# GNU Radio is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# GNU Radio is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with GNU Radio; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+""" Module to generate AST for the headers and parse it """
+
+from __future__ import print_function
+from __future__ import absolute_import
+from __future__ import unicode_literals
+
+import os
+import re
+import codecs
+import logging
+
+from pygccxml import parser, declarations, utils
+
+from ..core.base import BlockToolException, BlockTool
+from ..core.iosignature import io_signature, message_port
+from ..core.comments import read_comments, add_comments, exist_comments
+from ..core import Constants
+
+LOGGER = logging.getLogger(__name__)
+
+
+class BlockHeaderParser(BlockTool):
+ """
+ : Single argument required: file_path
+ file_path: enter path for the header block in any of GNU Radio module
+ : returns the parsed header data in python dict
+ : return dict keys: namespace, class, io_signature, make,
+ properties, methods
+ : Can be used as an CLI command or an extenal API
+ """
+ name = 'Block Parse Header'
+ description = 'Create a parsed output from a block header file'
+
+ def __init__(self, file_path=None, blocktool_comments=False, **kwargs):
+ """ __init__ """
+ BlockTool.__init__(self, **kwargs)
+ self.parsed_data = {}
+ self.addcomments = blocktool_comments
+ if not os.path.isfile(file_path):
+ raise BlockToolException('file does not exist')
+ file_path = os.path.abspath(file_path)
+ self.target_file = file_path
+ self.initialize()
+ self.validate()
+
+ def initialize(self):
+ """
+ initialize all the required API variables
+ """
+ self.module = self.target_file
+ for dirs in self.module:
+ if not os.path.basename(self.module).startswith(Constants.GR):
+ self.module = os.path.abspath(
+ os.path.join(self.module, os.pardir))
+ self.modname = os.path.basename(self.module)
+ self.filename = os.path.basename(self.target_file)
+ self.targetdir = os.path.dirname(self.target_file)
+ for dirs in os.scandir(self.module):
+ if dirs.is_dir():
+ if dirs.path.endswith('lib'):
+ self.impldir = dirs.path
+ self.impl_file = os.path.join(self.impldir,
+ self.filename.split('.')[0]+'_impl.cc')
+
+ def validate(self):
+ """ Override the Blocktool validate function """
+ BlockTool._validate(self)
+ if not self.filename.endswith('.h'):
+ raise BlockToolException(
+ 'Cannot parse a non-header file')
+
+ def get_header_info(self):
+ """
+ PyGCCXML header code parser
+ magic happens here!
+ : returns the parsed header data in python dict
+ : return dict keys: namespace, class, io_signature, make,
+ properties, methods
+ : Can be used as an CLI command or an extenal API
+ """
+ gr = self.modname.split('-')[0]
+ module = self.modname.split('-')[-1]
+ generator_path, generator_name = utils.find_xml_generator()
+ xml_generator_config = parser.xml_generator_configuration_t(
+ xml_generator_path=generator_path,
+ xml_generator=generator_name,
+ compiler='gcc')
+ decls = parser.parse(
+ [self.target_file], xml_generator_config)
+ global_namespace = declarations.get_global_namespace(decls)
+
+ # namespace
+ try:
+ self.parsed_data['namespace'] = []
+ ns = global_namespace.namespace(gr)
+ if ns is None:
+ raise BlockToolException
+ main_namespace = ns.namespace(module)
+ if main_namespace is None:
+ raise BlockToolException('namespace cannot be none')
+ self.parsed_data['namespace'] = [gr, module]
+ if main_namespace.declarations:
+ for _namespace in main_namespace.declarations:
+ if isinstance(_namespace, declarations.namespace_t):
+ if Constants.KERNEL not in str(_namespace):
+ main_namespace = _namespace
+ self.parsed_data['namespace'].append(
+ str(_namespace).split('::')[-1].split(' ')[0])
+ except RuntimeError:
+ raise BlockToolException(
+ 'Invalid namespace format in the block header file')
+
+ # class
+ try:
+ self.parsed_data['class'] = ''
+ for _class in main_namespace.declarations:
+ if isinstance(_class, declarations.class_t):
+ main_class = _class
+ self.parsed_data['class'] = str(_class).split('::')[
+ 2].split(' ')[0]
+ except RuntimeError:
+ raise BlockToolException(
+ 'Block header namespace {} must consist of a valid class instance'.format(module))
+
+ # io_signature, message_ports
+ self.parsed_data['io_signature'] = {}
+ self.parsed_data['message_port'] = {}
+ if os.path.isfile(self.impl_file) and exist_comments(self):
+ self.parsed_data['io_signature'] = io_signature(
+ self.impl_file)
+ self.parsed_data['message_port'] = message_port(
+ self.impl_file)
+ read_comments(self)
+ elif os.path.isfile(self.impl_file) and not exist_comments(self):
+ self.parsed_data['io_signature'] = io_signature(
+ self.impl_file)
+ self.parsed_data['message_port'] = message_port(
+ self.impl_file)
+ if self.addcomments:
+ add_comments(self)
+ elif not os.path.isfile(self.impl_file) and exist_comments(self):
+ read_comments(self)
+ else:
+ self.parsed_data['io_signature'] = {
+ "input": [],
+ "output": []
+ }
+ self.parsed_data['message_port'] = self.parsed_data['io_signature']
+
+ # make
+ try:
+ self.parsed_data['make'] = {}
+ self.parsed_data['make']['arguments'] = []
+ query_m = declarations.custom_matcher_t(
+ lambda mem_fun: mem_fun.name.startswith('make'))
+ query_make = query_m & declarations.access_type_matcher_t('public')
+ make_func = main_class.member_functions(function=query_make,
+ allow_empty=True,
+ header_file=self.target_file)
+ criteria = declarations.calldef_matcher(name='make')
+ _make_fun = declarations.matcher.get_single(criteria, main_class)
+ _make_fun = str(_make_fun).split(
+ 'make')[-1].split(')')[0].split('(')[1].lstrip().rstrip().split(',')
+ if make_func:
+ for arg in make_func[0].arguments:
+ for _arg in _make_fun:
+ if str(arg.name) in _arg:
+ make_arguments = {
+ "name": str(arg.name),
+ "dtype": str(arg.decl_type),
+ "default": ""
+ }
+ if re.findall(r'[-+]?\d*\.\d+|\d+', _arg):
+ make_arguments['default'] = re.findall(
+ r'[-+]?\d*\.\d+|\d+', _arg)[0]
+ elif re.findall(r'\"(.+?)\"', _arg):
+ make_arguments['default'] = re.findall(
+ r'\"(.+?)\"', _arg)[0]
+ elif "true" in _arg:
+ make_arguments['default'] = "True"
+ elif "false" in _arg:
+ make_arguments['default'] = "False"
+ self.parsed_data['make']['arguments'].append(
+ make_arguments.copy())
+ except RuntimeError:
+ self.parsed_data['make'] = {}
+ self.parsed_data['make']['arguments'] = []
+
+ # setters
+ try:
+ self.parsed_data['methods'] = []
+ query_methods = declarations.access_type_matcher_t('public')
+ setters = main_class.member_functions(function=query_methods,
+ allow_empty=True,
+ header_file=self.target_file)
+ getter_arguments = []
+ if setters:
+ for setter in setters:
+ if str(setter.name).startswith('set_') and setter.arguments:
+ setter_args = {
+ "name": str(setter.name),
+ "arguments_type": []
+ }
+ for argument in setter.arguments:
+ args = {
+ "name": str(argument.name),
+ "dtype": str(argument.decl_type)
+ }
+ getter_arguments.append(args['name'])
+ setter_args['arguments_type'].append(args.copy())
+ self.parsed_data['methods'].append(setter_args.copy())
+ except RuntimeError:
+ self.parsed_data['methods'] = []
+
+ # getters
+ try:
+ self.parsed_data['properties'] = []
+ query_properties = declarations.access_type_matcher_t('public')
+ getters = main_class.member_functions(function=query_properties,
+ allow_empty=True,
+ header_file=self.target_file)
+ if getters:
+ for getter in getters:
+ if not getter.arguments or getter.has_const:
+ getter_args = {
+ "name": str(getter.name),
+ "dtype": str(getter.return_type),
+ "read_only": True
+ }
+ if getter_args['name'] in getter_arguments:
+ getter_args["read_only"] = False
+ self.parsed_data['properties'].append(
+ getter_args.copy())
+ except RuntimeError:
+ self.parsed_data['properties'] = []
+
+ # documentation
+ try:
+ _index = None
+ header_file = codecs.open(self.target_file, 'r', 'cp932')
+ self.parsed_data['docstring'] = re.compile(
+ r'//.*?$|/\*.*?\*/', re.DOTALL | re.MULTILINE).findall(
+ header_file.read())[2:]
+ header_file.close()
+ for doc in self.parsed_data['docstring']:
+ if Constants.BLOCKTOOL in doc:
+ _index = self.parsed_data['docstring'].index(doc)
+ if _index is not None:
+ self.parsed_data['docstring'] = self.parsed_data['docstring'][: _index]
+ except:
+ self.parsed_data['docstring'] = []
+
+ return self.parsed_data
+
+ def run_blocktool(self):
+ """ Run, run, run. """
+ self.get_header_info()