From 6f1cac5d60a19f7f127c35d83eb369f0a99189cd Mon Sep 17 00:00:00 2001
From: Martin Braun <martin.braun@ettus.com>
Date: Wed, 3 Mar 2021 16:51:04 +0100
Subject: runtime: gr_unittest: Add waitFor() API call

This is a helper for adding conditional waits into test cases. Useful
for tests with a non-deterministic execution time.

Signed-off-by: Martin Braun <martin.braun@ettus.com>
---
 gnuradio-runtime/python/gnuradio/gr_unittest.py | 41 +++++++++++++++++++++++++
 1 file changed, 41 insertions(+)

(limited to 'gnuradio-runtime/python/gnuradio/gr_unittest.py')

diff --git a/gnuradio-runtime/python/gnuradio/gr_unittest.py b/gnuradio-runtime/python/gnuradio/gr_unittest.py
index f82355338e..6177f0f32f 100644
--- a/gnuradio-runtime/python/gnuradio/gr_unittest.py
+++ b/gnuradio-runtime/python/gnuradio/gr_unittest.py
@@ -11,8 +11,12 @@
 GNU radio specific extension of unittest.
 """
 
+import time
 import unittest
 
+# We allow snakeCase here for consistency with unittest
+# pylint: disable=invalid-name
+
 class TestCase(unittest.TestCase):
     """A subclass of unittest.TestCase that adds additional assertions
 
@@ -107,6 +111,43 @@ class TestCase(unittest.TestCase):
             for (x, y) in zip(a, b)
         ])
 
+    def waitFor(
+            self,
+            condition,
+            timeout=5.0,
+            poll_interval=0.2,
+            fail_on_timeout=True,
+            fail_msg=None):
+        """
+        Helper function: Wait for a callable to return True within a given
+        timeout.
+
+        This is useful for running tests where an exact wait time is not known.
+
+        Arguments:
+        - condition: A callable. Must return True when a 'good' condition is met.
+        - timeout: Timeout in seconds. `condition` must return True within this
+                   timeout.
+        - poll_interval: Time between calls to condition() in seconds
+        - fail_on_timeout: If True, the test case will fail when the timeout
+                           occurs. If False, this function will return False in
+                           that case.
+        - fail_msg: The message that is printed when a timeout occurs and
+                    fail_on_timeout is true.
+        """
+        if not callable(condition):
+            self.fail("Invalid condition provided to waitFor()!")
+        stop_time = time.monotonic() + timeout
+        while time.monotonic() <= stop_time:
+            if condition():
+                return True
+            time.sleep(poll_interval)
+        if fail_on_timeout:
+            fail_msg = fail_msg or "Timeout exceeded during call to waitFor()!"
+            self.fail(fail_msg)
+        return False
+
+
 TestResult = unittest.TestResult
 TestSuite = unittest.TestSuite
 FunctionTestCase = unittest.FunctionTestCase
-- 
cgit v1.2.3