1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
|
#!/usr/bin/env python
#
# Copyright 2013 Free Software Foundation, Inc.
#
# This file is part of GNU Radio
#
# SPDX-License-Identifier: GPL-3.0-or-later
#
#
from gnuradio import gr
import pmt
def make_lengthtags(lengths, offsets, tagname='length', vlen=1):
tags = []
assert(len(offsets) == len(lengths))
for offset, length in zip(offsets, lengths):
tag = gr.tag_t()
tag.offset = offset // vlen
tag.key = pmt.string_to_symbol(tagname)
tag.value = pmt.from_long(length // vlen)
tags.append(tag)
return tags
def string_to_vector(string):
v = []
for s in string:
v.append(ord(s))
return v
def strings_to_vectors(strings, tsb_tag_key):
vs = [string_to_vector(string) for string in strings]
return packets_to_vectors(vs, tsb_tag_key)
def vector_to_string(v):
s = []
for d in v:
s.append(chr(d))
return ''.join(s)
def vectors_to_strings(data, tags, tsb_tag_key):
packets = vectors_to_packets(data, tags, tsb_tag_key)
return [vector_to_string(packet) for packet in packets]
def count_bursts(data, tags, tsb_tag_key, vlen=1):
lengthtags = [t for t in tags
if pmt.symbol_to_string(t.key) == tsb_tag_key]
lengths = {}
for tag in lengthtags:
if tag.offset in lengths:
raise ValueError(
"More than one tags with key {0} with the same offset={1}."
.format(tsb_tag_key, tag.offset))
lengths[tag.offset] = pmt.to_long(tag.value) * vlen
in_burst = False
in_packet = False
packet_length = None
packet_pos = None
burst_count = 0
for pos in range(len(data)):
if pos in lengths:
if in_packet:
print("Got tag at pos {0} current packet_pos is {1}".format(
pos, packet_pos))
raise Exception("Received packet tag while in packet.")
packet_pos = -1
packet_length = lengths[pos]
in_packet = True
if not in_burst:
burst_count += 1
in_burst = True
elif not in_packet:
in_burst = False
if in_packet:
packet_pos += 1
if packet_pos == packet_length - 1:
in_packet = False
packet_pos = None
return burst_count
def vectors_to_packets(data, tags, tsb_tag_key, vlen=1):
lengthtags = [t for t in tags
if pmt.symbol_to_string(t.key) == tsb_tag_key]
lengths = {}
for tag in lengthtags:
if tag.offset in lengths:
raise ValueError(
"More than one tags with key {0} with the same offset={1}."
.format(tsb_tag_key, tag.offset))
lengths[tag.offset] = pmt.to_long(tag.value) * vlen
if 0 not in lengths:
raise ValueError("There is no tag with key {0} and an offset of 0"
.format(tsb_tag_key))
pos = 0
packets = []
while pos < len(data):
if pos not in lengths:
raise ValueError("There is no tag with key {0} and an offset of {1}."
"We were expecting one."
.format(tsb_tag_key, pos))
length = lengths[pos]
if length == 0:
raise ValueError("Packets cannot have zero length.")
if pos + length > len(data):
raise ValueError("The final packet is incomplete.")
packets.append(data[pos: pos + length])
pos += length
return packets
def packets_to_vectors(packets, tsb_tag_key, vlen=1):
""" Returns a single data vector and a set of tags.
If used with blocks.vector_source_X, this set of data
and tags will produced a correct tagged stream. """
tags = []
data = []
offset = 0
for packet in packets:
data.extend(packet)
tag = gr.tag_t()
tag.offset = offset // vlen
tag.key = pmt.string_to_symbol(tsb_tag_key)
tag.value = pmt.from_long(len(packet) // vlen)
tags.append(tag)
offset = offset + len(packet)
return data, tags
|