summaryrefslogtreecommitdiff
path: root/grc/converter/flow_graph.py
blob: d20c67703c7a8b566cab71efc478e4c1ff909d24 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# Copyright 2017 Free Software Foundation, Inc.
# This file is part of GNU Radio
#
# GNU Radio Companion is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; either version 2
# of the License, or (at your option) any later version.
#
# GNU Radio Companion is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301, USA

from __future__ import absolute_import, division

import ast
from collections import OrderedDict

from ..core.io import yaml
from . import xml


def from_xml(filename):
    """Load flow graph from xml file"""
    element, version_info = xml.load(filename, 'flow_graph.dtd')

    data = convert_flow_graph_xml(element)
    try:
        file_format = int(version_info['format'])
    except KeyError:
        file_format = _guess_file_format_1(data)

    data['metadata'] = {'file_format': file_format}

    return data


def dump(data, stream):
    out = yaml.dump(data, indent=2)

    replace = [
        ('blocks:', '\nblocks:'),
        ('connections:', '\nconnections:'),
        ('metadata:', '\nmetadata:'),
    ]
    for r in replace:
        out = out.replace(*r)
    prefix = '# auto-generated by grc.converter\n\n'
    stream.write(prefix + out)


def convert_flow_graph_xml(node):
    blocks = [
        convert_block(block_data)
        for block_data in node.findall('block')
    ]

    options = next(b for b in blocks if b['id'] == 'options')
    blocks.remove(options)
    options.pop('id')

    connections = [
        convert_connection(connection)
        for connection in node.findall('connection')
        ]

    flow_graph = OrderedDict()
    flow_graph['options'] = options
    flow_graph['blocks'] = blocks
    flow_graph['connections'] = connections
    return flow_graph


def convert_block(data):
    block_id = data.findtext('key')

    params = OrderedDict(sorted(
        (param.findtext('key'), param.findtext('value'))
        for param in data.findall('param')
    ))
    states = OrderedDict()
    x, y = ast.literal_eval(params.pop('_coordinate', '(10, 10)'))
    states['coordinate'] = yaml.ListFlowing([x, y])
    states['rotation'] = int(params.pop('_rotation', '0'))
    enabled = params.pop('_enabled', 'True')
    states['state'] = (
        'enabled' if enabled in ('1', 'True') else
        'bypassed' if enabled == '2' else
        'disabled'
    )

    block = OrderedDict()
    if block_id != 'options':
        block['name'] = params.pop('id')
    block['id'] = block_id
    block['parameters'] = params
    block['states'] = states

    return block


def convert_connection(data):
    src_blk_id = data.findtext('source_block_id')
    src_port_id = data.findtext('source_key')
    snk_blk_id = data.findtext('sink_block_id')
    snk_port_id = data.findtext('sink_key')

    if src_port_id.isdigit():
        src_port_id = 'out' + src_port_id
    if snk_port_id.isdigit():
        snk_port_id = 'in' + snk_port_id

    return yaml.ListFlowing([src_blk_id, src_port_id, snk_blk_id, snk_port_id])


def _guess_file_format_1(data):
    """Try to guess the file format for flow-graph files without version tag"""

    def has_numeric_port_ids(src_id, src_port_id, snk_id, snk_port_id):
        return src_port_id.isdigit() and snk_port_id.is_digit()

    try:
        if any(not has_numeric_port_ids(*con) for con in data['connections']):
            return 1
    except:
        pass
    return 0