summaryrefslogtreecommitdiff
path: root/gr-zeromq/python/zeromq/rpc_manager.py
diff options
context:
space:
mode:
Diffstat (limited to 'gr-zeromq/python/zeromq/rpc_manager.py')
-rw-r--r--gr-zeromq/python/zeromq/rpc_manager.py101
1 files changed, 101 insertions, 0 deletions
diff --git a/gr-zeromq/python/zeromq/rpc_manager.py b/gr-zeromq/python/zeromq/rpc_manager.py
new file mode 100644
index 0000000000..ac8ebfa7cf
--- /dev/null
+++ b/gr-zeromq/python/zeromq/rpc_manager.py
@@ -0,0 +1,101 @@
+#
+# Copyright 2013 Free Software Foundation, Inc.
+#
+# This file is part of GNU Radio.
+#
+# This is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3, or (at your option)
+# any later version.
+#
+# This software is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this software; see the file COPYING. If not, write to
+# the Free Software Foundation, Inc., 51 Franklin Street,
+# Boston, MA 02110-1301, USA.
+#
+
+import zmq
+import pmt
+import threading
+
+
+class rpc_manager():
+ def __init__(self):
+ self.zmq_context = zmq.Context()
+ self.poller_rep = zmq.Poller()
+ self.poller_req_out = zmq.Poller()
+ self.poller_req_in = zmq.Poller()
+ self.interfaces = dict()
+
+ def __del__(self):
+ self.stop_watcher()
+ self.watcher_thread.join()
+
+ def set_reply_socket(self, address):
+ self.rep_socket = self.zmq_context.socket(zmq.REP)
+ self.rep_socket.bind(address)
+ print "[RPC] reply socket bound to: ", address
+ self.poller_rep.register(self.rep_socket, zmq.POLLIN)
+
+ def set_request_socket(self, address):
+ self.req_socket = self.zmq_context.socket(zmq.REQ)
+ self.req_socket.connect(address)
+ print "[RPC] request socket connected to: ", address
+ self.poller_req_out.register(self.req_socket, zmq.POLLOUT)
+ self.poller_req_in.register(self.req_socket, zmq.POLLIN)
+
+ def add_interface(self, id_str, callback_func):
+ if not self.interfaces.has_key(id_str):
+ self.interfaces[id_str] = callback_func
+ print "[RPC] added reply interface:", id_str
+ else:
+ print "ERROR: duplicate id_str"
+
+ def watcher(self):
+ self.keep_running = True
+ while self.keep_running:
+ # poll for calls
+ socks = dict(self.poller_rep.poll(10))
+ if socks.get(self.rep_socket) == zmq.POLLIN:
+ # receive call
+ msg = self.rep_socket.recv()
+ (id_str, args) = pmt.to_python(pmt.deserialize_str(msg))
+ print "[RPC] request:", id_str, ", args:", args
+ reply = self.callback(id_str, args)
+ self.rep_socket.send(pmt.serialize_str(pmt.to_pmt(reply)))
+
+ def start_watcher(self):
+ self.watcher_thread = threading.Thread(target=self.watcher,args=())
+ self.watcher_thread.daemon = True
+ self.watcher_thread.start()
+
+ def stop_watcher(self):
+ self.keep_running = False
+ self.watcher_thread.join()
+
+ def request(self, id_str, args=None):
+ socks = dict(self.poller_req_out.poll(10))
+ if socks.get(self.req_socket) == zmq.POLLOUT:
+ self.req_socket.send(pmt.serialize_str(pmt.to_pmt((id_str,args))))
+ socks = dict(self.poller_req_in.poll(10))
+ if socks.get(self.req_socket) == zmq.POLLIN:
+ reply = pmt.to_python(pmt.deserialize_str(self.req_socket.recv()))
+ print "[RPC] reply:", reply
+ return reply
+
+ def callback(self, id_str, args):
+ if self.interfaces.has_key(id_str):
+ callback_func = self.interfaces.get(id_str)
+ if not args == None:
+ # use unpacking or splat operator * to unpack argument list
+ return(callback_func(*args))
+ else:
+ return(callback_func())
+ else:
+ print "[RPC] ERROR: id_str not found"
+ return None