From 827d0a83b8c9148598c23cb550862c0cf50b5a23 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Thu, 18 Jul 2013 14:22:11 -0400 Subject: Packages: added lock to yum cache update --- src/lib/Bcfg2/Server/Plugins/Packages/Yum.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) (limited to 'src/lib') diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py b/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py index 7c950a435..7a90f4f2e 100644 --- a/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py +++ b/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py @@ -53,11 +53,13 @@ The Yum Backend import os import re import sys +import time import copy import errno import socket import logging import lxml.etree +from lockfile import FileLock from subprocess import Popen, PIPE import Bcfg2.Server.Plugin # pylint: disable=W0622 @@ -864,6 +866,17 @@ class YumCollection(Collection): if not self.use_yum: return Collection.complete(self, packagelist) + lock = FileLock(os.path.join(self.cachefile, "lock")) + slept = 0 + while lock.is_locked(): + if slept > 30: + self.logger.warning("Packages: Timeout waiting for yum cache " + "to release its lock") + return set(), set() + self.logger.debug("Packages: Yum cache is locked, waiting...") + time.sleep(3) + slept += 3 + if packagelist: try: result = self.call_helper( -- cgit v1.2.3-1-g7c22 From 1450bd76eec59d409bdf93e5e9982ca3ec852f2b Mon Sep 17 00:00:00 2001 From: Sol Jerome Date: Fri, 19 Jul 2013 17:23:41 -0500 Subject: Admin/Init: Add listen_all option Some of these changes were suggested in github issue #29. Signed-off-by: Sol Jerome --- src/lib/Bcfg2/Server/Admin/Init.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) (limited to 'src/lib') diff --git a/src/lib/Bcfg2/Server/Admin/Init.py b/src/lib/Bcfg2/Server/Admin/Init.py index 6175d8ed0..c76e54569 100644 --- a/src/lib/Bcfg2/Server/Admin/Init.py +++ b/src/lib/Bcfg2/Server/Admin/Init.py @@ -20,6 +20,8 @@ from Bcfg2.Compat import input # pylint: disable=W0622 CONFIG = '''[server] repository = %s plugins = %s +# Uncomment the following to listen on all interfaces +#listen_all = true [statistics] sendmailpath = %s @@ -78,7 +80,7 @@ CLIENTS = ''' ''' # Mapping of operating system names to groups -OS_LIST = [('Red Hat/Fedora/RHEL/RHAS/Centos', 'redhat'), +OS_LIST = [('Red Hat/Fedora/RHEL/RHAS/CentOS', 'redhat'), ('SUSE/SLES', 'suse'), ('Mandrake', 'mandrake'), ('Debian', 'debian'), @@ -234,8 +236,9 @@ class Init(Bcfg2.Server.Admin.Mode): def _prompt_server(self): """Ask for the server name.""" - newserver = safe_input("Input the server location [%s]: " % - self.data['server_uri']) + newserver = safe_input( + "Input the server location (the server listens on a single " + "interface by default) [%s]: " % self.data['server_uri']) if newserver != '': self.data['server_uri'] = newserver -- cgit v1.2.3-1-g7c22 From 2f45bf09380595da88ebb1757042b7e799d41f57 Mon Sep 17 00:00:00 2001 From: Sol Jerome Date: Fri, 19 Jul 2013 17:50:07 -0500 Subject: Admin/Init: Fix default db location Signed-off-by: Sol Jerome --- src/lib/Bcfg2/Server/Admin/Init.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/lib') diff --git a/src/lib/Bcfg2/Server/Admin/Init.py b/src/lib/Bcfg2/Server/Admin/Init.py index c76e54569..153d7bea6 100644 --- a/src/lib/Bcfg2/Server/Admin/Init.py +++ b/src/lib/Bcfg2/Server/Admin/Init.py @@ -33,7 +33,7 @@ sendmailpath = %s # 'postgresql', 'mysql', 'mysql_old', 'sqlite3' or 'ado_mssql'. #name = # Or path to database file if using sqlite3. -#/bcfg2.sqlite is default path if left empty +#/etc/bcfg2.sqlite is default path if left empty #user = # Not used with sqlite3. #password = -- cgit v1.2.3-1-g7c22 From 360ba2e77865d2a292568ede99d8896ef7742056 Mon Sep 17 00:00:00 2001 From: Arto Jantunen Date: Sun, 21 Jul 2013 10:35:50 +0300 Subject: Make the server process always detach, even if started by init Otherwise startup with systemd in type=forking fails since the daemon will not fork. --- src/lib/Bcfg2/Server/BuiltinCore.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/lib') diff --git a/src/lib/Bcfg2/Server/BuiltinCore.py b/src/lib/Bcfg2/Server/BuiltinCore.py index e69a92b64..2dd83289d 100644 --- a/src/lib/Bcfg2/Server/BuiltinCore.py +++ b/src/lib/Bcfg2/Server/BuiltinCore.py @@ -31,7 +31,8 @@ class Core(BaseCore): daemon_args = dict(uid=self.setup['daemon_uid'], gid=self.setup['daemon_gid'], - umask=int(self.setup['umask'], 8)) + umask=int(self.setup['umask'], 8), + detach_process=True) if self.setup['daemon']: daemon_args['pidfile'] = TimeoutPIDLockFile(self.setup['daemon'], acquire_timeout=5) -- cgit v1.2.3-1-g7c22 From 7d544a289852c761fc209772064f794fb0472198 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Mon, 22 Jul 2013 09:08:32 -0400 Subject: Packages: Added timeout to bcfg2-yum-helper calls This involved making the Yum backend use Bcfg2.Utils.Executor to call bcfg2-yum-helper instead of subprocess.Popen directly. This was cherry-picked (kinda) from 3d06f311274d6b942ee89d8cdb13b2ecc99af1b0, so will likely break the maint -> master merge in spectacular ways. --- src/lib/Bcfg2/Options.py | 3 +- src/lib/Bcfg2/Server/Plugins/Packages/Yum.py | 54 +++++++++++----------------- 2 files changed, 22 insertions(+), 35 deletions(-) (limited to 'src/lib') diff --git a/src/lib/Bcfg2/Options.py b/src/lib/Bcfg2/Options.py index dba3e96ef..3d105bf30 100644 --- a/src/lib/Bcfg2/Options.py +++ b/src/lib/Bcfg2/Options.py @@ -1221,7 +1221,8 @@ SERVER_COMMON_OPTIONS = dict(repo=SERVER_REPOSITORY, authentication=SERVER_AUTHENTICATION, perflog=LOG_PERFORMANCE, perflog_interval=PERFLOG_INTERVAL, - children=SERVER_CHILDREN) + children=SERVER_CHILDREN, + client_timeout=CLIENT_TIMEOUT) CRYPT_OPTIONS = dict(encrypt=ENCRYPT, decrypt=DECRYPT, diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py b/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py index 7a90f4f2e..e0002ef34 100644 --- a/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py +++ b/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py @@ -59,9 +59,9 @@ import errno import socket import logging import lxml.etree -from lockfile import FileLock -from subprocess import Popen, PIPE import Bcfg2.Server.Plugin +from lockfile import FileLock +from Bcfg2.Utils import Executor # pylint: disable=W0622 from Bcfg2.Compat import StringIO, cPickle, HTTPError, URLError, \ ConfigParser, any @@ -279,6 +279,7 @@ class YumCollection(Collection): debug=debug) self.keypath = os.path.join(self.cachepath, "keys") + self._helper = None if self.use_yum: #: Define a unique cache file for this collection to use #: for cached yum metadata @@ -294,8 +295,10 @@ class YumCollection(Collection): os.mkdir(self.cachefile) if not self.disableMetaData: self.setup_data() + self.cmd = Executor() else: self.cachefile = None + self.cmd = None if HAS_PULP and self.has_pulp_sources: _setup_pulp(self.setup) @@ -348,20 +351,18 @@ class YumCollection(Collection): a call to it; I wish there was a way to do this without forking, but apparently not); finally we check in /usr/sbin, the default location. """ - global HELPER - if not HELPER: + if not self._helper: try: - HELPER = self.setup.cfp.get("packages:yum", "helper") + self._helper = self.setup.cfp.get("packages:yum", "helper") except (ConfigParser.NoOptionError, ConfigParser.NoSectionError): # first see if bcfg2-yum-helper is in PATH try: self.debug_log("Checking for bcfg2-yum-helper in $PATH") - Popen(['bcfg2-yum-helper'], - stdin=PIPE, stdout=PIPE, stderr=PIPE).wait() - HELPER = 'bcfg2-yum-helper' + self.cmd.run(['bcfg2-yum-helper']) + self._helper = 'bcfg2-yum-helper' except OSError: - HELPER = "/usr/sbin/bcfg2-yum-helper" - return HELPER + self._helper = "/usr/sbin/bcfg2-yum-helper" + return self._helper @property def use_yum(self): @@ -925,36 +926,21 @@ class YumCollection(Collection): cmd.append("-v") cmd.append(command) self.debug_log("Packages: running %s" % " ".join(cmd)) - try: - helper = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) - except OSError: - err = sys.exc_info()[1] - self.logger.error("Packages: Failed to execute %s: %s" % - (" ".join(cmd), err)) - return None if inputdata: - idata = json.dumps(inputdata) - (stdout, stderr) = helper.communicate(idata) + result = self.cmd.run(cmd, timeout=self.setup['client_timeout'], + inputdata=json.dumps(inputdata)) else: - (stdout, stderr) = helper.communicate() - rv = helper.wait() - errlines = stderr.splitlines() - if rv: - if not errlines: - errlines.append("No error output") - self.logger.error("Packages: error running bcfg2-yum-helper " - "(returned %d): %s" % (rv, errlines[0])) - for line in errlines[1:]: - self.logger.error("Packages: %s" % line) - elif errlines: + result = self.cmd.run(cmd, timeout=self.setup['client_timeout']) + if not result.success: + self.logger.error("Packages: error running bcfg2-yum-helper: %s" % + result.error) + elif result.stderr: self.debug_log("Packages: debug info from bcfg2-yum-helper: %s" % - errlines[0]) - for line in errlines[1:]: - self.debug_log("Packages: %s" % line) + result.stderr) try: - return json.loads(stdout) + return json.loads(result.stdout) except ValueError: err = sys.exc_info()[1] self.logger.error("Packages: error reading bcfg2-yum-helper " -- cgit v1.2.3-1-g7c22 From 752da22a2247892f647c0a9c46e7b0faf9351ea6 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Mon, 22 Jul 2013 11:44:23 -0400 Subject: Packages: instantiate Executor before determining path to helper --- src/lib/Bcfg2/Server/Plugins/Packages/Yum.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) (limited to 'src/lib') diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py b/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py index e0002ef34..48c5b1f65 100644 --- a/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py +++ b/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py @@ -104,9 +104,6 @@ FL = '{http://linux.duke.edu/metadata/filelists}' PULPSERVER = None PULPCONFIG = None -#: The path to bcfg2-yum-helper -HELPER = None - def _setup_pulp(setup): """ Connect to a Pulp server and pass authentication credentials. @@ -279,6 +276,10 @@ class YumCollection(Collection): debug=debug) self.keypath = os.path.join(self.cachepath, "keys") + #: A :class:`Bcfg2.Utils.Executor` object to use to run + #: external commands + self.cmd = Executor() + self._helper = None if self.use_yum: #: Define a unique cache file for this collection to use @@ -295,10 +296,8 @@ class YumCollection(Collection): os.mkdir(self.cachefile) if not self.disableMetaData: self.setup_data() - self.cmd = Executor() else: self.cachefile = None - self.cmd = None if HAS_PULP and self.has_pulp_sources: _setup_pulp(self.setup) -- cgit v1.2.3-1-g7c22 From 7d17d1c283f2718ae86a4b2db03726f4ae802889 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Mon, 22 Jul 2013 14:26:49 -0400 Subject: MultiprocessingCore: Dispatch metadata cache expiration to children When the broker in a multiprocessing configuration expires its metadata cache (e.g., when probe data is received), it must dispatch that expiration call to its children. This also makes the protocol for communication between the broker and its children into a real RPC protocol, so we can do even more stuff in the future. --- src/lib/Bcfg2/Server/MultiprocessingCore.py | 102 +++++++++++++++++++++++++--- 1 file changed, 92 insertions(+), 10 deletions(-) (limited to 'src/lib') diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index 81fba7092..066519774 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -2,16 +2,71 @@ :mod:`Bcfg2.Server.BuiltinCore` that uses the Python :mod:`multiprocessing` library to offload work to multiple child processes. As such, it requires Python 2.6+. + +The parent communicates with the children over a +:class:`multiprocessing.Pipe` that implements a very simple RPC +protocol. Each command passed to a child over the Pipe must be a +tuple with the format:: + + (, , ) + +The method must be exposed by the child by decorating it with +:func:`Bcfg2.Server.Core.exposed`. + +The RPC call always returns a value via the pipe, so the caller *must* +read the return value in order to keep the pipe consistent. """ +import logging import threading import lxml.etree import multiprocessing +from Bcfg2.Cache import Cache from Bcfg2.Compat import Queue from Bcfg2.Server.Core import BaseCore, exposed +from Bcfg2.Server.Plugin import Debuggable from Bcfg2.Server.BuiltinCore import Core as BuiltinCore +class DispatchingCache(Cache, Debuggable): + """ Implementation of :class:`Bcfg2.Cache.Cache` that propagates + cache expiration events to child nodes. """ + + #: The method to send over the pipe to expire the cache + method = "expire_cache" + + def __init__(self, *args, **kwargs): + #: A dict of : :class:`multiprocessing.Pipe` + #: objects that should be given a cache expiration command any + #: time an item is expired. + self.pipes = kwargs.pop("pipes", dict()) + + #: A :class:`logging.Logger` object this cache object can use + self.logger = logging.getLogger(self.__class__.__name__) + Cache.__init__(self, *args, **kwargs) + + def expire(self, key=None): + if (key and key in self) or (not key and len(self)): + # dispatching cache expiration to children can be + # expensive, so only do it if there's something to expire + for child, pipe in self.pipes.items(): + if key: + self.logger.debug("Expiring metadata cache for %s on %s" % + (key, child)) + else: + self.logger.debug("Expiring metadata cache on %s" % child) + pipe.send((self.method, [key], dict())) + pipe.recv() + Cache.expire(self, key=key) + + +class NoSuchMethod(Exception): + """ Exception raised by a child process if it's asked to execute a + method that doesn't exist or that isn't exposed via the + :class:`multiprocessing.Pipe` RPC interface. """ + pass + + class DualEvent(object): """ DualEvent is a clone of :class:`threading.Event` that internally implements both :class:`threading.Event` and @@ -98,6 +153,33 @@ class ChildCore(BaseCore): def _run(self): return True + def rpc_dispatch(self): + """ Dispatch a method received via the + :class:`multiprocessing.Pipe` RPC interface. + + :param data: The tuple of ``(, , )`` + :type data: tuple + """ + method, args, kwargs = self.pipe.recv() + if hasattr(self, method): + func = getattr(self, method) + if func.exposed: + self.pipe.send(func(*args, **kwargs)) + else: + raise NoSuchMethod(method) + else: + raise NoSuchMethod(method) + + @exposed + def GetConfig(self, client): + self.logger.debug("Building configuration for %s" % client) + return lxml.etree.tostring(self.BuildConfiguration(client)) + + @exposed + def expire_cache(self, client=None): + """ Expire the metadata cache for a client """ + self.metadata_cache.expire(client) + def _block(self): while not self.terminate.isSet(): try: @@ -109,15 +191,7 @@ class ChildCore(BaseCore): # should be using the metadata database if you're # using this core. self.fam.handle_events_in_interval(0.1) - client = self.pipe.recv() - self.logger.debug("Building configuration for %s" % client) - config = \ - lxml.etree.tostring(self.BuildConfiguration(client)) - self.logger.debug("Returning configuration for %s to main " - "process" % client) - self.pipe.send(config) - self.logger.debug("Returned configuration for %s to main " - "process" % client) + self.rpc_dispatch() except KeyboardInterrupt: break self.shutdown() @@ -164,11 +238,14 @@ class Core(BuiltinCore): # monkeypatch self.terminate to have isSet(). self.terminate = DualEvent(threading_event=self.terminate) + self.metadata_cache = DispatchingCache() + def _run(self): for cnum in range(self.setup['children']): name = "Child-%s" % cnum (mainpipe, childpipe) = multiprocessing.Pipe() self.pipes[name] = mainpipe + self.metadata_cache.pipes[name] = mainpipe self.logger.debug("Starting child %s" % name) childcore = ChildCore(self.setup, childpipe, self.terminate) child = multiprocessing.Process(target=childcore.run, name=name) @@ -192,13 +269,18 @@ class Core(BuiltinCore): self.logger.debug("Child %s shut down" % child.name) self.logger.debug("All children shut down") + @exposed + def set_debug(self, address, debug): + BuiltinCore.set_debug(self, address, debug) + self.metadata_cache.set_debug(debug) + @exposed def GetConfig(self, address): client = self.resolve_client(address)[0] childname = self.available_children.get() self.logger.debug("Building configuration on child %s" % childname) pipe = self.pipes[childname] - pipe.send(client) + pipe.send(("GetConfig", [client], dict())) config = pipe.recv() self.available_children.put_nowait(childname) return config -- cgit v1.2.3-1-g7c22 From 01693c1905aab3cc11978a193003537f6ca35444 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Mon, 22 Jul 2013 14:49:43 -0400 Subject: MultiprocessingCore: Call Debuggable.__init__ on DispatchingCache --- src/lib/Bcfg2/Server/MultiprocessingCore.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) (limited to 'src/lib') diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index 066519774..76a80f5fb 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -41,8 +41,7 @@ class DispatchingCache(Cache, Debuggable): #: time an item is expired. self.pipes = kwargs.pop("pipes", dict()) - #: A :class:`logging.Logger` object this cache object can use - self.logger = logging.getLogger(self.__class__.__name__) + Debuggable.__init__(self) Cache.__init__(self, *args, **kwargs) def expire(self, key=None): -- cgit v1.2.3-1-g7c22 From 2ba89f93a9146eabad02828eb1a41b0a97dd2038 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Mon, 22 Jul 2013 15:20:42 -0400 Subject: MultiprocessingCore: removed unused import --- src/lib/Bcfg2/Server/MultiprocessingCore.py | 1 - 1 file changed, 1 deletion(-) (limited to 'src/lib') diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index 76a80f5fb..af5eb0f3a 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -17,7 +17,6 @@ The RPC call always returns a value via the pipe, so the caller *must* read the return value in order to keep the pipe consistent. """ -import logging import threading import lxml.etree import multiprocessing -- cgit v1.2.3-1-g7c22 From a9ea92aa595c5df63eff25ff545927078f7651e6 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Mon, 22 Jul 2013 15:43:34 -0400 Subject: MultiprocessingCore: Fixed return value from set_debug --- src/lib/Bcfg2/Server/MultiprocessingCore.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src/lib') diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index af5eb0f3a..c9d7fc8c0 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -269,8 +269,8 @@ class Core(BuiltinCore): @exposed def set_debug(self, address, debug): - BuiltinCore.set_debug(self, address, debug) self.metadata_cache.set_debug(debug) + return BuiltinCore.set_debug(self, address, debug) @exposed def GetConfig(self, address): -- cgit v1.2.3-1-g7c22 From 9084b0e889407956227ae8d65bceff5148f7ee1f Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 23 Jul 2013 08:23:59 -0400 Subject: MultiprocessingCore: rewrote parent-child RPC to be thread-safe (and less powerful) --- src/lib/Bcfg2/Server/MultiprocessingCore.py | 157 ++++++++++++++++------------ 1 file changed, 90 insertions(+), 67 deletions(-) (limited to 'src/lib') diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index c9d7fc8c0..02710ab99 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -3,18 +3,24 @@ :mod:`multiprocessing` library to offload work to multiple child processes. As such, it requires Python 2.6+. -The parent communicates with the children over a -:class:`multiprocessing.Pipe` that implements a very simple RPC -protocol. Each command passed to a child over the Pipe must be a -tuple with the format:: +The parent communicates with the children over two constructs: + +* A :class:`multiprocessing.Pipe` is used to process render requests. + The pipe is locked when in use (i.e., between the time that a client + is submitted to be rendered and the time that its configuration is + returned) to keep things thread-safe. (This is accomplished through + the use of + :attr:`Bcfg2.Server.MultiprocessingCore.available_children.) +* A :class:`multiprocessing.Queue` is used to submit other commands in + a thread-safe, non-blocking fashion. (Note that, since it is a + queue, no results can be returned.) It implements a very simple RPC + protocol. Each command passed to a child over the Pipe must be a + tuple with the format:: (, , ) -The method must be exposed by the child by decorating it with -:func:`Bcfg2.Server.Core.exposed`. - -The RPC call always returns a value via the pipe, so the caller *must* -read the return value in order to keep the pipe consistent. + The method must be exposed by the child by decorating it with + :func:`Bcfg2.Server.Core.exposed`. """ import threading @@ -35,10 +41,10 @@ class DispatchingCache(Cache, Debuggable): method = "expire_cache" def __init__(self, *args, **kwargs): - #: A dict of : :class:`multiprocessing.Pipe` + #: A dict of : :class:`multiprocessing.Queue` #: objects that should be given a cache expiration command any #: time an item is expired. - self.pipes = kwargs.pop("pipes", dict()) + self.command_queues = kwargs.pop("pipes", dict()) Debuggable.__init__(self) Cache.__init__(self, *args, **kwargs) @@ -47,24 +53,16 @@ class DispatchingCache(Cache, Debuggable): if (key and key in self) or (not key and len(self)): # dispatching cache expiration to children can be # expensive, so only do it if there's something to expire - for child, pipe in self.pipes.items(): + for child, cmd_q in self.command_queues.items(): if key: self.logger.debug("Expiring metadata cache for %s on %s" % (key, child)) else: self.logger.debug("Expiring metadata cache on %s" % child) - pipe.send((self.method, [key], dict())) - pipe.recv() + cmd_q.put((self.method, [key], dict())) Cache.expire(self, key=key) -class NoSuchMethod(Exception): - """ Exception raised by a child process if it's asked to execute a - method that doesn't exist or that isn't exposed via the - :class:`multiprocessing.Pipe` RPC interface. """ - pass - - class DualEvent(object): """ DualEvent is a clone of :class:`threading.Event` that internally implements both :class:`threading.Event` and @@ -120,7 +118,7 @@ class ChildCore(BaseCore): #: every ``poll_wait`` seconds. poll_wait = 5.0 - def __init__(self, setup, pipe, terminate): + def __init__(self, setup, render_pipe, command_queue, terminate): """ :param setup: A Bcfg2 options dict :type setup: Bcfg2.Options.OptionParser @@ -139,61 +137,75 @@ class ChildCore(BaseCore): #: objects to build configurations, and to which client #: configurations are added after having been built by #: ChildCore objects. - self.pipe = pipe + self.render_pipe = render_pipe + + #: The queue from which other commands are received + self.command_queue = command_queue #: The :class:`multiprocessing.Event` that will be monitored #: to determine when this child should shut down. self.terminate = terminate + #: The :class:`threading.Thread` used to process commands + #: received via the :class:`multiprocessing.Queue` RPC + #: interface + self.command_thread = \ + threading.Thread(name="CommandThread", + target=self._command_queue_thread) + def _daemonize(self): return True def _run(self): + try: + self.command_thread.start() + except: + self.shutdown() + raise return True - def rpc_dispatch(self): - """ Dispatch a method received via the - :class:`multiprocessing.Pipe` RPC interface. - - :param data: The tuple of ``(, , )`` - :type data: tuple - """ - method, args, kwargs = self.pipe.recv() - if hasattr(self, method): - func = getattr(self, method) - if func.exposed: - self.pipe.send(func(*args, **kwargs)) - else: - raise NoSuchMethod(method) - else: - raise NoSuchMethod(method) - - @exposed - def GetConfig(self, client): - self.logger.debug("Building configuration for %s" % client) - return lxml.etree.tostring(self.BuildConfiguration(client)) - - @exposed - def expire_cache(self, client=None): - """ Expire the metadata cache for a client """ - self.metadata_cache.expire(client) + def render(self): + """ Process client configuration render requests """ + if self.render_pipe.poll(self.poll_wait): + if not self.metadata.use_database: + # handle FAM events, in case (for instance) the + # client has just been added to clients.xml, or a + # profile has just been asserted. but really, you + # should be using the metadata database if you're + # using this core. + self.fam.handle_events_in_interval(0.1) + client = self.render_pipe.recv() + self.logger.debug("Building configuration for %s" % client) + self.render_pipe.send( + lxml.etree.tostring(self.BuildConfiguration(client))) def _block(self): while not self.terminate.isSet(): try: - if self.pipe.poll(self.poll_wait): - if not self.metadata.use_database: - # handle FAM events, in case (for instance) the - # client has just been added to clients.xml, or a - # profile has just been asserted. but really, you - # should be using the metadata database if you're - # using this core. - self.fam.handle_events_in_interval(0.1) - self.rpc_dispatch() + self.render() except KeyboardInterrupt: break self.shutdown() + def _command_queue_thread(self): + """ Process commands received on the command queue thread """ + while not self.terminate.isSet(): + method, args, kwargs = self.command_queue.get() + if hasattr(self, method): + func = getattr(self, method) + if func.exposed: + self.logger.debug("Child calling RPC method %s" % method) + func(*args, **kwargs) + else: + self.logger.error("Method %s is not exposed" % method) + else: + self.logger.error("Method %s does not exist" % method) + + @exposed + def expire_cache(self, client=None): + """ Expire the metadata cache for a client """ + self.metadata_cache.expire(client) + class Core(BuiltinCore): """ A multiprocessing core that delegates building the actual @@ -213,10 +225,14 @@ class Core(BuiltinCore): setup['children'] = multiprocessing.cpu_count() #: A dict of child name -> one end of the - #: :class:`multiprocessing.Pipe` object used to communicate - #: with that child. (The child is given the other end of the - #: Pipe.) - self.pipes = dict() + #: :class:`multiprocessing.Pipe` object used to submit render + #: requests to that child. (The child is given the other end + #: of the Pipe.) + self.render_pipes = dict() + + #: A dict of child name -> :class:`multiprocessing.Queue` + #: object used to pass commands to that child. + self.command_queues = dict() #: A queue that keeps track of which children are available to #: render a configuration. A child is popped from the queue @@ -241,11 +257,18 @@ class Core(BuiltinCore): def _run(self): for cnum in range(self.setup['children']): name = "Child-%s" % cnum + + # create Pipe for render requests and results (mainpipe, childpipe) = multiprocessing.Pipe() - self.pipes[name] = mainpipe - self.metadata_cache.pipes[name] = mainpipe + self.render_pipes[name] = mainpipe + + # create Queue for other commands + cmd_q = multiprocessing.Queue() + self.command_queues[name] = cmd_q + self.metadata_cache.command_queues[name] = cmd_q + self.logger.debug("Starting child %s" % name) - childcore = ChildCore(self.setup, childpipe, self.terminate) + childcore = ChildCore(self.setup, childpipe, cmd_q, self.terminate) child = multiprocessing.Process(target=childcore.run, name=name) child.start() self.logger.debug("Child %s started with PID %s" % (name, @@ -277,8 +300,8 @@ class Core(BuiltinCore): client = self.resolve_client(address)[0] childname = self.available_children.get() self.logger.debug("Building configuration on child %s" % childname) - pipe = self.pipes[childname] - pipe.send(("GetConfig", [client], dict())) + pipe = self.render_pipes[childname] + pipe.send(client) config = pipe.recv() self.available_children.put_nowait(childname) return config -- cgit v1.2.3-1-g7c22 From ca9be467a0a4460a2dacadebb2ba9e45433297a4 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 23 Jul 2013 14:26:27 -0400 Subject: Cfg: unknown-cfg-files lint check honors FAM ignore list --- src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) (limited to 'src/lib') diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py b/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py index 154cd5e63..44455ff13 100644 --- a/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py +++ b/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py @@ -10,6 +10,7 @@ import lxml.etree import Bcfg2.Options import Bcfg2.Server.Plugin import Bcfg2.Server.Lint +from fnmatch import fnmatch from Bcfg2.Server.Plugin import PluginExecutionError # pylint: disable=W0622 from Bcfg2.Compat import u_str, unicode, b64encode, walk_packages, \ @@ -940,22 +941,37 @@ class CfgLint(Bcfg2.Server.Lint.ServerPlugin): "%s has no corresponding pubkey.xml at %s" % (basename, pubkey)) + def _list_path_components(self, path): + rv = [] + remaining, component = os.path.split(path) + while component != '': + rv.append(component) + remaining, component = os.path.split(remaining) + return rv + def check_missing_files(self): """ check that all files on the filesystem are known to Cfg """ cfg = self.core.plugins['Cfg'] # first, collect ignore patterns from handlers - ignore = [] + ignore = set() for hdlr in handlers(): - ignore.extend(hdlr.__ignore__) + ignore.update(hdlr.__ignore__) # next, get a list of all non-ignored files on the filesystem all_files = set() for root, _, files in os.walk(cfg.data): - all_files.update(os.path.join(root, fname) - for fname in files - if not any(fname.endswith("." + i) - for i in ignore)) + for fname in files: + fpath = os.path.join(root, fname) + # check against the handler ignore patterns and the + # global FAM ignore list + if (not any(fname.endswith("." + i) for i in ignore) and + not any(fnmatch(fpath, p) + for p in self.config['ignore']) and + not any(fnmatch(c, p) + for p in self.config['ignore'] + for c in self._list_path_components(fpath))): + all_files.add(fpath) # next, get a list of all files known to Cfg cfg_files = set() -- cgit v1.2.3-1-g7c22 From 084c9293050eb78a6da9f6dac41f71507b8098a2 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 23 Jul 2013 14:52:37 -0400 Subject: Cfg: added missing docstring --- src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'src/lib') diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py b/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py index 44455ff13..7f271fc7f 100644 --- a/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py +++ b/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py @@ -942,6 +942,10 @@ class CfgLint(Bcfg2.Server.Lint.ServerPlugin): (basename, pubkey)) def _list_path_components(self, path): + """ Get a list of all components of a path. E.g., + ``self._list_path_components("/foo/bar/foobaz")`` would return + ``["foo", "bar", "foo", "baz"]``. The list is not guaranteed + to be in order.""" rv = [] remaining, component = os.path.split(path) while component != '': -- cgit v1.2.3-1-g7c22 From df87f88840841ef4ec6b14eaef92cd11b6ad8710 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 24 Jul 2013 15:52:46 -0400 Subject: settings: allow setting database schema --- src/lib/Bcfg2/Options.py | 5 ++++- src/lib/Bcfg2/settings.py | 9 ++++++--- 2 files changed, 10 insertions(+), 4 deletions(-) (limited to 'src/lib') diff --git a/src/lib/Bcfg2/Options.py b/src/lib/Bcfg2/Options.py index 3d105bf30..2b9a8706f 100644 --- a/src/lib/Bcfg2/Options.py +++ b/src/lib/Bcfg2/Options.py @@ -673,12 +673,15 @@ DB_PORT = \ default='', cf=('database', 'port'), deprecated_cf=('statistics', 'database_port')) - DB_OPTIONS = \ Option('Database options', default=dict(), cf=('database', 'options'), cook=dict_split) +DB_PORT = \ + Option('Database schema', + default='', + cf=('database', 'schema')) # Django options WEB_CFILE = \ diff --git a/src/lib/Bcfg2/settings.py b/src/lib/Bcfg2/settings.py index 6e718a079..82a3bdb2a 100644 --- a/src/lib/Bcfg2/settings.py +++ b/src/lib/Bcfg2/settings.py @@ -27,6 +27,7 @@ DATABASE_PASSWORD = None DATABASE_HOST = None DATABASE_PORT = None DATABASE_OPTIONS = None +DATABASE_SCHEMA = None TIME_ZONE = None @@ -59,8 +60,8 @@ def read_config(cfile=DEFAULT_CONFIG, repo=None, quiet=False): """ read the config file and set django settings based on it """ # pylint: disable=W0602,W0603 global DATABASE_ENGINE, DATABASE_NAME, DATABASE_USER, DATABASE_PASSWORD, \ - DATABASE_HOST, DATABASE_PORT, DATABASE_OPTIONS, DEBUG, \ - TEMPLATE_DEBUG, TIME_ZONE, MEDIA_URL + DATABASE_HOST, DATABASE_PORT, DATABASE_OPTIONS, DATABASE_SCHEMA, \ + DEBUG, TEMPLATE_DEBUG, TIME_ZONE, MEDIA_URL # pylint: enable=W0602,W0603 if not os.path.exists(cfile) and os.path.exists(DEFAULT_CONFIG): @@ -88,7 +89,8 @@ def read_config(cfile=DEFAULT_CONFIG, repo=None, quiet=False): PASSWORD=setup['db_password'], HOST=setup['db_host'], PORT=setup['db_port'], - OPTIONS=setup['db_options']) + OPTIONS=setup['db_options'], + SCHEMA=setup['db_schema']) if HAS_DJANGO and django.VERSION[0] == 1 and django.VERSION[1] < 2: DATABASE_ENGINE = setup['db_engine'] @@ -98,6 +100,7 @@ def read_config(cfile=DEFAULT_CONFIG, repo=None, quiet=False): DATABASE_HOST = DATABASES['default']['HOST'] DATABASE_PORT = DATABASES['default']['PORT'] DATABASE_OPTIONS = DATABASES['default']['OPTIONS'] + DATABASE_SCHEMA = DATABASES['default']['SCHEMA'] # dropping the version check. This was added in 1.1.2 TIME_ZONE = setup['time_zone'] -- cgit v1.2.3-1-g7c22 From 1c86a3668d017bcaeda602cbc5c5bee84d701647 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 24 Jul 2013 16:06:25 -0400 Subject: settings: fixed db schema option --- src/lib/Bcfg2/Options.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src/lib') diff --git a/src/lib/Bcfg2/Options.py b/src/lib/Bcfg2/Options.py index 2b9a8706f..51af84cb1 100644 --- a/src/lib/Bcfg2/Options.py +++ b/src/lib/Bcfg2/Options.py @@ -678,7 +678,7 @@ DB_OPTIONS = \ default=dict(), cf=('database', 'options'), cook=dict_split) -DB_PORT = \ +DB_SCHEMA = \ Option('Database schema', default='', cf=('database', 'schema')) @@ -1318,6 +1318,7 @@ DATABASE_COMMON_OPTIONS = dict(web_configfile=WEB_CFILE, db_host=DB_HOST, db_port=DB_PORT, db_options=DB_OPTIONS, + db_schema=DB_SCHEMA, time_zone=DJANGO_TIME_ZONE, django_debug=DJANGO_DEBUG, web_prefix=DJANGO_WEB_PREFIX) -- cgit v1.2.3-1-g7c22