summaryrefslogtreecommitdiffstats
path: root/src/lib
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2013-08-08 08:10:16 -0400
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2013-08-08 08:10:16 -0400
commite1e194a573b3803fa7f45a646bbb36b2f164a3e1 (patch)
treee9b689d1be39d38279e0a16f010e8d5e573612ef /src/lib
parent35851347089db1a092ec715cb183aec19f19e983 (diff)
parenteef441c1acdf1d3d483647b153f721cbab4a8517 (diff)
downloadbcfg2-e1e194a573b3803fa7f45a646bbb36b2f164a3e1.tar.gz
bcfg2-e1e194a573b3803fa7f45a646bbb36b2f164a3e1.tar.bz2
bcfg2-e1e194a573b3803fa7f45a646bbb36b2f164a3e1.zip
Merge branch 'maint'
Conflicts: doc/appendix/files/mysql.txt doc/getting_started/index.txt doc/server/plugins/structures/bundler/kernel.txt src/lib/Bcfg2/Server/MultiprocessingCore.py src/lib/Bcfg2/Server/Plugin/interfaces.py src/lib/Bcfg2/Server/Plugins/Packages/Yum.py src/lib/Bcfg2/Server/Plugins/Probes.py src/lib/Bcfg2/Server/Plugins/SSHbase.py
Diffstat (limited to 'src/lib')
-rw-r--r--src/lib/Bcfg2/Client/Tools/POSIX/base.py9
-rw-r--r--src/lib/Bcfg2/Client/Tools/POSIXUsers.py5
-rw-r--r--src/lib/Bcfg2/Options.py4
-rw-r--r--src/lib/Bcfg2/Server/Core.py51
-rw-r--r--src/lib/Bcfg2/Server/MultiprocessingCore.py499
-rw-r--r--src/lib/Bcfg2/Server/Plugin/base.py23
-rw-r--r--src/lib/Bcfg2/Server/Plugin/interfaces.py19
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Guppy.py1
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Metadata.py35
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py12
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Packages/Yum.py25
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Packages/__init__.py45
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Probes.py39
-rw-r--r--src/lib/Bcfg2/Server/Plugins/PuppetENC.py2
-rw-r--r--src/lib/Bcfg2/Server/Plugins/SSHbase.py6
-rw-r--r--src/lib/Bcfg2/Utils.py4
16 files changed, 559 insertions, 220 deletions
diff --git a/src/lib/Bcfg2/Client/Tools/POSIX/base.py b/src/lib/Bcfg2/Client/Tools/POSIX/base.py
index 3778569a6..fb5d06e54 100644
--- a/src/lib/Bcfg2/Client/Tools/POSIX/base.py
+++ b/src/lib/Bcfg2/Client/Tools/POSIX/base.py
@@ -686,7 +686,7 @@ class POSIXTool(Bcfg2.Client.Tools.Tool):
""" os.makedirs helpfully creates all parent directories for
us, but it sets permissions according to umask, which is
probably wrong. we need to find out which directories were
- created and set permissions on those
+ created and try to set permissions on those
(http://trac.mcs.anl.gov/projects/bcfg2/ticket/1125 and
http://trac.mcs.anl.gov/projects/bcfg2/ticket/1134) """
created = []
@@ -706,8 +706,9 @@ class POSIXTool(Bcfg2.Client.Tools.Tool):
(path, err))
rv = False
- # set auto-created directories to mode 755, if you need
- # something else, you should specify it in your config
+ # set auto-created directories to mode 755 and use best effort for
+ # permissions. If you need something else, you should specify it in
+ # your config.
tmpentry = copy.deepcopy(entry)
tmpentry.set('mode', '0755')
for acl in tmpentry.findall('ACL'):
@@ -715,7 +716,7 @@ class POSIXTool(Bcfg2.Client.Tools.Tool):
oct_mode(self._norm_acl_perms(acl.get('perms')) |
ACL_MAP['x']))
for cpath in created:
- rv &= self._set_perms(tmpentry, path=cpath)
+ self._set_perms(tmpentry, path=cpath)
return rv
diff --git a/src/lib/Bcfg2/Client/Tools/POSIXUsers.py b/src/lib/Bcfg2/Client/Tools/POSIXUsers.py
index 9b40f9cea..9d7441b5c 100644
--- a/src/lib/Bcfg2/Client/Tools/POSIXUsers.py
+++ b/src/lib/Bcfg2/Client/Tools/POSIXUsers.py
@@ -196,7 +196,10 @@ class POSIXUsers(Bcfg2.Client.Tools.Tool):
# automatically determine one -- i.e., it always
# verifies
continue
- if val != entry.get(attr):
+ entval = entry.get(attr)
+ if not isinstance(entval, str):
+ entval = entval.encode('utf-8')
+ if val != entval:
errors.append("%s for %s %s is incorrect. Current %s is "
"%s, but should be %s" %
(attr.title(), entry.tag, entry.get("name"),
diff --git a/src/lib/Bcfg2/Options.py b/src/lib/Bcfg2/Options.py
index 41bf54dfb..a96ea9a3b 100644
--- a/src/lib/Bcfg2/Options.py
+++ b/src/lib/Bcfg2/Options.py
@@ -1198,9 +1198,9 @@ DRIVER_OPTIONS = \
yum_verify_fail_action=CLIENT_YUM_VERIFY_FAIL_ACTION,
yum_verify_flags=CLIENT_YUM_VERIFY_FLAGS,
posix_uid_whitelist=CLIENT_POSIX_UID_WHITELIST,
- posix_gid_whitelist=CLIENT_POSIX_UID_WHITELIST,
+ posix_gid_whitelist=CLIENT_POSIX_GID_WHITELIST,
posix_uid_blacklist=CLIENT_POSIX_UID_BLACKLIST,
- posix_gid_blacklist=CLIENT_POSIX_UID_BLACKLIST)
+ posix_gid_blacklist=CLIENT_POSIX_GID_BLACKLIST)
CLIENT_COMMON_OPTIONS = \
dict(extra=CLIENT_EXTRA_DISPLAY,
diff --git a/src/lib/Bcfg2/Server/Core.py b/src/lib/Bcfg2/Server/Core.py
index 63e2c2910..698703457 100644
--- a/src/lib/Bcfg2/Server/Core.py
+++ b/src/lib/Bcfg2/Server/Core.py
@@ -263,6 +263,20 @@ class BaseCore(object):
#: metadata
self.metadata_cache = Cache()
+ def expire_caches_by_type(self, base_cls, key=None):
+ """ Expire caches for all
+ :class:`Bcfg2.Server.Plugin.interfaces.Caching` plugins that
+ are instances of ``base_cls``.
+
+ :param base_cls: The base plugin interface class to match (see
+ :mod:`Bcfg2.Server.Plugin.interfaces`)
+ :type base_cls: type
+ :param key: The cache key to expire
+ """
+ for plugin in self.plugins_by_type(base_cls):
+ if isinstance(plugin, Bcfg2.Server.Plugin.Caching):
+ plugin.expire_cache(key)
+
def plugins_by_type(self, base_cls):
""" Return a list of loaded plugins that match the passed type.
@@ -289,11 +303,12 @@ class BaseCore(object):
self.logger.debug("Performance logging thread starting")
while not self.terminate.isSet():
self.terminate.wait(self.setup['perflog_interval'])
- for name, stats in self.get_statistics(None).items():
- self.logger.info("Performance statistics: "
- "%s min=%.06f, max=%.06f, average=%.06f, "
- "count=%d" % ((name, ) + stats))
- self.logger.debug("Performance logging thread terminated")
+ if not self.terminate.isSet():
+ for name, stats in self.get_statistics(None).items():
+ self.logger.info("Performance statistics: "
+ "%s min=%.06f, max=%.06f, average=%.06f, "
+ "count=%d" % ((name, ) + stats))
+ self.logger.info("Performance logging thread terminated")
def _file_monitor_thread(self):
""" The thread that runs the
@@ -310,11 +325,12 @@ class BaseCore(object):
else:
if not self.fam.pending():
terminate.wait(15)
+ if self.fam.pending():
+ self._update_vcs_revision()
self.fam.handle_event_set(self.lock)
except:
continue
- self._update_vcs_revision()
- self.logger.debug("File monitor thread terminated")
+ self.logger.info("File monitor thread terminated")
@Bcfg2.Server.Statistics.track_statistics()
def _update_vcs_revision(self):
@@ -431,14 +447,14 @@ class BaseCore(object):
def shutdown(self):
""" Perform plugin and FAM shutdown tasks. """
- self.logger.debug("Shutting down core...")
+ self.logger.info("Shutting down core...")
if not self.terminate.isSet():
self.terminate.set()
self.fam.shutdown()
- self.logger.debug("FAM shut down")
+ self.logger.info("FAM shut down")
for plugin in list(self.plugins.values()):
plugin.shutdown()
- self.logger.debug("All plugins shut down")
+ self.logger.info("All plugins shut down")
@property
def metadata_cache_mode(self):
@@ -730,7 +746,7 @@ class BaseCore(object):
if event.code2str() == 'deleted':
return
self.setup.reparse()
- self.metadata_cache.expire()
+ self.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata)
def block_for_fam_events(self, handle_events=False):
""" Block until all fam events have been handleed, optionally
@@ -920,6 +936,9 @@ class BaseCore(object):
imd.query.by_name = self.build_metadata
if self.metadata_cache_mode in ['cautious', 'aggressive']:
self.metadata_cache[client_name] = imd
+ else:
+ self.logger.debug("Using cached metadata object for %s" %
+ client_name)
return imd
def process_statistics(self, client_name, statistics):
@@ -947,6 +966,7 @@ class BaseCore(object):
state.get('state')))
self.client_run_hook("end_statistics", meta)
+ @track_statistics()
def resolve_client(self, address, cleanup_cache=False, metadata=True):
""" Given a client address, get the client hostname and
optionally metadata.
@@ -1002,12 +1022,10 @@ class BaseCore(object):
def _get_rmi(self):
""" Get a list of RMI calls exposed by plugins """
rmi = dict()
- for pname, pinst in list(self.plugins.items()):
+ for pname, pinst in self.plugins.items() + \
+ [(self.fam.__class__.__name__, self.fam)]:
for mname in pinst.__rmi__:
rmi["%s.%s" % (pname, mname)] = getattr(pinst, mname)
- famname = self.fam.__class__.__name__
- for mname in self.fam.__rmi__:
- rmi["%s.%s" % (famname, mname)] = getattr(self.fam, mname)
return rmi
def _resolve_exposed_method(self, method_name):
@@ -1100,6 +1118,7 @@ class BaseCore(object):
for plugin in self.plugins_by_type(Probing):
for probe in plugin.GetProbes(metadata):
resp.append(probe)
+ self.logger.debug("Sending probe list to %s" % client)
return lxml.etree.tostring(resp,
xml_declaration=False).decode('UTF-8')
except:
@@ -1125,7 +1144,7 @@ class BaseCore(object):
# that's created for RecvProbeData doesn't get cached.
# I.e., the next metadata object that's built, after probe
# data is processed, is cached.
- self.metadata_cache.expire(client)
+ self.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata)
try:
xpdata = lxml.etree.XML(probedata.encode('utf-8'),
parser=Bcfg2.Server.XMLParser)
diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py
index b9716619d..e79207291 100644
--- a/src/lib/Bcfg2/Server/MultiprocessingCore.py
+++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py
@@ -3,66 +3,134 @@
:mod:`multiprocessing` library to offload work to multiple child
processes. As such, it requires Python 2.6+.
-The parent communicates with the children over two constructs:
-
-* A :class:`multiprocessing.Pipe` is used to process render requests.
- The pipe is locked when in use (i.e., between the time that a client
- is submitted to be rendered and the time that its configuration is
- returned) to keep things thread-safe. (This is accomplished through
- the use of
- :attr:`Bcfg2.Server.MultiprocessingCore.available_children.)
-* A :class:`multiprocessing.Queue` is used to submit other commands in
- a thread-safe, non-blocking fashion. (Note that, since it is a
- queue, no results can be returned.) It implements a very simple RPC
- protocol. Each command passed to a child over the Pipe must be a
- tuple with the format::
-
- (<method>, <args>, <kwargs>)
-
- The method must be exposed by the child by decorating it with
- :func:`Bcfg2.Server.Core.exposed`.
+The parent communicates with the children over
+:class:`multiprocessing.Queue` objects via a
+:class:`Bcfg2.Server.MultiprocessingCore.RPCQueue` object.
+
+A method being called via the RPCQueue must be exposed by the child by
+decorating it with :func:`Bcfg2.Server.Core.exposed`.
"""
+import time
import threading
import lxml.etree
import multiprocessing
-from Bcfg2.Compat import Queue
-from Bcfg2.Server.Cache import Cache
+import Bcfg2.Server.Plugin
+from itertools import cycle
+from Bcfg2.Cache import Cache
+from Bcfg2.Compat import Queue, Empty, wraps
from Bcfg2.Server.Core import BaseCore, exposed
-from Bcfg2.Server.Plugin import Debuggable
from Bcfg2.Server.BuiltinCore import Core as BuiltinCore
+from multiprocessing.connection import Listener, Client
-class DispatchingCache(Cache, Debuggable):
+class DispatchingCache(Cache, Bcfg2.Server.Plugin.Debuggable):
""" Implementation of :class:`Bcfg2.Cache.Cache` that propagates
cache expiration events to child nodes. """
#: The method to send over the pipe to expire the cache
- method = "expire_cache"
+ method = "expire_metadata_cache"
def __init__(self, *args, **kwargs):
- #: A dict of <child name>: :class:`multiprocessing.Queue`
- #: objects that should be given a cache expiration command any
- #: time an item is expired.
- self.command_queues = kwargs.pop("pipes", dict())
-
- Debuggable.__init__(self)
+ self.rpc_q = kwargs.pop("queue")
+ Bcfg2.Server.Plugin.Debuggable.__init__(self)
Cache.__init__(self, *args, **kwargs)
def expire(self, key=None):
- if (key and key in self) or (not key and len(self)):
- # dispatching cache expiration to children can be
- # expensive, so only do it if there's something to expire
- for child, cmd_q in self.command_queues.items():
- if key:
- self.logger.debug("Expiring metadata cache for %s on %s" %
- (key, child))
- else:
- self.logger.debug("Expiring metadata cache on %s" % child)
- cmd_q.put((self.method, [key], dict()))
+ self.rpc_q.publish(self.method, args=[key])
Cache.expire(self, key=key)
+class RPCQueue(Bcfg2.Server.Plugin.Debuggable):
+ """ An implementation of a :class:`multiprocessing.Queue` designed
+ for several additional use patterns:
+
+ * Random-access reads, based on a key that identifies the data;
+ * Publish-subscribe, where a datum is sent to all hosts.
+
+ The subscribers can deal with this as a normal Queue with no
+ special handling.
+ """
+ poll_wait = 3.0
+
+ def __init__(self):
+ Bcfg2.Server.Plugin.Debuggable.__init__(self)
+ self._terminate = threading.Event()
+ self._queues = dict()
+ self._available_listeners = Queue()
+ self._blocking_listeners = []
+
+ def add_subscriber(self, name):
+ """ Add a subscriber to the queue. This returns the
+ :class:`multiprocessing.Queue` object that the subscriber
+ should read from. """
+ self._queues[name] = multiprocessing.Queue()
+ return self._queues[name]
+
+ def publish(self, method, args=None, kwargs=None):
+ """ Publish an RPC call to the queue for consumption by all
+ subscribers. """
+ for queue in self._queues.values():
+ queue.put((None, (method, args or [], kwargs or dict())))
+
+ def rpc(self, dest, method, args=None, kwargs=None):
+ """ Make an RPC call to the named subscriber, expecting a
+ response. This opens a
+ :class:`multiprocessing.connection.Listener` and passes the
+ Listener address to the child as part of the RPC call, so that
+ the child can connect to the Listener to submit its results.
+
+ Listeners are reused when possible to minimize overhead.
+ """
+ try:
+ listener = self._available_listeners.get_nowait()
+ self.logger.debug("Reusing existing RPC listener at %s" %
+ listener.address)
+ except Empty:
+ listener = Listener()
+ self.logger.debug("Created new RPC listener at %s" %
+ listener.address)
+ self._blocking_listeners.append(listener)
+ try:
+ self._queues[dest].put((listener.address,
+ (method, args or [], kwargs or dict())))
+ conn = listener.accept()
+ self._blocking_listeners.remove(listener)
+ try:
+ while not self._terminate.is_set():
+ if conn.poll(self.poll_wait):
+ return conn.recv()
+ finally:
+ conn.close()
+ finally:
+ self._available_listeners.put(listener)
+
+ def close(self):
+ """ Close queues and connections. """
+ self._terminate.set()
+ self.logger.debug("Closing RPC queues")
+ for name, queue in self._queues.items():
+ self.logger.debug("Closing RPC queue to %s" % name)
+ queue.close()
+
+ # close any listeners that are waiting for connections
+ self.logger.debug("Closing RPC connections")
+ for listener in self._blocking_listeners:
+ self.logger.debug("Closing RPC connection at %s" %
+ listener.address)
+ listener.close()
+
+ self.logger.debug("Closing RPC listeners")
+ try:
+ while True:
+ listener = self._available_listeners.get_nowait()
+ self.logger.debug("Closing RPC listener at %s" %
+ listener.address)
+ listener.close()
+ except Empty:
+ pass
+
+
class DualEvent(object):
""" DualEvent is a clone of :class:`threading.Event` that
internally implements both :class:`threading.Event` and
@@ -111,101 +179,154 @@ class ChildCore(BaseCore):
those, though, if the pipe communication "protocol" were made more
robust. """
- #: How long to wait while polling for new clients to build. This
- #: doesn't affect the speed with which a client is built, but
+ #: How long to wait while polling for new RPC commands. This
+ #: doesn't affect the speed with which a command is processed, but
#: setting it too high will result in longer shutdown times, since
#: we only check for the termination event from the main process
#: every ``poll_wait`` seconds.
- poll_wait = 5.0
+ poll_wait = 3.0
- def __init__(self, setup, render_pipe, command_queue, terminate):
+ def __init__(self, name, setup, rpc_q, terminate):
"""
+ :param name: The name of this child
+ :type name: string
:param setup: A Bcfg2 options dict
:type setup: Bcfg2.Options.OptionParser
- :param pipe: The pipe to which client hostnames are added for
- ChildCore objects to build configurations, and to
- which client configurations are added after
- having been built by ChildCore objects.
- :type pipe: multiprocessing.Pipe
+ :param read_q: The queue the child will read from for RPC
+ communications from the parent process.
+ :type read_q: multiprocessing.Queue
+ :param write_q: The queue the child will write the results of
+ RPC calls to.
+ :type write_q: multiprocessing.Queue
:param terminate: An event that flags ChildCore objects to shut
themselves down.
:type terminate: multiprocessing.Event
"""
BaseCore.__init__(self, setup)
- #: The pipe to which client hostnames are added for ChildCore
- #: objects to build configurations, and to which client
- #: configurations are added after having been built by
- #: ChildCore objects.
- self.render_pipe = render_pipe
-
- #: The queue from which other commands are received
- self.command_queue = command_queue
+ #: The name of this child
+ self.name = name
#: The :class:`multiprocessing.Event` that will be monitored
#: to determine when this child should shut down.
self.terminate = terminate
- #: The :class:`threading.Thread` used to process commands
- #: received via the :class:`multiprocessing.Queue` RPC
- #: interface
- self.command_thread = \
- threading.Thread(name="CommandThread",
- target=self._command_queue_thread)
+ #: The queue used for RPC communication
+ self.rpc_q = rpc_q
- def _daemonize(self):
- return True
+ # override this setting so that the child doesn't try to write
+ # the pidfile
+ self.setup['daemon'] = False
+
+ # ensure that the child doesn't start a perflog thread
+ self.perflog_thread = None
+
+ self._rmi = dict()
def _run(self):
- try:
- self.command_thread.start()
- except:
- self.shutdown()
- raise
return True
- def render(self):
- """ Process client configuration render requests """
- if self.render_pipe.poll(self.poll_wait):
- if not self.metadata.use_database:
- # handle FAM events, in case (for instance) the
- # client has just been added to clients.xml, or a
- # profile has just been asserted. but really, you
- # should be using the metadata database if you're
- # using this core.
- self.fam.handle_events_in_interval(0.1)
- client = self.render_pipe.recv()
- self.logger.debug("Building configuration for %s" % client)
- self.render_pipe.send(
- lxml.etree.tostring(self.BuildConfiguration(client)))
+ def _daemonize(self):
+ return True
+
+ def _dispatch(self, address, data):
+ """ Method dispatcher used for commands received from
+ the RPC queue. """
+ if address is not None:
+ # if the key is None, then no response is expected. we
+ # make the return connection before dispatching the actual
+ # RPC call so that the parent is blocking for a connection
+ # as briefly as possible
+ self.logger.debug("Connecting to parent via %s" % address)
+ client = Client(address)
+ method, args, kwargs = data
+ func = None
+ rv = None
+ if "." in method:
+ if method in self._rmi:
+ func = self._rmi[method]
+ else:
+ self.logger.error("%s: Method %s does not exist" % (self.name,
+ method))
+ elif not hasattr(self, method):
+ self.logger.error("%s: Method %s does not exist" % (self.name,
+ method))
+ else: # method is not a plugin RMI, and exists
+ func = getattr(self, method)
+ if not func.exposed:
+ self.logger.error("%s: Method %s is not exposed" % (self.name,
+ method))
+ func = None
+ if func is not None:
+ self.logger.debug("%s: Calling RPC method %s" % (self.name,
+ method))
+ rv = func(*args, **kwargs)
+ if address is not None:
+ # if the key is None, then no response is expected
+ self.logger.debug("Returning data to parent via %s" % address)
+ client.send(rv)
def _block(self):
- while not self.terminate.isSet():
+ self._rmi = self._get_rmi()
+ while not self.terminate.is_set():
try:
- self.render()
+ address, data = self.rpc_q.get(timeout=self.poll_wait)
+ threadname = "-".join(str(i) for i in data)
+ rpc_thread = threading.Thread(name=threadname,
+ target=self._dispatch,
+ args=[address, data])
+ rpc_thread.start()
+ except Empty:
+ pass
except KeyboardInterrupt:
break
self.shutdown()
- def _command_queue_thread(self):
- """ Process commands received on the command queue thread """
- while not self.terminate.isSet():
- method, args, kwargs = self.command_queue.get()
- if hasattr(self, method):
- func = getattr(self, method)
- if func.exposed:
- self.logger.debug("Child calling RPC method %s" % method)
- func(*args, **kwargs)
+ def shutdown(self):
+ BaseCore.shutdown(self)
+ self.logger.info("%s: Closing RPC command queue" % self.name)
+ self.rpc_q.close()
+
+ while len(threading.enumerate()) > 1:
+ threads = [t for t in threading.enumerate()
+ if t != threading.current_thread()]
+ self.logger.info("%s: Waiting for %d thread(s): %s" %
+ (self.name, len(threads),
+ [t.name for t in threads]))
+ time.sleep(1)
+ self.logger.info("%s: All threads stopped" % self.name)
+
+ def _get_rmi(self):
+ rmi = dict()
+ for pname, pinst in self.plugins.items() + \
+ [(self.fam.__class__.__name__, self.fam)]:
+ for crmi in pinst.__child_rmi__:
+ if isinstance(crmi, tuple):
+ mname = crmi[1]
else:
- self.logger.error("Method %s is not exposed" % method)
- else:
- self.logger.error("Method %s does not exist" % method)
+ mname = crmi
+ rmi["%s.%s" % (pname, mname)] = getattr(pinst, mname)
+ return rmi
@exposed
- def expire_cache(self, client=None):
+ def expire_metadata_cache(self, client=None):
""" Expire the metadata cache for a client """
self.metadata_cache.expire(client)
+ @exposed
+ def RecvProbeData(self, address, _):
+ """ Expire the probe cache for a client """
+ self.expire_caches_by_type(Bcfg2.Server.Plugin.Probing,
+ key=self.resolve_client(address,
+ metadata=False)[0])
+
+ @exposed
+ def GetConfig(self, client):
+ """ Render the configuration for a client """
+ self.logger.debug("%s: Building configuration for %s" %
+ (self.name, client))
+ return lxml.etree.tostring(self.BuildConfiguration(client))
+
class Core(BuiltinCore):
""" A multiprocessing core that delegates building the actual
@@ -224,84 +345,162 @@ class Core(BuiltinCore):
if setup['children'] is None:
setup['children'] = multiprocessing.cpu_count()
- #: A dict of child name -> one end of the
- #: :class:`multiprocessing.Pipe` object used to submit render
- #: requests to that child. (The child is given the other end
- #: of the Pipe.)
- self.render_pipes = dict()
-
- #: A dict of child name -> :class:`multiprocessing.Queue`
- #: object used to pass commands to that child.
- self.command_queues = dict()
-
- #: A queue that keeps track of which children are available to
- #: render a configuration. A child is popped from the queue
- #: when it starts to render a config, then it's pushed back on
- #: when it's done. This lets us use a blocking call to
- #: :func:`Queue.Queue.get` when waiting for an available
- #: child.
- self.available_children = Queue(maxsize=self.setup['children'])
-
- # sigh. multiprocessing was added in py2.6, which is when the
- # camelCase methods for threading objects were deprecated in
- # favor of the Pythonic under_score methods. So
- # multiprocessing.Event *only* has is_set(), while
- # threading.Event has *both* isSet() and is_set(). In order
- # to make the core work with Python 2.4+, and with both
- # multiprocessing and threading Event objects, we just
- # monkeypatch self.terminate to have isSet().
+ #: The flag that indicates when to stop child threads and
+ #: processes
self.terminate = DualEvent(threading_event=self.terminate)
- self.metadata_cache = DispatchingCache()
+ #: A :class:`Bcfg2.Server.MultiprocessingCore.RPCQueue` object
+ #: used to send or publish commands to children.
+ self.rpc_q = RPCQueue()
+
+ self.metadata_cache = DispatchingCache(queue=self.rpc_q)
+
+ #: A list of children that will be cycled through
+ self._all_children = []
+
+ #: An iterator that each child will be taken from in sequence,
+ #: to provide a round-robin distribution of render requests
+ self.children = None
def _run(self):
for cnum in range(self.setup['children']):
name = "Child-%s" % cnum
- # create Pipe for render requests and results
- (mainpipe, childpipe) = multiprocessing.Pipe()
- self.render_pipes[name] = mainpipe
-
- # create Queue for other commands
- cmd_q = multiprocessing.Queue()
- self.command_queues[name] = cmd_q
- self.metadata_cache.command_queues[name] = cmd_q
-
self.logger.debug("Starting child %s" % name)
- childcore = ChildCore(self.setup, childpipe, cmd_q, self.terminate)
+ child_q = self.rpc_q.add_subscriber(name)
+ childcore = ChildCore(name, self.setup, child_q, self.terminate)
child = multiprocessing.Process(target=childcore.run, name=name)
child.start()
self.logger.debug("Child %s started with PID %s" % (name,
child.pid))
- self.available_children.put(name)
+ self._all_children.append(name)
+ self.logger.debug("Started %s children: %s" % (len(self._all_children),
+ self._all_children))
+ self.children = cycle(self._all_children)
return BuiltinCore._run(self)
def shutdown(self):
BuiltinCore.shutdown(self)
- for child in multiprocessing.active_children():
- self.logger.debug("Shutting down child %s" % child.name)
- child.join(self.shutdown_timeout)
- if child.is_alive():
+ self.logger.info("Closing RPC command queues")
+ self.rpc_q.close()
+
+ def term_children():
+ """ Terminate all remaining multiprocessing children. """
+ for child in multiprocessing.active_children():
self.logger.error("Waited %s seconds to shut down %s, "
"terminating" % (self.shutdown_timeout,
child.name))
child.terminate()
- else:
- self.logger.debug("Child %s shut down" % child.name)
- self.logger.debug("All children shut down")
+
+ timer = threading.Timer(self.shutdown_timeout, term_children)
+ timer.start()
+ while len(multiprocessing.active_children()):
+ self.logger.info("Waiting for %s child(ren): %s" %
+ (len(multiprocessing.active_children()),
+ [c.name
+ for c in multiprocessing.active_children()]))
+ time.sleep(1)
+ timer.cancel()
+ self.logger.info("All children shut down")
+
+ while len(threading.enumerate()) > 1:
+ threads = [t for t in threading.enumerate()
+ if t != threading.current_thread()]
+ self.logger.info("Waiting for %s thread(s): %s" %
+ (len(threads), [t.name for t in threads]))
+ time.sleep(1)
+ self.logger.info("Shutdown complete")
+
+ def _get_rmi(self):
+ child_rmi = dict()
+ for pname, pinst in self.plugins.items() + \
+ [(self.fam.__class__.__name__, self.fam)]:
+ for crmi in pinst.__child_rmi__:
+ if isinstance(crmi, tuple):
+ parentname, childname = crmi
+ else:
+ parentname = childname = crmi
+ child_rmi["%s.%s" % (pname, parentname)] = \
+ "%s.%s" % (pname, childname)
+
+ rmi = BuiltinCore._get_rmi(self)
+ for method in rmi.keys():
+ if method in child_rmi:
+ rmi[method] = self._child_rmi_wrapper(method,
+ rmi[method],
+ child_rmi[method])
+ return rmi
+
+ def _child_rmi_wrapper(self, method, parent_rmi, child_rmi):
+ """ Returns a callable that dispatches a call to the given
+ child RMI to child processes, and calls the parent RMI locally
+ (i.e., in the parent process). """
+ @wraps(parent_rmi)
+ def inner(*args, **kwargs):
+ self.logger.debug("Dispatching RMI call to %s to children: %s" %
+ (method, child_rmi))
+ self.rpc_q.publish(child_rmi, args=args, kwargs=kwargs)
+ return parent_rmi(*args, **kwargs)
+
+ return inner
@exposed
def set_debug(self, address, debug):
+ self.rpc_q.set_debug(debug)
+ self.rpc_q.publish("set_debug", args=[address, debug])
self.metadata_cache.set_debug(debug)
return BuiltinCore.set_debug(self, address, debug)
@exposed
+ def RecvProbeData(self, address, probedata):
+ rv = BuiltinCore.RecvProbeData(self, address, probedata)
+ # we don't want the children to actually process probe data,
+ # so we don't send the data, just the fact that we got some.
+ self.rpc_q.publish("RecvProbeData", args=[address, None])
+ return rv
+
+ @exposed
def GetConfig(self, address):
client = self.resolve_client(address)[0]
- childname = self.available_children.get()
- self.logger.debug("Building configuration on child %s" % childname)
- pipe = self.render_pipes[childname]
- pipe.send(client)
- config = pipe.recv()
- self.available_children.put_nowait(childname)
- return config
+ childname = self.children.next()
+ self.logger.debug("Building configuration for %s on %s" % (client,
+ childname))
+ return self.rpc_q.rpc(childname, "GetConfig", args=[client])
+
+ @exposed
+ def get_statistics(self, address):
+ stats = dict()
+
+ def _aggregate_statistics(newstats, prefix=None):
+ """ Aggregate a set of statistics from a child or parent
+ server core. This adds the statistics to the overall
+ statistics dict (optionally prepending a prefix, such as
+ "Child-1", to uniquely identify this set of statistics),
+ and aggregates it with the set of running totals that are
+ kept from all cores. """
+ for statname, vals in newstats.items():
+ if statname.startswith("ChildCore:"):
+ statname = statname[5:]
+ if prefix:
+ prettyname = "%s:%s" % (prefix, statname)
+ else:
+ prettyname = statname
+ stats[prettyname] = vals
+ totalname = "Total:%s" % statname
+ if totalname not in stats:
+ stats[totalname] = vals
+ else:
+ newmin = min(stats[totalname][0], vals[0])
+ newmax = max(stats[totalname][1], vals[1])
+ newcount = stats[totalname][3] + vals[3]
+ newmean = ((stats[totalname][2] * stats[totalname][3]) +
+ (vals[2] * vals[3])) / newcount
+ stats[totalname] = (newmin, newmax, newmean, newcount)
+
+ stats = dict()
+ for childname in self._all_children:
+ _aggregate_statistics(
+ self.rpc_q.rpc(childname, "get_statistics", args=[address]),
+ prefix=childname)
+ _aggregate_statistics(BuiltinCore.get_statistics(self, address))
+ return stats
diff --git a/src/lib/Bcfg2/Server/Plugin/base.py b/src/lib/Bcfg2/Server/Plugin/base.py
index c825a57b5..03feceb6f 100644
--- a/src/lib/Bcfg2/Server/Plugin/base.py
+++ b/src/lib/Bcfg2/Server/Plugin/base.py
@@ -12,6 +12,10 @@ class Debuggable(object):
#: List of names of methods to be exposed as XML-RPC functions
__rmi__ = ['toggle_debug', 'set_debug']
+ #: How exposed XML-RPC functions should be dispatched to child
+ #: processes.
+ __child_rmi__ = __rmi__[:]
+
def __init__(self, name=None):
"""
:param name: The name of the logger object to get. If none is
@@ -34,9 +38,6 @@ class Debuggable(object):
:returns: bool - The new value of the debug flag
"""
self.debug_flag = debug
- self.debug_log("%s: debug = %s" % (self.__class__.__name__,
- self.debug_flag),
- flag=True)
return debug
def toggle_debug(self):
@@ -94,6 +95,20 @@ class Plugin(Debuggable):
#: List of names of methods to be exposed as XML-RPC functions
__rmi__ = Debuggable.__rmi__
+ #: How exposed XML-RPC functions should be dispatched to child
+ #: processes, if :mod:`Bcfg2.Server.MultiprocessingCore` is in
+ #: use. Items ``__child_rmi__`` can either be strings (in which
+ #: case the same function is called on child processes as on the
+ #: parent) or 2-tuples, in which case the first element is the
+ #: name of the RPC function called on the parent process, and the
+ #: second element is the name of the function to call on child
+ #: processes. Functions that are not listed in the list will not
+ #: be dispatched to child processes, i.e., they will only be
+ #: called on the parent. A function must be listed in ``__rmi__``
+ #: in order to be exposed; functions listed in ``_child_rmi__``
+ #: but not ``__rmi__`` will be ignored.
+ __child_rmi__ = Debuggable.__child_rmi__
+
def __init__(self, core, datastore):
"""
:param core: The Bcfg2.Server.Core initializing the plugin
@@ -136,6 +151,8 @@ class Plugin(Debuggable):
self.running = False
def set_debug(self, debug):
+ self.debug_log("%s: debug = %s" % (self.name, self.debug_flag),
+ flag=True)
for entry in self.Entries.values():
if isinstance(entry, Debuggable):
entry.set_debug(debug)
diff --git a/src/lib/Bcfg2/Server/Plugin/interfaces.py b/src/lib/Bcfg2/Server/Plugin/interfaces.py
index 7909eaa03..ed4afb9b2 100644
--- a/src/lib/Bcfg2/Server/Plugin/interfaces.py
+++ b/src/lib/Bcfg2/Server/Plugin/interfaces.py
@@ -630,3 +630,22 @@ class ClientACLs(object):
:returns: bool
"""
return True
+
+
+class Caching(object):
+ """ A plugin that caches more than just the data received from the
+ FAM. This presents a unified interface to clear the cache. """
+
+ def expire_cache(self, key=None):
+ """ Expire the cache associated with the given key.
+
+ :param key: The key to expire the cache for. Because cache
+ implementations vary tremendously between plugins,
+ this could be any number of things, but generally
+ a hostname. It also may or may not be possible to
+ expire the cache for a single host; this interface
+ does not require any guarantee about that.
+ :type key: varies
+ :returns: None
+ """
+ raise NotImplementedError
diff --git a/src/lib/Bcfg2/Server/Plugins/Guppy.py b/src/lib/Bcfg2/Server/Plugins/Guppy.py
index 6d6df3cc3..c5969f978 100644
--- a/src/lib/Bcfg2/Server/Plugins/Guppy.py
+++ b/src/lib/Bcfg2/Server/Plugins/Guppy.py
@@ -34,6 +34,7 @@ class Guppy(Bcfg2.Server.Plugin.Plugin):
"""Guppy is a debugging plugin to help trace memory leaks"""
__author__ = 'bcfg-dev@mcs.anl.gov'
__rmi__ = Bcfg2.Server.Plugin.Plugin.__rmi__ + ['Enable', 'Disable']
+ __child_rmi__ = __rmi__[:]
def __init__(self, core, datastore):
Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore)
diff --git a/src/lib/Bcfg2/Server/Plugins/Metadata.py b/src/lib/Bcfg2/Server/Plugins/Metadata.py
index f37e0d97c..f355fd7de 100644
--- a/src/lib/Bcfg2/Server/Plugins/Metadata.py
+++ b/src/lib/Bcfg2/Server/Plugins/Metadata.py
@@ -486,6 +486,7 @@ class MetadataGroup(tuple): # pylint: disable=E0012,R0924
class Metadata(Bcfg2.Server.Plugin.Metadata,
+ Bcfg2.Server.Plugin.Caching,
Bcfg2.Server.Plugin.ClientRunHooks,
Bcfg2.Server.Plugin.DatabaseBacked):
"""This class contains data for bcfg2 server metadata."""
@@ -494,6 +495,7 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
def __init__(self, core, datastore, watch_clients=True):
Bcfg2.Server.Plugin.Metadata.__init__(self)
+ Bcfg2.Server.Plugin.Caching.__init__(self)
Bcfg2.Server.Plugin.ClientRunHooks.__init__(self)
Bcfg2.Server.Plugin.DatabaseBacked.__init__(self, core, datastore)
self.watch_clients = watch_clients
@@ -934,13 +936,16 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
self.groups[gname]
self.states['groups.xml'] = True
+ def expire_cache(self, key=None):
+ self.core.metadata_cache.expire(key)
+
def HandleEvent(self, event):
"""Handle update events for data files."""
for handles, event_handler in self.handlers.items():
if handles(event):
# clear the entire cache when we get an event for any
# metadata file
- self.core.metadata_cache.expire()
+ self.expire_cache()
event_handler(event)
if False not in list(self.states.values()) and self.debug_flag:
@@ -978,17 +983,21 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
self.logger.error(msg)
raise Bcfg2.Server.Plugin.PluginExecutionError(msg)
- profiles = [g for g in self.clientgroups[client]
- if g in self.groups and self.groups[g].is_profile]
- self.logger.info("Changing %s profile from %s to %s" %
- (client, profiles, profile))
- self.update_client(client, dict(profile=profile))
- if client in self.clientgroups:
- for prof in profiles:
- self.clientgroups[client].remove(prof)
- self.clientgroups[client].append(profile)
+ metadata = self.core.build_metadata(client)
+ if metadata.profile != profile:
+ self.logger.info("Changing %s profile from %s to %s" %
+ (client, metadata.profile, profile))
+ self.update_client(client, dict(profile=profile))
+ if client in self.clientgroups:
+ if metadata.profile in self.clientgroups[client]:
+ self.clientgroups[client].remove(metadata.profile)
+ self.clientgroups[client].append(profile)
+ else:
+ self.clientgroups[client] = [profile]
else:
- self.clientgroups[client] = [profile]
+ self.logger.debug(
+ "Ignoring %s request to change profile from %s to %s"
+ % (client, metadata.profile, profile))
else:
self.logger.info("Creating new client: %s, profile %s" %
(client, profile))
@@ -1004,8 +1013,8 @@ class Metadata(Bcfg2.Server.Plugin.Metadata,
self.add_client(client, dict(profile=profile))
self.clients.append(client)
self.clientgroups[client] = [profile]
- if not self._use_db:
- self.clients_xml.write()
+ if not self._use_db:
+ self.clients_xml.write()
def set_version(self, client, version):
"""Set version for provided client."""
diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py b/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py
index aa6127f57..9ff2d53a0 100644
--- a/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py
+++ b/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py
@@ -85,13 +85,12 @@ class PackagesSources(Bcfg2.Server.Plugin.StructFile,
:type event: Bcfg2.Server.FileMonitor.Event
:returns: None
"""
- Bcfg2.Server.Plugin.StructFile.HandleEvent(self, event=event)
if event and event.filename != self.name:
for fpath in self.extras:
if fpath == os.path.abspath(event.filename):
self.parsed.add(fpath)
break
-
+ Bcfg2.Server.Plugin.StructFile.HandleEvent(self, event=event)
if self.loaded:
self.logger.info("Reloading Packages plugin")
self.pkg_obj.Reload()
@@ -108,10 +107,11 @@ class PackagesSources(Bcfg2.Server.Plugin.StructFile,
def Index(self):
Bcfg2.Server.Plugin.StructFile.Index(self)
self.entries = []
- for xsource in self.xdata.findall('.//Source'):
- source = self.source_from_xml(xsource)
- if source is not None:
- self.entries.append(source)
+ if self.loaded:
+ for xsource in self.xdata.findall('.//Source'):
+ source = self.source_from_xml(xsource)
+ if source is not None:
+ self.entries.append(source)
Index.__doc__ = Bcfg2.Server.Plugin.StructFile.Index.__doc__ + """
``Index`` is responsible for calling :func:`source_from_xml`
diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py b/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py
index 453198ac8..75dab3f76 100644
--- a/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py
+++ b/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py
@@ -265,6 +265,8 @@ class YumCollection(Collection):
.. private-include: _add_gpg_instances, _get_pulp_consumer
"""
+ _helper = None
+
#: Options that are included in the [packages:yum] section of the
#: config but that should not be included in the temporary
#: yum.conf we write out
@@ -282,7 +284,6 @@ class YumCollection(Collection):
#: external commands
self.cmd = Executor()
- self._helper = None
if self.use_yum:
#: Define a unique cache file for this collection to use
#: for cached yum metadata
@@ -320,7 +321,7 @@ class YumCollection(Collection):
self.logger.error("Could not create Pulp consumer "
"cert directory at %s: %s" %
(certdir, err))
- self.pulp_cert_set = PulpCertificateSet(certdir)
+ self.__class__.pulp_cert_set = PulpCertificateSet(certdir)
@property
def disableMetaData(self):
@@ -355,16 +356,19 @@ class YumCollection(Collection):
forking, but apparently not); finally we check in /usr/sbin,
the default location. """
if not self._helper:
+ # pylint: disable=W0212
try:
- self._helper = self.setup.cfp.get("packages:yum", "helper")
+ self.__class__._helper = self.setup.cfp.get("packages:yum",
+ "helper")
except (ConfigParser.NoOptionError, ConfigParser.NoSectionError):
# first see if bcfg2-yum-helper is in PATH
try:
self.debug_log("Checking for bcfg2-yum-helper in $PATH")
self.cmd.run(['bcfg2-yum-helper'])
- self._helper = 'bcfg2-yum-helper'
+ self.__class__._helper = 'bcfg2-yum-helper'
except OSError:
- self._helper = "/usr/sbin/bcfg2-yum-helper"
+ self.__class__._helper = "/usr/sbin/bcfg2-yum-helper"
+ # pylint: enable=W0212
return self._helper
@property
@@ -945,9 +949,14 @@ class YumCollection(Collection):
try:
return json.loads(result.stdout)
except ValueError:
- err = sys.exc_info()[1]
- self.logger.error("Packages: error reading bcfg2-yum-helper "
- "output: %s" % err)
+ if result.stdout:
+ err = sys.exc_info()[1]
+ self.logger.error("Packages: Error reading bcfg2-yum-helper "
+ "output: %s" % err)
+ self.logger.error("Packages: bcfg2-yum-helper output: %s" %
+ result.stdout)
+ else:
+ self.logger.error("Packages: No bcfg2-yum-helper output")
raise
def setup_data(self, force_update=False):
diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py b/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py
index 03047b046..20a75c678 100644
--- a/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py
+++ b/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py
@@ -71,6 +71,7 @@ class OnDemandDict(MutableMapping):
class Packages(Bcfg2.Server.Plugin.Plugin,
+ Bcfg2.Server.Plugin.Caching,
Bcfg2.Server.Plugin.StructureValidator,
Bcfg2.Server.Plugin.Generator,
Bcfg2.Server.Plugin.Connector,
@@ -93,8 +94,12 @@ class Packages(Bcfg2.Server.Plugin.Plugin,
#: and :func:`Reload`
__rmi__ = Bcfg2.Server.Plugin.Plugin.__rmi__ + ['Refresh', 'Reload']
+ __child_rmi__ = Bcfg2.Server.Plugin.Plugin.__child_rmi__ + \
+ [('Refresh', 'expire_cache'), ('Reload', 'expire_cache')]
+
def __init__(self, core, datastore):
Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore)
+ Bcfg2.Server.Plugin.Caching.__init__(self)
Bcfg2.Server.Plugin.StructureValidator.__init__(self)
Bcfg2.Server.Plugin.Generator.__init__(self)
Bcfg2.Server.Plugin.Connector.__init__(self)
@@ -149,8 +154,21 @@ class Packages(Bcfg2.Server.Plugin.Plugin,
#: object when one is requested, so each entry is very
#: short-lived -- it's purged at the end of each client run.
self.clients = dict()
- # pylint: enable=C0301
+ #: groupcache caches group lookups. It maps Collections (via
+ #: :attr:`Bcfg2.Server.Plugins.Packages.Collection.Collection.cachekey`)
+ #: to sets of package groups, and thence to the packages
+ #: indicated by those groups.
+ self.groupcache = dict()
+
+ #: pkgcache caches complete package sets. It maps Collections
+ #: (via
+ #: :attr:`Bcfg2.Server.Plugins.Packages.Collection.Collection.cachekey`)
+ #: to sets of initial packages, and thence to the final
+ #: (complete) package selections resolved from the initial
+ #: packages
+ self.pkgcache = dict()
+ # pylint: enable=C0301
__init__.__doc__ = Bcfg2.Server.Plugin.Plugin.__init__.__doc__
def set_debug(self, debug):
@@ -388,14 +406,24 @@ class Packages(Bcfg2.Server.Plugin.Plugin,
for el in to_remove:
el.getparent().remove(el)
- gpkgs = collection.get_groups(groups)
- for pkgs in gpkgs.values():
+ groups.sort()
+ # check for this set of groups in the group cache
+ gkey = hash(tuple(groups))
+ if gkey not in self.groupcache[collection.cachekey]:
+ self.groupcache[collection.cachekey][gkey] = \
+ collection.get_groups(groups)
+ for pkgs in self.groupcache[collection.cachekey][gkey].values():
base.update(pkgs)
# essential pkgs are those marked as such by the distribution
base.update(collection.get_essential())
- packages, unknown = collection.complete(base)
+ # check for this set of packages in the package cache
+ pkey = hash(tuple(base))
+ if pkey not in self.pkgcache[collection.cachekey]:
+ self.pkgcache[collection.cachekey][pkey] = \
+ collection.complete(base)
+ packages, unknown = self.pkgcache[collection.cachekey][pkey]
if unknown:
self.logger.info("Packages: Got %d unknown entries" % len(unknown))
self.logger.info("Packages: %s" % list(unknown))
@@ -421,6 +449,9 @@ class Packages(Bcfg2.Server.Plugin.Plugin,
self._load_config()
return True
+ def expire_cache(self, _=None):
+ self.Reload()
+
def _load_config(self, force_update=False):
"""
Load the configuration data and setup sources
@@ -448,9 +479,11 @@ class Packages(Bcfg2.Server.Plugin.Plugin,
if not self.disableMetaData:
collection.setup_data(force_update)
- # clear Collection caches
+ # clear Collection and package caches
self.clients = dict()
self.collections = dict()
+ self.groupcache = dict()
+ self.pkgcache = dict()
for source in self.sources.entries:
cachefiles.add(source.cachefile)
@@ -563,6 +596,8 @@ class Packages(Bcfg2.Server.Plugin.Plugin,
if cclass != Collection:
self.clients[metadata.hostname] = ckey
self.collections[ckey] = collection
+ self.groupcache.setdefault(ckey, dict())
+ self.pkgcache.setdefault(ckey, dict())
return collection
def get_additional_data(self, metadata):
diff --git a/src/lib/Bcfg2/Server/Plugins/Probes.py b/src/lib/Bcfg2/Server/Plugins/Probes.py
index c6cf920df..7b85f180d 100644
--- a/src/lib/Bcfg2/Server/Plugins/Probes.py
+++ b/src/lib/Bcfg2/Server/Plugins/Probes.py
@@ -183,14 +183,16 @@ class ProbeSet(Bcfg2.Server.Plugin.EntrySet):
class Probes(Bcfg2.Server.Plugin.Probing,
+ Bcfg2.Server.Plugin.Caching,
Bcfg2.Server.Plugin.Connector,
Bcfg2.Server.Plugin.DatabaseBacked):
""" A plugin to gather information from a client machine """
__author__ = 'bcfg-dev@mcs.anl.gov'
def __init__(self, core, datastore):
- Bcfg2.Server.Plugin.Connector.__init__(self)
Bcfg2.Server.Plugin.Probing.__init__(self)
+ Bcfg2.Server.Plugin.Caching.__init__(self)
+ Bcfg2.Server.Plugin.Connector.__init__(self)
Bcfg2.Server.Plugin.DatabaseBacked.__init__(self, core, datastore)
try:
@@ -268,12 +270,17 @@ class Probes(Bcfg2.Server.Plugin.Probing,
hostname=client.hostname).exclude(
group__in=self.cgroups[client.hostname]).delete()
- def load_data(self):
+ def expire_cache(self, key=None):
+ self.load_data(client=key)
+
+ def load_data(self, client=None):
""" Load probe data from the appropriate backend (probed.xml
or the database) """
if self._use_db:
- return self._load_data_db()
+ return self._load_data_db(client=client)
else:
+ # the XML backend doesn't support loading data for single
+ # clients, so it reloads all data
return self._load_data_xml()
def _load_data_xml(self):
@@ -298,20 +305,36 @@ class Probes(Bcfg2.Server.Plugin.Probing,
elif pdata.tag == 'Group':
self.cgroups[client.get('name')].append(pdata.get('name'))
- def _load_data_db(self):
+ if self.core.metadata_cache_mode in ['cautious', 'aggressive']:
+ self.core.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata)
+
+ def _load_data_db(self, client=None):
""" Load probe data from the database """
- self.probedata = {}
- self.cgroups = {}
- for pdata in ProbesDataModel.objects.all():
+ if client is None:
+ self.probedata = {}
+ self.cgroups = {}
+ probedata = ProbesDataModel.objects.all()
+ groupdata = ProbesGroupsModel.objects.all()
+ else:
+ self.probedata.pop(client, None)
+ self.cgroups.pop(client, None)
+ probedata = ProbesDataModel.objects.filter(hostname=client)
+ groupdata = ProbesGroupsModel.objects.filter(hostname=client)
+
+ for pdata in probedata:
if pdata.hostname not in self.probedata:
self.probedata[pdata.hostname] = ClientProbeDataSet(
timestamp=time.mktime(pdata.timestamp.timetuple()))
self.probedata[pdata.hostname][pdata.probe] = ProbeData(pdata.data)
- for pgroup in ProbesGroupsModel.objects.all():
+ for pgroup in groupdata:
if pgroup.hostname not in self.cgroups:
self.cgroups[pgroup.hostname] = []
self.cgroups[pgroup.hostname].append(pgroup.group)
+ if self.core.metadata_cache_mode in ['cautious', 'aggressive']:
+ self.core.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata,
+ key=client)
+
@track_statistics()
def GetProbes(self, meta):
return self.probes.get_probe_data(meta)
diff --git a/src/lib/Bcfg2/Server/Plugins/PuppetENC.py b/src/lib/Bcfg2/Server/Plugins/PuppetENC.py
index 3b367573b..a02f012a0 100644
--- a/src/lib/Bcfg2/Server/Plugins/PuppetENC.py
+++ b/src/lib/Bcfg2/Server/Plugins/PuppetENC.py
@@ -117,7 +117,7 @@ class PuppetENC(Bcfg2.Server.Plugin.Plugin,
self.logger.warning("PuppetENC is incompatible with aggressive "
"client metadata caching, try 'cautious' or "
"'initial' instead")
- self.core.cache.expire()
+ self.core.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata)
def end_statistics(self, metadata):
self.end_client_run(self, metadata)
diff --git a/src/lib/Bcfg2/Server/Plugins/SSHbase.py b/src/lib/Bcfg2/Server/Plugins/SSHbase.py
index 84dcf2780..f350a7761 100644
--- a/src/lib/Bcfg2/Server/Plugins/SSHbase.py
+++ b/src/lib/Bcfg2/Server/Plugins/SSHbase.py
@@ -90,6 +90,7 @@ class KnownHostsEntrySet(Bcfg2.Server.Plugin.EntrySet):
class SSHbase(Bcfg2.Server.Plugin.Plugin,
+ Bcfg2.Server.Plugin.Caching,
Bcfg2.Server.Plugin.Generator,
Bcfg2.Server.Plugin.PullTarget):
"""
@@ -123,6 +124,7 @@ class SSHbase(Bcfg2.Server.Plugin.Plugin,
def __init__(self, core, datastore):
Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore)
+ Bcfg2.Server.Plugin.Caching.__init__(self)
Bcfg2.Server.Plugin.Generator.__init__(self)
Bcfg2.Server.Plugin.PullTarget.__init__(self)
self.ipcache = {}
@@ -147,9 +149,11 @@ class SSHbase(Bcfg2.Server.Plugin.Plugin,
self.entries["/etc/ssh/" + keypattern] = \
HostKeyEntrySet(keypattern, self.data)
self.Entries['Path']["/etc/ssh/" + keypattern] = self.build_hk
-
self.cmd = Executor()
+ def expire_cache(self, key=None):
+ self.__skn = False
+
def get_skn(self):
"""Build memory cache of the ssh known hosts file."""
if not self.__skn:
diff --git a/src/lib/Bcfg2/Utils.py b/src/lib/Bcfg2/Utils.py
index 59065479a..993fd9e0f 100644
--- a/src/lib/Bcfg2/Utils.py
+++ b/src/lib/Bcfg2/Utils.py
@@ -235,9 +235,9 @@ class Executor(object):
# py3k fixes
if not isinstance(stdout, str):
- stdout = stdout.decode('utf-8')
+ stdout = stdout.decode('utf-8') # pylint: disable=E1103
if not isinstance(stderr, str):
- stderr = stderr.decode('utf-8')
+ stderr = stderr.decode('utf-8') # pylint: disable=E1103
for line in stdout.splitlines(): # pylint: disable=E1103
self.logger.debug('< %s' % line)