summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorNarayan Desai <desai@mcs.anl.gov>2007-07-25 04:00:11 +0000
committerNarayan Desai <desai@mcs.anl.gov>2007-07-25 04:00:11 +0000
commit73a1d2d66ea0ea4eb04ff76e360dc7cf0d7bafc4 (patch)
tree5ebd17953e19e2a54634352aa2450bdd4942eeea
parent31f6577cabe54e6e2a5c733572bfe75a0e5eff39 (diff)
downloadbcfg2-73a1d2d66ea0ea4eb04ff76e360dc7cf0d7bafc4.tar.gz
bcfg2-73a1d2d66ea0ea4eb04ff76e360dc7cf0d7bafc4.tar.bz2
bcfg2-73a1d2d66ea0ea4eb04ff76e360dc7cf0d7bafc4.zip
Implementing selective forking server, which runs read-only requests in child processes. Should dramatically improve scalability
git-svn-id: https://svn.mcs.anl.gov/repos/bcfg/trunk/bcfg2@3561 ce84e21b-d406-0410-9b95-82705330c041
-rw-r--r--src/lib/Component.py49
-rw-r--r--src/lib/Server/Plugins/SSHbase.py57
-rwxr-xr-xsrc/sbin/bcfg2-server1
3 files changed, 74 insertions, 33 deletions
diff --git a/src/lib/Component.py b/src/lib/Component.py
index 7ebdf8f86..5bf61452c 100644
--- a/src/lib/Component.py
+++ b/src/lib/Component.py
@@ -1,7 +1,7 @@
'''Cobalt component base classes'''
__revision__ = '$Revision$'
-import atexit, logging, select, signal, socket, sys, time, urlparse, xmlrpclib, cPickle, ConfigParser
+import atexit, logging, select, signal, socket, sys, time, urlparse, xmlrpclib, cPickle, ConfigParser, os
from base64 import decodestring
import BaseHTTPServer, SimpleXMLRPCServer
import Bcfg2.tlslite.errors
@@ -20,15 +20,17 @@ class ComponentKeyError(Exception):
'''raised in case of key parse fails'''
pass
+class ForkedChild(Exception):
+ '''raised after child has been forked'''
+ pass
+
class CobaltXMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
'''CobaltXMLRPCRequestHandler takes care of ssl xmlrpc requests'''
- def finish(self):
- '''Finish HTTPS connections properly'''
- self.request.close()
def do_POST(self):
'''Overload do_POST to pass through client address information'''
try:
+ self.cleanup = True
# get arguments
data = self.rfile.read(int(self.headers["content-length"]))
@@ -48,6 +50,9 @@ class CobaltXMLRPCRequestHandler(SimpleXMLRPCServer.SimpleXMLRPCRequestHandler):
authenticated = True
response = self.server._cobalt_marshalled_dispatch(data, self.client_address, authenticated)
+ except ForkedChild:
+ self.cleanup = False
+ return
except: # This should only happen if the module is buggy
# internal error, report as HTTP server error
log.error("Unexcepted handler failure in do_POST", exc_info=1)
@@ -81,6 +86,7 @@ class TLSServer(Bcfg2.tlslite.api.TLSSocketServerMixIn,
reqCert=False):
self.sc = Bcfg2.tlslite.api.SessionCache()
self.rc = reqCert
+ self.master = os.getpid()
x509 = Bcfg2.tlslite.api.X509()
s = open(keyfile).read()
x509.parse(s)
@@ -92,12 +98,15 @@ class TLSServer(Bcfg2.tlslite.api.TLSSocketServerMixIn,
self.chain = Bcfg2.tlslite.api.X509CertChain([x509])
BaseHTTPServer.HTTPServer.__init__(self, address, handler)
- def finish_request(self, sock, client_address):
+ def finish_request(self, sock, address):
sock.settimeout(90)
tlsConnection = TLSConnection(sock)
if self.handshake(tlsConnection) == True:
- self.RequestHandlerClass(tlsConnection, client_address, self)
- tlsConnection.close()
+ req = self.RequestHandlerClass(tlsConnection, address, self)
+ if req.cleanup:
+ tlsConnection.close()
+ if os.getpid() != self.master:
+ os._exit(0)
def handshake(self, tlsConnection):
try:
@@ -125,6 +134,8 @@ class Component(TLSServer,
__implementation__ = 'Generic'
__statefields__ = []
async_funcs = ['assert_location']
+ fork_funcs = []
+ child_limit = 32
def __init__(self, setup):
# need to get addr
@@ -134,6 +145,7 @@ class Component(TLSServer,
signal.signal(signal.SIGTERM, self.start_shutdown)
self.logger = logging.getLogger('Component')
self.cfile = ConfigParser.ConfigParser()
+ self.children = []
if setup['configfile']:
cfilename = setup['configfile']
else:
@@ -211,6 +223,13 @@ class Component(TLSServer,
params = rawparams[0:]
# generate response
try:
+ # need to add waitpid code here to enforce maxchild
+ if method in self.fork_funcs:
+ self.clean_up_children()
+ pid = os.fork()
+ if pid:
+ self.children.append(pid)
+ raise ForkedChild
# all handlers must take address as the first argument
response = self._dispatch(method, (address, ) + params)
# wrap response in a singleton tuple
@@ -222,6 +241,8 @@ class Component(TLSServer,
self.logger.error("Client %s called function %s with wrong argument count" %
(address[0], method), exc_info=1)
response = xmlrpclib.dumps(xmlrpclib.Fault(4, terror.args[0]))
+ except ForkedChild:
+ raise
except:
self.logger.error("Unexpected handler failure", exc_info=1)
# report exception back to server
@@ -229,6 +250,20 @@ class Component(TLSServer,
"%s:%s" % (sys.exc_type, sys.exc_value)))
return response
+ def clean_up_children(self):
+ while True:
+ try:
+ pid = os.waitpid(0, os.WNOHANG)[0]
+ self.children.remove(pid)
+ self.logger.debug("process %d exited" % pid)
+ except OSError:
+ break
+ if len(self.children) >= self.child_limit:
+ self.logger.info("Reached child_limit; waiting for child exit")
+ pid = os.waitpid(0, 0)[0]
+ self.children.remove(pid)
+ self.logger.debug("process %d exited" % pid)
+
def _authenticate_connection(self, method, user, password, address):
'''Authenticate new connection'''
(user, address, method)
diff --git a/src/lib/Server/Plugins/SSHbase.py b/src/lib/Server/Plugins/SSHbase.py
index 0e473e29b..4d3f18957 100644
--- a/src/lib/Server/Plugins/SSHbase.py
+++ b/src/lib/Server/Plugins/SSHbase.py
@@ -10,7 +10,7 @@ def update_file(path, diff):
print "writing file, %s" % path
open(path, 'w').write(newdata)
-class SSHbase(Bcfg2.Server.Plugin.Plugin):
+class SSHbase(Bcfg2.Server.Plugin.Plugin, Bcfg2.Server.Plugin.DirectoryBacked):
'''The sshbase generator manages ssh host keys (both v1 and v2)
for hosts. It also manages the ssh_known_hosts file. It can
integrate host keys from other management domains and similarly
@@ -42,26 +42,31 @@ class SSHbase(Bcfg2.Server.Plugin.Plugin):
def __init__(self, core, datastore):
Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore)
try:
- self.repository = Bcfg2.Server.Plugin.DirectoryBacked(self.data, self.core.fam)
+ Bcfg2.Server.Plugin.DirectoryBacked.__init__(self, self.data, self.core.fam)
except OSError, ioerr:
self.logger.error("Failed to load SSHbase repository from %s" % (self.data))
self.logger.error(ioerr)
raise Bcfg2.Server.Plugin.PluginInitError
- try:
- prefix = open("%s/prefix" % (self.data)).read().strip()
- except IOError:
- prefix = ''
self.Entries = {'ConfigFile':
- {prefix + '/etc/ssh/ssh_known_hosts':self.build_skn,
- prefix + '/etc/ssh/ssh_host_dsa_key':self.build_hk,
- prefix + '/etc/ssh/ssh_host_rsa_key':self.build_hk,
- prefix + '/etc/ssh/ssh_host_dsa_key.pub':self.build_hk,
- prefix + '/etc/ssh/ssh_host_rsa_key.pub':self.build_hk,
- prefix + '/etc/ssh/ssh_host_key':self.build_hk,
- prefix + '/etc/ssh/ssh_host_key.pub':self.build_hk}}
+ {'/etc/ssh/ssh_known_hosts':self.build_skn,
+ '/etc/ssh/ssh_host_dsa_key':self.build_hk,
+ '/etc/ssh/ssh_host_rsa_key':self.build_hk,
+ '/etc/ssh/ssh_host_dsa_key.pub':self.build_hk,
+ '/etc/ssh/ssh_host_rsa_key.pub':self.build_hk,
+ '/etc/ssh/ssh_host_key':self.build_hk,
+ '/etc/ssh/ssh_host_key.pub':self.build_hk}}
self.ipcache = {}
self.__rmi__ = ['GetPubKeys']
+ def HandleEvent(self, event=None):
+ '''Local event handler that does skn regen on pubkey change'''
+ Bcfg2.Server.Plugin.DirectoryBacked.HandleEvent(self, event)
+ if (len(self.entries.keys())) > (0.90 * len(os.listdir(self.data))) and \
+ event and '_key.pub.H_' in event.filename:
+ self.cache_skn()
+ elif (len(self.entries.keys())) > (0.90 * len(os.listdir(self.data))) and \
+ not hasattr(self, 'static_skn'):
+ self.cache_skn()
def HandlesEntry(self, entry):
'''Handle key entries dynamically'''
@@ -81,7 +86,10 @@ class SSHbase(Bcfg2.Server.Plugin.Plugin):
def get_ipcache_entry(self, client):
'''build a cache of dns results'''
if self.ipcache.has_key(client):
- return self.ipcache[client]
+ if self.ipcache[client]:
+ return self.ipcache[client]
+ else:
+ raise socket.gaierror
else:
# need to add entry
try:
@@ -93,13 +101,14 @@ class SSHbase(Bcfg2.Server.Plugin.Plugin):
if ipaddr:
self.ipcache[client] = (ipaddr, client)
return (ipaddr, client)
+ self.ipcache[client] = False
self.logger.error("Failed to find IP address for %s" % client)
raise socket.gaierror
def cache_skn(self):
'''build memory cache of the ssh known hosts file'''
self.static_skn = ''
- pubkeys = [pubk for pubk in self.repository.entries.keys() if pubk.find('.pub.H_') != -1]
+ pubkeys = [pubk for pubk in self.entries.keys() if pubk.find('.pub.H_') != -1]
pubkeys.sort()
for pubkey in pubkeys:
hostname = pubkey.split('H_')[1]
@@ -109,20 +118,18 @@ class SSHbase(Bcfg2.Server.Plugin.Plugin):
continue
shortname = hostname.split('.')[0]
self.static_skn += "%s,%s,%s %s" % (shortname, fqdn, ipaddr,
- self.repository.entries[pubkey].data)
+ self.entries[pubkey].data)
def build_skn(self, entry, metadata):
'''This function builds builds a host specific known_hosts file'''
client = metadata.hostname
- if not hasattr(self, 'static_skn'):
- self.cache_skn()
entry.text = self.static_skn
hostkeys = [keytmpl % client for keytmpl in self.pubkeys \
- if self.repository.entries.has_key(keytmpl % client)]
+ if self.entries.has_key(keytmpl % client)]
hostkeys.sort()
for hostkey in hostkeys:
entry.text += "localhost,localhost.localdomain,127.0.0.1 %s" % (
- self.repository.entries[hostkey].data)
+ self.entries[hostkey].data)
permdata = {'owner':'root', 'group':'0', 'perms':'0644'}
[entry.attrib.__setitem__(key, permdata[key]) for key in permdata]
@@ -130,14 +137,12 @@ class SSHbase(Bcfg2.Server.Plugin.Plugin):
'''This binds host key data into entries'''
client = metadata.hostname
filename = "%s.H_%s" % (entry.get('name').split('/')[-1], client)
- if filename not in self.repository.entries.keys():
+ if filename not in self.entries.keys():
self.GenerateHostKeys(client)
- if hasattr(self, 'static_skn'):
- del self.static_skn
- if not self.repository.entries.has_key(filename):
+ if not self.entries.has_key(filename):
self.logger.error("%s still not registered" % filename)
raise Bcfg2.Server.Plugin.PluginExecutionError
- keydata = self.repository.entries[filename].data
+ keydata = self.entries[filename].data
permdata = {'owner':'root', 'group':'0'}
permdata['perms'] = '0600'
if entry.get('name')[-4:] == '.pub':
@@ -160,7 +165,7 @@ class SSHbase(Bcfg2.Server.Plugin.Plugin):
else:
keytype = 'rsa1'
- if hostkey not in self.repository.entries.keys():
+ if hostkey not in self.entries.keys():
fileloc = "%s/%s" % (self.data, hostkey)
publoc = self.data + '/' + ".".join([hostkey.split('.')[0]]+['pub', "H_%s" % client])
temploc = "/tmp/%s" % hostkey
diff --git a/src/sbin/bcfg2-server b/src/sbin/bcfg2-server
index 46d937c39..431732896 100755
--- a/src/sbin/bcfg2-server
+++ b/src/sbin/bcfg2-server
@@ -67,6 +67,7 @@ class Bcfg2Serv(Bcfg2.Component.Component):
"""The Bcfg2 Server component providing XML-RPC access to Bcfg methods"""
__name__ = 'bcfg2'
__implementation__ = 'bcfg2'
+ fork_funcs = ['GetConfig', 'GetProbes']
request_queue_size = 15