summaryrefslogtreecommitdiffstats
path: root/src/lib/Component.py
diff options
context:
space:
mode:
authorNarayan Desai <desai@mcs.anl.gov>2009-05-06 01:26:53 +0000
committerNarayan Desai <desai@mcs.anl.gov>2009-05-06 01:26:53 +0000
commit9590d0bb421cb7fdf7dd04d4b1d0d77e3f06f13b (patch)
tree768763aa48be1a5a2c8dae7cba81859510f1146e /src/lib/Component.py
parent13f6d1554dd24d08d44662906fa9f3f008a23058 (diff)
downloadbcfg2-9590d0bb421cb7fdf7dd04d4b1d0d77e3f06f13b.tar.gz
bcfg2-9590d0bb421cb7fdf7dd04d4b1d0d77e3f06f13b.tar.bz2
bcfg2-9590d0bb421cb7fdf7dd04d4b1d0d77e3f06f13b.zip
more to python 2.6 ssl
git-svn-id: https://svn.mcs.anl.gov/repos/bcfg/trunk/bcfg2@5187 ce84e21b-d406-0410-9b95-82705330c041
Diffstat (limited to 'src/lib/Component.py')
-rw-r--r--src/lib/Component.py584
1 files changed, 280 insertions, 304 deletions
diff --git a/src/lib/Component.py b/src/lib/Component.py
index 7cfd3b8c8..d35759603 100644
--- a/src/lib/Component.py
+++ b/src/lib/Component.py
@@ -1,330 +1,306 @@
-'''Cobalt component base classes'''
-__revision__ = '$Revision$'
+"""Cobalt component base."""
-import logging, select, signal, socket, sys, urlparse, xmlrpclib, cPickle, os, traceback
-from base64 import decodestring
+__revision__ = '$Revision$'
-import BaseHTTPServer, SimpleXMLRPCServer
-import Bcfg2.tlslite.errors
-import Bcfg2.tlslite.api
-from Bcfg2.tlslite.TLSConnection import TLSConnection
+__all__ = ["Component", "exposed", "automatic", "run_component"]
-log = logging.getLogger('Component')
+import inspect
+import os
+import cPickle
+import pydoc
+import sys
+import getopt
+import logging
+import time
+import threading
+import xmlrpclib
-class ComponentInitError(Exception):
- '''Raised in case of component initialization failure'''
- pass
+import Bcfg2.Logger
+from Bcfg2.SSLServer import XMLRPCServer
-class ComponentKeyError(Exception):
- '''raised in case of key parse fails'''
- pass
+def run_component (component_cls, argv=None, register=True, state_name=False,
+ cls_kwargs={}, extra_getopt='', time_out=10):
+ if argv is None:
+ argv = sys.argv
+ try:
+ (opts, arg) = getopt.getopt(argv[1:], 'C:D:d' + extra_getopt)
+ except getopt.GetoptError, e:
+ print >> sys.stderr, e
+ print >> sys.stderr, "Usage:"
+ print >> sys.stderr, "%s [-d] [-D pidfile] [-C config file]" % (os.path.basename(argv[0]))
+ sys.exit(1)
+
+ # default settings
+ daemon = False
+ pidfile = ""
+ level = logging.INFO
+ # get user input
+ for item in opts:
+ if item[0] == '-C':
+ #FIXME
+ cf = (item[1], )
+ elif item[0] == '-D':
+ daemon = True
+ pidfile_name = item[1]
+ elif item[0] == '-d':
+ level = logging.DEBUG
+
+ logging.getLogger().setLevel(level)
+ Bcfg2.Logger.log_to_stderr(logging.getLogger())
+ Bcfg2.Logger.setup_logging(component_cls.implementation, True, True)
-class ForkedChild(Exception):
- '''raised after child has been forked'''
- pass
+ if daemon:
+ child_pid = os.fork()
+ if child_pid != 0:
+ return
+
+ os.setsid()
+
+ child_pid = os.fork()
+ if child_pid != 0:
+ os._exit(0)
+
+ redirect_file = open("/dev/null", "w+")
+ os.dup2(redirect_file.fileno(), sys.__stdin__.fileno())
+ os.dup2(redirect_file.fileno(), sys.__stdout__.fileno())
+ os.dup2(redirect_file.fileno(), sys.__stderr__.fileno())
+
+ os.chdir(os.sep)
+ os.umask(0)
+
+ pidfile = open(pidfile_name or "/dev/null", "w")
+ print >> pidfile, os.getpid()
+ pidfile.close()
-class CobaltXMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
- '''CobaltXMLRPCRequestHandler takes care of ssl xmlrpc requests'''
- masterpid = os.getpid()
+ component = component_cls(**cls_kwargs)
+
+ location = ('', 6789)
+ keypath = '/etc/bcfg2.key'
+ certfile = '/etc/bcfg2.key'
- def __init__(self, request, client_address, server):
- self.cleanup = True
- SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.__init__(self, request,
- client_address,
- server)
+ server = XMLRPCServer(location, keyfile=keypath, certfile=keypath,
+ register=register, timeout=time_out)
+ server.register_instance(component)
+
+ try:
+ server.serve_forever()
+ finally:
+ server.server_close()
- def do_POST(self):
- '''Overload do_POST to pass through client address information'''
- try:
- # get arguments
- data = self.rfile.read(int(self.headers["content-length"]))
+def exposed (func):
+ """Mark a method to be exposed publically.
+
+ Examples:
+ class MyComponent (Component):
+ @expose
+ def my_method (self, param1, param2):
+ do_stuff()
+
+ class MyComponent (Component):
+ def my_method (self, param1, param2):
+ do_stuff()
+ my_method = expose(my_method)
+ """
+ func.exposed = True
+ return func
- authenticated = False
- #try x509 cert auth (will have been completed, just checking status)
- authenticated = self.request.authenticated
- #TLSConnection can be accessed by self.request?
-
- #try httpauth
- if not authenticated and "Authorization" in self.headers:
- binauth = self.headers['Authorization'].replace("Basic ", "")
- namepass = decodestring(binauth).split(':')
- if self.server._authenticate_connection("bogus-method",
- namepass[0],
- namepass[1],
- self.client_address):
- authenticated = True
+def automatic (func, period=10):
+ """Mark a method to be run periodically."""
+ func.automatic = True
+ func.automatic_period = period
+ func.automatic_ts = -1
+ return func
- response = self.server._cobalt_marshalled_dispatch(data, self.client_address, authenticated)
- except ForkedChild:
- self.cleanup = False
- return
- except: # This should only happen if the module is buggy
- # internal error, report as HTTP server error
- log.error("Unexcepted handler failure in do_POST", exc_info=1)
- self.send_response(500)
- self.end_headers()
- else:
- # got a valid XML RPC response
- if os.getpid() != self.masterpid:
- pid = os.fork()
- if pid:
- self.cleanup = False
- return
- try:
- self.send_response(200)
- self.send_header("Content-type", "text/xml")
- self.send_header("Content-length", str(len(response)))
- self.end_headers()
- self.wfile.write(response)
-
- # shut down the connection
- self.wfile.flush()
- self.rfile.close()
- self.wfile.close()
- self.connection.sock.shutdown(2)
- #self.wfile.close()
- except socket.error:
- pass
+def locking (func):
+ """Mark a function as being internally thread safe"""
+ func.locking = True
+ return func
- def setup(self):
- '''Setup a working connection'''
- self.cleanup = True
- self.connection = self.request
- self.rfile = socket._fileobject(self.request, "rb", self.rbufsize)
- self.wfile = socket._fileobject(self.request, "wb", self.wbufsize)
+def readonly (func):
+ """Mark a function as read-only -- no data effects in component inst"""
+ func.readonly = True
+ return func
-class TLSServer(Bcfg2.tlslite.api.TLSSocketServerMixIn,
- BaseHTTPServer.HTTPServer):
- '''This class is an tlslite-using SSLServer'''
- def __init__(self, address, keyfile, certfile, handler, checker=None,
- reqCert=False):
- print keyfile, certfile
- self.sc = Bcfg2.tlslite.api.SessionCache()
- self.rc = reqCert
- self.master = os.getpid()
- x509 = Bcfg2.tlslite.api.X509()
- cdata = open(certfile).read()
- x509.parse(cdata)
- self.checker = checker
- kdata = open(keyfile).read()
- try:
- self.key = Bcfg2.tlslite.api.parsePEMKey(kdata, private=True)
- except:
- raise ComponentKeyError
- self.chain = Bcfg2.tlslite.api.X509CertChain([x509])
- BaseHTTPServer.HTTPServer.__init__(self, address, handler)
+def query (func=None, **kwargs):
+ """Mark a method to be marshalled as a query."""
+ def _query (func):
+ if kwargs.get("all_fields", True):
+ func.query_all_fields = True
+ func.query = True
+ return func
+ if func is not None:
+ return _query(func)
+ return _query
- def finish_request(self, sock, address):
- sock.settimeout(90)
- tlsConnection = TLSConnection(sock)
- if self.handshake(tlsConnection) == True:
- req = self.RequestHandlerClass(tlsConnection, address, self)
- if req.cleanup:
- tlsConnection.close()
- if os.getpid() != self.master:
- os._exit(0)
- else:
- log.error("Handshake failed")
+def marshal_query_result (items, specs=None):
+ if specs is not None:
+ fields = get_spec_fields(specs)
+ else:
+ fields = None
+ return [item.to_rx(fields) for item in items]
- def handshake(self, tlsConnection):
- try:
- tlsConnection.handshakeServer(certChain=self.chain,
- privateKey=self.key,
- sessionCache=self.sc,
- checker=self.checker,
- reqCert=self.rc)
- tlsConnection.ignoreAbruptClose = True
- #Connection authenticated during TLS handshake, no need for passwords
- if not self.checker == None:
- tlsConnection.authenticated = True
+class Component (object):
+
+ """Base component.
+
+ Intended to be served as an instance by Cobalt.Component.XMLRPCServer
+ >>> server = Cobalt.Component.XMLRPCServer(location, keyfile)
+ >>> component = Cobalt.Component.Component()
+ >>> server.serve_instance(component)
+
+ Class attributes:
+ name -- logical component name (e.g., "queue-manager", "process-manager")
+ implementation -- implementation identifier (e.g., "BlueGene/L", "BlueGene/P")
+
+ Methods:
+ save -- pickle the component to a file
+ do_tasks -- perform automatic tasks for the component
+ """
+
+ name = "component"
+ implementation = "generic"
+
+ def __init__ (self, **kwargs):
+ """Initialize a new component.
+
+ Keyword arguments:
+ statefile -- file in which to save state automatically
+ """
+ self.statefile = kwargs.get("statefile", None)
+ self.logger = logging.getLogger("%s %s" % (self.implementation, self.name))
+ self.lock = threading.Lock()
+
+ def save (self, statefile=None):
+ """Pickle the component.
+
+ Arguments:
+ statefile -- use this file, rather than component.statefile
+ """
+ statefile = statefile or self.statefile
+ if statefile:
+ temp_statefile = statefile + ".temp"
+ data = cPickle.dumps(self)
+ try:
+ fd = file(temp_statefile, "wb")
+ fd.write(data)
+ fd.close()
+ except IOError, e:
+ self.logger.error("statefile failure : %s" % e)
+ return str(e)
else:
- tlsConnection.authenticated = False
- return True
- except Bcfg2.tlslite.errors.TLSError, error:
- return False
- except socket.error:
- return False
-
-class Component(TLSServer,
- SimpleXMLRPCServer.SimpleXMLRPCDispatcher):
- """Cobalt component providing XML-RPC access"""
- __name__ = 'Component'
- __implementation__ = 'Generic'
- __statefields__ = []
- async_funcs = []
- fork_funcs = []
- child_limit = 32
-
- def __init__(self, keyfile, certfile, password, location):
- # need to get addr
- self.shut = False
- signal.signal(signal.SIGINT, self.start_shutdown)
- signal.signal(signal.SIGTERM, self.start_shutdown)
- self.logger = logging.getLogger('Component')
- self.children = []
- self.static = True
- uparsed = urlparse.urlparse(location)[1].split(':')
- sock_loc = (uparsed[0], int(uparsed[1]))
+ os.rename(temp_statefile, statefile)
+ return "state saved to file: %s" % statefile
+ save = exposed(save)
+
+ def do_tasks (self):
+ """Perform automatic tasks for the component.
+
+ Automatic tasks are member callables with an attribute
+ automatic == True.
+ """
+ for name, func in inspect.getmembers(self, callable):
+ if getattr(func, "automatic", False):
+ need_to_lock = not getattr(func, 'locking', False)
+ if (time.time() - func.automatic_ts) > \
+ func.automatic_period:
+ if need_to_lock:
+ self.lock.acquire()
+ try:
+ mt1 = time.time()
+ func()
+ except:
+ self.logger.error("Automatic method %s failed" \
+ % (name), exc_info=1)
+ finally:
+ mt2 = time.time()
- self.password = password
+ if need_to_lock:
+ self.lock.release()
+ func.__dict__['automatic_ts'] = time.time()
+ def _resolve_exposed_method (self, method_name):
+ """Resolve an exposed method.
+
+ Arguments:
+ method_name -- name of the method to resolve
+ """
try:
- TLSServer.__init__(self, sock_loc, keyfile, certfile,
- CobaltXMLRPCRequestHandler)
- except socket.error:
- self.logger.error("Failed to bind to socket")
- raise ComponentInitError
- except ComponentKeyError:
- self.logger.error("Failed to parse key" % (keyfile))
- raise ComponentInitError
- except:
- self.logger.error("Failed to load ssl key '%s'" % (keyfile), exc_info=1)
- raise ComponentInitError
- try:
- SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self)
- except TypeError:
- SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self, False, None)
- self.logRequests = 0
- self.port = self.socket.getsockname()[1]
- self.url = "https://%s:%s" % (socket.gethostname(), self.port)
- self.logger.info("Bound to port %s" % self.port)
- self.funcs.update({'system.listMethods':self.addr_system_listMethods})
- self.atime = 0
+ func = getattr(self, method_name)
+ except AttributeError:
+ raise NoExposedMethod(method_name)
+ if not getattr(func, "exposed", False):
+ raise NoExposedMethod(method_name)
+ return func
- def _cobalt_marshalled_dispatch(self, data, address, authenticated=False):
- """Decode and dispatch XMLRPC requests. Overloaded to pass through
- client address information
+ def _dispatch (self, method, args, dispatch_dict):
+ """Custom XML-RPC dispatcher for components.
+
+ method -- XML-RPC method name
+ args -- tuple of paramaters to method
"""
- try:
- rawparams, method = xmlrpclib.loads(data)
- except:
- self.logger.error("Failed to parse request from %s" \
- % (address[0]))
- #open('/tmp/badreq', 'w').write(data)
- return xmlrpclib.dumps(xmlrpclib.Fault(4, "Bad Request"))
- if not authenticated:
- if len(rawparams) < 2:
- self.logger.error("No authentication included with request from %s" % address[0])
- return xmlrpclib.dumps(xmlrpclib.Fault(2, "No Authentication Info"))
- user = rawparams[0]
- password = rawparams[1]
- params = rawparams[2:]
- # check authentication
- if not self._authenticate_connection(method, user, password, address):
- return xmlrpclib.dumps(xmlrpclib.Fault(3, "Authentication Failure"))
+ need_to_lock = True
+ if method in dispatch_dict:
+ method_func = dispatch_dict[method]
else:
- #there is no prefixed auth info in this case
- params = rawparams[0:]
- # generate response
- try:
- # need to add waitpid code here to enforce maxchild
- if method in self.fork_funcs:
- self.clean_up_children()
- self.check_for_free_slot()
- pid = os.fork()
- if pid:
- self.children.append(pid)
- raise ForkedChild
- # all handlers must take address as the first argument
- response = self._dispatch(method, (address, ) + params)
- # wrap response in a singleton tuple
- response = (response,)
- response = xmlrpclib.dumps(response, methodresponse=1)
- except xmlrpclib.Fault, fault:
- response = xmlrpclib.dumps(fault)
- except TypeError, terror:
- self.logger.error("Client %s called function %s with wrong argument count" %
- (address[0], method), exc_info=1)
- response = xmlrpclib.dumps(xmlrpclib.Fault(4, terror.args[0]))
- except ForkedChild:
- raise
- except:
- self.logger.error("Unexpected handler failure")
- trace = sys.exc_info()
- self.logger.error("%s : %s" % (str(trace[0]), str(trace[1])))
- for line in traceback.format_exc().splitlines():
- self.logger.error(line)
- del trace
- # report exception back to server
- response = xmlrpclib.dumps(xmlrpclib.Fault(1,
- "%s:%s" % (sys.exc_type, sys.exc_value)))
- return response
-
- def clean_up_children(self):
- while True:
try:
- pid = os.waitpid(0, os.WNOHANG)[0]
- if pid:
- if pid in self.children:
- self.children.remove(pid)
- else:
- break
- except OSError:
- break
-
- def check_for_free_slot(self):
- if len(self.children) >= self.child_limit:
- self.logger.info("Reached child_limit; waiting for child exit")
- pid = os.waitpid(0, 0)[0]
- self.children.remove(pid)
-
- def _authenticate_connection(self, method, user, password, address):
- '''Authenticate new connection'''
- (user, address, method)
- return password == self.password
-
- def save_state(self):
- '''Save fields defined in __statefields__ in /var/spool/cobalt/__implementation__'''
- if self.__statefields__:
- savedata = tuple([getattr(self, field) for field in self.__statefields__])
+ method_func = self._resolve_exposed_method(method)
+ except Exception, e:
+ if getattr(e, "log", True):
+ self.logger.error(e, exc_info=True)
+ raise xmlrpclib.Fault(getattr(e, "fault_code", 1), str(e))
+
+ if getattr(method_func, 'locking', False):
+ need_to_lock = False
+ if need_to_lock:
+ lock_start = time.time()
+ self.lock.acquire()
+ lock_done = time.time()
try:
- statefile = open("/var/spool/cobalt/%s" % self.__implementation__, 'w')
- # need to flock here
- statefile.write(cPickle.dumps(savedata))
- except:
- self.logger.info("Statefile save failed; data persistence disabled")
- self.__statefields__ = []
-
- def load_state(self):
- '''Load fields defined in __statefields__ from /var/spool/cobalt/__implementation__'''
- if self.__statefields__:
- try:
- loaddata = cPickle.loads(open("/var/spool/cobalt/%s" % self.__implementation__).read())
- except:
- self.logger.info("Statefile load failed")
- return
- for field in self.__statefields__:
- setattr(self, field, loaddata[self.__statefields__.index(field)])
-
- def addr_system_listMethods(self, address):
- """get rid of the address argument and call the underlying dispatcher method"""
- return SimpleXMLRPCServer.SimpleXMLRPCDispatcher.system_listMethods(self)
-
- def get_request(self):
- '''We need to do work between requests, so select with timeout instead of blocking in accept'''
- rsockinfo = []
- while self.socket not in rsockinfo:
- if self.shut:
- raise socket.error
- for funcname in self.async_funcs:
- func = getattr(self, funcname, False)
- if callable(func):
- func()
- else:
- self.logger.error("Cannot call uncallable method %s" % (funcname))
- try:
- rsockinfo = select.select([self.socket], [], [], 10)[0]
- except select.error:
- continue
- if self.socket in rsockinfo:
- return self.socket.accept()
-
- def serve_forever(self):
- """Handle one request at a time until doomsday."""
- while not self.shut:
- self.handle_request()
+ method_start = time.time()
+ result = method_func(*args)
+ method_done = time.time()
+ except Exception, e:
+ if getattr(e, "log", True):
+ self.logger.error(e, exc_info=True)
+ raise xmlrpclib.Fault(getattr(e, "fault_code", 1), str(e))
+ finally:
+ if need_to_lock:
+ self.lock.release()
+ if getattr(method_func, "query", False):
+ if not getattr(method_func, "query_all_methods", False):
+ margs = args[:1]
+ else:
+ margs = []
+ result = marshal_query_result(result, *margs)
+ return result
- def start_shutdown(self, signum, frame):
- '''Shutdown on unexpected signals'''
- self.shut = True
+ @exposed
+ def listMethods (self):
+ """Custom XML-RPC introspective method list."""
+ return [
+ name for name, func in inspect.getmembers(self, callable)
+ if getattr(func, "exposed", False)
+ ]
+ @exposed
+ def methodHelp (self, method_name):
+ """Custom XML-RPC introspective method help.
+
+ Arguments:
+ method_name -- name of method to get help on
+ """
+ try:
+ func = self._resolve_exposed_method(method_name)
+ except NoExposedMethod:
+ return ""
+ return pydoc.getdoc(func)
+
+ def get_name (self):
+ """The name of the component."""
+ return self.name
+ get_name = exposed(get_name)
+
+ def get_implementation (self):
+ """The implementation of the component."""
+ return self.implementation
+ get_implementation = exposed(get_implementation)