From c3ff6611b640e11ab0d4986b64015355b516fa88 Mon Sep 17 00:00:00 2001 From: Michael Fenn Date: Wed, 24 Jul 2013 20:26:53 -0400 Subject: POSIX: Ignore permissions error on auto-created dirs If the POSIX client tool is run as a non-root user, it is very likely that the _set_perms() call in _makedirs() will fail because it cannot set the owner of the newly-created directories. This causes _makedirs() to return False, which in turn causes POSIXFile.install() to bail out early. Applying the reasoning in the freebie directories created by _makedirs should have mode and ownership done on a best-effort basis. If a user needs parent directories to have a specific ownership and mode, then they should specify that directory in their configuration. --- src/lib/Bcfg2/Client/Tools/POSIX/base.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Client/Tools/POSIX/base.py b/src/lib/Bcfg2/Client/Tools/POSIX/base.py index 3778569a6..fb5d06e54 100644 --- a/src/lib/Bcfg2/Client/Tools/POSIX/base.py +++ b/src/lib/Bcfg2/Client/Tools/POSIX/base.py @@ -686,7 +686,7 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): """ os.makedirs helpfully creates all parent directories for us, but it sets permissions according to umask, which is probably wrong. we need to find out which directories were - created and set permissions on those + created and try to set permissions on those (http://trac.mcs.anl.gov/projects/bcfg2/ticket/1125 and http://trac.mcs.anl.gov/projects/bcfg2/ticket/1134) """ created = [] @@ -706,8 +706,9 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): (path, err)) rv = False - # set auto-created directories to mode 755, if you need - # something else, you should specify it in your config + # set auto-created directories to mode 755 and use best effort for + # permissions. If you need something else, you should specify it in + # your config. tmpentry = copy.deepcopy(entry) tmpentry.set('mode', '0755') for acl in tmpentry.findall('ACL'): @@ -715,7 +716,7 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): oct_mode(self._norm_acl_perms(acl.get('perms')) | ACL_MAP['x'])) for cpath in created: - rv &= self._set_perms(tmpentry, path=cpath) + self._set_perms(tmpentry, path=cpath) return rv -- cgit v1.2.3-1-g7c22 From 762baa81c1be1829ddc341410d36685aca46d095 Mon Sep 17 00:00:00 2001 From: Sol Jerome Date: Sat, 27 Jul 2013 18:23:42 -0500 Subject: bcfg2-yum-helper: Fix python3 syntax error Signed-off-by: Sol Jerome --- src/sbin/bcfg2-yum-helper | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/sbin/bcfg2-yum-helper b/src/sbin/bcfg2-yum-helper index 161aa3e50..643d0ccd9 100755 --- a/src/sbin/bcfg2-yum-helper +++ b/src/sbin/bcfg2-yum-helper @@ -304,7 +304,7 @@ def main(): try: # this code copied from yumcommands.py cachemgr.populate_cache() - print json.dumps(True) + print(json.dumps(True)) except yum.Errors.YumBaseError: logger.error("Unexpected error creating cache: %s" % sys.exc_info()[1], exc_info=1) -- cgit v1.2.3-1-g7c22 From 736d98688db0ad853a14d58a5fa395a9844dc590 Mon Sep 17 00:00:00 2001 From: Sol Jerome Date: Sat, 27 Jul 2013 18:27:25 -0500 Subject: bcfg2-yum-helper: Fix another python3 syntax error Signed-off-by: Sol Jerome --- src/sbin/bcfg2-yum-helper | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/sbin/bcfg2-yum-helper b/src/sbin/bcfg2-yum-helper index 643d0ccd9..59b9f5e78 100755 --- a/src/sbin/bcfg2-yum-helper +++ b/src/sbin/bcfg2-yum-helper @@ -308,7 +308,7 @@ def main(): except yum.Errors.YumBaseError: logger.error("Unexpected error creating cache: %s" % sys.exc_info()[1], exc_info=1) - print json.dumps(False) + print(json.dumps(False)) elif cmd == "complete": depsolver = DepSolver(options.config, options.verbose) try: -- cgit v1.2.3-1-g7c22 From 50b7407fd8c29bfede3091fa9e76b8e2a78de3ec Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 30 Jul 2013 13:54:37 -0400 Subject: MultiprocessingCore: make multiprocessing children threaded for higher performance --- src/lib/Bcfg2/Server/Core.py | 20 +- src/lib/Bcfg2/Server/MultiprocessingCore.py | 480 ++++++++++++++++++++-------- 2 files changed, 352 insertions(+), 148 deletions(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/Core.py b/src/lib/Bcfg2/Server/Core.py index 0cd4bea3e..e37c0b4e3 100644 --- a/src/lib/Bcfg2/Server/Core.py +++ b/src/lib/Bcfg2/Server/Core.py @@ -289,11 +289,12 @@ class BaseCore(object): self.logger.debug("Performance logging thread starting") while not self.terminate.isSet(): self.terminate.wait(self.setup['perflog_interval']) - for name, stats in self.get_statistics(None).items(): - self.logger.info("Performance statistics: " - "%s min=%.06f, max=%.06f, average=%.06f, " - "count=%d" % ((name, ) + stats)) - self.logger.debug("Performance logging thread terminated") + if not self.terminate.isSet(): + for name, stats in self.get_statistics(None).items(): + self.logger.info("Performance statistics: " + "%s min=%.06f, max=%.06f, average=%.06f, " + "count=%d" % ((name, ) + stats)) + self.logger.info("Performance logging thread terminated") def _file_monitor_thread(self): """ The thread that runs the @@ -314,7 +315,7 @@ class BaseCore(object): except: continue self._update_vcs_revision() - self.logger.debug("File monitor thread terminated") + self.logger.info("File monitor thread terminated") @track_statistics() def _update_vcs_revision(self): @@ -430,14 +431,14 @@ class BaseCore(object): def shutdown(self): """ Perform plugin and FAM shutdown tasks. """ - self.logger.debug("Shutting down core...") + self.logger.info("Shutting down core...") if not self.terminate.isSet(): self.terminate.set() self.fam.shutdown() - self.logger.debug("FAM shut down") + self.logger.info("FAM shut down") for plugin in list(self.plugins.values()): plugin.shutdown() - self.logger.debug("All plugins shut down") + self.logger.info("All plugins shut down") @property def metadata_cache_mode(self): @@ -1052,6 +1053,7 @@ class BaseCore(object): for plugin in self.plugins_by_type(Probing): for probe in plugin.GetProbes(metadata): resp.append(probe) + self.logger.debug("Sending probe list to %s" % client) return lxml.etree.tostring(resp, xml_declaration=False).decode('UTF-8') except: diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index 02710ab99..af8f6a56e 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -3,31 +3,31 @@ :mod:`multiprocessing` library to offload work to multiple child processes. As such, it requires Python 2.6+. -The parent communicates with the children over two constructs: - -* A :class:`multiprocessing.Pipe` is used to process render requests. - The pipe is locked when in use (i.e., between the time that a client - is submitted to be rendered and the time that its configuration is - returned) to keep things thread-safe. (This is accomplished through - the use of - :attr:`Bcfg2.Server.MultiprocessingCore.available_children.) -* A :class:`multiprocessing.Queue` is used to submit other commands in - a thread-safe, non-blocking fashion. (Note that, since it is a - queue, no results can be returned.) It implements a very simple RPC - protocol. Each command passed to a child over the Pipe must be a - tuple with the format:: +The parent communicates with the children over +:class:`multiprocessing.Pipe` objects that are wrapped in a +:class:`Bcfg2.Server.MultiprocessingCore.ThreadSafePipeDispatcher` to +make them thread-safe. Each command passed over the Pipe should be in +the following format:: (, , ) - The method must be exposed by the child by decorating it with - :func:`Bcfg2.Server.Core.exposed`. +The parent can also communicate with children over a one-way +:class:`multiprocessing.Queue` object that is used for +publish-subscribe communications, i.e., most XML-RPC commands. +(Setting debug, e.g., doesn't require a response from the children.) + +The method must be exposed by the child by decorating it with +:func:`Bcfg2.Server.Core.exposed`. """ +import time import threading import lxml.etree import multiprocessing +from uuid import uuid4 +from itertools import cycle from Bcfg2.Cache import Cache -from Bcfg2.Compat import Queue +from Bcfg2.Compat import Queue, Empty from Bcfg2.Server.Core import BaseCore, exposed from Bcfg2.Server.Plugin import Debuggable from Bcfg2.Server.BuiltinCore import Core as BuiltinCore @@ -41,28 +41,160 @@ class DispatchingCache(Cache, Debuggable): method = "expire_cache" def __init__(self, *args, **kwargs): - #: A dict of : :class:`multiprocessing.Queue` - #: objects that should be given a cache expiration command any - #: time an item is expired. - self.command_queues = kwargs.pop("pipes", dict()) - + self.cmd_q = kwargs.pop("queue") Debuggable.__init__(self) Cache.__init__(self, *args, **kwargs) def expire(self, key=None): - if (key and key in self) or (not key and len(self)): - # dispatching cache expiration to children can be - # expensive, so only do it if there's something to expire - for child, cmd_q in self.command_queues.items(): - if key: - self.logger.debug("Expiring metadata cache for %s on %s" % - (key, child)) - else: - self.logger.debug("Expiring metadata cache on %s" % child) - cmd_q.put((self.method, [key], dict())) + self.cmd_q.put((self.method, [key], dict())) Cache.expire(self, key=key) +class PublishSubscribeQueue(object): + """ An implementation of a :class:`multiprocessing.Queue` designed + for publish-subscribe use patterns. I.e., a single node adds items + to the queue, and every other node retrieves the item. This is + the 'publish' end; the subscribers can deal with this as a normal + Queue with no special handling. + + Note that, since this is the publishing end, there's no support + for getting. + """ + def __init__(self): + self._queues = [] + + def add_subscriber(self): + new_q = multiprocessing.Queue() + self._queues.append(new_q) + return new_q + + def put(self, data, block=True, timeout=None): + for queue in self._queues: + queue.put(data, block=block, timeout=timeout) + + def put_nowait(self, data): + self.put(data, block=False) + + def close(self): + for queue in self._queues: + queue.close() + + +class ThreadSafePipeDispatcher(Debuggable): + """ This is a wrapper around :class:`multiprocessing.Pipe` objects + that allows them to be used in multithreaded applications. When + performing a ``send()``, a key is included that will be used to + identify the response. As responses are received from the Pipe, + they are added to a dict that is used to get the appropriate + response for a given thread. + + The remote end of the Pipe must deal with the key being sent with + the data in a tuple of ``(key, data)``, and it must include the + key with its response. + + It is the responsibility of the user to ensure that the key is + unique. + + Note that this adds a bottleneck -- all communication over the + actual Pipe happens in a single thread. But for our purposes, + Pipe communication is fairly minimal and that's an acceptable + bottleneck.""" + + #: How long to wait while polling for new data to send. This + #: doesn't affect the speed with which data is sent, but + #: setting it too high will result in longer shutdown times, since + #: we only check for the termination event from the main process + #: every ``poll_wait`` seconds. + poll_wait = 2.0 + + _sentinel = object() + + def __init__(self, terminate): + Debuggable.__init__(self) + + #: The threading flag that is used to determine when the + #: threads should stop. + self.terminate = terminate + + #: The :class:`multiprocessing.Pipe` tuple used by this object + self.pipe = multiprocessing.Pipe() + + self._mainpipe = self.pipe[0] + self._recv_dict = dict() + self._send_queue = Queue() + + self.send_thread = threading.Thread(name="PipeSendThread", + target=self._send_thread) + self.send_thread.start() + self.recv_thread = threading.Thread(name="PipeRecvThread", + target=self._recv_thread) + self.recv_thread.start() + + def _send_thread(self): + """ Run the single thread through which send requests are passed """ + self.logger.debug("Starting interprocess RPC send thread") + while not self.terminate.isSet(): + try: + data = self._send_queue.get(True, self.poll_wait) + self._mainpipe.send(data) + except Empty: + pass + self.logger.info("Interprocess RPC send thread stopped") + + def send(self, key, data): + """ Send data with the given unique key """ + self._send_queue.put((key, data)) + + def _recv_thread(self): + """ Run the single thread through which recv requests are passed """ + self.logger.debug("Starting interprocess RPC receive thread") + while not self.terminate.isSet(): + if self._mainpipe.poll(self.poll_wait): + key, data = self._mainpipe.recv() + if key in self._recv_dict: + self.logger.error("Duplicate key in received data: %s" % + key) + self._mainpipe.close() + self._recv_dict[key] = data + self.logger.info("Interprocess RPC receive thread stopped") + + def recv(self, key): + """ Receive data with the given unique key """ + self.poll(key, timeout=None) + return self._recv_dict.pop(key) + + def poll(self, key, timeout=_sentinel): + """ Poll for data with the given unique key. See + :func:`multiprocessing.Connection.poll` for the possible + values of ``timeout``. """ + if timeout is self._sentinel: + return key in self._recv_dict + + abort = threading.Event() + + if timeout is not None: + timer = threading.Timer(float(timeout), abort.set) + timer.start() + try: + while not abort.is_set(): + if key in self._recv_dict: + return True + return False + finally: + if timeout is not None: + timer.cancel() + + @staticmethod + def genkey(base): + """ Generate a key suitable for use with + :class:`Bcfg2.Server.MultiprocessingCore.ThreadSafePipeDispatcher` + send() requests, based on the given data. The key is + constructed from the string given, some information about this + thread, and some random data. """ + thread = threading.current_thread() + return "%s-%s-%s-%s" % (base, thread.name, thread.ident, uuid4()) + + class DualEvent(object): """ DualEvent is a clone of :class:`threading.Event` that internally implements both :class:`threading.Event` and @@ -111,101 +243,153 @@ class ChildCore(BaseCore): those, though, if the pipe communication "protocol" were made more robust. """ - #: How long to wait while polling for new clients to build. This - #: doesn't affect the speed with which a client is built, but + #: How long to wait while polling for new RPC commands. This + #: doesn't affect the speed with which a command is processed, but #: setting it too high will result in longer shutdown times, since #: we only check for the termination event from the main process #: every ``poll_wait`` seconds. - poll_wait = 5.0 + poll_wait = 3.0 - def __init__(self, setup, render_pipe, command_queue, terminate): + def __init__(self, name, setup, rpc_pipe, cmd_q, terminate): """ + :param name: The name of this child + :type name: string :param setup: A Bcfg2 options dict :type setup: Bcfg2.Options.OptionParser - :param pipe: The pipe to which client hostnames are added for - ChildCore objects to build configurations, and to - which client configurations are added after - having been built by ChildCore objects. - :type pipe: multiprocessing.Pipe + :param rpc_pipe: The pipe used for RPC communication with the + parent process + :type rpc_pipe: multiprocessing.Pipe + :param cmd_q: The queue used for one-way pub-sub + communications from the parent process + :type cmd_q: multiprocessing.Queue :param terminate: An event that flags ChildCore objects to shut themselves down. :type terminate: multiprocessing.Event """ BaseCore.__init__(self, setup) - #: The pipe to which client hostnames are added for ChildCore - #: objects to build configurations, and to which client - #: configurations are added after having been built by - #: ChildCore objects. - self.render_pipe = render_pipe + #: The name of this child + self.name = name + + #: The pipe used for RPC communication with the parent + self.rpc_pipe = rpc_pipe - #: The queue from which other commands are received - self.command_queue = command_queue + #: The queue used to receive pub-sub commands + self.cmd_q = cmd_q #: The :class:`multiprocessing.Event` that will be monitored #: to determine when this child should shut down. self.terminate = terminate - #: The :class:`threading.Thread` used to process commands - #: received via the :class:`multiprocessing.Queue` RPC - #: interface - self.command_thread = \ - threading.Thread(name="CommandThread", - target=self._command_queue_thread) + # a list of all rendering threads + self._threads = [] - def _daemonize(self): - return True + # the thread used to process publish-subscribe commands + self._command_thread = threading.Thread(name="CommandThread", + target=self._dispatch_commands) + + # override this setting so that the child doesn't try to write + # the pidfile + self.setup['daemon'] = False + + # ensure that the child doesn't start a perflog thread + self.perflog_thread = None def _run(self): - try: - self.command_thread.start() - except: - self.shutdown() - raise + self._command_thread.start() return True - def render(self): - """ Process client configuration render requests """ - if self.render_pipe.poll(self.poll_wait): - if not self.metadata.use_database: - # handle FAM events, in case (for instance) the - # client has just been added to clients.xml, or a - # profile has just been asserted. but really, you - # should be using the metadata database if you're - # using this core. - self.fam.handle_events_in_interval(0.1) - client = self.render_pipe.recv() - self.logger.debug("Building configuration for %s" % client) - self.render_pipe.send( - lxml.etree.tostring(self.BuildConfiguration(client))) + def _daemonize(self): + return True + + def _dispatch_commands(self): + """ Dispatch commands received via the pub-sub queue interface + """ + self.logger.debug("Starting %s RPC subscription thread" % self.name) + while not self.terminate.is_set(): + try: + data = self.cmd_q.get(True, self.poll_wait) + self.logger.debug("%s: Processing asynchronous command: %s" % + (self.name, data[0])) + self._dispatch(data) + except Empty: + pass + self.logger.info("%s RPC subscription thread stopped" % self.name) + + def _dispatch_render(self): + """ Dispatch render requests received via the RPC pipe + interface """ + key, data = self.rpc_pipe.recv() + self.rpc_pipe.send((key, self._dispatch(data))) + + def _reap_threads(self): + """ Reap rendering threads that have completed """ + for thread in self._threads[:]: + if not thread.is_alive(): + self._threads.remove(thread) + + def _dispatch(self, data): + """ Generic method dispatcher used for commands received from + either the pub-sub queue or the RPC pipe. """ + method, args, kwargs = data + if not hasattr(self, method): + self.logger.error("%s: Method %s does not exist" % (self.name, + method)) + return None + + func = getattr(self, method) + if func.exposed: + self.logger.debug("%s: Calling RPC method %s" % (self.name, + method)) + return func(*args, **kwargs) + else: + self.logger.error("%s: Method %s is not exposed" % (self.name, + method)) + return None def _block(self): while not self.terminate.isSet(): try: - self.render() + if self.rpc_pipe.poll(self.poll_wait): + rpc_thread = threading.Thread( + name="Renderer%s" % len(self._threads), + target=self._dispatch_render) + self._threads.append(rpc_thread) + rpc_thread.start() + self._reap_threads() except KeyboardInterrupt: break self.shutdown() - def _command_queue_thread(self): - """ Process commands received on the command queue thread """ - while not self.terminate.isSet(): - method, args, kwargs = self.command_queue.get() - if hasattr(self, method): - func = getattr(self, method) - if func.exposed: - self.logger.debug("Child calling RPC method %s" % method) - func(*args, **kwargs) - else: - self.logger.error("Method %s is not exposed" % method) - else: - self.logger.error("Method %s does not exist" % method) + def shutdown(self): + BaseCore.shutdown(self) + self._reap_threads() + while len(threading.enumerate()) > 1: + threads = [t for t in threading.enumerate() + if t != threading.current_thread()] + self.logger.info("%s: Waiting for %d thread(s): %s" % + (self.name, len(threads), + [t.name for t in threads])) + time.sleep(1) + self._reap_threads() + self.logger.info("%s: All threads stopped" % self.name) + + @exposed + def set_debug(self, address, debug): + BaseCore.set_debug(self, address, debug) @exposed def expire_cache(self, client=None): """ Expire the metadata cache for a client """ self.metadata_cache.expire(client) + @exposed + def GetConfig(self, client): + """ Render the configuration for a client """ + self.logger.debug("%s: Building configuration for %s" % + (self.name, client)) + return lxml.etree.tostring(self.BuildConfiguration(client)) + class Core(BuiltinCore): """ A multiprocessing core that delegates building the actual @@ -224,84 +408,102 @@ class Core(BuiltinCore): if setup['children'] is None: setup['children'] = multiprocessing.cpu_count() - #: A dict of child name -> one end of the - #: :class:`multiprocessing.Pipe` object used to submit render - #: requests to that child. (The child is given the other end - #: of the Pipe.) - self.render_pipes = dict() - - #: A dict of child name -> :class:`multiprocessing.Queue` - #: object used to pass commands to that child. - self.command_queues = dict() - - #: A queue that keeps track of which children are available to - #: render a configuration. A child is popped from the queue - #: when it starts to render a config, then it's pushed back on - #: when it's done. This lets us use a blocking call to - #: :func:`Queue.Queue.get` when waiting for an available - #: child. - self.available_children = Queue(maxsize=self.setup['children']) - - # sigh. multiprocessing was added in py2.6, which is when the - # camelCase methods for threading objects were deprecated in - # favor of the Pythonic under_score methods. So - # multiprocessing.Event *only* has is_set(), while - # threading.Event has *both* isSet() and is_set(). In order - # to make the core work with Python 2.4+, and with both - # multiprocessing and threading Event objects, we just - # monkeypatch self.terminate to have isSet(). + #: A dict of child name -> + #: :class:`Bcfg2.Server.MultiprocessingCore.ThreadSafePipeDispatcher` + #: objects used to pass render requests to that child. (The + #: child is given the other end of the Pipe.) + self.pipes = dict() + + #: A + #: :class:`Bcfg2.Server.MultiprocessingCore.PublishSubscribeQueue` + #: object used to publish commands to all children. + self.cmd_q = PublishSubscribeQueue() + + #: The flag that indicates when to stop child threads and + #: processes self.terminate = DualEvent(threading_event=self.terminate) - self.metadata_cache = DispatchingCache() + self.metadata_cache = DispatchingCache(queue=self.cmd_q) + + #: A list of children that will be cycled through + self._all_children = [] + + #: An iterator that each child will be taken from in sequence, + #: to provide a round-robin distribution of render requests + self.children = None def _run(self): for cnum in range(self.setup['children']): name = "Child-%s" % cnum - # create Pipe for render requests and results - (mainpipe, childpipe) = multiprocessing.Pipe() - self.render_pipes[name] = mainpipe - - # create Queue for other commands - cmd_q = multiprocessing.Queue() - self.command_queues[name] = cmd_q - self.metadata_cache.command_queues[name] = cmd_q + # create Pipe for render requests + dispatcher = ThreadSafePipeDispatcher(self.terminate) + self.pipes[name] = dispatcher self.logger.debug("Starting child %s" % name) - childcore = ChildCore(self.setup, childpipe, cmd_q, self.terminate) + childcore = ChildCore(name, self.setup, dispatcher.pipe[1], + self.cmd_q.add_subscriber(), self.terminate) child = multiprocessing.Process(target=childcore.run, name=name) child.start() self.logger.debug("Child %s started with PID %s" % (name, child.pid)) - self.available_children.put(name) + self._all_children.append(name) + self.logger.debug("Started %s children: %s" % (len(self._all_children), + self._all_children)) + self.children = cycle(self._all_children) return BuiltinCore._run(self) def shutdown(self): BuiltinCore.shutdown(self) - for child in multiprocessing.active_children(): - self.logger.debug("Shutting down child %s" % child.name) - child.join(self.shutdown_timeout) - if child.is_alive(): + self.logger.debug("Closing RPC command queues") + self.cmd_q.close() + + def term_children(): + for child in multiprocessing.active_children(): self.logger.error("Waited %s seconds to shut down %s, " "terminating" % (self.shutdown_timeout, child.name)) child.terminate() - else: - self.logger.debug("Child %s shut down" % child.name) - self.logger.debug("All children shut down") + + timer = threading.Timer(self.shutdown_timeout, term_children) + timer.start() + while len(multiprocessing.active_children()): + self.logger.info("Waiting for %s child(ren): %s" % + (len(multiprocessing.active_children()), + [c.name + for c in multiprocessing.active_children()])) + time.sleep(1) + timer.cancel() + self.logger.info("All children shut down") + + while len(threading.enumerate()) > 1: + threads = [t for t in threading.enumerate() + if t != threading.current_thread()] + self.logger.info("Waiting for %s thread(s): %s" % + (len(threads), [t.name for t in threads])) + time.sleep(1) + self.logger.info("Shutdown complete") @exposed def set_debug(self, address, debug): + self.cmd_q.put(("set_debug", [address, debug], dict())) self.metadata_cache.set_debug(debug) + for pipe in self.pipes.values(): + pipe.set_debug(debug) return BuiltinCore.set_debug(self, address, debug) @exposed def GetConfig(self, address): client = self.resolve_client(address)[0] - childname = self.available_children.get() - self.logger.debug("Building configuration on child %s" % childname) - pipe = self.render_pipes[childname] - pipe.send(client) - config = pipe.recv() - self.available_children.put_nowait(childname) - return config + childname = self.children.next() + self.logger.debug("Building configuration for %s on %s" % (client, + childname)) + key = ThreadSafePipeDispatcher.genkey(client) + pipe = self.pipes[childname] + pipe.send(key, ("GetConfig", [client], dict())) + if pipe.poll(key, timeout=self.setup['client_timeout']): + return pipe.recv(key) + else: + self.logger.error("Building configuration for %s on %s timed out" % + (client, childname)) + return None -- cgit v1.2.3-1-g7c22 From 8ad614f3d2c19037e9aaa8895119adad7d699060 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 30 Jul 2013 13:54:50 -0400 Subject: Options: fix parsing of POSIX GID blacklist/whitelist --- src/lib/Bcfg2/Options.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Options.py b/src/lib/Bcfg2/Options.py index 51af84cb1..673fb125c 100644 --- a/src/lib/Bcfg2/Options.py +++ b/src/lib/Bcfg2/Options.py @@ -1265,9 +1265,9 @@ DRIVER_OPTIONS = \ yum_verify_fail_action=CLIENT_YUM_VERIFY_FAIL_ACTION, yum_verify_flags=CLIENT_YUM_VERIFY_FLAGS, posix_uid_whitelist=CLIENT_POSIX_UID_WHITELIST, - posix_gid_whitelist=CLIENT_POSIX_UID_WHITELIST, + posix_gid_whitelist=CLIENT_POSIX_GID_WHITELIST, posix_uid_blacklist=CLIENT_POSIX_UID_BLACKLIST, - posix_gid_blacklist=CLIENT_POSIX_UID_BLACKLIST) + posix_gid_blacklist=CLIENT_POSIX_GID_BLACKLIST) CLIENT_COMMON_OPTIONS = \ dict(extra=CLIENT_EXTRA_DISPLAY, -- cgit v1.2.3-1-g7c22 From 0f4103c197df36ed67b3f39b2be61bd86ba39ebd Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 30 Jul 2013 14:22:17 -0400 Subject: MultiprocessingCore: added missing doc strings --- src/lib/Bcfg2/Server/MultiprocessingCore.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index af8f6a56e..882ff4fb8 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -60,22 +60,31 @@ class PublishSubscribeQueue(object): Note that, since this is the publishing end, there's no support for getting. """ + def __init__(self): self._queues = [] def add_subscriber(self): + """ Add a subscriber to the queue. This returns a + :class:`multiprocessing.Queue` object that is used as the + subscription end of the queue. """ new_q = multiprocessing.Queue() self._queues.append(new_q) return new_q - def put(self, data, block=True, timeout=None): + def put(self, obj, block=True, timeout=None): + """ Put ``obj`` into the queue. See + :func:`multiprocessing.Queue.put` for more details.""" for queue in self._queues: - queue.put(data, block=block, timeout=timeout) + queue.put(obj, block=block, timeout=timeout) - def put_nowait(self, data): - self.put(data, block=False) + def put_nowait(self, obj): + """ Equivalent to ``put(obj, False)``. """ + self.put(obj, block=False) def close(self): + """ Close the queue. See :func:`multiprocessing.Queue.close` + for more details. """ for queue in self._queues: queue.close() @@ -459,6 +468,7 @@ class Core(BuiltinCore): self.cmd_q.close() def term_children(): + """ Terminate all remaining multiprocessing children. """ for child in multiprocessing.active_children(): self.logger.error("Waited %s seconds to shut down %s, " "terminating" % (self.shutdown_timeout, -- cgit v1.2.3-1-g7c22 From 2a93140e6f9ea5679e4cf8241c193f31bcb87f9b Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 31 Jul 2013 08:13:14 -0400 Subject: bcfg2-yum-helper: populate group cache during makecache Fixes #128 --- src/sbin/bcfg2-yum-helper | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'src') diff --git a/src/sbin/bcfg2-yum-helper b/src/sbin/bcfg2-yum-helper index 59b9f5e78..b8c99953d 100755 --- a/src/sbin/bcfg2-yum-helper +++ b/src/sbin/bcfg2-yum-helper @@ -255,6 +255,10 @@ class CacheManager(YumHelper): for repo in self.yumbase.repos.listEnabled(): # this populates the cache as a side effect repo.repoXML # pylint: disable=W0104 + try: + repo.getGroups() + except yum.Errors.RepoMDError: + pass # this repo has no groups self.yumbase.repos.populateSack(mdtype='metadata', cacheonly=1) self.yumbase.repos.populateSack(mdtype='filelists', cacheonly=1) self.yumbase.repos.populateSack(mdtype='otherdata', cacheonly=1) -- cgit v1.2.3-1-g7c22 From 7c274b35552e631d7b5359b13192e7463eace645 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 31 Jul 2013 09:11:08 -0400 Subject: Yum: record helper location in class variable so it's only detected once --- src/lib/Bcfg2/Server/Plugins/Packages/Yum.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py b/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py index 48c5b1f65..9ec7ac122 100644 --- a/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py +++ b/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py @@ -262,6 +262,8 @@ class YumCollection(Collection): .. private-include: _add_gpg_instances, _get_pulp_consumer """ + _helper = None + #: Options that are included in the [packages:yum] section of the #: config but that should not be included in the temporary #: yum.conf we write out @@ -280,7 +282,6 @@ class YumCollection(Collection): #: external commands self.cmd = Executor() - self._helper = None if self.use_yum: #: Define a unique cache file for this collection to use #: for cached yum metadata -- cgit v1.2.3-1-g7c22 From 04c8ed7554f711c718b1952ea5ea83eac99c85bc Mon Sep 17 00:00:00 2001 From: Sol Jerome Date: Wed, 31 Jul 2013 13:05:29 -0500 Subject: Utils: Silence bogus pylint errors Signed-off-by: Sol Jerome --- src/lib/Bcfg2/Utils.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Utils.py b/src/lib/Bcfg2/Utils.py index 4293f3f69..ab1276178 100644 --- a/src/lib/Bcfg2/Utils.py +++ b/src/lib/Bcfg2/Utils.py @@ -244,9 +244,9 @@ class Executor(object): # py3k fixes if not isinstance(stdout, str): - stdout = stdout.decode('utf-8') + stdout = stdout.decode('utf-8') # pylint: disable=E1103 if not isinstance(stderr, str): - stderr = stderr.decode('utf-8') + stderr = stderr.decode('utf-8') # pylint: disable=E1103 for line in stdout.splitlines(): # pylint: disable=E1103 self.logger.debug('< %s' % line) -- cgit v1.2.3-1-g7c22 From 604b5d2998ce4d93dee3945cc458feada3602d44 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 31 Jul 2013 11:46:14 -0400 Subject: MultiprocessingCore: timing out GetConfig() calls can cause memory issues --- src/lib/Bcfg2/Server/MultiprocessingCore.py | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index 882ff4fb8..03394edf9 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -511,9 +511,4 @@ class Core(BuiltinCore): key = ThreadSafePipeDispatcher.genkey(client) pipe = self.pipes[childname] pipe.send(key, ("GetConfig", [client], dict())) - if pipe.poll(key, timeout=self.setup['client_timeout']): - return pipe.recv(key) - else: - self.logger.error("Building configuration for %s on %s timed out" % - (client, childname)) - return None + return pipe.recv(key) -- cgit v1.2.3-1-g7c22 From aca1228a808990644d239b2e4c4bc06dfc5ab955 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 31 Jul 2013 14:10:05 -0400 Subject: MultiprocessingCore: dispatch "bcfg2-admin perf" calls to children --- src/lib/Bcfg2/Server/MultiprocessingCore.py | 38 ++++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index 03394edf9..ce3343808 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -29,8 +29,8 @@ from itertools import cycle from Bcfg2.Cache import Cache from Bcfg2.Compat import Queue, Empty from Bcfg2.Server.Core import BaseCore, exposed -from Bcfg2.Server.Plugin import Debuggable from Bcfg2.Server.BuiltinCore import Core as BuiltinCore +from Bcfg2.Server.Plugin import Debuggable, track_statistics class DispatchingCache(Cache, Debuggable): @@ -144,8 +144,7 @@ class ThreadSafePipeDispatcher(Debuggable): self.logger.debug("Starting interprocess RPC send thread") while not self.terminate.isSet(): try: - data = self._send_queue.get(True, self.poll_wait) - self._mainpipe.send(data) + self._mainpipe.send(self._send_queue.get(True, self.poll_wait)) except Empty: pass self.logger.info("Interprocess RPC send thread stopped") @@ -331,6 +330,7 @@ class ChildCore(BaseCore): key, data = self.rpc_pipe.recv() self.rpc_pipe.send((key, self._dispatch(data))) + @track_statistics() def _reap_threads(self): """ Reap rendering threads that have completed """ for thread in self._threads[:]: @@ -512,3 +512,35 @@ class Core(BuiltinCore): pipe = self.pipes[childname] pipe.send(key, ("GetConfig", [client], dict())) return pipe.recv(key) + + @exposed + def get_statistics(self, address): + stats = dict() + + def _aggregate_statistics(newstats, prefix=None): + for statname, vals in newstats.items(): + if statname.startswith("ChildCore:"): + statname = statname[5:] + if prefix: + prettyname = "%s:%s" % (prefix, statname) + else: + prettyname = statname + stats[prettyname] = vals + totalname = "Total:%s" % statname + if totalname not in stats: + stats[totalname] = vals + else: + newmin = min(stats[totalname][0], vals[0]) + newmax = max(stats[totalname][1], vals[1]) + newcount = stats[totalname][3] + vals[3] + newmean = ((stats[totalname][2] * stats[totalname][3]) + + (vals[2] * vals[3])) / newcount + stats[totalname] = (newmin, newmax, newmean, newcount) + + key = ThreadSafePipeDispatcher.genkey("get_statistics") + stats = dict() + for childname, pipe in self.pipes.items(): + pipe.send(key, ("get_statistics", [address], dict())) + _aggregate_statistics(pipe.recv(key), prefix=childname) + _aggregate_statistics(BuiltinCore.get_statistics(self, address)) + return stats -- cgit v1.2.3-1-g7c22 From 45e6697e82d28ccb9d5519b19c99411caf108811 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 31 Jul 2013 14:42:08 -0400 Subject: Core: update VCS revision only when events are handled --- src/lib/Bcfg2/Server/Core.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/Core.py b/src/lib/Bcfg2/Server/Core.py index e37c0b4e3..de8015a3f 100644 --- a/src/lib/Bcfg2/Server/Core.py +++ b/src/lib/Bcfg2/Server/Core.py @@ -311,10 +311,11 @@ class BaseCore(object): else: if not self.fam.pending(): terminate.wait(15) + if self.fam.pending(): + self._update_vcs_revision() self.fam.handle_event_set(self.lock) except: continue - self._update_vcs_revision() self.logger.info("File monitor thread terminated") @track_statistics() -- cgit v1.2.3-1-g7c22 From c3b296c7d29c8c6cb0a81a5ad3ac44eab5ee33f1 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 31 Jul 2013 15:42:21 -0400 Subject: Packages: reduce source_from_xml() calls on startup --- src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py b/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py index 332f0bbab..c47e18201 100644 --- a/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py +++ b/src/lib/Bcfg2/Server/Plugins/Packages/PackagesSources.py @@ -88,13 +88,12 @@ class PackagesSources(Bcfg2.Server.Plugin.StructFile, :type event: Bcfg2.Server.FileMonitor.Event :returns: None """ - Bcfg2.Server.Plugin.StructFile.HandleEvent(self, event=event) if event and event.filename != self.name: for fpath in self.extras: if fpath == os.path.abspath(event.filename): self.parsed.add(fpath) break - + Bcfg2.Server.Plugin.StructFile.HandleEvent(self, event=event) if self.loaded: self.logger.info("Reloading Packages plugin") self.pkg_obj.Reload() @@ -111,10 +110,11 @@ class PackagesSources(Bcfg2.Server.Plugin.StructFile, def Index(self): Bcfg2.Server.Plugin.StructFile.Index(self) self.entries = [] - for xsource in self.xdata.findall('.//Source'): - source = self.source_from_xml(xsource) - if source is not None: - self.entries.append(source) + if self.loaded: + for xsource in self.xdata.findall('.//Source'): + source = self.source_from_xml(xsource) + if source is not None: + self.entries.append(source) Index.__doc__ = Bcfg2.Server.Plugin.StructFile.Index.__doc__ + """ ``Index`` is responsible for calling :func:`source_from_xml` -- cgit v1.2.3-1-g7c22 From 8b0ee3a67064bf810b3e6621ce128853eaf8a6f0 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Thu, 1 Aug 2013 08:51:34 -0400 Subject: Yum: Fixed class-scope variable assignments --- src/lib/Bcfg2/Server/Plugins/Packages/Yum.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py b/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py index 9ec7ac122..b26fb6870 100644 --- a/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py +++ b/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py @@ -317,7 +317,8 @@ class YumCollection(Collection): self.logger.error("Could not create Pulp consumer " "cert directory at %s: %s" % (certdir, err)) - self.pulp_cert_set = PulpCertificateSet(certdir, self.fam) + self.__class__.pulp_cert_set = PulpCertificateSet(certdir, + self.fam) @property def disableMetaData(self): @@ -353,15 +354,16 @@ class YumCollection(Collection): the default location. """ if not self._helper: try: - self._helper = self.setup.cfp.get("packages:yum", "helper") + self.__class__._helper = self.setup.cfp.get("packages:yum", + "helper") except (ConfigParser.NoOptionError, ConfigParser.NoSectionError): # first see if bcfg2-yum-helper is in PATH try: self.debug_log("Checking for bcfg2-yum-helper in $PATH") self.cmd.run(['bcfg2-yum-helper']) - self._helper = 'bcfg2-yum-helper' + self.__class__._helper = 'bcfg2-yum-helper' except OSError: - self._helper = "/usr/sbin/bcfg2-yum-helper" + self.__class__._helper = "/usr/sbin/bcfg2-yum-helper" return self._helper @property -- cgit v1.2.3-1-g7c22 From 605c1962c5cb71d2bbb26767d2a9629d2fe4af61 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Thu, 1 Aug 2013 09:14:52 -0400 Subject: MultiprocessingCore: added missing docstring --- src/lib/Bcfg2/Server/MultiprocessingCore.py | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index ce3343808..775131188 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -518,6 +518,12 @@ class Core(BuiltinCore): stats = dict() def _aggregate_statistics(newstats, prefix=None): + """ Aggregate a set of statistics from a child or parent + server core. This adds the statistics to the overall + statistics dict (optionally prepending a prefix, such as + "Child-1", to uniquely identify this set of statistics), + and aggregates it with the set of running totals that are + kept from all cores. """ for statname, vals in newstats.items(): if statname.startswith("ChildCore:"): statname = statname[5:] -- cgit v1.2.3-1-g7c22 From 9c86ddc5cdd166ef416b7fa2fab6f883f70e4a66 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Thu, 1 Aug 2013 09:15:08 -0400 Subject: Yum: suppress warning about class variable assignment --- src/lib/Bcfg2/Server/Plugins/Packages/Yum.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py b/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py index b26fb6870..4187f2812 100644 --- a/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py +++ b/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py @@ -353,6 +353,7 @@ class YumCollection(Collection): forking, but apparently not); finally we check in /usr/sbin, the default location. """ if not self._helper: + # pylint: disable=W0212 try: self.__class__._helper = self.setup.cfp.get("packages:yum", "helper") @@ -364,6 +365,7 @@ class YumCollection(Collection): self.__class__._helper = 'bcfg2-yum-helper' except OSError: self.__class__._helper = "/usr/sbin/bcfg2-yum-helper" + # pylint: enable=W0212 return self._helper @property -- cgit v1.2.3-1-g7c22 From b95a5c40d3d145e6a27ea4efcbc483e31f6aa635 Mon Sep 17 00:00:00 2001 From: Sol Jerome Date: Fri, 2 Aug 2013 17:59:10 -0500 Subject: POSIXUsers: Handle unicode gecos attributes Signed-off-by: Sol Jerome --- src/lib/Bcfg2/Client/Tools/POSIXUsers.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Client/Tools/POSIXUsers.py b/src/lib/Bcfg2/Client/Tools/POSIXUsers.py index bb684899d..49fc704e4 100644 --- a/src/lib/Bcfg2/Client/Tools/POSIXUsers.py +++ b/src/lib/Bcfg2/Client/Tools/POSIXUsers.py @@ -196,7 +196,10 @@ class POSIXUsers(Bcfg2.Client.Tools.Tool): # automatically determine one -- i.e., it always # verifies continue - if val != entry.get(attr): + entval = entry.get(attr) + if not isinstance(entval, str): + entval = entval.encode('utf-8') + if val != entval: errors.append("%s for %s %s is incorrect. Current %s is " "%s, but should be %s" % (attr.title(), entry.tag, entry.get("name"), -- cgit v1.2.3-1-g7c22 From ec7ea9d2a7e1c7c15e52c866f8fc9bb84bb0920d Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Fri, 2 Aug 2013 08:36:22 -0400 Subject: bcfg2-crypt: prevent traceback for certain decrypt failures --- src/sbin/bcfg2-crypt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/sbin/bcfg2-crypt b/src/sbin/bcfg2-crypt index 7102d06a9..0ba84fa0a 100755 --- a/src/sbin/bcfg2-crypt +++ b/src/sbin/bcfg2-crypt @@ -158,6 +158,7 @@ class CfgDecryptor(Decryptor): except Bcfg2.Encryption.EVPError: self.logger.info("Could not decrypt %s with any passphrase" % self.filename) + return False def get_destination_filename(self, original_filename): if original_filename.endswith(".crypt"): @@ -417,7 +418,7 @@ def main(): # pylint: disable=R0912,R0915 if data is None: data = getattr(tool, mode)() - if data is False: + if not data: logger.error("Failed to %s %s, skipping" % (mode, fname)) continue if setup['crypt_stdout']: -- cgit v1.2.3-1-g7c22 From e1f045ff3c56b09ff06e11e6d4f9677bf63d051f Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Mon, 5 Aug 2013 13:44:57 -0400 Subject: Yum: better errors when yum helper output isn't valid JSON --- src/lib/Bcfg2/Server/Plugins/Packages/Yum.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py b/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py index 4187f2812..66f8e9dbe 100644 --- a/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py +++ b/src/lib/Bcfg2/Server/Plugins/Packages/Yum.py @@ -946,9 +946,14 @@ class YumCollection(Collection): try: return json.loads(result.stdout) except ValueError: - err = sys.exc_info()[1] - self.logger.error("Packages: error reading bcfg2-yum-helper " - "output: %s" % err) + if result.stdout: + err = sys.exc_info()[1] + self.logger.error("Packages: Error reading bcfg2-yum-helper " + "output: %s" % err) + self.logger.error("Packages: bcfg2-yum-helper output: %s" % + result.stdout) + else: + self.logger.error("Packages: No bcfg2-yum-helper output") raise def setup_data(self, force_update=False): -- cgit v1.2.3-1-g7c22 From c8d71e18c16039593b309bc35e4ceffc50a0107d Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Mon, 5 Aug 2013 13:53:20 -0400 Subject: MultiprocessingCore: greatly simplified parent-child RPC, removed non-thread-safe bits --- src/lib/Bcfg2/Server/MultiprocessingCore.py | 407 ++++++++++------------------ 1 file changed, 145 insertions(+), 262 deletions(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index 775131188..4c304d28c 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -4,33 +4,24 @@ processes. As such, it requires Python 2.6+. The parent communicates with the children over -:class:`multiprocessing.Pipe` objects that are wrapped in a -:class:`Bcfg2.Server.MultiprocessingCore.ThreadSafePipeDispatcher` to -make them thread-safe. Each command passed over the Pipe should be in -the following format:: +:class:`multiprocessing.Queue` objects via a +:class:`Bcfg2.Server.MultiprocessingCore.RPCQueue` object. - (, , ) - -The parent can also communicate with children over a one-way -:class:`multiprocessing.Queue` object that is used for -publish-subscribe communications, i.e., most XML-RPC commands. -(Setting debug, e.g., doesn't require a response from the children.) - -The method must be exposed by the child by decorating it with -:func:`Bcfg2.Server.Core.exposed`. +A method being called via the RPCQueue must be exposed by the child by +decorating it with :func:`Bcfg2.Server.Core.exposed`. """ import time import threading import lxml.etree import multiprocessing -from uuid import uuid4 from itertools import cycle from Bcfg2.Cache import Cache from Bcfg2.Compat import Queue, Empty +from Bcfg2.Server.Plugin import Debuggable from Bcfg2.Server.Core import BaseCore, exposed from Bcfg2.Server.BuiltinCore import Core as BuiltinCore -from Bcfg2.Server.Plugin import Debuggable, track_statistics +from multiprocessing.connection import Listener, Client class DispatchingCache(Cache, Debuggable): @@ -41,166 +32,103 @@ class DispatchingCache(Cache, Debuggable): method = "expire_cache" def __init__(self, *args, **kwargs): - self.cmd_q = kwargs.pop("queue") + self.rpc_q = kwargs.pop("queue") Debuggable.__init__(self) Cache.__init__(self, *args, **kwargs) def expire(self, key=None): - self.cmd_q.put((self.method, [key], dict())) + self.rpc_q.publish(self.method, args=[key]) Cache.expire(self, key=key) -class PublishSubscribeQueue(object): +class RPCQueue(Debuggable): """ An implementation of a :class:`multiprocessing.Queue` designed - for publish-subscribe use patterns. I.e., a single node adds items - to the queue, and every other node retrieves the item. This is - the 'publish' end; the subscribers can deal with this as a normal - Queue with no special handling. + for several additional use patterns: - Note that, since this is the publishing end, there's no support - for getting. + * Random-access reads, based on a key that identifies the data; + * Publish-subscribe, where a datum is sent to all hosts. + + The subscribers can deal with this as a normal Queue with no + special handling. """ + poll_wait = 3.0 def __init__(self): - self._queues = [] - - def add_subscriber(self): - """ Add a subscriber to the queue. This returns a - :class:`multiprocessing.Queue` object that is used as the - subscription end of the queue. """ - new_q = multiprocessing.Queue() - self._queues.append(new_q) - return new_q - - def put(self, obj, block=True, timeout=None): - """ Put ``obj`` into the queue. See - :func:`multiprocessing.Queue.put` for more details.""" - for queue in self._queues: - queue.put(obj, block=block, timeout=timeout) - - def put_nowait(self, obj): - """ Equivalent to ``put(obj, False)``. """ - self.put(obj, block=False) + Debuggable.__init__(self) + self._terminate = threading.Event() + self._queues = dict() + self._available_listeners = Queue() + self._blocking_listeners = [] + + def add_subscriber(self, name): + """ Add a subscriber to the queue. This returns the + :class:`multiprocessing.Queue` object that the subscriber + should read from. """ + self._queues[name] = multiprocessing.Queue() + return self._queues[name] + + def publish(self, method, args=None, kwargs=None): + """ Publish an RPC call to the queue for consumption by all + subscribers. """ + for queue in self._queues.values(): + queue.put((None, (method, args or [], kwargs or dict()))) + + def rpc(self, dest, method, args=None, kwargs=None): + """ Make an RPC call to the named subscriber, expecting a + response. This opens a + :class:`multiprocessing.connection.Listener` and passes the + Listener address to the child as part of the RPC call, so that + the child can connect to the Listener to submit its results. + + Listeners are reused when possible to minimize overhead. + """ + try: + listener = self._available_listeners.get_nowait() + self.logger.debug("Reusing existing RPC listener at %s" % + listener.address) + except Empty: + listener = Listener() + self.logger.debug("Created new RPC listener at %s" % + listener.address) + self._blocking_listeners.append(listener) + try: + self._queues[dest].put((listener.address, + (method, args or [], kwargs or dict()))) + conn = listener.accept() + self._blocking_listeners.remove(listener) + try: + while not self._terminate.is_set(): + if conn.poll(self.poll_wait): + return conn.recv() + finally: + conn.close() + finally: + self._available_listeners.put(listener) def close(self): - """ Close the queue. See :func:`multiprocessing.Queue.close` - for more details. """ - for queue in self._queues: + """ Close queues and connections. """ + self._terminate.set() + self.logger.debug("Closing RPC queues") + for name, queue in self._queues.items(): + self.logger.debug("Closing RPC queue to %s" % name) queue.close() + # close any listeners that are waiting for connections + self.logger.debug("Closing RPC connections") + for listener in self._blocking_listeners: + self.logger.debug("Closing RPC connection at %s" % + listener.address) + listener.close() -class ThreadSafePipeDispatcher(Debuggable): - """ This is a wrapper around :class:`multiprocessing.Pipe` objects - that allows them to be used in multithreaded applications. When - performing a ``send()``, a key is included that will be used to - identify the response. As responses are received from the Pipe, - they are added to a dict that is used to get the appropriate - response for a given thread. - - The remote end of the Pipe must deal with the key being sent with - the data in a tuple of ``(key, data)``, and it must include the - key with its response. - - It is the responsibility of the user to ensure that the key is - unique. - - Note that this adds a bottleneck -- all communication over the - actual Pipe happens in a single thread. But for our purposes, - Pipe communication is fairly minimal and that's an acceptable - bottleneck.""" - - #: How long to wait while polling for new data to send. This - #: doesn't affect the speed with which data is sent, but - #: setting it too high will result in longer shutdown times, since - #: we only check for the termination event from the main process - #: every ``poll_wait`` seconds. - poll_wait = 2.0 - - _sentinel = object() - - def __init__(self, terminate): - Debuggable.__init__(self) - - #: The threading flag that is used to determine when the - #: threads should stop. - self.terminate = terminate - - #: The :class:`multiprocessing.Pipe` tuple used by this object - self.pipe = multiprocessing.Pipe() - - self._mainpipe = self.pipe[0] - self._recv_dict = dict() - self._send_queue = Queue() - - self.send_thread = threading.Thread(name="PipeSendThread", - target=self._send_thread) - self.send_thread.start() - self.recv_thread = threading.Thread(name="PipeRecvThread", - target=self._recv_thread) - self.recv_thread.start() - - def _send_thread(self): - """ Run the single thread through which send requests are passed """ - self.logger.debug("Starting interprocess RPC send thread") - while not self.terminate.isSet(): - try: - self._mainpipe.send(self._send_queue.get(True, self.poll_wait)) - except Empty: - pass - self.logger.info("Interprocess RPC send thread stopped") - - def send(self, key, data): - """ Send data with the given unique key """ - self._send_queue.put((key, data)) - - def _recv_thread(self): - """ Run the single thread through which recv requests are passed """ - self.logger.debug("Starting interprocess RPC receive thread") - while not self.terminate.isSet(): - if self._mainpipe.poll(self.poll_wait): - key, data = self._mainpipe.recv() - if key in self._recv_dict: - self.logger.error("Duplicate key in received data: %s" % - key) - self._mainpipe.close() - self._recv_dict[key] = data - self.logger.info("Interprocess RPC receive thread stopped") - - def recv(self, key): - """ Receive data with the given unique key """ - self.poll(key, timeout=None) - return self._recv_dict.pop(key) - - def poll(self, key, timeout=_sentinel): - """ Poll for data with the given unique key. See - :func:`multiprocessing.Connection.poll` for the possible - values of ``timeout``. """ - if timeout is self._sentinel: - return key in self._recv_dict - - abort = threading.Event() - - if timeout is not None: - timer = threading.Timer(float(timeout), abort.set) - timer.start() + self.logger.debug("Closing RPC listeners") try: - while not abort.is_set(): - if key in self._recv_dict: - return True - return False - finally: - if timeout is not None: - timer.cancel() - - @staticmethod - def genkey(base): - """ Generate a key suitable for use with - :class:`Bcfg2.Server.MultiprocessingCore.ThreadSafePipeDispatcher` - send() requests, based on the given data. The key is - constructed from the string given, some information about this - thread, and some random data. """ - thread = threading.current_thread() - return "%s-%s-%s-%s" % (base, thread.name, thread.ident, uuid4()) + while True: + listener = self._available_listeners.get_nowait() + self.logger.debug("Closing RPC listener at %s" % + listener.address) + listener.close() + except Empty: + pass class DualEvent(object): @@ -258,18 +186,18 @@ class ChildCore(BaseCore): #: every ``poll_wait`` seconds. poll_wait = 3.0 - def __init__(self, name, setup, rpc_pipe, cmd_q, terminate): + def __init__(self, name, setup, rpc_q, terminate): """ :param name: The name of this child :type name: string :param setup: A Bcfg2 options dict :type setup: Bcfg2.Options.OptionParser - :param rpc_pipe: The pipe used for RPC communication with the - parent process - :type rpc_pipe: multiprocessing.Pipe - :param cmd_q: The queue used for one-way pub-sub - communications from the parent process - :type cmd_q: multiprocessing.Queue + :param read_q: The queue the child will read from for RPC + communications from the parent process. + :type read_q: multiprocessing.Queue + :param write_q: The queue the child will write the results of + RPC calls to. + :type write_q: multiprocessing.Queue :param terminate: An event that flags ChildCore objects to shut themselves down. :type terminate: multiprocessing.Event @@ -279,22 +207,12 @@ class ChildCore(BaseCore): #: The name of this child self.name = name - #: The pipe used for RPC communication with the parent - self.rpc_pipe = rpc_pipe - - #: The queue used to receive pub-sub commands - self.cmd_q = cmd_q - #: The :class:`multiprocessing.Event` that will be monitored #: to determine when this child should shut down. self.terminate = terminate - # a list of all rendering threads - self._threads = [] - - # the thread used to process publish-subscribe commands - self._command_thread = threading.Thread(name="CommandThread", - target=self._dispatch_commands) + #: The queue used for RPC communication + self.rpc_q = rpc_q # override this setting so that the child doesn't try to write # the pidfile @@ -304,75 +222,60 @@ class ChildCore(BaseCore): self.perflog_thread = None def _run(self): - self._command_thread.start() return True def _daemonize(self): return True - def _dispatch_commands(self): - """ Dispatch commands received via the pub-sub queue interface - """ - self.logger.debug("Starting %s RPC subscription thread" % self.name) - while not self.terminate.is_set(): - try: - data = self.cmd_q.get(True, self.poll_wait) - self.logger.debug("%s: Processing asynchronous command: %s" % - (self.name, data[0])) - self._dispatch(data) - except Empty: - pass - self.logger.info("%s RPC subscription thread stopped" % self.name) - - def _dispatch_render(self): - """ Dispatch render requests received via the RPC pipe - interface """ - key, data = self.rpc_pipe.recv() - self.rpc_pipe.send((key, self._dispatch(data))) - - @track_statistics() - def _reap_threads(self): - """ Reap rendering threads that have completed """ - for thread in self._threads[:]: - if not thread.is_alive(): - self._threads.remove(thread) - - def _dispatch(self, data): - """ Generic method dispatcher used for commands received from - either the pub-sub queue or the RPC pipe. """ + def _dispatch(self, address, data): + """ Method dispatcher used for commands received from + the RPC queue. """ + if address is not None: + # if the key is None, then no response is expected. we + # make the return connection before dispatching the actual + # RPC call so that the parent is blocking for a connection + # as briefly as possible + self.logger.error("Connecting to parent via %s" % address) + client = Client(address) method, args, kwargs = data + rv = None if not hasattr(self, method): self.logger.error("%s: Method %s does not exist" % (self.name, method)) - return None - - func = getattr(self, method) - if func.exposed: - self.logger.debug("%s: Calling RPC method %s" % (self.name, - method)) - return func(*args, **kwargs) else: - self.logger.error("%s: Method %s is not exposed" % (self.name, - method)) - return None + func = getattr(self, method) + if func.exposed: + self.logger.debug("%s: Calling RPC method %s" % (self.name, + method)) + rv = func(*args, **kwargs) + else: + self.logger.error("%s: Method %s is not exposed" % (self.name, + method)) + if address is not None: + # if the key is None, then no response is expected + self.logger.error("Returning data to parent via %s" % address) + client.send(rv) def _block(self): - while not self.terminate.isSet(): + while not self.terminate.is_set(): try: - if self.rpc_pipe.poll(self.poll_wait): - rpc_thread = threading.Thread( - name="Renderer%s" % len(self._threads), - target=self._dispatch_render) - self._threads.append(rpc_thread) - rpc_thread.start() - self._reap_threads() + address, data = self.rpc_q.get(timeout=self.poll_wait) + threadname = "-".join(str(i) for i in data) + rpc_thread = threading.Thread(name=threadname, + target=self._dispatch, + args=[address, data]) + rpc_thread.start() + except Empty: + pass except KeyboardInterrupt: break self.shutdown() def shutdown(self): BaseCore.shutdown(self) - self._reap_threads() + self.logger.info("%s: Closing RPC command queue" % self.name) + self.rpc_q.close() + while len(threading.enumerate()) > 1: threads = [t for t in threading.enumerate() if t != threading.current_thread()] @@ -380,13 +283,8 @@ class ChildCore(BaseCore): (self.name, len(threads), [t.name for t in threads])) time.sleep(1) - self._reap_threads() self.logger.info("%s: All threads stopped" % self.name) - @exposed - def set_debug(self, address, debug): - BaseCore.set_debug(self, address, debug) - @exposed def expire_cache(self, client=None): """ Expire the metadata cache for a client """ @@ -417,22 +315,15 @@ class Core(BuiltinCore): if setup['children'] is None: setup['children'] = multiprocessing.cpu_count() - #: A dict of child name -> - #: :class:`Bcfg2.Server.MultiprocessingCore.ThreadSafePipeDispatcher` - #: objects used to pass render requests to that child. (The - #: child is given the other end of the Pipe.) - self.pipes = dict() - - #: A - #: :class:`Bcfg2.Server.MultiprocessingCore.PublishSubscribeQueue` - #: object used to publish commands to all children. - self.cmd_q = PublishSubscribeQueue() - #: The flag that indicates when to stop child threads and #: processes self.terminate = DualEvent(threading_event=self.terminate) - self.metadata_cache = DispatchingCache(queue=self.cmd_q) + #: A :class:`Bcfg2.Server.MultiprocessingCore.RPCQueue` object + #: used to send or publish commands to children. + self.rpc_q = RPCQueue() + + self.metadata_cache = DispatchingCache(queue=self.rpc_q) #: A list of children that will be cycled through self._all_children = [] @@ -445,13 +336,9 @@ class Core(BuiltinCore): for cnum in range(self.setup['children']): name = "Child-%s" % cnum - # create Pipe for render requests - dispatcher = ThreadSafePipeDispatcher(self.terminate) - self.pipes[name] = dispatcher - self.logger.debug("Starting child %s" % name) - childcore = ChildCore(name, self.setup, dispatcher.pipe[1], - self.cmd_q.add_subscriber(), self.terminate) + child_q = self.rpc_q.add_subscriber(name) + childcore = ChildCore(name, self.setup, child_q, self.terminate) child = multiprocessing.Process(target=childcore.run, name=name) child.start() self.logger.debug("Child %s started with PID %s" % (name, @@ -464,8 +351,8 @@ class Core(BuiltinCore): def shutdown(self): BuiltinCore.shutdown(self) - self.logger.debug("Closing RPC command queues") - self.cmd_q.close() + self.logger.info("Closing RPC command queues") + self.rpc_q.close() def term_children(): """ Terminate all remaining multiprocessing children. """ @@ -496,10 +383,9 @@ class Core(BuiltinCore): @exposed def set_debug(self, address, debug): - self.cmd_q.put(("set_debug", [address, debug], dict())) + self.rpc_q.set_debug(debug) + self.rpc_q.publish("set_debug", args=[address, debug]) self.metadata_cache.set_debug(debug) - for pipe in self.pipes.values(): - pipe.set_debug(debug) return BuiltinCore.set_debug(self, address, debug) @exposed @@ -508,10 +394,7 @@ class Core(BuiltinCore): childname = self.children.next() self.logger.debug("Building configuration for %s on %s" % (client, childname)) - key = ThreadSafePipeDispatcher.genkey(client) - pipe = self.pipes[childname] - pipe.send(key, ("GetConfig", [client], dict())) - return pipe.recv(key) + return self.rpc_q.rpc(childname, "GetConfig", args=[client]) @exposed def get_statistics(self, address): @@ -543,10 +426,10 @@ class Core(BuiltinCore): (vals[2] * vals[3])) / newcount stats[totalname] = (newmin, newmax, newmean, newcount) - key = ThreadSafePipeDispatcher.genkey("get_statistics") stats = dict() - for childname, pipe in self.pipes.items(): - pipe.send(key, ("get_statistics", [address], dict())) - _aggregate_statistics(pipe.recv(key), prefix=childname) + for childname in self._all_children: + _aggregate_statistics( + self.rpc_q.rpc(childname, "get_statistics", args=[address]), + prefix=childname) _aggregate_statistics(BuiltinCore.get_statistics(self, address)) return stats -- cgit v1.2.3-1-g7c22 From dd97411cd650bfc157a7835ed9b859f5def5abb0 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Mon, 5 Aug 2013 14:36:59 -0400 Subject: Core: track statistics on resolve_client --- src/lib/Bcfg2/Server/Core.py | 1 + 1 file changed, 1 insertion(+) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/Core.py b/src/lib/Bcfg2/Server/Core.py index de8015a3f..d06277fa2 100644 --- a/src/lib/Bcfg2/Server/Core.py +++ b/src/lib/Bcfg2/Server/Core.py @@ -903,6 +903,7 @@ class BaseCore(object): state.get('state'))) self.client_run_hook("end_statistics", meta) + @track_statistics() def resolve_client(self, address, cleanup_cache=False, metadata=True): """ Given a client address, get the client hostname and optionally metadata. -- cgit v1.2.3-1-g7c22 From cb8b6e95e010ae322cb6be9b79e1a89009f45948 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Mon, 5 Aug 2013 14:50:23 -0400 Subject: Multiprocessing: proxy RecvProbeData calls This proxies RecvProbeData calls to child cores to expire the probe cache. The probe data itself is not relayed, just the fact that there was probe data received from a given client. Fixes #129. --- src/lib/Bcfg2/Server/MultiprocessingCore.py | 15 +++++++++++++++ src/lib/Bcfg2/Server/Plugins/Probes.py | 19 ++++++++++++++----- 2 files changed, 29 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index 4c304d28c..8031c69d4 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -290,6 +290,13 @@ class ChildCore(BaseCore): """ Expire the metadata cache for a client """ self.metadata_cache.expire(client) + @exposed + def RecvProbeData(self, address, _): + """ Expire the probe cache for a client """ + if 'Probes' in self.plugins: + client = self.resolve_client(address, metadata=False) + self.plugins['Probes'].load_data(client) + @exposed def GetConfig(self, client): """ Render the configuration for a client """ @@ -388,6 +395,14 @@ class Core(BuiltinCore): self.metadata_cache.set_debug(debug) return BuiltinCore.set_debug(self, address, debug) + @exposed + def RecvProbeData(self, address, probedata): + rv = BuiltinCore.RecvProbeData(self, address, probedata) + # we don't want the children to actually process probe data, + # so we don't send the data, just the fact that we got some. + self.rpc_q.publish("RecvProbeData", args=[address, None]) + return rv + @exposed def GetConfig(self, address): client = self.resolve_client(address)[0] diff --git a/src/lib/Bcfg2/Server/Plugins/Probes.py b/src/lib/Bcfg2/Server/Plugins/Probes.py index 407cfc2d4..b58fbf715 100644 --- a/src/lib/Bcfg2/Server/Plugins/Probes.py +++ b/src/lib/Bcfg2/Server/Plugins/Probes.py @@ -266,12 +266,14 @@ class Probes(Bcfg2.Server.Plugin.Probing, hostname=client.hostname).exclude( group__in=self.cgroups[client.hostname]).delete() - def load_data(self): + def load_data(self, client=None): """ Load probe data from the appropriate backend (probed.xml or the database) """ if self._use_db: - return self._load_data_db() + return self._load_data_db(client) else: + # the XML backend doesn't support loading data for single + # clients, so it reloads all data return self._load_data_xml() def _load_data_xml(self): @@ -296,16 +298,23 @@ class Probes(Bcfg2.Server.Plugin.Probing, elif pdata.tag == 'Group': self.cgroups[client.get('name')].append(pdata.get('name')) - def _load_data_db(self): + def _load_data_db(self, client=None): """ Load probe data from the database """ self.probedata = {} self.cgroups = {} - for pdata in ProbesDataModel.objects.all(): + if client is None: + probedata = ProbesDataModel.objects.all() + groupdata = ProbesGroupsModel.objects.all() + else: + probedata = ProbesDataModel.objects.filter(hostname=client) + groupdata = ProbesGroupsModel.objects.filter(hostname=client) + + for pdata in probedata: if pdata.hostname not in self.probedata: self.probedata[pdata.hostname] = ClientProbeDataSet( timestamp=time.mktime(pdata.timestamp.timetuple())) self.probedata[pdata.hostname][pdata.probe] = ProbeData(pdata.data) - for pgroup in ProbesGroupsModel.objects.all(): + for pgroup in groupdata: if pgroup.hostname not in self.cgroups: self.cgroups[pgroup.hostname] = [] self.cgroups[pgroup.hostname].append(pgroup.group) -- cgit v1.2.3-1-g7c22 From 6c86b70c58d05bd6d1e6e9630d9490e36dccfe92 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Mon, 5 Aug 2013 15:13:39 -0400 Subject: Probes: fixed unit test --- src/lib/Bcfg2/Server/Plugins/Probes.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/Plugins/Probes.py b/src/lib/Bcfg2/Server/Plugins/Probes.py index b58fbf715..8c552a90b 100644 --- a/src/lib/Bcfg2/Server/Plugins/Probes.py +++ b/src/lib/Bcfg2/Server/Plugins/Probes.py @@ -270,7 +270,7 @@ class Probes(Bcfg2.Server.Plugin.Probing, """ Load probe data from the appropriate backend (probed.xml or the database) """ if self._use_db: - return self._load_data_db(client) + return self._load_data_db(client=client) else: # the XML backend doesn't support loading data for single # clients, so it reloads all data -- cgit v1.2.3-1-g7c22 From 6c500a1660c8012be88638785d3befcd4564216a Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Mon, 5 Aug 2013 16:32:30 -0400 Subject: Multiprocessing: fixed resolve_client call --- src/lib/Bcfg2/Server/MultiprocessingCore.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index 8031c69d4..f67161fc5 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -294,7 +294,7 @@ class ChildCore(BaseCore): def RecvProbeData(self, address, _): """ Expire the probe cache for a client """ if 'Probes' in self.plugins: - client = self.resolve_client(address, metadata=False) + client = self.resolve_client(address, metadata=False)[0] self.plugins['Probes'].load_data(client) @exposed -- cgit v1.2.3-1-g7c22 From 54d86177513c26f52e4aa7947e5bbc8ba91969ff Mon Sep 17 00:00:00 2001 From: Michael Fenn Date: Mon, 5 Aug 2013 19:15:37 -0400 Subject: Metadata: Don't update XML on gratuitous profile update Check to see if the profile that is being set by set_profile exactly matches the existing profile list. If it does, then avoid writing out a new clients.xml. This simple optimization reduces the amount of clients.xml rewriting that occurs if you have a bunch of clients running bcfg2 -p at the same time (for example, during a cluster rebuild). --- src/lib/Bcfg2/Server/Plugins/Metadata.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/Plugins/Metadata.py b/src/lib/Bcfg2/Server/Plugins/Metadata.py index e8962d707..973acb89a 100644 --- a/src/lib/Bcfg2/Server/Plugins/Metadata.py +++ b/src/lib/Bcfg2/Server/Plugins/Metadata.py @@ -980,9 +980,10 @@ class Metadata(Bcfg2.Server.Plugin.Metadata, profiles = [g for g in self.clientgroups[client] if g in self.groups and self.groups[g].is_profile] - self.logger.info("Changing %s profile from %s to %s" % + if profiles != [profile]: + self.logger.info("Changing %s profile from %s to %s" % (client, profiles, profile)) - self.update_client(client, dict(profile=profile)) + self.update_client(client, dict(profile=profile)) if client in self.clientgroups: for prof in profiles: self.clientgroups[client].remove(prof) -- cgit v1.2.3-1-g7c22 From 8b6a550d6dfa086935bf695ded1cdad89a383ff4 Mon Sep 17 00:00:00 2001 From: Michael Fenn Date: Mon, 5 Aug 2013 19:29:25 -0400 Subject: Make pylint happy --- src/lib/Bcfg2/Server/Plugins/Metadata.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/Plugins/Metadata.py b/src/lib/Bcfg2/Server/Plugins/Metadata.py index 973acb89a..512090ac5 100644 --- a/src/lib/Bcfg2/Server/Plugins/Metadata.py +++ b/src/lib/Bcfg2/Server/Plugins/Metadata.py @@ -982,7 +982,7 @@ class Metadata(Bcfg2.Server.Plugin.Metadata, if g in self.groups and self.groups[g].is_profile] if profiles != [profile]: self.logger.info("Changing %s profile from %s to %s" % - (client, profiles, profile)) + (client, profiles, profile)) self.update_client(client, dict(profile=profile)) if client in self.clientgroups: for prof in profiles: -- cgit v1.2.3-1-g7c22 From a96a3301d5a2b0630795709c00e80f938fed10a4 Mon Sep 17 00:00:00 2001 From: Michael Fenn Date: Mon, 5 Aug 2013 19:55:56 -0400 Subject: Found a stray write that should be part of the new client case --- src/lib/Bcfg2/Server/Plugins/Metadata.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/Plugins/Metadata.py b/src/lib/Bcfg2/Server/Plugins/Metadata.py index 512090ac5..0e2277239 100644 --- a/src/lib/Bcfg2/Server/Plugins/Metadata.py +++ b/src/lib/Bcfg2/Server/Plugins/Metadata.py @@ -1005,8 +1005,8 @@ class Metadata(Bcfg2.Server.Plugin.Metadata, self.add_client(client, dict(profile=profile)) self.clients.append(client) self.clientgroups[client] = [profile] - if not self._use_db: - self.clients_xml.write() + if not self._use_db: + self.clients_xml.write() def set_version(self, client, version): """Set version for provided client.""" -- cgit v1.2.3-1-g7c22 From 936b8aa2b6329ddf45c1032171c16d0eaea99b37 Mon Sep 17 00:00:00 2001 From: Michael Fenn Date: Mon, 5 Aug 2013 21:19:41 -0400 Subject: Make updating the structures conditional on profile changing Based on the expectations of the tests, I am reasonably confident that updating the in memory structures is logically part of changing the client's profile so I put it in the if block --- src/lib/Bcfg2/Server/Plugins/Metadata.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/Plugins/Metadata.py b/src/lib/Bcfg2/Server/Plugins/Metadata.py index 0e2277239..e62b3f77f 100644 --- a/src/lib/Bcfg2/Server/Plugins/Metadata.py +++ b/src/lib/Bcfg2/Server/Plugins/Metadata.py @@ -984,12 +984,12 @@ class Metadata(Bcfg2.Server.Plugin.Metadata, self.logger.info("Changing %s profile from %s to %s" % (client, profiles, profile)) self.update_client(client, dict(profile=profile)) - if client in self.clientgroups: - for prof in profiles: - self.clientgroups[client].remove(prof) - self.clientgroups[client].append(profile) - else: - self.clientgroups[client] = [profile] + if client in self.clientgroups: + for prof in profiles: + self.clientgroups[client].remove(prof) + self.clientgroups[client].append(profile) + else: + self.clientgroups[client] = [profile] else: self.logger.info("Creating new client: %s, profile %s" % (client, profile)) -- cgit v1.2.3-1-g7c22 From 2ebb02acd1d53e1bd88793ad67ea9d0c4354c03f Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 6 Aug 2013 10:20:50 -0400 Subject: bcfg2-yum-helper: fully download group metadata during makecache --- src/sbin/bcfg2-yum-helper | 1 + 1 file changed, 1 insertion(+) (limited to 'src') diff --git a/src/sbin/bcfg2-yum-helper b/src/sbin/bcfg2-yum-helper index b8c99953d..03b672ac7 100755 --- a/src/sbin/bcfg2-yum-helper +++ b/src/sbin/bcfg2-yum-helper @@ -262,6 +262,7 @@ class CacheManager(YumHelper): self.yumbase.repos.populateSack(mdtype='metadata', cacheonly=1) self.yumbase.repos.populateSack(mdtype='filelists', cacheonly=1) self.yumbase.repos.populateSack(mdtype='otherdata', cacheonly=1) + self.yumbase.comps def main(): -- cgit v1.2.3-1-g7c22 From ddaac2a41e5f0521385e989760e6c85511e1b9f5 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 6 Aug 2013 10:35:55 -0400 Subject: bcfg2-yum-helper: disable pylint check --- src/sbin/bcfg2-yum-helper | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'src') diff --git a/src/sbin/bcfg2-yum-helper b/src/sbin/bcfg2-yum-helper index 03b672ac7..49baeb9c3 100755 --- a/src/sbin/bcfg2-yum-helper +++ b/src/sbin/bcfg2-yum-helper @@ -262,7 +262,8 @@ class CacheManager(YumHelper): self.yumbase.repos.populateSack(mdtype='metadata', cacheonly=1) self.yumbase.repos.populateSack(mdtype='filelists', cacheonly=1) self.yumbase.repos.populateSack(mdtype='otherdata', cacheonly=1) - self.yumbase.comps + # this does something with the groups cache as a side effect + self.yumbase.comps # pylint: disable=W0104 def main(): -- cgit v1.2.3-1-g7c22 From 4e60e190a4267e69365f1968351f558e1102db94 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 6 Aug 2013 14:19:59 -0400 Subject: Core: log when a cached metadata object is used --- src/lib/Bcfg2/Server/Core.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/Core.py b/src/lib/Bcfg2/Server/Core.py index d06277fa2..1b56099df 100644 --- a/src/lib/Bcfg2/Server/Core.py +++ b/src/lib/Bcfg2/Server/Core.py @@ -876,6 +876,9 @@ class BaseCore(object): imd.query.by_name = self.build_metadata if self.metadata_cache_mode in ['cautious', 'aggressive']: self.metadata_cache[client_name] = imd + else: + self.logger.debug("Using cached metadata object for %s" % + client_name) return imd def process_statistics(self, client_name, statistics): -- cgit v1.2.3-1-g7c22 From 794d1cb59f9cf7b8006b261bcbb6aa3863c52975 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 6 Aug 2013 14:21:38 -0400 Subject: MultiprocessingCore: log some messages as debug, not error --- src/lib/Bcfg2/Server/MultiprocessingCore.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index f67161fc5..b690242f8 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -235,7 +235,7 @@ class ChildCore(BaseCore): # make the return connection before dispatching the actual # RPC call so that the parent is blocking for a connection # as briefly as possible - self.logger.error("Connecting to parent via %s" % address) + self.logger.debug("Connecting to parent via %s" % address) client = Client(address) method, args, kwargs = data rv = None @@ -253,7 +253,7 @@ class ChildCore(BaseCore): method)) if address is not None: # if the key is None, then no response is expected - self.logger.error("Returning data to parent via %s" % address) + self.logger.debug("Returning data to parent via %s" % address) client.send(rv) def _block(self): -- cgit v1.2.3-1-g7c22 From e13dc6c4cce99fb2ab63fb5fbc60addd0688355d Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 6 Aug 2013 14:24:44 -0400 Subject: Plugin: quiet down set_debug, but keep it useful --- src/lib/Bcfg2/Server/Plugin/base.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/Plugin/base.py b/src/lib/Bcfg2/Server/Plugin/base.py index c825a57b5..244c73aa8 100644 --- a/src/lib/Bcfg2/Server/Plugin/base.py +++ b/src/lib/Bcfg2/Server/Plugin/base.py @@ -34,9 +34,6 @@ class Debuggable(object): :returns: bool - The new value of the debug flag """ self.debug_flag = debug - self.debug_log("%s: debug = %s" % (self.__class__.__name__, - self.debug_flag), - flag=True) return debug def toggle_debug(self): @@ -136,6 +133,8 @@ class Plugin(Debuggable): self.running = False def set_debug(self, debug): + self.debug_log("%s: debug = %s" % (self.name, self.debug_flag), + flag=True) for entry in self.Entries.values(): if isinstance(entry, Debuggable): entry.set_debug(debug) -- cgit v1.2.3-1-g7c22 From 159b152fcaecbbd69ad3665f8dd00c37d81af4e4 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 6 Aug 2013 14:26:44 -0400 Subject: Probes: expire metadata cache after loading data --- src/lib/Bcfg2/Server/Plugins/Probes.py | 6 ++++++ 1 file changed, 6 insertions(+) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/Plugins/Probes.py b/src/lib/Bcfg2/Server/Plugins/Probes.py index 8c552a90b..87688a804 100644 --- a/src/lib/Bcfg2/Server/Plugins/Probes.py +++ b/src/lib/Bcfg2/Server/Plugins/Probes.py @@ -298,6 +298,9 @@ class Probes(Bcfg2.Server.Plugin.Probing, elif pdata.tag == 'Group': self.cgroups[client.get('name')].append(pdata.get('name')) + if self.core.metadata_cache_mode in ['cautious', 'aggressive']: + self.core.metadata_cache.expire() + def _load_data_db(self, client=None): """ Load probe data from the database """ self.probedata = {} @@ -319,6 +322,9 @@ class Probes(Bcfg2.Server.Plugin.Probing, self.cgroups[pgroup.hostname] = [] self.cgroups[pgroup.hostname].append(pgroup.group) + if self.core.metadata_cache_mode in ['cautious', 'aggressive']: + self.core.metadata_cache.expire(client) + @Bcfg2.Server.Plugin.track_statistics() def GetProbes(self, meta): return self.probes.get_probe_data(meta) -- cgit v1.2.3-1-g7c22 From 99c680e94132d5bf3110bd14bfabc9e407b1dae9 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 6 Aug 2013 14:50:37 -0400 Subject: Probes: properly clear cache When reloading probe groups/data for a single client from the database, only clear data for that client, not for all clients. --- src/lib/Bcfg2/Server/Plugins/Probes.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/Plugins/Probes.py b/src/lib/Bcfg2/Server/Plugins/Probes.py index 87688a804..bf59809f7 100644 --- a/src/lib/Bcfg2/Server/Plugins/Probes.py +++ b/src/lib/Bcfg2/Server/Plugins/Probes.py @@ -303,12 +303,14 @@ class Probes(Bcfg2.Server.Plugin.Probing, def _load_data_db(self, client=None): """ Load probe data from the database """ - self.probedata = {} - self.cgroups = {} if client is None: + self.probedata = {} + self.cgroups = {} probedata = ProbesDataModel.objects.all() groupdata = ProbesGroupsModel.objects.all() else: + self.probedata.pop(client, None) + self.cgroups.pop(client, None) probedata = ProbesDataModel.objects.filter(hostname=client) groupdata = ProbesGroupsModel.objects.filter(hostname=client) -- cgit v1.2.3-1-g7c22 From 4bf1c82868357e5c357e0b656419af37a99a8545 Mon Sep 17 00:00:00 2001 From: Michael Fenn Date: Tue, 6 Aug 2013 16:01:12 -0400 Subject: Get profile by building metadata instead of guessing Rather than doing some ad-hoc lookups of internal data structures stpierre suggested that it'd be better to use the normal metadata build procedures. This implements that and adjusts the tests. --- src/lib/Bcfg2/Server/Plugins/Metadata.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/Plugins/Metadata.py b/src/lib/Bcfg2/Server/Plugins/Metadata.py index e62b3f77f..cc0456334 100644 --- a/src/lib/Bcfg2/Server/Plugins/Metadata.py +++ b/src/lib/Bcfg2/Server/Plugins/Metadata.py @@ -978,18 +978,21 @@ class Metadata(Bcfg2.Server.Plugin.Metadata, self.logger.error(msg) raise Bcfg2.Server.Plugin.PluginExecutionError(msg) - profiles = [g for g in self.clientgroups[client] - if g in self.groups and self.groups[g].is_profile] - if profiles != [profile]: + metadata = self.core.build_metadata(client) + if metadata.profile != profile: self.logger.info("Changing %s profile from %s to %s" % - (client, profiles, profile)) + (client, metadata.profile, profile)) self.update_client(client, dict(profile=profile)) if client in self.clientgroups: - for prof in profiles: - self.clientgroups[client].remove(prof) + if metadata.profile in self.clientgroups[client]: + self.clientgroups[client].remove(metadata.profile) self.clientgroups[client].append(profile) else: self.clientgroups[client] = [profile] + else: + self.logger.debug( + "Ignoring %s request to change profile from %s to %s" + % (client, metadata.profile, profile)) else: self.logger.info("Creating new client: %s, profile %s" % (client, profile)) -- cgit v1.2.3-1-g7c22 From 441a96457ba55529c0316a7459dcb295988824b0 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 7 Aug 2013 11:23:03 -0400 Subject: Packages: cache group and package selections --- src/lib/Bcfg2/Server/Plugins/Packages/__init__.py | 37 ++++++++++++++++++++--- 1 file changed, 32 insertions(+), 5 deletions(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py b/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py index cd1bb70b0..470a52bbc 100644 --- a/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py +++ b/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py @@ -157,8 +157,21 @@ class Packages(Bcfg2.Server.Plugin.Plugin, #: object when one is requested, so each entry is very #: short-lived -- it's purged at the end of each client run. self.clients = dict() - # pylint: enable=C0301 + #: groupcache caches group lookups. It maps Collections (via + #: :attr:`Bcfg2.Server.Plugins.Packages.Collection.Collection.cachekey`) + #: to sets of package groups, and thence to the packages + #: indicated by those groups. + self.groupcache = dict() + + #: pkgcache caches complete package sets. It maps Collections + #: (via + #: :attr:`Bcfg2.Server.Plugins.Packages.Collection.Collection.cachekey`) + #: to sets of initial packages, and thence to the final + #: (complete) package selections resolved from the initial + #: packages + self.pkgcache = dict() + # pylint: enable=C0301 __init__.__doc__ = Bcfg2.Server.Plugin.Plugin.__init__.__doc__ def set_debug(self, debug): @@ -402,14 +415,24 @@ class Packages(Bcfg2.Server.Plugin.Plugin, for el in to_remove: el.getparent().remove(el) - gpkgs = collection.get_groups(groups) - for pkgs in gpkgs.values(): + groups.sort() + # check for this set of groups in the group cache + gkey = hash(tuple(groups)) + if gkey not in self.groupcache[collection.cachekey]: + self.groupcache[collection.cachekey][gkey] = \ + collection.get_groups(groups) + for pkgs in self.groupcache[collection.cachekey][gkey].values(): base.update(pkgs) # essential pkgs are those marked as such by the distribution base.update(collection.get_essential()) - packages, unknown = collection.complete(base) + # check for this set of packages in the package cache + pkey = hash(tuple(base)) + if pkey not in self.pkgcache[collection.cachekey]: + self.pkgcache[collection.cachekey][pkey] = \ + collection.complete(base) + packages, unknown = self.pkgcache[collection.cachekey][pkey] if unknown: self.logger.info("Packages: Got %d unknown entries" % len(unknown)) self.logger.info("Packages: %s" % list(unknown)) @@ -462,9 +485,11 @@ class Packages(Bcfg2.Server.Plugin.Plugin, if not self.disableMetaData: collection.setup_data(force_update) - # clear Collection caches + # clear Collection and package caches self.clients = dict() self.collections = dict() + self.groupcache = dict() + self.pkgcache = dict() for source in self.sources.entries: cachefiles.add(source.cachefile) @@ -578,6 +603,8 @@ class Packages(Bcfg2.Server.Plugin.Plugin, if cclass != Collection: self.clients[metadata.hostname] = ckey self.collections[ckey] = collection + self.groupcache.setdefault(ckey, dict()) + self.pkgcache.setdefault(ckey, dict()) return collection def get_additional_data(self, metadata): -- cgit v1.2.3-1-g7c22 From 7e9787c947e99b68317f5420951a296cea858daa Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 7 Aug 2013 11:37:38 -0400 Subject: Plugin: added new Caching interface This gives a single unified interface for expiring caches, no matter the plugin. This will be particularly useful with the MultiprocessingCore, as certain calls must be dispatched to child processes to expire their caches. --- src/lib/Bcfg2/Server/Core.py | 18 ++++++++++++++++-- src/lib/Bcfg2/Server/MultiprocessingCore.py | 16 ++++++++-------- src/lib/Bcfg2/Server/Plugin/interfaces.py | 8 ++++++++ src/lib/Bcfg2/Server/Plugins/Metadata.py | 7 ++++++- src/lib/Bcfg2/Server/Plugins/Packages/__init__.py | 5 +++++ src/lib/Bcfg2/Server/Plugins/Probes.py | 12 +++++++++--- src/lib/Bcfg2/Server/Plugins/PuppetENC.py | 2 +- src/lib/Bcfg2/Server/Plugins/SSHbase.py | 5 +++++ src/sbin/bcfg2-info | 5 +++-- 9 files changed, 61 insertions(+), 17 deletions(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/Core.py b/src/lib/Bcfg2/Server/Core.py index 1b56099df..b577f65e1 100644 --- a/src/lib/Bcfg2/Server/Core.py +++ b/src/lib/Bcfg2/Server/Core.py @@ -263,6 +263,20 @@ class BaseCore(object): #: metadata self.metadata_cache = Cache() + def expire_caches_by_type(self, base_cls, key=None): + """ Expire caches for all + :class:`Bcfg2.Server.Plugin.interfaces.Caching` plugins that + are instances of ``base_cls``. + + :param base_cls: The base plugin interface class to match (see + :mod:`Bcfg2.Server.Plugin.interfaces`) + :type base_cls: type + :param key: The cache key to expire + """ + for plugin in self.plugins_by_type(base_cls): + if isinstance(plugin, Bcfg2.Server.Plugin.Caching): + plugin.expire_cache(key) + def plugins_by_type(self, base_cls): """ Return a list of loaded plugins that match the passed type. @@ -728,7 +742,7 @@ class BaseCore(object): if event.code2str() == 'deleted': return self.setup.reparse() - self.metadata_cache.expire() + self.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata) def block_for_fam_events(self, handle_events=False): """ Block until all fam events have been handleed, optionally @@ -1084,7 +1098,7 @@ class BaseCore(object): # that's created for RecvProbeData doesn't get cached. # I.e., the next metadata object that's built, after probe # data is processed, is cached. - self.metadata_cache.expire(client) + self.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata) try: xpdata = lxml.etree.XML(probedata.encode('utf-8'), parser=Bcfg2.Server.XMLParser) diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index b690242f8..c185a5893 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -15,16 +15,16 @@ import time import threading import lxml.etree import multiprocessing +import Bcfg2.Server.Plugin from itertools import cycle from Bcfg2.Cache import Cache from Bcfg2.Compat import Queue, Empty -from Bcfg2.Server.Plugin import Debuggable from Bcfg2.Server.Core import BaseCore, exposed from Bcfg2.Server.BuiltinCore import Core as BuiltinCore from multiprocessing.connection import Listener, Client -class DispatchingCache(Cache, Debuggable): +class DispatchingCache(Cache, Bcfg2.Server.Plugin.Debuggable): """ Implementation of :class:`Bcfg2.Cache.Cache` that propagates cache expiration events to child nodes. """ @@ -33,7 +33,7 @@ class DispatchingCache(Cache, Debuggable): def __init__(self, *args, **kwargs): self.rpc_q = kwargs.pop("queue") - Debuggable.__init__(self) + Bcfg2.Server.Plugin.Debuggable.__init__(self) Cache.__init__(self, *args, **kwargs) def expire(self, key=None): @@ -41,7 +41,7 @@ class DispatchingCache(Cache, Debuggable): Cache.expire(self, key=key) -class RPCQueue(Debuggable): +class RPCQueue(Bcfg2.Server.Plugin.Debuggable): """ An implementation of a :class:`multiprocessing.Queue` designed for several additional use patterns: @@ -54,7 +54,7 @@ class RPCQueue(Debuggable): poll_wait = 3.0 def __init__(self): - Debuggable.__init__(self) + Bcfg2.Server.Plugin.Debuggable.__init__(self) self._terminate = threading.Event() self._queues = dict() self._available_listeners = Queue() @@ -293,9 +293,9 @@ class ChildCore(BaseCore): @exposed def RecvProbeData(self, address, _): """ Expire the probe cache for a client """ - if 'Probes' in self.plugins: - client = self.resolve_client(address, metadata=False)[0] - self.plugins['Probes'].load_data(client) + self.expire_caches_by_type(Bcfg2.Server.Plugin.Probing, + key=self.resolve_client(address, + metadata=False)[0]) @exposed def GetConfig(self, client): diff --git a/src/lib/Bcfg2/Server/Plugin/interfaces.py b/src/lib/Bcfg2/Server/Plugin/interfaces.py index 222b94fe3..2dbf75f42 100644 --- a/src/lib/Bcfg2/Server/Plugin/interfaces.py +++ b/src/lib/Bcfg2/Server/Plugin/interfaces.py @@ -598,3 +598,11 @@ class ClientRunHooks(object): :returns: None """ pass + + +class Caching(object): + """ A plugin that caches more than just the data received from the + FAM. This presents a unified interface to clear the cache. """ + + def expire_cache(self, key=None): + raise NotImplementedError diff --git a/src/lib/Bcfg2/Server/Plugins/Metadata.py b/src/lib/Bcfg2/Server/Plugins/Metadata.py index cc0456334..03323d64b 100644 --- a/src/lib/Bcfg2/Server/Plugins/Metadata.py +++ b/src/lib/Bcfg2/Server/Plugins/Metadata.py @@ -487,6 +487,7 @@ class MetadataGroup(tuple): # pylint: disable=E0012,R0924 class Metadata(Bcfg2.Server.Plugin.Metadata, + Bcfg2.Server.Plugin.Caching, Bcfg2.Server.Plugin.ClientRunHooks, Bcfg2.Server.Plugin.DatabaseBacked): """This class contains data for bcfg2 server metadata.""" @@ -495,6 +496,7 @@ class Metadata(Bcfg2.Server.Plugin.Metadata, def __init__(self, core, datastore, watch_clients=True): Bcfg2.Server.Plugin.Metadata.__init__(self) + Bcfg2.Server.Plugin.Caching.__init__(self) Bcfg2.Server.Plugin.ClientRunHooks.__init__(self) Bcfg2.Server.Plugin.DatabaseBacked.__init__(self, core, datastore) self.watch_clients = watch_clients @@ -934,13 +936,16 @@ class Metadata(Bcfg2.Server.Plugin.Metadata, self.groups[gname] self.states['groups.xml'] = True + def expire_cache(self, key=None): + self.core.metadata_cache.expire(key) + def HandleEvent(self, event): """Handle update events for data files.""" for handles, event_handler in self.handlers.items(): if handles(event): # clear the entire cache when we get an event for any # metadata file - self.core.metadata_cache.expire() + self.expire_cache() event_handler(event) if False not in list(self.states.values()) and self.debug_flag: diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py b/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py index 470a52bbc..3e4fb33ec 100644 --- a/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py +++ b/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py @@ -70,6 +70,7 @@ class OnDemandDict(MutableMapping): class Packages(Bcfg2.Server.Plugin.Plugin, + Bcfg2.Server.Plugin.Caching, Bcfg2.Server.Plugin.StructureValidator, Bcfg2.Server.Plugin.Generator, Bcfg2.Server.Plugin.Connector, @@ -94,6 +95,7 @@ class Packages(Bcfg2.Server.Plugin.Plugin, def __init__(self, core, datastore): Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore) + Bcfg2.Server.Plugin.Caching.__init__(self) Bcfg2.Server.Plugin.StructureValidator.__init__(self) Bcfg2.Server.Plugin.Generator.__init__(self) Bcfg2.Server.Plugin.Connector.__init__(self) @@ -458,6 +460,9 @@ class Packages(Bcfg2.Server.Plugin.Plugin, self._load_config() return True + def expire_cache(self, _=None): + self.Reload() + def _load_config(self, force_update=False): """ Load the configuration data and setup sources diff --git a/src/lib/Bcfg2/Server/Plugins/Probes.py b/src/lib/Bcfg2/Server/Plugins/Probes.py index bf59809f7..b9f93052a 100644 --- a/src/lib/Bcfg2/Server/Plugins/Probes.py +++ b/src/lib/Bcfg2/Server/Plugins/Probes.py @@ -181,14 +181,16 @@ class ProbeSet(Bcfg2.Server.Plugin.EntrySet): class Probes(Bcfg2.Server.Plugin.Probing, + Bcfg2.Server.Plugin.Caching, Bcfg2.Server.Plugin.Connector, Bcfg2.Server.Plugin.DatabaseBacked): """ A plugin to gather information from a client machine """ __author__ = 'bcfg-dev@mcs.anl.gov' def __init__(self, core, datastore): - Bcfg2.Server.Plugin.Connector.__init__(self) Bcfg2.Server.Plugin.Probing.__init__(self) + Bcfg2.Server.Plugin.Caching.__init__(self) + Bcfg2.Server.Plugin.Connector.__init__(self) Bcfg2.Server.Plugin.DatabaseBacked.__init__(self, core, datastore) try: @@ -266,6 +268,9 @@ class Probes(Bcfg2.Server.Plugin.Probing, hostname=client.hostname).exclude( group__in=self.cgroups[client.hostname]).delete() + def expire_cache(self, key=None): + self.load_data(client=key) + def load_data(self, client=None): """ Load probe data from the appropriate backend (probed.xml or the database) """ @@ -299,7 +304,7 @@ class Probes(Bcfg2.Server.Plugin.Probing, self.cgroups[client.get('name')].append(pdata.get('name')) if self.core.metadata_cache_mode in ['cautious', 'aggressive']: - self.core.metadata_cache.expire() + self.core.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata) def _load_data_db(self, client=None): """ Load probe data from the database """ @@ -325,7 +330,8 @@ class Probes(Bcfg2.Server.Plugin.Probing, self.cgroups[pgroup.hostname].append(pgroup.group) if self.core.metadata_cache_mode in ['cautious', 'aggressive']: - self.core.metadata_cache.expire(client) + self.core.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata, + key=client) @Bcfg2.Server.Plugin.track_statistics() def GetProbes(self, meta): diff --git a/src/lib/Bcfg2/Server/Plugins/PuppetENC.py b/src/lib/Bcfg2/Server/Plugins/PuppetENC.py index 801e7006d..072f3f7e7 100644 --- a/src/lib/Bcfg2/Server/Plugins/PuppetENC.py +++ b/src/lib/Bcfg2/Server/Plugins/PuppetENC.py @@ -127,7 +127,7 @@ class PuppetENC(Bcfg2.Server.Plugin.Plugin, self.logger.warning("PuppetENC is incompatible with aggressive " "client metadata caching, try 'cautious' or " "'initial' instead") - self.core.cache.expire() + self.core.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata) def end_statistics(self, metadata): self.end_client_run(self, metadata) diff --git a/src/lib/Bcfg2/Server/Plugins/SSHbase.py b/src/lib/Bcfg2/Server/Plugins/SSHbase.py index d8b3104b7..2deea5f07 100644 --- a/src/lib/Bcfg2/Server/Plugins/SSHbase.py +++ b/src/lib/Bcfg2/Server/Plugins/SSHbase.py @@ -92,6 +92,7 @@ class KnownHostsEntrySet(Bcfg2.Server.Plugin.EntrySet): class SSHbase(Bcfg2.Server.Plugin.Plugin, + Bcfg2.Server.Plugin.Caching, Bcfg2.Server.Plugin.Generator, Bcfg2.Server.Plugin.PullTarget): """ @@ -125,6 +126,7 @@ class SSHbase(Bcfg2.Server.Plugin.Plugin, def __init__(self, core, datastore): Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore) + Bcfg2.Server.Plugin.Caching.__init__(self) Bcfg2.Server.Plugin.Generator.__init__(self) Bcfg2.Server.Plugin.PullTarget.__init__(self) self.ipcache = {} @@ -149,6 +151,9 @@ class SSHbase(Bcfg2.Server.Plugin.Plugin, HostKeyEntrySet(keypattern, self.data) self.Entries['Path']["/etc/ssh/" + keypattern] = self.build_hk + def expire_cache(self, key=None): + self.__skn = False + def get_skn(self): """Build memory cache of the ssh known hosts file.""" if not self.__skn: diff --git a/src/sbin/bcfg2-info b/src/sbin/bcfg2-info index 451d8e49c..6008f8896 100755 --- a/src/sbin/bcfg2-info +++ b/src/sbin/bcfg2-info @@ -488,9 +488,10 @@ Bcfg2 client itself.""") alist = args.split() if len(alist): for client in self._get_client_list(alist): - self.metadata_cache.expire(client) + self.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata, + key=client) else: - self.metadata_cache.expire() + self.expire_caches_by_type(Bcfg2.Server.Plugin.Metadata) def do_probes(self, args): """ probes [-p] - Get probe list for the given -- cgit v1.2.3-1-g7c22 From 4b09a72355d6fea244ad6b8dcfb2fd151a5ada6b Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 7 Aug 2013 13:22:14 -0400 Subject: MultiprocessingCore: added a way to dispatch RMI calls to child processes --- src/lib/Bcfg2/Server/Core.py | 6 +- src/lib/Bcfg2/Server/MultiprocessingCore.py | 76 ++++++++++++++++++++--- src/lib/Bcfg2/Server/Plugin/base.py | 18 ++++++ src/lib/Bcfg2/Server/Plugins/Guppy.py | 1 + src/lib/Bcfg2/Server/Plugins/Packages/__init__.py | 3 + 5 files changed, 90 insertions(+), 14 deletions(-) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/Core.py b/src/lib/Bcfg2/Server/Core.py index b577f65e1..6357baae4 100644 --- a/src/lib/Bcfg2/Server/Core.py +++ b/src/lib/Bcfg2/Server/Core.py @@ -976,12 +976,10 @@ class BaseCore(object): def _get_rmi(self): """ Get a list of RMI calls exposed by plugins """ rmi = dict() - for pname, pinst in list(self.plugins.items()): + for pname, pinst in self.plugins.items() + \ + [(self.fam.__class__.__name__, self.fam)]: for mname in pinst.__rmi__: rmi["%s.%s" % (pname, mname)] = getattr(pinst, mname) - famname = self.fam.__class__.__name__ - for mname in self.fam.__rmi__: - rmi["%s.%s" % (famname, mname)] = getattr(self.fam, mname) return rmi def _resolve_exposed_method(self, method_name): diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index c185a5893..e79207291 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -18,7 +18,7 @@ import multiprocessing import Bcfg2.Server.Plugin from itertools import cycle from Bcfg2.Cache import Cache -from Bcfg2.Compat import Queue, Empty +from Bcfg2.Compat import Queue, Empty, wraps from Bcfg2.Server.Core import BaseCore, exposed from Bcfg2.Server.BuiltinCore import Core as BuiltinCore from multiprocessing.connection import Listener, Client @@ -29,7 +29,7 @@ class DispatchingCache(Cache, Bcfg2.Server.Plugin.Debuggable): cache expiration events to child nodes. """ #: The method to send over the pipe to expire the cache - method = "expire_cache" + method = "expire_metadata_cache" def __init__(self, *args, **kwargs): self.rpc_q = kwargs.pop("queue") @@ -221,6 +221,8 @@ class ChildCore(BaseCore): # ensure that the child doesn't start a perflog thread self.perflog_thread = None + self._rmi = dict() + def _run(self): return True @@ -238,25 +240,34 @@ class ChildCore(BaseCore): self.logger.debug("Connecting to parent via %s" % address) client = Client(address) method, args, kwargs = data + func = None rv = None - if not hasattr(self, method): + if "." in method: + if method in self._rmi: + func = self._rmi[method] + else: + self.logger.error("%s: Method %s does not exist" % (self.name, + method)) + elif not hasattr(self, method): self.logger.error("%s: Method %s does not exist" % (self.name, method)) - else: + else: # method is not a plugin RMI, and exists func = getattr(self, method) - if func.exposed: - self.logger.debug("%s: Calling RPC method %s" % (self.name, - method)) - rv = func(*args, **kwargs) - else: + if not func.exposed: self.logger.error("%s: Method %s is not exposed" % (self.name, method)) + func = None + if func is not None: + self.logger.debug("%s: Calling RPC method %s" % (self.name, + method)) + rv = func(*args, **kwargs) if address is not None: # if the key is None, then no response is expected self.logger.debug("Returning data to parent via %s" % address) client.send(rv) def _block(self): + self._rmi = self._get_rmi() while not self.terminate.is_set(): try: address, data = self.rpc_q.get(timeout=self.poll_wait) @@ -285,8 +296,20 @@ class ChildCore(BaseCore): time.sleep(1) self.logger.info("%s: All threads stopped" % self.name) + def _get_rmi(self): + rmi = dict() + for pname, pinst in self.plugins.items() + \ + [(self.fam.__class__.__name__, self.fam)]: + for crmi in pinst.__child_rmi__: + if isinstance(crmi, tuple): + mname = crmi[1] + else: + mname = crmi + rmi["%s.%s" % (pname, mname)] = getattr(pinst, mname) + return rmi + @exposed - def expire_cache(self, client=None): + def expire_metadata_cache(self, client=None): """ Expire the metadata cache for a client """ self.metadata_cache.expire(client) @@ -388,6 +411,39 @@ class Core(BuiltinCore): time.sleep(1) self.logger.info("Shutdown complete") + def _get_rmi(self): + child_rmi = dict() + for pname, pinst in self.plugins.items() + \ + [(self.fam.__class__.__name__, self.fam)]: + for crmi in pinst.__child_rmi__: + if isinstance(crmi, tuple): + parentname, childname = crmi + else: + parentname = childname = crmi + child_rmi["%s.%s" % (pname, parentname)] = \ + "%s.%s" % (pname, childname) + + rmi = BuiltinCore._get_rmi(self) + for method in rmi.keys(): + if method in child_rmi: + rmi[method] = self._child_rmi_wrapper(method, + rmi[method], + child_rmi[method]) + return rmi + + def _child_rmi_wrapper(self, method, parent_rmi, child_rmi): + """ Returns a callable that dispatches a call to the given + child RMI to child processes, and calls the parent RMI locally + (i.e., in the parent process). """ + @wraps(parent_rmi) + def inner(*args, **kwargs): + self.logger.debug("Dispatching RMI call to %s to children: %s" % + (method, child_rmi)) + self.rpc_q.publish(child_rmi, args=args, kwargs=kwargs) + return parent_rmi(*args, **kwargs) + + return inner + @exposed def set_debug(self, address, debug): self.rpc_q.set_debug(debug) diff --git a/src/lib/Bcfg2/Server/Plugin/base.py b/src/lib/Bcfg2/Server/Plugin/base.py index 244c73aa8..03feceb6f 100644 --- a/src/lib/Bcfg2/Server/Plugin/base.py +++ b/src/lib/Bcfg2/Server/Plugin/base.py @@ -12,6 +12,10 @@ class Debuggable(object): #: List of names of methods to be exposed as XML-RPC functions __rmi__ = ['toggle_debug', 'set_debug'] + #: How exposed XML-RPC functions should be dispatched to child + #: processes. + __child_rmi__ = __rmi__[:] + def __init__(self, name=None): """ :param name: The name of the logger object to get. If none is @@ -91,6 +95,20 @@ class Plugin(Debuggable): #: List of names of methods to be exposed as XML-RPC functions __rmi__ = Debuggable.__rmi__ + #: How exposed XML-RPC functions should be dispatched to child + #: processes, if :mod:`Bcfg2.Server.MultiprocessingCore` is in + #: use. Items ``__child_rmi__`` can either be strings (in which + #: case the same function is called on child processes as on the + #: parent) or 2-tuples, in which case the first element is the + #: name of the RPC function called on the parent process, and the + #: second element is the name of the function to call on child + #: processes. Functions that are not listed in the list will not + #: be dispatched to child processes, i.e., they will only be + #: called on the parent. A function must be listed in ``__rmi__`` + #: in order to be exposed; functions listed in ``_child_rmi__`` + #: but not ``__rmi__`` will be ignored. + __child_rmi__ = Debuggable.__child_rmi__ + def __init__(self, core, datastore): """ :param core: The Bcfg2.Server.Core initializing the plugin diff --git a/src/lib/Bcfg2/Server/Plugins/Guppy.py b/src/lib/Bcfg2/Server/Plugins/Guppy.py index 4f2601f15..3c9b8a459 100644 --- a/src/lib/Bcfg2/Server/Plugins/Guppy.py +++ b/src/lib/Bcfg2/Server/Plugins/Guppy.py @@ -37,6 +37,7 @@ class Guppy(Bcfg2.Server.Plugin.Plugin): experimental = True __rmi__ = Bcfg2.Server.Plugin.Plugin.__rmi__ + ['Enable', 'Disable'] + __child_rmi__ = __rmi__[:] def __init__(self, core, datastore): Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore) diff --git a/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py b/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py index 3e4fb33ec..4096153a3 100644 --- a/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py +++ b/src/lib/Bcfg2/Server/Plugins/Packages/__init__.py @@ -93,6 +93,9 @@ class Packages(Bcfg2.Server.Plugin.Plugin, #: and :func:`Reload` __rmi__ = Bcfg2.Server.Plugin.Plugin.__rmi__ + ['Refresh', 'Reload'] + __child_rmi__ = Bcfg2.Server.Plugin.Plugin.__child_rmi__ + \ + [('Refresh', 'expire_cache'), ('Reload', 'expire_cache')] + def __init__(self, core, datastore): Bcfg2.Server.Plugin.Plugin.__init__(self, core, datastore) Bcfg2.Server.Plugin.Caching.__init__(self) -- cgit v1.2.3-1-g7c22 From eef441c1acdf1d3d483647b153f721cbab4a8517 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 7 Aug 2013 14:20:55 -0400 Subject: Plugin: added missing docstring --- src/lib/Bcfg2/Server/Plugin/interfaces.py | 11 +++++++++++ 1 file changed, 11 insertions(+) (limited to 'src') diff --git a/src/lib/Bcfg2/Server/Plugin/interfaces.py b/src/lib/Bcfg2/Server/Plugin/interfaces.py index 2dbf75f42..376030792 100644 --- a/src/lib/Bcfg2/Server/Plugin/interfaces.py +++ b/src/lib/Bcfg2/Server/Plugin/interfaces.py @@ -605,4 +605,15 @@ class Caching(object): FAM. This presents a unified interface to clear the cache. """ def expire_cache(self, key=None): + """ Expire the cache associated with the given key. + + :param key: The key to expire the cache for. Because cache + implementations vary tremendously between plugins, + this could be any number of things, but generally + a hostname. It also may or may not be possible to + expire the cache for a single host; this interface + does not require any guarantee about that. + :type key: varies + :returns: None + """ raise NotImplementedError -- cgit v1.2.3-1-g7c22