diff options
Diffstat (limited to 'src/lib')
-rw-r--r-- | src/lib/Bcfg2/Options.py | 12 | ||||
-rw-r--r-- | src/lib/Bcfg2/Server/Admin/Init.py | 11 | ||||
-rw-r--r-- | src/lib/Bcfg2/Server/BuiltinCore.py | 3 | ||||
-rw-r--r-- | src/lib/Bcfg2/Server/MultiprocessingCore.py | 155 | ||||
-rw-r--r-- | src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py | 32 | ||||
-rw-r--r-- | src/lib/Bcfg2/Server/Plugins/Packages/Yum.py | 35 | ||||
-rw-r--r-- | src/lib/Bcfg2/settings.py | 7 |
7 files changed, 202 insertions, 53 deletions
diff --git a/src/lib/Bcfg2/Options.py b/src/lib/Bcfg2/Options.py index a1fd07b86..41bf54dfb 100644 --- a/src/lib/Bcfg2/Options.py +++ b/src/lib/Bcfg2/Options.py @@ -666,13 +666,17 @@ DB_HOST = \ DB_PORT = \ Option('Database port', default='', - cf=('database', 'port')) - + cf=('database', 'port'), + deprecated_cf=('statistics', 'database_port')) DB_OPTIONS = \ Option('Database options', default=dict(), cf=('database', 'options'), cook=dict_split) +DB_SCHEMA = \ + Option('Database schema', + default='', + cf=('database', 'schema')) # Django options WEB_CFILE = \ @@ -1154,7 +1158,8 @@ SERVER_COMMON_OPTIONS = dict(repo=SERVER_REPOSITORY, authentication=SERVER_AUTHENTICATION, perflog=LOG_PERFORMANCE, perflog_interval=PERFLOG_INTERVAL, - children=SERVER_CHILDREN) + children=SERVER_CHILDREN, + client_timeout=CLIENT_TIMEOUT) CRYPT_OPTIONS = dict(encrypt=ENCRYPT, decrypt=DECRYPT, @@ -1246,6 +1251,7 @@ DATABASE_COMMON_OPTIONS = dict(web_configfile=WEB_CFILE, db_host=DB_HOST, db_port=DB_PORT, db_options=DB_OPTIONS, + db_schema=DB_SCHEMA, time_zone=DJANGO_TIME_ZONE, django_debug=DJANGO_DEBUG, web_prefix=DJANGO_WEB_PREFIX) diff --git a/src/lib/Bcfg2/Server/Admin/Init.py b/src/lib/Bcfg2/Server/Admin/Init.py index 870a31480..ba553c7ef 100644 --- a/src/lib/Bcfg2/Server/Admin/Init.py +++ b/src/lib/Bcfg2/Server/Admin/Init.py @@ -19,6 +19,8 @@ from Bcfg2.Compat import input # pylint: disable=W0622 CONFIG = '''[server] repository = %s plugins = %s +# Uncomment the following to listen on all interfaces +#listen_all = true [statistics] sendmailpath = %s @@ -30,7 +32,7 @@ sendmailpath = %s # 'postgresql', 'mysql', 'mysql_old', 'sqlite3' or 'ado_mssql'. #name = # Or path to database file if using sqlite3. -#<repository>/bcfg2.sqlite is default path if left empty +#<repository>/etc/bcfg2.sqlite is default path if left empty #user = # Not used with sqlite3. #password = @@ -77,7 +79,7 @@ CLIENTS = '''<Clients version="3.0"> ''' # Mapping of operating system names to groups -OS_LIST = [('Red Hat/Fedora/RHEL/RHAS/Centos', 'redhat'), +OS_LIST = [('Red Hat/Fedora/RHEL/RHAS/CentOS', 'redhat'), ('SUSE/SLES', 'suse'), ('Mandrake', 'mandrake'), ('Debian', 'debian'), @@ -238,8 +240,9 @@ class Init(Bcfg2.Server.Admin.Mode): def _prompt_server(self): """Ask for the server name.""" - newserver = safe_input("Input the server location [%s]: " % - self.data['server_uri']) + newserver = safe_input( + "Input the server location (the server listens on a single " + "interface by default) [%s]: " % self.data['server_uri']) if newserver != '': self.data['server_uri'] = newserver diff --git a/src/lib/Bcfg2/Server/BuiltinCore.py b/src/lib/Bcfg2/Server/BuiltinCore.py index b05ad9d41..ea1d97e83 100644 --- a/src/lib/Bcfg2/Server/BuiltinCore.py +++ b/src/lib/Bcfg2/Server/BuiltinCore.py @@ -31,7 +31,8 @@ class Core(BaseCore): daemon_args = dict(uid=self.setup['daemon_uid'], gid=self.setup['daemon_gid'], - umask=int(self.setup['umask'], 8)) + umask=int(self.setup['umask'], 8), + detach_process=True) if self.setup['daemon']: daemon_args['pidfile'] = TimeoutPIDLockFile(self.setup['daemon'], acquire_timeout=5) diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index 81fba7092..02710ab99 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -2,16 +2,67 @@ :mod:`Bcfg2.Server.BuiltinCore` that uses the Python :mod:`multiprocessing` library to offload work to multiple child processes. As such, it requires Python 2.6+. + +The parent communicates with the children over two constructs: + +* A :class:`multiprocessing.Pipe` is used to process render requests. + The pipe is locked when in use (i.e., between the time that a client + is submitted to be rendered and the time that its configuration is + returned) to keep things thread-safe. (This is accomplished through + the use of + :attr:`Bcfg2.Server.MultiprocessingCore.available_children.) +* A :class:`multiprocessing.Queue` is used to submit other commands in + a thread-safe, non-blocking fashion. (Note that, since it is a + queue, no results can be returned.) It implements a very simple RPC + protocol. Each command passed to a child over the Pipe must be a + tuple with the format:: + + (<method>, <args>, <kwargs>) + + The method must be exposed by the child by decorating it with + :func:`Bcfg2.Server.Core.exposed`. """ import threading import lxml.etree import multiprocessing +from Bcfg2.Cache import Cache from Bcfg2.Compat import Queue from Bcfg2.Server.Core import BaseCore, exposed +from Bcfg2.Server.Plugin import Debuggable from Bcfg2.Server.BuiltinCore import Core as BuiltinCore +class DispatchingCache(Cache, Debuggable): + """ Implementation of :class:`Bcfg2.Cache.Cache` that propagates + cache expiration events to child nodes. """ + + #: The method to send over the pipe to expire the cache + method = "expire_cache" + + def __init__(self, *args, **kwargs): + #: A dict of <child name>: :class:`multiprocessing.Queue` + #: objects that should be given a cache expiration command any + #: time an item is expired. + self.command_queues = kwargs.pop("pipes", dict()) + + Debuggable.__init__(self) + Cache.__init__(self, *args, **kwargs) + + def expire(self, key=None): + if (key and key in self) or (not key and len(self)): + # dispatching cache expiration to children can be + # expensive, so only do it if there's something to expire + for child, cmd_q in self.command_queues.items(): + if key: + self.logger.debug("Expiring metadata cache for %s on %s" % + (key, child)) + else: + self.logger.debug("Expiring metadata cache on %s" % child) + cmd_q.put((self.method, [key], dict())) + Cache.expire(self, key=key) + + class DualEvent(object): """ DualEvent is a clone of :class:`threading.Event` that internally implements both :class:`threading.Event` and @@ -67,7 +118,7 @@ class ChildCore(BaseCore): #: every ``poll_wait`` seconds. poll_wait = 5.0 - def __init__(self, setup, pipe, terminate): + def __init__(self, setup, render_pipe, command_queue, terminate): """ :param setup: A Bcfg2 options dict :type setup: Bcfg2.Options.OptionParser @@ -86,42 +137,75 @@ class ChildCore(BaseCore): #: objects to build configurations, and to which client #: configurations are added after having been built by #: ChildCore objects. - self.pipe = pipe + self.render_pipe = render_pipe + + #: The queue from which other commands are received + self.command_queue = command_queue #: The :class:`multiprocessing.Event` that will be monitored #: to determine when this child should shut down. self.terminate = terminate + #: The :class:`threading.Thread` used to process commands + #: received via the :class:`multiprocessing.Queue` RPC + #: interface + self.command_thread = \ + threading.Thread(name="CommandThread", + target=self._command_queue_thread) + def _daemonize(self): return True def _run(self): + try: + self.command_thread.start() + except: + self.shutdown() + raise return True + def render(self): + """ Process client configuration render requests """ + if self.render_pipe.poll(self.poll_wait): + if not self.metadata.use_database: + # handle FAM events, in case (for instance) the + # client has just been added to clients.xml, or a + # profile has just been asserted. but really, you + # should be using the metadata database if you're + # using this core. + self.fam.handle_events_in_interval(0.1) + client = self.render_pipe.recv() + self.logger.debug("Building configuration for %s" % client) + self.render_pipe.send( + lxml.etree.tostring(self.BuildConfiguration(client))) + def _block(self): while not self.terminate.isSet(): try: - if self.pipe.poll(self.poll_wait): - if not self.metadata.use_database: - # handle FAM events, in case (for instance) the - # client has just been added to clients.xml, or a - # profile has just been asserted. but really, you - # should be using the metadata database if you're - # using this core. - self.fam.handle_events_in_interval(0.1) - client = self.pipe.recv() - self.logger.debug("Building configuration for %s" % client) - config = \ - lxml.etree.tostring(self.BuildConfiguration(client)) - self.logger.debug("Returning configuration for %s to main " - "process" % client) - self.pipe.send(config) - self.logger.debug("Returned configuration for %s to main " - "process" % client) + self.render() except KeyboardInterrupt: break self.shutdown() + def _command_queue_thread(self): + """ Process commands received on the command queue thread """ + while not self.terminate.isSet(): + method, args, kwargs = self.command_queue.get() + if hasattr(self, method): + func = getattr(self, method) + if func.exposed: + self.logger.debug("Child calling RPC method %s" % method) + func(*args, **kwargs) + else: + self.logger.error("Method %s is not exposed" % method) + else: + self.logger.error("Method %s does not exist" % method) + + @exposed + def expire_cache(self, client=None): + """ Expire the metadata cache for a client """ + self.metadata_cache.expire(client) + class Core(BuiltinCore): """ A multiprocessing core that delegates building the actual @@ -141,10 +225,14 @@ class Core(BuiltinCore): setup['children'] = multiprocessing.cpu_count() #: A dict of child name -> one end of the - #: :class:`multiprocessing.Pipe` object used to communicate - #: with that child. (The child is given the other end of the - #: Pipe.) - self.pipes = dict() + #: :class:`multiprocessing.Pipe` object used to submit render + #: requests to that child. (The child is given the other end + #: of the Pipe.) + self.render_pipes = dict() + + #: A dict of child name -> :class:`multiprocessing.Queue` + #: object used to pass commands to that child. + self.command_queues = dict() #: A queue that keeps track of which children are available to #: render a configuration. A child is popped from the queue @@ -164,13 +252,23 @@ class Core(BuiltinCore): # monkeypatch self.terminate to have isSet(). self.terminate = DualEvent(threading_event=self.terminate) + self.metadata_cache = DispatchingCache() + def _run(self): for cnum in range(self.setup['children']): name = "Child-%s" % cnum + + # create Pipe for render requests and results (mainpipe, childpipe) = multiprocessing.Pipe() - self.pipes[name] = mainpipe + self.render_pipes[name] = mainpipe + + # create Queue for other commands + cmd_q = multiprocessing.Queue() + self.command_queues[name] = cmd_q + self.metadata_cache.command_queues[name] = cmd_q + self.logger.debug("Starting child %s" % name) - childcore = ChildCore(self.setup, childpipe, self.terminate) + childcore = ChildCore(self.setup, childpipe, cmd_q, self.terminate) child = multiprocessing.Process(target=childcore.run, name=name) child.start() self.logger.debug("Child %s started with PID %s" % (name, @@ -193,11 +291,16 @@ class Core(BuiltinCore): self.logger.debug("All children shut down") @exposed + def set_debug(self, address, debug): + self.metadata_cache.set_debug(debug) + return BuiltinCore.set_debug(self, address, debug) + + @exposed def GetConfig(self, address): client = self.resolve_client(address)[0] childname = self.available_children.get() self.logger.debug("Building configuration on child %s" % childname) - pipe = self.pipes[childname] + pipe = self.render_pipes[childname] pipe.send(client) config = pipe.recv() self.available_children.put_nowait(childname) diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py b/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py index 8a787751c..fc3de3d68 100644 --- a/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py +++ b/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py @@ -10,6 +10,7 @@ import lxml.etree import Bcfg2.Options import Bcfg2.Server.Plugin import Bcfg2.Server.Lint +from fnmatch import fnmatch from Bcfg2.Server.Plugin import PluginExecutionError # pylint: disable=W0622 from Bcfg2.Compat import u_str, unicode, b64encode, walk_packages, \ @@ -876,22 +877,41 @@ class CfgLint(Bcfg2.Server.Lint.ServerPlugin): "%s has no corresponding pubkey.xml at %s" % (basename, pubkey)) + def _list_path_components(self, path): + """ Get a list of all components of a path. E.g., + ``self._list_path_components("/foo/bar/foobaz")`` would return + ``["foo", "bar", "foo", "baz"]``. The list is not guaranteed + to be in order.""" + rv = [] + remaining, component = os.path.split(path) + while component != '': + rv.append(component) + remaining, component = os.path.split(remaining) + return rv + def check_missing_files(self): """ check that all files on the filesystem are known to Cfg """ cfg = self.core.plugins['Cfg'] # first, collect ignore patterns from handlers - ignore = [] + ignore = set() for hdlr in handlers(): - ignore.extend(hdlr.__ignore__) + ignore.update(hdlr.__ignore__) # next, get a list of all non-ignored files on the filesystem all_files = set() for root, _, files in os.walk(cfg.data): - all_files.update(os.path.join(root, fname) - for fname in files - if not any(fname.endswith("." + i) - for i in ignore)) + for fname in files: + fpath = os.path.join(root, fname) + # check against the handler ignore patterns and the + # global FAM ignore list + if (not any(fname.endswith("." + i) for i in ignore) and + not any(fnmatch(fpath, p) + for p in self.config['ignore']) and + not any(fnmatch(c, p) + for p in self.config['ignore'] + for c in self._list_path_components(fpath))): + all_files.add(fpath) # next, get a list of all files known to Cfg cfg_files = set() diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py b/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py index 6692a1735..aee16eee1 100644 --- a/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py +++ b/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py @@ -53,11 +53,14 @@ The Yum Backend import os import re import sys +import time import copy import errno import socket import logging import lxml.etree +from lockfile import FileLock + import Bcfg2.Server.FileMonitor import Bcfg2.Server.Plugin from Bcfg2.Utils import Executor @@ -275,6 +278,10 @@ class YumCollection(Collection): debug=debug) self.keypath = os.path.join(self.cachepath, "keys") + #: A :class:`Bcfg2.Utils.Executor` object to use to run + #: external commands + self.cmd = Executor() + self._helper = None if self.use_yum: #: Define a unique cache file for this collection to use @@ -843,6 +850,17 @@ class YumCollection(Collection): if not self.use_yum: return Collection.complete(self, packagelist) + lock = FileLock(os.path.join(self.cachefile, "lock")) + slept = 0 + while lock.is_locked(): + if slept > 30: + self.logger.warning("Packages: Timeout waiting for yum cache " + "to release its lock") + return set(), set() + self.logger.debug("Packages: Yum cache is locked, waiting...") + time.sleep(3) + slept += 3 + if packagelist: try: result = self.call_helper( @@ -891,22 +909,19 @@ class YumCollection(Collection): cmd.append("-v") cmd.append(command) self.debug_log("Packages: running %s" % " ".join(cmd)) + if inputdata: - result = self.cmd.run(cmd, inputdata=json.dumps(inputdata)) + result = self.cmd.run(cmd, timeout=self.setup['client_timeout'], + inputdata=json.dumps(inputdata)) else: - result = self.cmd.run(cmd) + result = self.cmd.run(cmd, timeout=self.setup['client_timeout']) if not result.success: - errlines = result.error.splitlines() self.logger.error("Packages: error running bcfg2-yum-helper: %s" % - errlines[0]) - for line in errlines[1:]: - self.logger.error("Packages: %s" % line) + result.error) elif result.stderr: - errlines = result.stderr.splitlines() self.debug_log("Packages: debug info from bcfg2-yum-helper: %s" % - errlines[0]) - for line in errlines[1:]: - self.debug_log("Packages: %s" % line) + result.stderr) + try: return json.loads(result.stdout) except ValueError: diff --git a/src/lib/Bcfg2/settings.py b/src/lib/Bcfg2/settings.py index d73ab7c56..13512ff58 100644 --- a/src/lib/Bcfg2/settings.py +++ b/src/lib/Bcfg2/settings.py @@ -50,8 +50,8 @@ def read_config(cfile=DEFAULT_CONFIG, repo=None): """ read the config file and set django settings based on it """ # pylint: disable=W0602,W0603 global DATABASE_ENGINE, DATABASE_NAME, DATABASE_USER, DATABASE_PASSWORD, \ - DATABASE_HOST, DATABASE_PORT, DATABASE_OPTIONS, DEBUG, \ - TEMPLATE_DEBUG, TIME_ZONE, MEDIA_URL + DATABASE_HOST, DATABASE_PORT, DATABASE_OPTIONS, DATABASE_SCHEMA, \ + DEBUG, TEMPLATE_DEBUG, TIME_ZONE, MEDIA_URL # pylint: enable=W0602,W0603 if not os.path.exists(cfile) and os.path.exists(DEFAULT_CONFIG): @@ -79,7 +79,8 @@ def read_config(cfile=DEFAULT_CONFIG, repo=None): PASSWORD=setup['db_password'], HOST=setup['db_host'], PORT=setup['db_port'], - OPTIONS=setup['db_options']) + OPTIONS=setup['db_options'], + SCHEMA=setup['db_schema']) # dropping the version check. This was added in 1.1.2 TIME_ZONE = setup['time_zone'] |