summaryrefslogtreecommitdiff
path: root/gnuradio-runtime/python/gnuradio/gr/packet_utils.py
blob: 780e0460d1f1d3066ef97e3132773bc51eee4fb2 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/usr/bin/env python
#
# Copyright 2013 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
#

from __future__ import print_function
from __future__ import division
from __future__ import unicode_literals
from gnuradio import gr
import pmt

def make_lengthtags(lengths, offsets, tagname='length', vlen=1):
    tags = []
    assert(len(offsets) == len(lengths))
    for offset, length in zip(offsets, lengths):
        tag = gr.tag_t()
        tag.offset = offset // vlen
        tag.key = pmt.string_to_symbol(tagname)
        tag.value = pmt.from_long(length // vlen)
        tags.append(tag)
    return tags

def string_to_vector(string):
    v = []
    for s in string:
        v.append(ord(s))
    return v

def strings_to_vectors(strings, tsb_tag_key):
    vs = [string_to_vector(string) for string in strings]
    return packets_to_vectors(vs, tsb_tag_key)

def vector_to_string(v):
    s = []
    for d in v:
        s.append(chr(d))
    return ''.join(s)

def vectors_to_strings(data, tags, tsb_tag_key):
    packets = vectors_to_packets(data, tags, tsb_tag_key)
    return [vector_to_string(packet) for packet in packets]

def count_bursts(data, tags, tsb_tag_key, vlen=1):
    lengthtags = [t for t in tags
                  if pmt.symbol_to_string(t.key) == tsb_tag_key]
    lengths = {}
    for tag in lengthtags:
        if tag.offset in lengths:
            raise ValueError(
                "More than one tags with key {0} with the same offset={1}."
                .format(tsb_tag_key, tag.offset))
        lengths[tag.offset] = pmt.to_long(tag.value)*vlen
    in_burst = False
    in_packet = False
    packet_length = None
    packet_pos = None
    burst_count = 0
    for pos in range(len(data)):
        if pos in lengths:
            if in_packet:
                print("Got tag at pos {0} current packet_pos is {1}".format(pos, packet_pos))
                raise Exception("Received packet tag while in packet.")
            packet_pos = -1
            packet_length = lengths[pos]
            in_packet = True
            if not in_burst:
                burst_count += 1
            in_burst = True
        elif not in_packet:
            in_burst = False
        if in_packet:
            packet_pos += 1
            if packet_pos == packet_length-1:
                in_packet = False
                packet_pos = None
    return burst_count

def vectors_to_packets(data, tags, tsb_tag_key, vlen=1):
    lengthtags = [t for t in tags
                  if pmt.symbol_to_string(t.key) == tsb_tag_key]
    lengths = {}
    for tag in lengthtags:
        if tag.offset in lengths:
            raise ValueError(
                "More than one tags with key {0} with the same offset={1}."
                .format(tsb_tag_key, tag.offset))
        lengths[tag.offset] = pmt.to_long(tag.value)*vlen
    if 0 not in lengths:
        raise ValueError("There is no tag with key {0} and an offset of 0"
                         .format(tsb_tag_key))
    pos = 0
    packets = []
    while pos < len(data):
        if pos not in lengths:
            raise ValueError("There is no tag with key {0} and an offset of {1}."
                             "We were expecting one."
                             .format(tsb_tag_key, pos))
        length = lengths[pos]
        if length == 0:
            raise ValueError("Packets cannot have zero length.")
        if pos+length > len(data):
            raise ValueError("The final packet is incomplete.")
        packets.append(data[pos: pos+length])
        pos += length
    return packets

def packets_to_vectors(packets, tsb_tag_key, vlen=1):
    """ Returns a single data vector and a set of tags.
    If used with blocks.vector_source_X, this set of data
    and tags will produced a correct tagged stream. """
    tags = []
    data = []
    offset = 0
    for packet in packets:
        data.extend(packet)
        tag = gr.tag_t()
        tag.offset = offset // vlen
        tag.key = pmt.string_to_symbol(tsb_tag_key)
        tag.value = pmt.from_long(len(packet) // vlen)
        tags.append(tag)
        offset = offset + len(packet)
    return data, tags