From 9590d0bb421cb7fdf7dd04d4b1d0d77e3f06f13b Mon Sep 17 00:00:00 2001 From: Narayan Desai Date: Wed, 6 May 2009 01:26:53 +0000 Subject: more to python 2.6 ssl git-svn-id: https://svn.mcs.anl.gov/repos/bcfg/trunk/bcfg2@5187 ce84e21b-d406-0410-9b95-82705330c041 --- src/lib/Component.py | 584 ++++++++++++++++++++++++--------------------------- 1 file changed, 280 insertions(+), 304 deletions(-) (limited to 'src/lib/Component.py') diff --git a/src/lib/Component.py b/src/lib/Component.py index 7cfd3b8c8..d35759603 100644 --- a/src/lib/Component.py +++ b/src/lib/Component.py @@ -1,330 +1,306 @@ -'''Cobalt component base classes''' -__revision__ = '$Revision$' +"""Cobalt component base.""" -import logging, select, signal, socket, sys, urlparse, xmlrpclib, cPickle, os, traceback -from base64 import decodestring +__revision__ = '$Revision$' -import BaseHTTPServer, SimpleXMLRPCServer -import Bcfg2.tlslite.errors -import Bcfg2.tlslite.api -from Bcfg2.tlslite.TLSConnection import TLSConnection +__all__ = ["Component", "exposed", "automatic", "run_component"] -log = logging.getLogger('Component') +import inspect +import os +import cPickle +import pydoc +import sys +import getopt +import logging +import time +import threading +import xmlrpclib -class ComponentInitError(Exception): - '''Raised in case of component initialization failure''' - pass +import Bcfg2.Logger +from Bcfg2.SSLServer import XMLRPCServer -class ComponentKeyError(Exception): - '''raised in case of key parse fails''' - pass +def run_component (component_cls, argv=None, register=True, state_name=False, + cls_kwargs={}, extra_getopt='', time_out=10): + if argv is None: + argv = sys.argv + try: + (opts, arg) = getopt.getopt(argv[1:], 'C:D:d' + extra_getopt) + except getopt.GetoptError, e: + print >> sys.stderr, e + print >> sys.stderr, "Usage:" + print >> sys.stderr, "%s [-d] [-D pidfile] [-C config file]" % (os.path.basename(argv[0])) + sys.exit(1) + + # default settings + daemon = False + pidfile = "" + level = logging.INFO + # get user input + for item in opts: + if item[0] == '-C': + #FIXME + cf = (item[1], ) + elif item[0] == '-D': + daemon = True + pidfile_name = item[1] + elif item[0] == '-d': + level = logging.DEBUG + + logging.getLogger().setLevel(level) + Bcfg2.Logger.log_to_stderr(logging.getLogger()) + Bcfg2.Logger.setup_logging(component_cls.implementation, True, True) -class ForkedChild(Exception): - '''raised after child has been forked''' - pass + if daemon: + child_pid = os.fork() + if child_pid != 0: + return + + os.setsid() + + child_pid = os.fork() + if child_pid != 0: + os._exit(0) + + redirect_file = open("/dev/null", "w+") + os.dup2(redirect_file.fileno(), sys.__stdin__.fileno()) + os.dup2(redirect_file.fileno(), sys.__stdout__.fileno()) + os.dup2(redirect_file.fileno(), sys.__stderr__.fileno()) + + os.chdir(os.sep) + os.umask(0) + + pidfile = open(pidfile_name or "/dev/null", "w") + print >> pidfile, os.getpid() + pidfile.close() -class CobaltXMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): - '''CobaltXMLRPCRequestHandler takes care of ssl xmlrpc requests''' - masterpid = os.getpid() + component = component_cls(**cls_kwargs) + + location = ('', 6789) + keypath = '/etc/bcfg2.key' + certfile = '/etc/bcfg2.key' - def __init__(self, request, client_address, server): - self.cleanup = True - SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.__init__(self, request, - client_address, - server) + server = XMLRPCServer(location, keyfile=keypath, certfile=keypath, + register=register, timeout=time_out) + server.register_instance(component) + + try: + server.serve_forever() + finally: + server.server_close() - def do_POST(self): - '''Overload do_POST to pass through client address information''' - try: - # get arguments - data = self.rfile.read(int(self.headers["content-length"])) +def exposed (func): + """Mark a method to be exposed publically. + + Examples: + class MyComponent (Component): + @expose + def my_method (self, param1, param2): + do_stuff() + + class MyComponent (Component): + def my_method (self, param1, param2): + do_stuff() + my_method = expose(my_method) + """ + func.exposed = True + return func - authenticated = False - #try x509 cert auth (will have been completed, just checking status) - authenticated = self.request.authenticated - #TLSConnection can be accessed by self.request? - - #try httpauth - if not authenticated and "Authorization" in self.headers: - binauth = self.headers['Authorization'].replace("Basic ", "") - namepass = decodestring(binauth).split(':') - if self.server._authenticate_connection("bogus-method", - namepass[0], - namepass[1], - self.client_address): - authenticated = True +def automatic (func, period=10): + """Mark a method to be run periodically.""" + func.automatic = True + func.automatic_period = period + func.automatic_ts = -1 + return func - response = self.server._cobalt_marshalled_dispatch(data, self.client_address, authenticated) - except ForkedChild: - self.cleanup = False - return - except: # This should only happen if the module is buggy - # internal error, report as HTTP server error - log.error("Unexcepted handler failure in do_POST", exc_info=1) - self.send_response(500) - self.end_headers() - else: - # got a valid XML RPC response - if os.getpid() != self.masterpid: - pid = os.fork() - if pid: - self.cleanup = False - return - try: - self.send_response(200) - self.send_header("Content-type", "text/xml") - self.send_header("Content-length", str(len(response))) - self.end_headers() - self.wfile.write(response) - - # shut down the connection - self.wfile.flush() - self.rfile.close() - self.wfile.close() - self.connection.sock.shutdown(2) - #self.wfile.close() - except socket.error: - pass +def locking (func): + """Mark a function as being internally thread safe""" + func.locking = True + return func - def setup(self): - '''Setup a working connection''' - self.cleanup = True - self.connection = self.request - self.rfile = socket._fileobject(self.request, "rb", self.rbufsize) - self.wfile = socket._fileobject(self.request, "wb", self.wbufsize) +def readonly (func): + """Mark a function as read-only -- no data effects in component inst""" + func.readonly = True + return func -class TLSServer(Bcfg2.tlslite.api.TLSSocketServerMixIn, - BaseHTTPServer.HTTPServer): - '''This class is an tlslite-using SSLServer''' - def __init__(self, address, keyfile, certfile, handler, checker=None, - reqCert=False): - print keyfile, certfile - self.sc = Bcfg2.tlslite.api.SessionCache() - self.rc = reqCert - self.master = os.getpid() - x509 = Bcfg2.tlslite.api.X509() - cdata = open(certfile).read() - x509.parse(cdata) - self.checker = checker - kdata = open(keyfile).read() - try: - self.key = Bcfg2.tlslite.api.parsePEMKey(kdata, private=True) - except: - raise ComponentKeyError - self.chain = Bcfg2.tlslite.api.X509CertChain([x509]) - BaseHTTPServer.HTTPServer.__init__(self, address, handler) +def query (func=None, **kwargs): + """Mark a method to be marshalled as a query.""" + def _query (func): + if kwargs.get("all_fields", True): + func.query_all_fields = True + func.query = True + return func + if func is not None: + return _query(func) + return _query - def finish_request(self, sock, address): - sock.settimeout(90) - tlsConnection = TLSConnection(sock) - if self.handshake(tlsConnection) == True: - req = self.RequestHandlerClass(tlsConnection, address, self) - if req.cleanup: - tlsConnection.close() - if os.getpid() != self.master: - os._exit(0) - else: - log.error("Handshake failed") +def marshal_query_result (items, specs=None): + if specs is not None: + fields = get_spec_fields(specs) + else: + fields = None + return [item.to_rx(fields) for item in items] - def handshake(self, tlsConnection): - try: - tlsConnection.handshakeServer(certChain=self.chain, - privateKey=self.key, - sessionCache=self.sc, - checker=self.checker, - reqCert=self.rc) - tlsConnection.ignoreAbruptClose = True - #Connection authenticated during TLS handshake, no need for passwords - if not self.checker == None: - tlsConnection.authenticated = True +class Component (object): + + """Base component. + + Intended to be served as an instance by Cobalt.Component.XMLRPCServer + >>> server = Cobalt.Component.XMLRPCServer(location, keyfile) + >>> component = Cobalt.Component.Component() + >>> server.serve_instance(component) + + Class attributes: + name -- logical component name (e.g., "queue-manager", "process-manager") + implementation -- implementation identifier (e.g., "BlueGene/L", "BlueGene/P") + + Methods: + save -- pickle the component to a file + do_tasks -- perform automatic tasks for the component + """ + + name = "component" + implementation = "generic" + + def __init__ (self, **kwargs): + """Initialize a new component. + + Keyword arguments: + statefile -- file in which to save state automatically + """ + self.statefile = kwargs.get("statefile", None) + self.logger = logging.getLogger("%s %s" % (self.implementation, self.name)) + self.lock = threading.Lock() + + def save (self, statefile=None): + """Pickle the component. + + Arguments: + statefile -- use this file, rather than component.statefile + """ + statefile = statefile or self.statefile + if statefile: + temp_statefile = statefile + ".temp" + data = cPickle.dumps(self) + try: + fd = file(temp_statefile, "wb") + fd.write(data) + fd.close() + except IOError, e: + self.logger.error("statefile failure : %s" % e) + return str(e) else: - tlsConnection.authenticated = False - return True - except Bcfg2.tlslite.errors.TLSError, error: - return False - except socket.error: - return False - -class Component(TLSServer, - SimpleXMLRPCServer.SimpleXMLRPCDispatcher): - """Cobalt component providing XML-RPC access""" - __name__ = 'Component' - __implementation__ = 'Generic' - __statefields__ = [] - async_funcs = [] - fork_funcs = [] - child_limit = 32 - - def __init__(self, keyfile, certfile, password, location): - # need to get addr - self.shut = False - signal.signal(signal.SIGINT, self.start_shutdown) - signal.signal(signal.SIGTERM, self.start_shutdown) - self.logger = logging.getLogger('Component') - self.children = [] - self.static = True - uparsed = urlparse.urlparse(location)[1].split(':') - sock_loc = (uparsed[0], int(uparsed[1])) + os.rename(temp_statefile, statefile) + return "state saved to file: %s" % statefile + save = exposed(save) + + def do_tasks (self): + """Perform automatic tasks for the component. + + Automatic tasks are member callables with an attribute + automatic == True. + """ + for name, func in inspect.getmembers(self, callable): + if getattr(func, "automatic", False): + need_to_lock = not getattr(func, 'locking', False) + if (time.time() - func.automatic_ts) > \ + func.automatic_period: + if need_to_lock: + self.lock.acquire() + try: + mt1 = time.time() + func() + except: + self.logger.error("Automatic method %s failed" \ + % (name), exc_info=1) + finally: + mt2 = time.time() - self.password = password + if need_to_lock: + self.lock.release() + func.__dict__['automatic_ts'] = time.time() + def _resolve_exposed_method (self, method_name): + """Resolve an exposed method. + + Arguments: + method_name -- name of the method to resolve + """ try: - TLSServer.__init__(self, sock_loc, keyfile, certfile, - CobaltXMLRPCRequestHandler) - except socket.error: - self.logger.error("Failed to bind to socket") - raise ComponentInitError - except ComponentKeyError: - self.logger.error("Failed to parse key" % (keyfile)) - raise ComponentInitError - except: - self.logger.error("Failed to load ssl key '%s'" % (keyfile), exc_info=1) - raise ComponentInitError - try: - SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self) - except TypeError: - SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self, False, None) - self.logRequests = 0 - self.port = self.socket.getsockname()[1] - self.url = "https://%s:%s" % (socket.gethostname(), self.port) - self.logger.info("Bound to port %s" % self.port) - self.funcs.update({'system.listMethods':self.addr_system_listMethods}) - self.atime = 0 + func = getattr(self, method_name) + except AttributeError: + raise NoExposedMethod(method_name) + if not getattr(func, "exposed", False): + raise NoExposedMethod(method_name) + return func - def _cobalt_marshalled_dispatch(self, data, address, authenticated=False): - """Decode and dispatch XMLRPC requests. Overloaded to pass through - client address information + def _dispatch (self, method, args, dispatch_dict): + """Custom XML-RPC dispatcher for components. + + method -- XML-RPC method name + args -- tuple of paramaters to method """ - try: - rawparams, method = xmlrpclib.loads(data) - except: - self.logger.error("Failed to parse request from %s" \ - % (address[0])) - #open('/tmp/badreq', 'w').write(data) - return xmlrpclib.dumps(xmlrpclib.Fault(4, "Bad Request")) - if not authenticated: - if len(rawparams) < 2: - self.logger.error("No authentication included with request from %s" % address[0]) - return xmlrpclib.dumps(xmlrpclib.Fault(2, "No Authentication Info")) - user = rawparams[0] - password = rawparams[1] - params = rawparams[2:] - # check authentication - if not self._authenticate_connection(method, user, password, address): - return xmlrpclib.dumps(xmlrpclib.Fault(3, "Authentication Failure")) + need_to_lock = True + if method in dispatch_dict: + method_func = dispatch_dict[method] else: - #there is no prefixed auth info in this case - params = rawparams[0:] - # generate response - try: - # need to add waitpid code here to enforce maxchild - if method in self.fork_funcs: - self.clean_up_children() - self.check_for_free_slot() - pid = os.fork() - if pid: - self.children.append(pid) - raise ForkedChild - # all handlers must take address as the first argument - response = self._dispatch(method, (address, ) + params) - # wrap response in a singleton tuple - response = (response,) - response = xmlrpclib.dumps(response, methodresponse=1) - except xmlrpclib.Fault, fault: - response = xmlrpclib.dumps(fault) - except TypeError, terror: - self.logger.error("Client %s called function %s with wrong argument count" % - (address[0], method), exc_info=1) - response = xmlrpclib.dumps(xmlrpclib.Fault(4, terror.args[0])) - except ForkedChild: - raise - except: - self.logger.error("Unexpected handler failure") - trace = sys.exc_info() - self.logger.error("%s : %s" % (str(trace[0]), str(trace[1]))) - for line in traceback.format_exc().splitlines(): - self.logger.error(line) - del trace - # report exception back to server - response = xmlrpclib.dumps(xmlrpclib.Fault(1, - "%s:%s" % (sys.exc_type, sys.exc_value))) - return response - - def clean_up_children(self): - while True: try: - pid = os.waitpid(0, os.WNOHANG)[0] - if pid: - if pid in self.children: - self.children.remove(pid) - else: - break - except OSError: - break - - def check_for_free_slot(self): - if len(self.children) >= self.child_limit: - self.logger.info("Reached child_limit; waiting for child exit") - pid = os.waitpid(0, 0)[0] - self.children.remove(pid) - - def _authenticate_connection(self, method, user, password, address): - '''Authenticate new connection''' - (user, address, method) - return password == self.password - - def save_state(self): - '''Save fields defined in __statefields__ in /var/spool/cobalt/__implementation__''' - if self.__statefields__: - savedata = tuple([getattr(self, field) for field in self.__statefields__]) + method_func = self._resolve_exposed_method(method) + except Exception, e: + if getattr(e, "log", True): + self.logger.error(e, exc_info=True) + raise xmlrpclib.Fault(getattr(e, "fault_code", 1), str(e)) + + if getattr(method_func, 'locking', False): + need_to_lock = False + if need_to_lock: + lock_start = time.time() + self.lock.acquire() + lock_done = time.time() try: - statefile = open("/var/spool/cobalt/%s" % self.__implementation__, 'w') - # need to flock here - statefile.write(cPickle.dumps(savedata)) - except: - self.logger.info("Statefile save failed; data persistence disabled") - self.__statefields__ = [] - - def load_state(self): - '''Load fields defined in __statefields__ from /var/spool/cobalt/__implementation__''' - if self.__statefields__: - try: - loaddata = cPickle.loads(open("/var/spool/cobalt/%s" % self.__implementation__).read()) - except: - self.logger.info("Statefile load failed") - return - for field in self.__statefields__: - setattr(self, field, loaddata[self.__statefields__.index(field)]) - - def addr_system_listMethods(self, address): - """get rid of the address argument and call the underlying dispatcher method""" - return SimpleXMLRPCServer.SimpleXMLRPCDispatcher.system_listMethods(self) - - def get_request(self): - '''We need to do work between requests, so select with timeout instead of blocking in accept''' - rsockinfo = [] - while self.socket not in rsockinfo: - if self.shut: - raise socket.error - for funcname in self.async_funcs: - func = getattr(self, funcname, False) - if callable(func): - func() - else: - self.logger.error("Cannot call uncallable method %s" % (funcname)) - try: - rsockinfo = select.select([self.socket], [], [], 10)[0] - except select.error: - continue - if self.socket in rsockinfo: - return self.socket.accept() - - def serve_forever(self): - """Handle one request at a time until doomsday.""" - while not self.shut: - self.handle_request() + method_start = time.time() + result = method_func(*args) + method_done = time.time() + except Exception, e: + if getattr(e, "log", True): + self.logger.error(e, exc_info=True) + raise xmlrpclib.Fault(getattr(e, "fault_code", 1), str(e)) + finally: + if need_to_lock: + self.lock.release() + if getattr(method_func, "query", False): + if not getattr(method_func, "query_all_methods", False): + margs = args[:1] + else: + margs = [] + result = marshal_query_result(result, *margs) + return result - def start_shutdown(self, signum, frame): - '''Shutdown on unexpected signals''' - self.shut = True + @exposed + def listMethods (self): + """Custom XML-RPC introspective method list.""" + return [ + name for name, func in inspect.getmembers(self, callable) + if getattr(func, "exposed", False) + ] + @exposed + def methodHelp (self, method_name): + """Custom XML-RPC introspective method help. + + Arguments: + method_name -- name of method to get help on + """ + try: + func = self._resolve_exposed_method(method_name) + except NoExposedMethod: + return "" + return pydoc.getdoc(func) + + def get_name (self): + """The name of the component.""" + return self.name + get_name = exposed(get_name) + + def get_implementation (self): + """The implementation of the component.""" + return self.implementation + get_implementation = exposed(get_implementation) -- cgit v1.2.3-1-g7c22