From aca1228a808990644d239b2e4c4bc06dfc5ab955 Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Wed, 31 Jul 2013 14:10:05 -0400 Subject: MultiprocessingCore: dispatch "bcfg2-admin perf" calls to children --- src/lib/Bcfg2/Server/MultiprocessingCore.py | 38 ++++++++++++++++++++++++++--- 1 file changed, 35 insertions(+), 3 deletions(-) (limited to 'src/lib') diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py index 03394edf9..ce3343808 100644 --- a/src/lib/Bcfg2/Server/MultiprocessingCore.py +++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py @@ -29,8 +29,8 @@ from itertools import cycle from Bcfg2.Cache import Cache from Bcfg2.Compat import Queue, Empty from Bcfg2.Server.Core import BaseCore, exposed -from Bcfg2.Server.Plugin import Debuggable from Bcfg2.Server.BuiltinCore import Core as BuiltinCore +from Bcfg2.Server.Plugin import Debuggable, track_statistics class DispatchingCache(Cache, Debuggable): @@ -144,8 +144,7 @@ class ThreadSafePipeDispatcher(Debuggable): self.logger.debug("Starting interprocess RPC send thread") while not self.terminate.isSet(): try: - data = self._send_queue.get(True, self.poll_wait) - self._mainpipe.send(data) + self._mainpipe.send(self._send_queue.get(True, self.poll_wait)) except Empty: pass self.logger.info("Interprocess RPC send thread stopped") @@ -331,6 +330,7 @@ class ChildCore(BaseCore): key, data = self.rpc_pipe.recv() self.rpc_pipe.send((key, self._dispatch(data))) + @track_statistics() def _reap_threads(self): """ Reap rendering threads that have completed """ for thread in self._threads[:]: @@ -512,3 +512,35 @@ class Core(BuiltinCore): pipe = self.pipes[childname] pipe.send(key, ("GetConfig", [client], dict())) return pipe.recv(key) + + @exposed + def get_statistics(self, address): + stats = dict() + + def _aggregate_statistics(newstats, prefix=None): + for statname, vals in newstats.items(): + if statname.startswith("ChildCore:"): + statname = statname[5:] + if prefix: + prettyname = "%s:%s" % (prefix, statname) + else: + prettyname = statname + stats[prettyname] = vals + totalname = "Total:%s" % statname + if totalname not in stats: + stats[totalname] = vals + else: + newmin = min(stats[totalname][0], vals[0]) + newmax = max(stats[totalname][1], vals[1]) + newcount = stats[totalname][3] + vals[3] + newmean = ((stats[totalname][2] * stats[totalname][3]) + + (vals[2] * vals[3])) / newcount + stats[totalname] = (newmin, newmax, newmean, newcount) + + key = ThreadSafePipeDispatcher.genkey("get_statistics") + stats = dict() + for childname, pipe in self.pipes.items(): + pipe.send(key, ("get_statistics", [address], dict())) + _aggregate_statistics(pipe.recv(key), prefix=childname) + _aggregate_statistics(BuiltinCore.get_statistics(self, address)) + return stats -- cgit v1.2.3-1-g7c22