summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Server
diff options
context:
space:
mode:
authorSol Jerome <sol.jerome@gmail.com>2013-07-27 17:19:47 -0500
committerSol Jerome <sol.jerome@gmail.com>2013-07-27 17:19:47 -0500
commitaa230853296cd3b69f0296d646daf37b4b2cd764 (patch)
tree8ad1a2e8ee5f05ef775c2536984440a0cc74ec78 /src/lib/Bcfg2/Server
parent08f5ad7e1b470b79ce81130b2f299426b132db80 (diff)
parent3435963a7c715bd3e6e912c6224fc8b893b1abe4 (diff)
downloadbcfg2-aa230853296cd3b69f0296d646daf37b4b2cd764.tar.gz
bcfg2-aa230853296cd3b69f0296d646daf37b4b2cd764.tar.bz2
bcfg2-aa230853296cd3b69f0296d646daf37b4b2cd764.zip
Merge branch 'maint'
Signed-off-by: Sol Jerome <sol.jerome@gmail.com> Conflicts: doc/appendix/guides/ubuntu.txt src/lib/Bcfg2/Options.py src/lib/Bcfg2/Server/Plugins/Packages/Yum.py src/lib/Bcfg2/settings.py
Diffstat (limited to 'src/lib/Bcfg2/Server')
-rw-r--r--src/lib/Bcfg2/Server/Admin/Init.py11
-rw-r--r--src/lib/Bcfg2/Server/BuiltinCore.py3
-rw-r--r--src/lib/Bcfg2/Server/MultiprocessingCore.py155
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py32
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Packages/Yum.py35
5 files changed, 189 insertions, 47 deletions
diff --git a/src/lib/Bcfg2/Server/Admin/Init.py b/src/lib/Bcfg2/Server/Admin/Init.py
index 870a31480..ba553c7ef 100644
--- a/src/lib/Bcfg2/Server/Admin/Init.py
+++ b/src/lib/Bcfg2/Server/Admin/Init.py
@@ -19,6 +19,8 @@ from Bcfg2.Compat import input # pylint: disable=W0622
CONFIG = '''[server]
repository = %s
plugins = %s
+# Uncomment the following to listen on all interfaces
+#listen_all = true
[statistics]
sendmailpath = %s
@@ -30,7 +32,7 @@ sendmailpath = %s
# 'postgresql', 'mysql', 'mysql_old', 'sqlite3' or 'ado_mssql'.
#name =
# Or path to database file if using sqlite3.
-#<repository>/bcfg2.sqlite is default path if left empty
+#<repository>/etc/bcfg2.sqlite is default path if left empty
#user =
# Not used with sqlite3.
#password =
@@ -77,7 +79,7 @@ CLIENTS = '''<Clients version="3.0">
'''
# Mapping of operating system names to groups
-OS_LIST = [('Red Hat/Fedora/RHEL/RHAS/Centos', 'redhat'),
+OS_LIST = [('Red Hat/Fedora/RHEL/RHAS/CentOS', 'redhat'),
('SUSE/SLES', 'suse'),
('Mandrake', 'mandrake'),
('Debian', 'debian'),
@@ -238,8 +240,9 @@ class Init(Bcfg2.Server.Admin.Mode):
def _prompt_server(self):
"""Ask for the server name."""
- newserver = safe_input("Input the server location [%s]: " %
- self.data['server_uri'])
+ newserver = safe_input(
+ "Input the server location (the server listens on a single "
+ "interface by default) [%s]: " % self.data['server_uri'])
if newserver != '':
self.data['server_uri'] = newserver
diff --git a/src/lib/Bcfg2/Server/BuiltinCore.py b/src/lib/Bcfg2/Server/BuiltinCore.py
index b05ad9d41..ea1d97e83 100644
--- a/src/lib/Bcfg2/Server/BuiltinCore.py
+++ b/src/lib/Bcfg2/Server/BuiltinCore.py
@@ -31,7 +31,8 @@ class Core(BaseCore):
daemon_args = dict(uid=self.setup['daemon_uid'],
gid=self.setup['daemon_gid'],
- umask=int(self.setup['umask'], 8))
+ umask=int(self.setup['umask'], 8),
+ detach_process=True)
if self.setup['daemon']:
daemon_args['pidfile'] = TimeoutPIDLockFile(self.setup['daemon'],
acquire_timeout=5)
diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py
index 81fba7092..02710ab99 100644
--- a/src/lib/Bcfg2/Server/MultiprocessingCore.py
+++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py
@@ -2,16 +2,67 @@
:mod:`Bcfg2.Server.BuiltinCore` that uses the Python
:mod:`multiprocessing` library to offload work to multiple child
processes. As such, it requires Python 2.6+.
+
+The parent communicates with the children over two constructs:
+
+* A :class:`multiprocessing.Pipe` is used to process render requests.
+ The pipe is locked when in use (i.e., between the time that a client
+ is submitted to be rendered and the time that its configuration is
+ returned) to keep things thread-safe. (This is accomplished through
+ the use of
+ :attr:`Bcfg2.Server.MultiprocessingCore.available_children.)
+* A :class:`multiprocessing.Queue` is used to submit other commands in
+ a thread-safe, non-blocking fashion. (Note that, since it is a
+ queue, no results can be returned.) It implements a very simple RPC
+ protocol. Each command passed to a child over the Pipe must be a
+ tuple with the format::
+
+ (<method>, <args>, <kwargs>)
+
+ The method must be exposed by the child by decorating it with
+ :func:`Bcfg2.Server.Core.exposed`.
"""
import threading
import lxml.etree
import multiprocessing
+from Bcfg2.Cache import Cache
from Bcfg2.Compat import Queue
from Bcfg2.Server.Core import BaseCore, exposed
+from Bcfg2.Server.Plugin import Debuggable
from Bcfg2.Server.BuiltinCore import Core as BuiltinCore
+class DispatchingCache(Cache, Debuggable):
+ """ Implementation of :class:`Bcfg2.Cache.Cache` that propagates
+ cache expiration events to child nodes. """
+
+ #: The method to send over the pipe to expire the cache
+ method = "expire_cache"
+
+ def __init__(self, *args, **kwargs):
+ #: A dict of <child name>: :class:`multiprocessing.Queue`
+ #: objects that should be given a cache expiration command any
+ #: time an item is expired.
+ self.command_queues = kwargs.pop("pipes", dict())
+
+ Debuggable.__init__(self)
+ Cache.__init__(self, *args, **kwargs)
+
+ def expire(self, key=None):
+ if (key and key in self) or (not key and len(self)):
+ # dispatching cache expiration to children can be
+ # expensive, so only do it if there's something to expire
+ for child, cmd_q in self.command_queues.items():
+ if key:
+ self.logger.debug("Expiring metadata cache for %s on %s" %
+ (key, child))
+ else:
+ self.logger.debug("Expiring metadata cache on %s" % child)
+ cmd_q.put((self.method, [key], dict()))
+ Cache.expire(self, key=key)
+
+
class DualEvent(object):
""" DualEvent is a clone of :class:`threading.Event` that
internally implements both :class:`threading.Event` and
@@ -67,7 +118,7 @@ class ChildCore(BaseCore):
#: every ``poll_wait`` seconds.
poll_wait = 5.0
- def __init__(self, setup, pipe, terminate):
+ def __init__(self, setup, render_pipe, command_queue, terminate):
"""
:param setup: A Bcfg2 options dict
:type setup: Bcfg2.Options.OptionParser
@@ -86,42 +137,75 @@ class ChildCore(BaseCore):
#: objects to build configurations, and to which client
#: configurations are added after having been built by
#: ChildCore objects.
- self.pipe = pipe
+ self.render_pipe = render_pipe
+
+ #: The queue from which other commands are received
+ self.command_queue = command_queue
#: The :class:`multiprocessing.Event` that will be monitored
#: to determine when this child should shut down.
self.terminate = terminate
+ #: The :class:`threading.Thread` used to process commands
+ #: received via the :class:`multiprocessing.Queue` RPC
+ #: interface
+ self.command_thread = \
+ threading.Thread(name="CommandThread",
+ target=self._command_queue_thread)
+
def _daemonize(self):
return True
def _run(self):
+ try:
+ self.command_thread.start()
+ except:
+ self.shutdown()
+ raise
return True
+ def render(self):
+ """ Process client configuration render requests """
+ if self.render_pipe.poll(self.poll_wait):
+ if not self.metadata.use_database:
+ # handle FAM events, in case (for instance) the
+ # client has just been added to clients.xml, or a
+ # profile has just been asserted. but really, you
+ # should be using the metadata database if you're
+ # using this core.
+ self.fam.handle_events_in_interval(0.1)
+ client = self.render_pipe.recv()
+ self.logger.debug("Building configuration for %s" % client)
+ self.render_pipe.send(
+ lxml.etree.tostring(self.BuildConfiguration(client)))
+
def _block(self):
while not self.terminate.isSet():
try:
- if self.pipe.poll(self.poll_wait):
- if not self.metadata.use_database:
- # handle FAM events, in case (for instance) the
- # client has just been added to clients.xml, or a
- # profile has just been asserted. but really, you
- # should be using the metadata database if you're
- # using this core.
- self.fam.handle_events_in_interval(0.1)
- client = self.pipe.recv()
- self.logger.debug("Building configuration for %s" % client)
- config = \
- lxml.etree.tostring(self.BuildConfiguration(client))
- self.logger.debug("Returning configuration for %s to main "
- "process" % client)
- self.pipe.send(config)
- self.logger.debug("Returned configuration for %s to main "
- "process" % client)
+ self.render()
except KeyboardInterrupt:
break
self.shutdown()
+ def _command_queue_thread(self):
+ """ Process commands received on the command queue thread """
+ while not self.terminate.isSet():
+ method, args, kwargs = self.command_queue.get()
+ if hasattr(self, method):
+ func = getattr(self, method)
+ if func.exposed:
+ self.logger.debug("Child calling RPC method %s" % method)
+ func(*args, **kwargs)
+ else:
+ self.logger.error("Method %s is not exposed" % method)
+ else:
+ self.logger.error("Method %s does not exist" % method)
+
+ @exposed
+ def expire_cache(self, client=None):
+ """ Expire the metadata cache for a client """
+ self.metadata_cache.expire(client)
+
class Core(BuiltinCore):
""" A multiprocessing core that delegates building the actual
@@ -141,10 +225,14 @@ class Core(BuiltinCore):
setup['children'] = multiprocessing.cpu_count()
#: A dict of child name -> one end of the
- #: :class:`multiprocessing.Pipe` object used to communicate
- #: with that child. (The child is given the other end of the
- #: Pipe.)
- self.pipes = dict()
+ #: :class:`multiprocessing.Pipe` object used to submit render
+ #: requests to that child. (The child is given the other end
+ #: of the Pipe.)
+ self.render_pipes = dict()
+
+ #: A dict of child name -> :class:`multiprocessing.Queue`
+ #: object used to pass commands to that child.
+ self.command_queues = dict()
#: A queue that keeps track of which children are available to
#: render a configuration. A child is popped from the queue
@@ -164,13 +252,23 @@ class Core(BuiltinCore):
# monkeypatch self.terminate to have isSet().
self.terminate = DualEvent(threading_event=self.terminate)
+ self.metadata_cache = DispatchingCache()
+
def _run(self):
for cnum in range(self.setup['children']):
name = "Child-%s" % cnum
+
+ # create Pipe for render requests and results
(mainpipe, childpipe) = multiprocessing.Pipe()
- self.pipes[name] = mainpipe
+ self.render_pipes[name] = mainpipe
+
+ # create Queue for other commands
+ cmd_q = multiprocessing.Queue()
+ self.command_queues[name] = cmd_q
+ self.metadata_cache.command_queues[name] = cmd_q
+
self.logger.debug("Starting child %s" % name)
- childcore = ChildCore(self.setup, childpipe, self.terminate)
+ childcore = ChildCore(self.setup, childpipe, cmd_q, self.terminate)
child = multiprocessing.Process(target=childcore.run, name=name)
child.start()
self.logger.debug("Child %s started with PID %s" % (name,
@@ -193,11 +291,16 @@ class Core(BuiltinCore):
self.logger.debug("All children shut down")
@exposed
+ def set_debug(self, address, debug):
+ self.metadata_cache.set_debug(debug)
+ return BuiltinCore.set_debug(self, address, debug)
+
+ @exposed
def GetConfig(self, address):
client = self.resolve_client(address)[0]
childname = self.available_children.get()
self.logger.debug("Building configuration on child %s" % childname)
- pipe = self.pipes[childname]
+ pipe = self.render_pipes[childname]
pipe.send(client)
config = pipe.recv()
self.available_children.put_nowait(childname)
diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py b/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py
index 8a787751c..fc3de3d68 100644
--- a/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py
+++ b/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py
@@ -10,6 +10,7 @@ import lxml.etree
import Bcfg2.Options
import Bcfg2.Server.Plugin
import Bcfg2.Server.Lint
+from fnmatch import fnmatch
from Bcfg2.Server.Plugin import PluginExecutionError
# pylint: disable=W0622
from Bcfg2.Compat import u_str, unicode, b64encode, walk_packages, \
@@ -876,22 +877,41 @@ class CfgLint(Bcfg2.Server.Lint.ServerPlugin):
"%s has no corresponding pubkey.xml at %s" %
(basename, pubkey))
+ def _list_path_components(self, path):
+ """ Get a list of all components of a path. E.g.,
+ ``self._list_path_components("/foo/bar/foobaz")`` would return
+ ``["foo", "bar", "foo", "baz"]``. The list is not guaranteed
+ to be in order."""
+ rv = []
+ remaining, component = os.path.split(path)
+ while component != '':
+ rv.append(component)
+ remaining, component = os.path.split(remaining)
+ return rv
+
def check_missing_files(self):
""" check that all files on the filesystem are known to Cfg """
cfg = self.core.plugins['Cfg']
# first, collect ignore patterns from handlers
- ignore = []
+ ignore = set()
for hdlr in handlers():
- ignore.extend(hdlr.__ignore__)
+ ignore.update(hdlr.__ignore__)
# next, get a list of all non-ignored files on the filesystem
all_files = set()
for root, _, files in os.walk(cfg.data):
- all_files.update(os.path.join(root, fname)
- for fname in files
- if not any(fname.endswith("." + i)
- for i in ignore))
+ for fname in files:
+ fpath = os.path.join(root, fname)
+ # check against the handler ignore patterns and the
+ # global FAM ignore list
+ if (not any(fname.endswith("." + i) for i in ignore) and
+ not any(fnmatch(fpath, p)
+ for p in self.config['ignore']) and
+ not any(fnmatch(c, p)
+ for p in self.config['ignore']
+ for c in self._list_path_components(fpath))):
+ all_files.add(fpath)
# next, get a list of all files known to Cfg
cfg_files = set()
diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py b/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py
index 6692a1735..aee16eee1 100644
--- a/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py
+++ b/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py
@@ -53,11 +53,14 @@ The Yum Backend
import os
import re
import sys
+import time
import copy
import errno
import socket
import logging
import lxml.etree
+from lockfile import FileLock
+
import Bcfg2.Server.FileMonitor
import Bcfg2.Server.Plugin
from Bcfg2.Utils import Executor
@@ -275,6 +278,10 @@ class YumCollection(Collection):
debug=debug)
self.keypath = os.path.join(self.cachepath, "keys")
+ #: A :class:`Bcfg2.Utils.Executor` object to use to run
+ #: external commands
+ self.cmd = Executor()
+
self._helper = None
if self.use_yum:
#: Define a unique cache file for this collection to use
@@ -843,6 +850,17 @@ class YumCollection(Collection):
if not self.use_yum:
return Collection.complete(self, packagelist)
+ lock = FileLock(os.path.join(self.cachefile, "lock"))
+ slept = 0
+ while lock.is_locked():
+ if slept > 30:
+ self.logger.warning("Packages: Timeout waiting for yum cache "
+ "to release its lock")
+ return set(), set()
+ self.logger.debug("Packages: Yum cache is locked, waiting...")
+ time.sleep(3)
+ slept += 3
+
if packagelist:
try:
result = self.call_helper(
@@ -891,22 +909,19 @@ class YumCollection(Collection):
cmd.append("-v")
cmd.append(command)
self.debug_log("Packages: running %s" % " ".join(cmd))
+
if inputdata:
- result = self.cmd.run(cmd, inputdata=json.dumps(inputdata))
+ result = self.cmd.run(cmd, timeout=self.setup['client_timeout'],
+ inputdata=json.dumps(inputdata))
else:
- result = self.cmd.run(cmd)
+ result = self.cmd.run(cmd, timeout=self.setup['client_timeout'])
if not result.success:
- errlines = result.error.splitlines()
self.logger.error("Packages: error running bcfg2-yum-helper: %s" %
- errlines[0])
- for line in errlines[1:]:
- self.logger.error("Packages: %s" % line)
+ result.error)
elif result.stderr:
- errlines = result.stderr.splitlines()
self.debug_log("Packages: debug info from bcfg2-yum-helper: %s" %
- errlines[0])
- for line in errlines[1:]:
- self.debug_log("Packages: %s" % line)
+ result.stderr)
+
try:
return json.loads(result.stdout)
except ValueError: