From cd7b0b3d40a5a340d5b47819f94a21c9faf23120 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 5 Sep 2012 14:36:15 -0400 Subject: added server-side client metadata object caching --- src/lib/Bcfg2/Cache.py | 12 ++++++ src/lib/Bcfg2/Compat.py | 7 ++++ src/lib/Bcfg2/Options.py | 12 ------ src/lib/Bcfg2/Server/Core.py | 60 ++++++++++++++++++++++----- src/lib/Bcfg2/Server/FileMonitor/Inotify.py | 7 ++-- src/lib/Bcfg2/Server/FileMonitor/__init__.py | 11 ++++- src/lib/Bcfg2/Server/Plugins/GroupPatterns.py | 12 +++++- src/lib/Bcfg2/Server/Plugins/Metadata.py | 17 ++++++-- src/lib/Bcfg2/Server/Plugins/Probes.py | 16 ++++++- src/lib/Bcfg2/Server/Plugins/PuppetENC.py | 10 +++++ 10 files changed, 129 insertions(+), 35 deletions(-) create mode 100644 src/lib/Bcfg2/Cache.py (limited to 'src') diff --git a/src/lib/Bcfg2/Cache.py b/src/lib/Bcfg2/Cache.py new file mode 100644 index 000000000..9a828e2c9 --- /dev/null +++ b/src/lib/Bcfg2/Cache.py @@ -0,0 +1,12 @@ +""" An implementation of a simple memory-backed cache. Right now this +doesn't provide many features, but more (time-based expiration, etc.) +can be added as necessary. """ + +class Cache(dict): + """ an implementation of a simple memory-backed cache """ + def expire(self, key=None): + if key is None: + self.clear() + elif key in self: + del self[key] + diff --git a/src/lib/Bcfg2/Compat.py b/src/lib/Bcfg2/Compat.py index 74f660216..a045929bb 100644 --- a/src/lib/Bcfg2/Compat.py +++ b/src/lib/Bcfg2/Compat.py @@ -222,9 +222,16 @@ except ImportError: try: all = all + any = any except NameError: def all(iterable): for element in iterable: if not element: return False return True + + def any(iterable): + for element in iterable: + if element: + return True + return False diff --git a/src/lib/Bcfg2/Options.py b/src/lib/Bcfg2/Options.py index a69300178..e617e3e38 100644 --- a/src/lib/Bcfg2/Options.py +++ b/src/lib/Bcfg2/Options.py @@ -1064,18 +1064,6 @@ class OptionParser(OptionSet): quiet=quiet) self.optinfo = copy.copy(args) - def HandleEvent(self, event): - if 'configfile' not in self or not isinstance(self['configfile'], str): - # we haven't parsed options yet, or CFILE wasn't included - # in the options - return - if event.filename != self['configfile']: - print("Got event for unknown file: %s" % event.filename) - return - if event.code2str() == 'deleted': - return - self.reparse() - def reparse(self): for key, opt in self.optinfo.items(): self[key] = opt diff --git a/src/lib/Bcfg2/Server/Core.py b/src/lib/Bcfg2/Server/Core.py index 13be70731..cc6cf13a8 100644 --- a/src/lib/Bcfg2/Server/Core.py +++ b/src/lib/Bcfg2/Server/Core.py @@ -14,6 +14,7 @@ import Bcfg2.settings import Bcfg2.Server import Bcfg2.Logger import Bcfg2.Server.FileMonitor +from Bcfg2.Cache import Cache from Bcfg2.Statistics import Statistics from Bcfg2.Compat import xmlrpclib, reduce from Bcfg2.Server.Plugin import PluginInitError, PluginExecutionError @@ -194,6 +195,7 @@ class BaseCore(object): self.lock = threading.Lock() self.stats = Statistics() + self.metadata_cache = Cache() def plugins_by_type(self, base_cls): """Return a list of loaded plugins that match the passed type. @@ -264,8 +266,19 @@ class BaseCore(object): for plugin in list(self.plugins.values()): plugin.shutdown() + @property + def metadata_cache_mode(self): + """ get the client metadata cache mode. options are off, + initial, cautious, aggressive, on (synonym for cautious) """ + mode = self.setup.cfp.get("caching", "client_metadata", + default="off").lower() + if mode == "on": + return "cautious" + else: + return mode + def client_run_hook(self, hook, metadata): - """Checks the data structure.""" + """invoke client run hooks for a given stage.""" start = time.time() try: for plugin in \ @@ -449,6 +462,16 @@ class BaseCore(object): (client, time.time() - start)) return config + def HandleEvent(self, event): + """ handle a change in the config file """ + if event.filename != self.cfile: + print("Got event for unknown file: %s" % event.filename) + return + if event.code2str() == 'deleted': + return + self.setup.reparse() + self.metadata_cache.expire() + def run(self): """ run the server core. note that it is the responsibility of the server core implementation to call shutdown() """ @@ -460,7 +483,7 @@ class BaseCore(object): self.fam.start() self.fam_thread.start() - self.fam.AddMonitor(self.cfile, self.setup) + self.fam.AddMonitor(self.cfile, self) self._block() @@ -498,14 +521,23 @@ class BaseCore(object): if not hasattr(self, 'metadata'): # some threads start before metadata is even loaded raise Bcfg2.Server.Plugin.MetadataRuntimeError - imd = self.metadata.get_initial_metadata(client_name) - for conn in self.connectors: - grps = conn.get_additional_groups(imd) - self.metadata.merge_additional_groups(imd, grps) - for conn in self.connectors: - data = conn.get_additional_data(imd) - self.metadata.merge_additional_data(imd, conn.name, data) - imd.query.by_name = self.build_metadata + if self.metadata_cache_mode == 'initial': + # the Metadata plugin handles loading the cached data if + # we're only caching the initial metadata object + imd = None + else: + imd = self.metadata_cache.get(client_name, None) + if not imd: + imd = self.metadata.get_initial_metadata(client_name) + for conn in self.connectors: + grps = conn.get_additional_groups(imd) + self.metadata.merge_additional_groups(imd, grps) + for conn in self.connectors: + data = conn.get_additional_data(imd) + self.metadata.merge_additional_data(imd, conn.name, data) + imd.query.by_name = self.build_metadata + if self.metadata_cache_mode in ['cautious', 'aggressive']: + self.metadata_cache[client_name] = imd return imd def process_statistics(self, client_name, statistics): @@ -603,6 +635,14 @@ class BaseCore(object): def RecvProbeData(self, address, probedata): """Receive probe data from clients.""" client, metadata = self.resolve_client(address) + if self.metadata_cache_mode == 'cautious': + # clear the metadata cache right after building the + # metadata object; that way the cache is cleared for any + # new probe data that's received, but the metadata object + # that's created for RecvProbeData doesn't get cached. + # I.e., the next metadata object that's built, after probe + # data is processed, is cached. + self.metadata_cache.expire(client) try: xpdata = lxml.etree.XML(probedata.encode('utf-8'), parser=Bcfg2.Server.XMLParser) diff --git a/src/lib/Bcfg2/Server/FileMonitor/Inotify.py b/src/lib/Bcfg2/Server/FileMonitor/Inotify.py index 097fc0b42..a20dc4ad5 100644 --- a/src/lib/Bcfg2/Server/FileMonitor/Inotify.py +++ b/src/lib/Bcfg2/Server/FileMonitor/Inotify.py @@ -27,14 +27,13 @@ class Inotify(Pseudo, pyinotify.ProcessEvent): # these are created in start() after the server is done forking self.notifier = None self.wm = None - self.started = False self.add_q = [] def start(self): + Pseudo.start(self) self.wm = pyinotify.WatchManager() self.notifier = pyinotify.ThreadedNotifier(self.wm, self) self.notifier.start() - self.started = True for monitor in self.add_q: self.AddMonitor(*monitor) self.add_q = [] @@ -142,5 +141,7 @@ class Inotify(Pseudo, pyinotify.ProcessEvent): return path def shutdown(self): - if self.started: + Pseudo.shutdown(self) + if self.notifier: self.notifier.stop() + diff --git a/src/lib/Bcfg2/Server/FileMonitor/__init__.py b/src/lib/Bcfg2/Server/FileMonitor/__init__.py index 251e04e4f..fd0cb66f1 100644 --- a/src/lib/Bcfg2/Server/FileMonitor/__init__.py +++ b/src/lib/Bcfg2/Server/FileMonitor/__init__.py @@ -37,6 +37,7 @@ class FileMonitor(object): if ignore is None: ignore = [] self.ignore = ignore + self.started = False def __str__(self): return "%s: %s" % (__name__, self.__class__.__name__) @@ -49,7 +50,7 @@ class FileMonitor(object): def start(self): """ start threads or anything else that needs to be done after the server forks and daemonizes """ - pass + self.started = True def debug_log(self, msg): if self.debug: @@ -73,6 +74,8 @@ class FileMonitor(object): return 0 def handle_one_event(self, event): + if not self.started: + self.start() if self.should_ignore(event): return if event.requestID not in self.handles: @@ -90,6 +93,8 @@ class FileMonitor(object): (event.code2str(), event.filename, err)) def handle_event_set(self, lock=None): + if not self.started: + self.start() count = 1 event = self.get_event() start = time() @@ -108,6 +113,8 @@ class FileMonitor(object): logger.info("Handled %d events in %.03fs" % (count, (end - start))) def handle_events_in_interval(self, interval): + if not self.started: + self.start() end = time() + interval while time() < end: if self.pending(): @@ -117,7 +124,7 @@ class FileMonitor(object): sleep(0.5) def shutdown(self): - pass + self.started = False available = dict() diff --git a/src/lib/Bcfg2/Server/Plugins/GroupPatterns.py b/src/lib/Bcfg2/Server/Plugins/GroupPatterns.py index 837f47279..955a46c6c 100644 --- a/src/lib/Bcfg2/Server/Plugins/GroupPatterns.py +++ b/src/lib/Bcfg2/Server/Plugins/GroupPatterns.py @@ -81,14 +81,22 @@ class PatternMap(object): class PatternFile(Bcfg2.Server.Plugin.XMLFileBacked): __identifier__ = None - def __init__(self, filename, fam=None): + def __init__(self, filename, core=None): + try: + fam = core.fam + except AttributeError: + fam = None Bcfg2.Server.Plugin.XMLFileBacked.__init__(self, filename, fam=fam, should_monitor=True) + self.core = core self.patterns = [] self.logger = logging.getLogger(self.__class__.__name__) def Index(self): Bcfg2.Server.Plugin.XMLFileBacked.Index(self) + if (self.core and + self.core.metadata_cache_mode in ['cautious', 'aggressive']): + self.core.metadata_cache.expire() self.patterns = [] for entry in self.xdata.xpath('//GroupPattern'): try: @@ -125,7 +133,7 @@ class GroupPatterns(Bcfg2.Server.Plugin.Plugin, Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore) Bcfg2.Server.Plugin.Connector.__init__(self) self.config = PatternFile(os.path.join(self.data, 'config.xml'), - fam=core.fam) + core=core) def get_additional_groups(self, metadata): return self.config.process_patterns(metadata.hostname) diff --git a/src/lib/Bcfg2/Server/Plugins/Metadata.py b/src/lib/Bcfg2/Server/Plugins/Metadata.py index 774c6b1ef..5d0b35835 100644 --- a/src/lib/Bcfg2/Server/Plugins/Metadata.py +++ b/src/lib/Bcfg2/Server/Plugins/Metadata.py @@ -729,6 +729,9 @@ class Metadata(Bcfg2.Server.Plugin.Metadata, for hdlr in self.handlers: aname = re.sub(r'[^A-z0-9_]', '_', os.path.basename(event.filename)) if hdlr(event): + # clear the entire cache when we get an event for any + # metadata file + self.core.metadata_cache.expire() try: proc = getattr(self, "_handle_%s_event" % aname) except AttributeError: @@ -750,7 +753,6 @@ class Metadata(Bcfg2.Server.Plugin.Metadata, self.debug_log("Group %s set as nonexistent group %s" % (gname, group)) - def set_profile(self, client, profile, addresspair, force=False): """Set group parameter for provided client.""" self.logger.info("Asserting client %s profile to %s" % @@ -888,6 +890,10 @@ class Metadata(Bcfg2.Server.Plugin.Metadata, if False in list(self.states.values()): raise Bcfg2.Server.Plugin.MetadataRuntimeError("Metadata has not been read yet") client = client.lower() + + if client in self.core.metadata_cache: + return self.core.metadata_cache[client] + if client in self.aliases: client = self.aliases[client] @@ -967,9 +973,12 @@ class Metadata(Bcfg2.Server.Plugin.Metadata, if len(profiles) >= 1: profile = profiles[0] - return ClientMetadata(client, profile, groups, bundles, aliases, - addresses, categories, uuid, password, version, - self.query) + rv = ClientMetadata(client, profile, groups, bundles, aliases, + addresses, categories, uuid, password, version, + self.query) + if self.core.metadata_cache_mode == 'initial': + self.core.metadata_cache[client] = rv + return rv def get_all_group_names(self): all_groups = set() diff --git a/src/lib/Bcfg2/Server/Plugins/Probes.py b/src/lib/Bcfg2/Server/Plugins/Probes.py index 7f300ebe0..056521ce7 100644 --- a/src/lib/Bcfg2/Server/Plugins/Probes.py +++ b/src/lib/Bcfg2/Server/Plugins/Probes.py @@ -2,10 +2,12 @@ import re import os import sys import time +import copy import operator import lxml.etree import Bcfg2.Server import Bcfg2.Server.Plugin +from Bcfg2.Compat import any try: from django.db import models @@ -35,8 +37,6 @@ except ImportError: except ImportError: has_yaml = False -import Bcfg2.Server.Plugin - if has_django: class ProbesDataModel(models.Model, Bcfg2.Server.Plugin.PluginDatabaseModel): @@ -266,10 +266,22 @@ class Probes(Bcfg2.Server.Plugin.Probing, return self.probes.get_probe_data(meta) def ReceiveData(self, client, datalist): + if self.core.metadata_cache_mode in ['cautious', 'aggressive']: + if client.hostname in self.probedata: + olddata = copy.copy(self.probedata[client.hostname]) + else: + olddata = ClientProbeDataSet() + self.cgroups[client.hostname] = [] self.probedata[client.hostname] = ClientProbeDataSet() for data in datalist: self.ReceiveDataItem(client, data) + + if (self.core.metadata_cache_mode in ['cautious', 'aggressive'] and + (olddata.keys() != self.probedata[client.hostname].keys() or + any(olddata[p] != self.probedata[client.hostname][p] + for p in olddata.keys()))): + self.core.metadata_cache.expire(client.hostname) self.write_data(client) def ReceiveDataItem(self, client, data): diff --git a/src/lib/Bcfg2/Server/Plugins/PuppetENC.py b/src/lib/Bcfg2/Server/Plugins/PuppetENC.py index 46182e9a2..341d63118 100644 --- a/src/lib/Bcfg2/Server/Plugins/PuppetENC.py +++ b/src/lib/Bcfg2/Server/Plugins/PuppetENC.py @@ -112,6 +112,16 @@ class PuppetENC(Bcfg2.Server.Plugin.Plugin, separately; and b) when a single client's metadata is generated multiple times by separate templates """ self.cache = dict() + if self.core.metadata_cache_mode == 'aggressive': + # clear the metadata client cache if we're in aggressive + # mode, and produce a warning. PuppetENC really isn't + # compatible with aggressive mode, since we don't know + # when the output from a given ENC has changed, and thus + # can't invalidate the cache sanely. + self.logger.warning("PuppetENC is incompatible with aggressive " + "client metadata caching, try 'cautious' or " + "'initial' instead") + self.core.cache.expire() def end_statistics(self, metadata): self.end_client_run(self, metadata) -- cgit v1.2.3-1-g7c22