summaryrefslogtreecommitdiff
path: root/gr-digital/python/digital/qa_packet_format.py
diff options
context:
space:
mode:
Diffstat (limited to 'gr-digital/python/digital/qa_packet_format.py')
-rw-r--r--gr-digital/python/digital/qa_packet_format.py47
1 files changed, 46 insertions, 1 deletions
diff --git a/gr-digital/python/digital/qa_packet_format.py b/gr-digital/python/digital/qa_packet_format.py
index 6440b80a5e..ae1a79f1f4 100644
--- a/gr-digital/python/digital/qa_packet_format.py
+++ b/gr-digital/python/digital/qa_packet_format.py
@@ -20,7 +20,7 @@
# Boston, MA 02110-1301, USA.
#
-import random, time, struct
+import time, struct
import pmt
from gnuradio import gr, gr_unittest, digital, blocks
from gnuradio.digital import packet_utils
@@ -80,6 +80,51 @@ class test_packet_format_fb(gr_unittest.TestCase):
self.assertEqual(send_str, payload[0:length])
+ def test_packet_parse_default(self):
+ ac = packet_utils.default_access_code
+ length = '0000000000000001'
+
+ hdr_format_1bps = digital.header_format_default(ac, 0)
+ hdr_format_4bps = digital.header_format_default(ac, 0, 4)
+
+ ac_bits = [int(x) & 1 for x in ac]
+ length_bits = [int(x) & 1 for x in length]
+ header_bits = ac_bits + length_bits + length_bits
+
+ src_hdr = blocks.vector_source_b(header_bits)
+
+ parser_1bps = digital.protocol_parser_b(hdr_format_1bps)
+ parser_4bps = digital.protocol_parser_b(hdr_format_4bps)
+
+ snk_hdr_1bps = blocks.message_debug()
+ snk_hdr_4bps = blocks.message_debug()
+
+ self.tb.connect(src_hdr, parser_1bps)
+ self.tb.connect(src_hdr, parser_4bps)
+
+ self.tb.msg_connect(parser_1bps, 'info', snk_hdr_1bps, 'store')
+ self.tb.msg_connect(parser_4bps, 'info', snk_hdr_4bps, 'store')
+
+ self.tb.start()
+ while (snk_hdr_1bps.num_messages() < 1) and (snk_hdr_4bps.num_messages() < 1):
+ time.sleep(0.1)
+ self.tb.stop()
+ self.tb.wait()
+
+ result_1bps = snk_hdr_1bps.get_message(0)
+ result_4bps = snk_hdr_4bps.get_message(0)
+
+ self.assertTrue(pmt.dict_has_key(
+ result_1bps, pmt.intern('payload symbols')))
+ self.assertEqual(pmt.to_long(pmt.dict_ref(
+ result_1bps, pmt.intern('payload symbols'), pmt.PMT_F)), 8)
+
+ self.assertTrue(pmt.dict_has_key(
+ result_4bps, pmt.intern('payload symbols')))
+ self.assertEqual(pmt.to_long(pmt.dict_ref(
+ result_4bps, pmt.intern('payload symbols'), pmt.PMT_F)), 2)
+
+
def test_packet_format_async_counter(self):
bps = 2
ac = packet_utils.default_access_code