From b9f5348a6ed0f1a9ffc11fc65f2ecb54dac7e600 Mon Sep 17 00:00:00 2001 From: Narayan Desai Date: Thu, 13 Oct 2005 18:55:00 +0000 Subject: (Logical change 1.336) git-svn-id: https://svn.mcs.anl.gov/repos/bcfg/trunk/bcfg2@1374 ce84e21b-d406-0410-9b95-82705330c041 --- src/lib/Client/XMLRPCComm.py | 57 +++++++++ src/sbin/Bcfg2ServerX | 270 +++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 327 insertions(+) (limited to 'src') diff --git a/src/lib/Client/XMLRPCComm.py b/src/lib/Client/XMLRPCComm.py index e69de29bb..83e99795b 100644 --- a/src/lib/Client/XMLRPCComm.py +++ b/src/lib/Client/XMLRPCComm.py @@ -0,0 +1,57 @@ +'''XMLRPC/SSL Communication Library (following the ssslib API)''' +__revision__ = '$Revision:$' + +from elementtree.ElementTree import XML, tostring +from ConfigParser import ConfigParser +from M2Crypto.m2xmlrpclib import ServerProxy, SSL_Transport +from xmlrpclib import Fault + +class CommunicationError(Exception): + '''Duplicate the sss.ssslib error API''' + pass + +class comm_lib(object): + '''This sets up the communication for XMLRPC Bcfg2''' + + def __init__(self): + self.cf = ConfigParser() + self.cf.read('/etc/bcfg2.conf') + location = self.cf.get("communication", "url") + self.proxy = ServerProxy(location, SSL_Transport()) + self.user = 'root' + self.password = self.cf.get("communication", "password") + + def ClientInit(self, component): + '''Return a single dummy handle''' + return "handle" + + def SendMessage(self, handle, msg): + '''Encode the XML msg as an XML-RPC request''' + data = XML(msg) + args = (self.user, self.password) + if data.tag == 'get-probes': + funcname = "GetProbes" + elif data.tag == 'probe-data': + funcname = "RecvProbeData" + args = (self.user, self.password, data.getchildren()) + elif data.tag == 'get-config': + funcname = 'GetConfig' + elif data.tag == 'upload-statistics': + funcname = "RecvStats" + args = (self.user, self.password, msg) + else: + print "unsupported function call" + raise CommunicationError, "foo" + func = getattr(self.proxy, funcname) + try: + self.response = apply(func, args) + except Fault, msg: + raise CommunicationError, msg + + def RecvMessage(self, handle): + '''Return cached response''' + return self.response + + def ClientClose(self, handle): + '''This is a noop for xmlrpc''' + pass diff --git a/src/sbin/Bcfg2ServerX b/src/sbin/Bcfg2ServerX index e69de29bb..5d9ae7630 100644 --- a/src/sbin/Bcfg2ServerX +++ b/src/sbin/Bcfg2ServerX @@ -0,0 +1,270 @@ +#!/usr/bin/env python + +'''The XML-RPC Bcfg2 Server''' +__revision__ = '$Revision:$' + +# M2Crypto +from M2Crypto import SSL +# /F's xmlrpcserver.py. +from SimpleXMLRPCServer import SimpleXMLRPCRequestHandler, SimpleXMLRPCDispatcher +from ConfigParser import ConfigParser +from getopt import getopt, GetoptError +from sys import argv +from syslog import openlog, LOG_LOCAL0, syslog, LOG_INFO, LOG_ERR +from Bcfg2.Server.Core import Core, CoreInitError +from Bcfg2.Server.Metadata import MetadataConsistencyError +from threading import Lock +from urlparse import urlparse +from select import select, error as selecterror +from xmlrpclib import Fault, dumps, loads +import sys +from time import time +from socket import gethostbyaddr, herror +from elementtree.ElementTree import XML, Element, tostring + +def dgetopt(arglist, opt, vopt): + '''parse options into a dictionary''' + ret = {} + for optname in opt.values() + vopt.values(): + ret[optname] = False + gstr = "".join(opt.keys()) + "".join([field+':' for field in vopt.keys()]) + try: + opts = getopt(arglist, gstr)[0] + except GetoptError, gerr: + print gerr + print "bcfg2 Usage:" + for arg in opt.iteritems(): + print " -%s %s" % arg + for arg in vopt.iteritems(): + print " -%s <%s>" % arg + raise SystemExit, 1 + for (gopt, garg) in opts: + option = gopt[1:] + if opt.has_key(option): + ret[opt[option]] = True + else: + ret[vopt[option]] = garg + return ret + +class Bcfg2XMLRPCRequestHandler(SimpleXMLRPCRequestHandler): + '''Bcfg2XMLRPCRequestHandler takes care of ssl xmlrpc requests''' + def finish(self): + '''Finish HTTPS connections properly''' + self.request.set_shutdown(SSL.SSL_RECEIVED_SHUTDOWN | SSL.SSL_SENT_SHUTDOWN) + self.request.close() + + def do_POST(self): + '''Overload do_POST to pass through client address information''' + try: + # get arguments + data = self.rfile.read(int(self.headers["content-length"])) + response = self.server._bcfg_marshaled_dispatch(data, self.client_address) + except: # This should only happen if the module is buggy + # internal error, report as HTTP server error + self.send_response(500) + self.end_headers() + else: + # got a valid XML RPC response + stamp1 = time() + self.send_response(200) + self.send_header("Content-type", "text/xml") + self.send_header("Content-length", str(len(response))) + self.end_headers() + self.wfile.write(response) + + # shut down the connection + self.wfile.flush() + stamp2 = time() + self.connection.shutdown(1) + print "write time was %f seconds" % (stamp2-stamp1) + +class Bcfg2XMLRPCServer(SSL.SSLServer, + SimpleXMLRPCDispatcher): + """The Bcfg2 Server component providing XML-RPC access to Bcfg methods""" + + def __init__(self, setup): + # need to get addr + self.setup = setup + self.cfile = ConfigParser() + if setup['configfile']: + cfilename = setup['configfile'] + else: + cfilename = '/etc/bcfg2.conf' + self.cfile.read([cfilename]) + if not self.cfile.has_section('communication'): + print "Configfile missing communication section" + raise SystemExit, 1 + if self.cfile.get('communication', 'protocol') != 'xmlrpc/ssl': + print "Unsupported protocol, exiting." + raise SystemExit, 1 + location = urlparse(self.cfile.get('communication', 'url'))[1].split(':') + location = (location[0], int(location[1])) + self.password = self.cfile.get('communication', 'password') + sslctx = SSL.Context('sslv23') + sslctx.load_cert_chain('server.pem') + sslctx.load_verify_locations('ca.pem') + sslctx.set_client_CA_list_from_file('ca.pem') + sslctx.set_verify(SSL.verify_none, 15) + #sslctx.set_allow_unknown_ca(1) + sslctx.set_session_id_ctx('Bcfg2') + sslctx.set_info_callback(self.handle_sslinfo) + sslctx.set_tmp_dh('dh1024.pem') + self.logRequests = 0 + SimpleXMLRPCDispatcher.__init__(self) + SSL.SSLServer.__init__(self, location, Bcfg2XMLRPCRequestHandler, sslctx) + syslog(LOG_INFO, "Bound to port %s" % location[1]) + try: + self.Core = Core(setup, cfilename) + self.CoreLock = Lock() + except CoreInitError, msg: + print msg + raise SystemExit, 1 + self.register_introspection_functions() + self.register_function(self.func2, "func2") + self.register_function(self.Bcfg2GetConfig, "GetConfig") + self.register_function(self.Bcfg2GetProbes, "GetProbes") + self.register_function(self.Bcfg2RecvProbeData, "RecvProbeData") + self.register_function(self.Bcfg2RecvStats, "RecvStats") + + def func2(self, address, arg): + '''dummy test function''' + print address, arg + return 30 * "foo" + + def handle_sslinfo(self, where, ret, ssl_ptr): + '''This is where we need to handle all ssl negotiation issues''' + pass + + def get_request(self): + '''We need to do work between requests, so we select with timeout instead of blocking in accept''' + rsockinfo = [] + famfd = self.Core.fam.fm.fileno() + while self.socket not in rsockinfo: + try: + rsockinfo = select([self.socket, famfd], [], [])[0] + except selecterror: + continue + if famfd in rsockinfo: + print "starting fs sync...", + while self.Core.fam.fm.pending(): + self.Core.fam.HandleEvent() + print "done" + if self.socket in rsockinfo: + return self.socket.accept() + + def _bcfg_marshaled_dispatch(self, data, address): + """Decode and dispatch XMLRPC requests. Overloaded to pass through + client address information + """ + print "in _bcfg_marshalled_dispatch" + + rawparams, method = loads(data) + + if len(rawparams) < 2: + return dumps(Fault(2, "No Authentication Info")) + user = rawparams[0] + password = rawparams[1] + params = rawparams[2:] + # check authentication + print "in _bcfg_marshalled_dispatch 1: user=%s, password=%s" % (user, password) + if not self._authenticate_connection(method, user, password, address): + return dumps(Fault(3, "Authentication Failure")) + # generate response + print "in _bcfg_marshalled_dispatch 2" + try: + response = self._dispatch(method, (address, ) + params) + # wrap response in a singleton tuple + response = (response,) + response = dumps(response, methodresponse=1) + except Fault, fault: + print "hit error path" + response = dumps(fault) + except: + # report exception back to server + response = dumps(Fault(1, + "%s:%s" % (sys.exc_type, sys.exc_value))) + print "in _bcfg_marshalled_dispatch 3" + + return response + + def _authenticate_connection(self, method, user, password, address): + '''Authenticate new connection''' + (user, address, method) + return password == self.password + + def Bcfg2GetProbes(self, address): + '''Fetch probes for a particular client''' + peer = address[0] + resp = Element('probes') + try: + client = gethostbyaddr(peer)[0] + except herror: + raise Fault, "host resolution error" + try: + meta = self.Core.metadata.FetchMetadata(client) + except MetadataConsistencyError: + raise Fault, 'metadata resolution error' + for generator in self.Core.generators: + for probe in generator.GetProbes(meta): + resp.append(probe) + return tostring(resp) + + def Bcfg2RecvProbeData(self, address, probedata): + '''Receive probe data from clients''' + print "in RecvProbeData" + peer = address[0] + try: + client = gethostbyaddr(peer)[0] + except herror: + raise Fault, "host resolution error" + for data in probedata: + try: + [generator] = [gen for gen in self.Core.generators if gen.__name__ == data.get('source')] + generator.ReceiveData(client, data) + except IndexError: + syslog(LOG_ERR, "Failed to locate plugin %s" % (data.get('source'))) + except: + syslog(LOG_ERR, "Unexpected failure in probe data receipt") + return True + + def Bcfg2GetConfig(self, address, image=False, profile=False): + '''Build config for a client''' + peer = address[0] + try: + client = gethostbyaddr(peer)[0] + except herror: + raise Fault, "host resolution error" + if image and profile: + try: + # if metadata is provided, call FetchMetadata with settings + # it is screwey. i know. + self.Core.metadata.FetchMetadata(client, image=image, profile=profile) + except MetadataConsistencyError: + syslog(LOG_ERR, "Metadata consistency error for client %s" % client) + return Fault, 'metadata error' + return tostring(self.Core.BuildConfiguration(client)) + + def Bcfg2RecvStats(self, address, stats): + '''Act on statistics upload''' + sdata = XML(stats) + e = sdata.find(".//Statistics") + # Versioned stats to prevent tied client/server upgrade + if e.get('version') >= '2.0': + try: + client = gethostbyaddr(address[0])[0] + except herror: + return Fault, 'host resolution error' + + # Update statistics + self.Core.stats.updateStats(sdata, client) + + syslog(LOG_INFO, "Client %s reported state %s"%(client, e.attrib['state'])) + return "" + +if __name__ == '__main__': + openlog("Bcfg2", 0, LOG_LOCAL0) + options = {'v':'verbose', 'd':'debug'} + doptions = {'D':'daemon', 'c':'configfile', 'C':'client'} + ssetup = dgetopt(argv[1:], options, doptions) + s = Bcfg2XMLRPCServer(ssetup) + s.serve_forever() -- cgit v1.2.3-1-g7c22