diff options
Diffstat (limited to 'gr-zeromq/python/zeromq/rpc_manager.py')
-rw-r--r-- | gr-zeromq/python/zeromq/rpc_manager.py | 101 |
1 files changed, 101 insertions, 0 deletions
diff --git a/gr-zeromq/python/zeromq/rpc_manager.py b/gr-zeromq/python/zeromq/rpc_manager.py new file mode 100644 index 0000000000..ac8ebfa7cf --- /dev/null +++ b/gr-zeromq/python/zeromq/rpc_manager.py @@ -0,0 +1,101 @@ +# +# Copyright 2013 Free Software Foundation, Inc. +# +# This file is part of GNU Radio. +# +# This is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 3, or (at your option) +# any later version. +# +# This software is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this software; see the file COPYING. If not, write to +# the Free Software Foundation, Inc., 51 Franklin Street, +# Boston, MA 02110-1301, USA. +# + +import zmq +import pmt +import threading + + +class rpc_manager(): + def __init__(self): + self.zmq_context = zmq.Context() + self.poller_rep = zmq.Poller() + self.poller_req_out = zmq.Poller() + self.poller_req_in = zmq.Poller() + self.interfaces = dict() + + def __del__(self): + self.stop_watcher() + self.watcher_thread.join() + + def set_reply_socket(self, address): + self.rep_socket = self.zmq_context.socket(zmq.REP) + self.rep_socket.bind(address) + print "[RPC] reply socket bound to: ", address + self.poller_rep.register(self.rep_socket, zmq.POLLIN) + + def set_request_socket(self, address): + self.req_socket = self.zmq_context.socket(zmq.REQ) + self.req_socket.connect(address) + print "[RPC] request socket connected to: ", address + self.poller_req_out.register(self.req_socket, zmq.POLLOUT) + self.poller_req_in.register(self.req_socket, zmq.POLLIN) + + def add_interface(self, id_str, callback_func): + if not self.interfaces.has_key(id_str): + self.interfaces[id_str] = callback_func + print "[RPC] added reply interface:", id_str + else: + print "ERROR: duplicate id_str" + + def watcher(self): + self.keep_running = True + while self.keep_running: + # poll for calls + socks = dict(self.poller_rep.poll(10)) + if socks.get(self.rep_socket) == zmq.POLLIN: + # receive call + msg = self.rep_socket.recv() + (id_str, args) = pmt.to_python(pmt.deserialize_str(msg)) + print "[RPC] request:", id_str, ", args:", args + reply = self.callback(id_str, args) + self.rep_socket.send(pmt.serialize_str(pmt.to_pmt(reply))) + + def start_watcher(self): + self.watcher_thread = threading.Thread(target=self.watcher,args=()) + self.watcher_thread.daemon = True + self.watcher_thread.start() + + def stop_watcher(self): + self.keep_running = False + self.watcher_thread.join() + + def request(self, id_str, args=None): + socks = dict(self.poller_req_out.poll(10)) + if socks.get(self.req_socket) == zmq.POLLOUT: + self.req_socket.send(pmt.serialize_str(pmt.to_pmt((id_str,args)))) + socks = dict(self.poller_req_in.poll(10)) + if socks.get(self.req_socket) == zmq.POLLIN: + reply = pmt.to_python(pmt.deserialize_str(self.req_socket.recv())) + print "[RPC] reply:", reply + return reply + + def callback(self, id_str, args): + if self.interfaces.has_key(id_str): + callback_func = self.interfaces.get(id_str) + if not args == None: + # use unpacking or splat operator * to unpack argument list + return(callback_func(*args)) + else: + return(callback_func()) + else: + print "[RPC] ERROR: id_str not found" + return None |