From ad980e36f52bc8313d02e2fd39224bee36c667e1 Mon Sep 17 00:00:00 2001 From: Narayan Desai Date: Tue, 14 Feb 2006 18:28:26 +0000 Subject: resync with cobalt libs git-svn-id: https://svn.mcs.anl.gov/repos/bcfg/trunk/bcfg2@1730 ce84e21b-d406-0410-9b95-82705330c041 --- src/lib/Logging.py | 85 ++++++++++++++++++++-------- src/lib/Server/Component.py | 135 ++++++++++++++++++++++++++++++-------------- src/sbin/bcfg2-server | 23 ++------ 3 files changed, 161 insertions(+), 82 deletions(-) (limited to 'src') diff --git a/src/lib/Logging.py b/src/lib/Logging.py index 992989ec2..70270ff23 100644 --- a/src/lib/Logging.py +++ b/src/lib/Logging.py @@ -1,31 +1,61 @@ '''Bcfg2 logging support''' -__revision__ = '$Revision: $' +__revision__ = '$Revision$' -import copy, fcntl, logging, logging.handlers, math, struct, termios, types +import copy, fcntl, logging, logging.handlers, lxml.etree, math, struct, sys, termios, types + +def print_attributes(attrib): + ''' Add the attributes for an element''' + return ' '.join(['%s="%s"' % data for data in attrib.iteritems()]) + +def print_text(text): + ''' Add text to the output (which will need normalising ''' + charmap = {'<':'<', '>':'>', '&':'&'} + return ''.join([charmap.get(char, char) for char in text]) + '\n' + +def xml_print(element, running_indent=0, indent=4): + ''' Add an element and its children to the return string ''' + if (len(element.getchildren()) == 0) and (not element.text): + ret = (' ' * running_indent) + ret += '<%s %s/>\n' % (element.tag, print_attributes(element.attrib)) + else: + child_indent = running_indent + indent + ret = (' ' * running_indent) + ret += '<%s%s>\n' % (element.tag, print_attributes(element)) + if element.text: + ret += (' '* child_indent) + print_text(element.text) + for child in element.getchildren(): + ret += xml_print(child, child_indent, indent) + ret += (' ' * running_indent) + '\n' % (element.tag) + if element.tail: + ret += (' ' * child_indent) + print_text(element.tail) + return ret class TermiosFormatter(logging.Formatter): '''The termios formatter displays output in a terminal-sensitive fashion''' def __init__(self, fmt=None, datefmt=None): logging.Formatter.__init__(self, fmt, datefmt) - # now get termios info - try: - self.height, self.width = struct.unpack('hhhh', - fcntl.ioctl(0, termios.TIOCGWINSZ, - "\000"*8))[0:2] - if self.height == 0 or self.width == 0: - self.height, self.width = (25, 80) - except: - self.height, self.width = (25, 80) + if sys.stdout.isatty(): + # now get termios info + try: + self.width = struct.unpack('hhhh', fcntl.ioctl(0, termios.TIOCGWINSZ, + "\000"*8))[1] + if self.width == 0: + self.width = 80 + except: + self.width = 80 + else: + # output to a pipe + self.width = sys.maxint def format(self, record): '''format a record for display''' returns = [] - line_len = self.width - len(record.name) - 2 + line_len = self.width if type(record.msg) in types.StringTypes: for line in record.msg.split('\n'): if len(line) <= line_len: - returns.append("%s: %s" % (record.name, line)) + returns.append(line) else: inner_lines = int(math.floor(float(len(line)) / line_len))+1 for i in xrange(inner_lines): @@ -41,8 +71,10 @@ class TermiosFormatter(logging.Formatter): for colNum in range(columns)] if idx < len(record.msg)] format = record.name + ':' + (len(indices) * (" %%-%ds " % columnWidth)) returns.append(format % tuple([record.msg[idx] for idx in indices])) + elif type(record.msg) == lxml.etree._Element: + returns.append(str(xml_print(record.msg))) else: - # got unsupported type + returns.append("Got unsupported type %s" % (str(type(record.msg)))) returns.append(record.name + ':' + str(record.msg)) if record.exc_info: returns.append(self.formatException(record.exc_info)) @@ -51,28 +83,40 @@ class TermiosFormatter(logging.Formatter): class FragmentingSysLogHandler(logging.handlers.SysLogHandler): '''This handler fragments messages into chunks smaller than 250 characters''' + def __init__(self, procname, path, facility): + self.procname = procname + logging.handlers.SysLogHandler.__init__(self, path, facility) + def emit(self, record): '''chunk and deliver records''' + record.name = self.procname if str(record.msg) > 250: start = 0 + error = None + if record.exc_info: + error = record.exc_info + record.exc_info = None msgdata = str(record.msg) while start < len(msgdata): newrec = copy.deepcopy(record) newrec.msg = msgdata[start:start+250] + newrec.exc_info = error logging.handlers.SysLogHandler.emit(self, newrec) + # only send the traceback once + error = None start += 250 else: logging.handlers.SysLogHandler.emit(self, newrec) -def setup_logging(to_console=True, to_syslog=True, level=0): +def setup_logging(procname, to_console=True, to_syslog=True, syslog_facility='local0', level=0): '''setup logging for bcfg2 software''' - if hasattr(logging, 'enabled'): + if hasattr(logging, 'already_setup'): return console = logging.StreamHandler() console.setLevel(logging.DEBUG) # tell the handler to use this format console.setFormatter(TermiosFormatter()) - syslog = FragmentingSysLogHandler('/dev/log', 'local0') + syslog = FragmentingSysLogHandler(procname, '/dev/log', syslog_facility) syslog.setLevel(logging.DEBUG) syslog.setFormatter(logging.Formatter('%(name)s[%(process)d]: %(message)s')) # add the handler to the root logger @@ -80,8 +124,5 @@ def setup_logging(to_console=True, to_syslog=True, level=0): logging.root.addHandler(console) if to_syslog: logging.root.addHandler(syslog) - logging.root.level = level - logging.enabled = True - - - + logging.root.setLevel(level) + logging.already_setup = True diff --git a/src/lib/Server/Component.py b/src/lib/Server/Component.py index de41a2277..4a4101e6e 100644 --- a/src/lib/Server/Component.py +++ b/src/lib/Server/Component.py @@ -1,20 +1,15 @@ '''Cobalt component base classes''' __revision__ = '$Revision$' -from M2Crypto import SSL +import atexit, logging, select, signal, socket, sys, time, urlparse, xmlrpclib, cPickle, ConfigParser +import BaseHTTPServer, Cobalt.Proxy, OpenSSL.SSL, SimpleXMLRPCServer, SocketServer -import cPickle, logging, socket, urlparse, xmlrpclib, ConfigParser, SimpleXMLRPCServer +log = logging.getLogger('Component') class CobaltXMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): '''CobaltXMLRPCRequestHandler takes care of ssl xmlrpc requests''' - def __init__(self, request, client_address, server): - SimpleXMLRPCServer.SimpleXMLRPCRequestHandler.__init__(self, - request, client_address, server) - self.logger = logging.getLogger('Bcfg2.Server.Handler') - def finish(self): '''Finish HTTPS connections properly''' - self.request.set_shutdown(SSL.SSL_RECEIVED_SHUTDOWN | SSL.SSL_SENT_SHUTDOWN) self.request.close() def do_POST(self): @@ -25,7 +20,7 @@ class CobaltXMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): response = self.server._cobalt_marshalled_dispatch(data, self.client_address) except: # This should only happen if the module is buggy # internal error, report as HTTP server error - self.logger.error("Unexpected failure in handler", exc_info=1) + log.error("Unexcepted handler failure in do_POST", exc_info=1) self.send_response(500) self.end_headers() else: @@ -38,24 +33,45 @@ class CobaltXMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler): # shut down the connection self.wfile.flush() - self.connection.shutdown(1) + self.connection.shutdown() -class Component(SSL.SSLServer, + def setup(self): + self.connection = self.request + self.rfile = socket._fileobject(self.request, "rb", self.rbufsize) + self.wfile = socket._fileobject(self.request, "wb", self.wbufsize) + +class SSLServer(BaseHTTPServer.HTTPServer): + '''This class encapsulates all of the ssl server stuff''' + def __init__(self, address, keyfile, handler): + SocketServer.BaseServer.__init__(self, address, handler) + ctxt = OpenSSL.SSL.Context(OpenSSL.SSL.SSLv23_METHOD) + ctxt.use_privatekey_file (keyfile) + ctxt.use_certificate_file(keyfile) + self.socket = OpenSSL.SSL.Connection(ctxt, + socket.socket(self.address_family, self.socket_type)) + self.server_bind() + self.server_activate() + +class Component(SSLServer, SimpleXMLRPCServer.SimpleXMLRPCDispatcher): """Cobalt component providing XML-RPC access""" __name__ = 'Component' __implementation__ = 'Generic' __statefields__ = [] + async_funcs = ['assert_location'] def __init__(self, setup): # need to get addr self.setup = setup + self.shut = False + signal.signal(signal.SIGINT, self.start_shutdown) + signal.signal(signal.SIGTERM, self.start_shutdown) + self.logger = logging.getLogger('Component') self.cfile = ConfigParser.ConfigParser() - self.logger = logging.getLogger('Bcfg2.Server') if setup['configfile']: cfilename = setup['configfile'] else: - cfilename = '/etc/cobalt.conf' + cfilename = '/etc/bcfg2.conf' self.cfile.read([cfilename]) if not self.cfile.has_section('communication'): print "Configfile missing communication section" @@ -64,49 +80,36 @@ class Component(SSL.SSLServer, if not self.cfile.has_section('components'): print "Configfile missing components section" raise SystemExit, 1 - if self.cfile._sections['components'].has_key(self.__name__): self.static = True location = urlparse.urlparse(self.cfile.get('components', self.__name__))[1].split(':') location = (location[0], int(location[1])) else: location = (socket.gethostname(), 0) - - self.password = self.cfile.get('communication', 'password') - sslctx = SSL.Context('sslv23') try: keyfile = self.cfile.get('communication', 'key') except ConfigParser.NoOptionError: print "No key specified in cobalt.conf" raise SystemExit, 1 - sslctx.load_cert_chain(keyfile) - #sslctx.load_verify_locations('ca.pem') - #sslctx.set_client_CA_list_from_file('ca.pem') - sslctx.set_verify(SSL.verify_none, 15) - #sslctx.set_allow_unknown_ca(1) - sslctx.set_session_id_ctx(self.__name__) - sslctx.set_info_callback(self.handle_sslinfo) - #sslctx.set_tmp_dh('dh1024.pem') - self.logRequests = 0 - # setup unhandled request syslog handling + + self.password = self.cfile.get('communication', 'password') + + SSLServer.__init__(self, location, keyfile, CobaltXMLRPCRequestHandler) SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self) - try: - SSL.SSLServer.__init__(self, location, CobaltXMLRPCRequestHandler, sslctx) - except socket.error, serr: - self.logger.error("Failed to bind to location %s" % (location,), exc_info=1) - self.port = self.socket.socket.getsockname()[1] + self.logRequests = 1 + self.port = self.socket.getsockname()[1] + self.url = "https://%s:%s" % (socket.gethostname(), self.port) self.logger.info("Bound to port %s" % self.port) self.funcs.update({'HandleEvents':self.HandleEvents, - 'system.listMethods':self.system_listMethods}) + 'system.listMethods':self.addr_system_listMethods}) + self.atime = 0 + self.assert_location() + atexit.register(self.deassert_location) def HandleEvents(self, address, event_list): '''Default event handler''' return True - def handle_sslinfo(self, where, ret, ssl_ptr): - '''This is where we need to handle all ssl negotiation issues''' - pass - def _cobalt_marshalled_dispatch(self, data, address): """Decode and dispatch XMLRPC requests. Overloaded to pass through client address information @@ -133,12 +136,13 @@ class Component(SSL.SSLServer, response = xmlrpclib.dumps(fault) except TypeError, terror: self.logger.error("Client %s called function %s with wrong argument count" % - (address[0], method)) + (address[0], method), exc_info=1) response = xmlrpclib.dumps(xmlrpclib.Fault(4, terror.args[0])) except: - self.logger.error("Unexpected failure in handler", exc_info=1) + self.logger.error("Unexpected handler failure", exc_info=1) # report exception back to server - response = xmlrpclib.dumps(xmlrpclib.Fault(1, "handler failure")) + response = xmlrpclib.dumps(xmlrpclib.Fault(1, + "%s:%s" % (sys.exc_type, sys.exc_value))) return response def _authenticate_connection(self, method, user, password, address): @@ -169,6 +173,55 @@ class Component(SSL.SSLServer, for field in self.__statefields__: setattr(self, field, loaddata[self.__statefields__.index(field)]) - def system_listMethods(self, address): + def addr_system_listMethods(self, address): """get rid of the address argument and call the underlying dispatcher method""" return SimpleXMLRPCServer.SimpleXMLRPCDispatcher.system_listMethods(self) + + def get_request(self): + '''We need to do work between requests, so select with timeout instead of blocking in accept''' + rsockinfo = [] + while self.socket not in rsockinfo: + if self.shut: + raise socket.error + for funcname in self.async_funcs: + func = getattr(self, funcname, False) + if callable(func): + func() + else: + self.logger.error("Cannot call uncallable method %s" % (funcname)) + try: + rsockinfo = select.select([self.socket], [], [], 10)[0] + except select.error: + continue + if self.socket in rsockinfo: + return self.socket.accept() + + def assert_location(self): + '''Assert component location with slp''' + if self.__name__ == 'service-location' or self.static: + return + if (time.time() - self.atime) > 240: + slp = Cobalt.Proxy.service_location() + slp.AssertService({'tag':'location', 'name':self.__name__, 'url':self.url}) + self.atime = time.time() + + def deassert_location(self): + '''remove registration from slp''' + if self.__name__ == 'service-location' or self.static: + return + slp = Cobalt.Proxy.service_location() + try: + slp.DeassertService([{'tag':'location', 'name':self.__name__, 'url':self.url}]) + except xmlrpclib.Fault, fault: + if fault.faultCode == 11: + self.logger.error("Failed to deregister self; no matching component") + + def serve_forever(self): + """Handle one request at a time until doomsday.""" + while not self.shut: + self.handle_request() + + def start_shutdown(self, signum, frame): + '''Shutdown on unexpected signals''' + self.shut = True + diff --git a/src/sbin/bcfg2-server b/src/sbin/bcfg2-server index a528f1864..dafc8be1e 100755 --- a/src/sbin/bcfg2-server +++ b/src/sbin/bcfg2-server @@ -9,7 +9,7 @@ from xmlrpclib import Fault from lxml.etree import XML, Element, tostring import getopt, logging, os, select, signal, socket, sys -import Bcfg2.Logging, Bcfg2.Server.Component, M2Crypto.SSL +import Bcfg2.Logging, Bcfg2.Server.Component logger = logging.getLogger('bcfg2-server') @@ -110,32 +110,17 @@ class Bcfg2Serv(Bcfg2.Server.Component.Component): famfd = self.Core.fam.fileno() while self.socket not in rsockinfo: if self.shut: - raise M2Crypto.SSL.SSLError + raise socket.error try: rsockinfo = select.select([self.socket, famfd], [], [], 15)[0] except select.error: - raise M2Crypto.SSL.SSLError + continue if famfd in rsockinfo: self.Core.fam.Service() if self.socket in rsockinfo: - # workaround for m2crypto 0.15 bug - self.socket.postConnectionCheck = None return self.socket.accept() - def serve_forever(self): - """Handle one request at a time until doomsday.""" - while not self.shut: - self.handle_request() - - def start_shutdown(self, signum, frame): - '''Shutdown on unexpected signals''' - self.shut = True - - def handle_error(self): - '''Catch error path for clean exit''' - return False - def resolve_client(self, client): if self.setup['client']: return self.setup['client'] @@ -208,7 +193,7 @@ class Bcfg2Serv(Bcfg2.Server.Component.Component): return "" if __name__ == '__main__': - Bcfg2.Logging.setup_logging() + Bcfg2.Logging.setup_logging('bcfg2-server') options = { 'v':'verbose', 'd':'debug', -- cgit v1.2.3-1-g7c22