summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Server/MultiprocessingCore.py
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2013-07-31 14:10:05 -0400
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2013-07-31 15:42:29 -0400
commitaca1228a808990644d239b2e4c4bc06dfc5ab955 (patch)
tree972c4320ca51bf34e78a13c041fcc1290f37c11c /src/lib/Bcfg2/Server/MultiprocessingCore.py
parent604b5d2998ce4d93dee3945cc458feada3602d44 (diff)
downloadbcfg2-aca1228a808990644d239b2e4c4bc06dfc5ab955.tar.gz
bcfg2-aca1228a808990644d239b2e4c4bc06dfc5ab955.tar.bz2
bcfg2-aca1228a808990644d239b2e4c4bc06dfc5ab955.zip
MultiprocessingCore: dispatch "bcfg2-admin perf" calls to children
Diffstat (limited to 'src/lib/Bcfg2/Server/MultiprocessingCore.py')
-rw-r--r--src/lib/Bcfg2/Server/MultiprocessingCore.py38
1 files changed, 35 insertions, 3 deletions
diff --git a/src/lib/Bcfg2/Server/MultiprocessingCore.py b/src/lib/Bcfg2/Server/MultiprocessingCore.py
index 03394edf9..ce3343808 100644
--- a/src/lib/Bcfg2/Server/MultiprocessingCore.py
+++ b/src/lib/Bcfg2/Server/MultiprocessingCore.py
@@ -29,8 +29,8 @@ from itertools import cycle
from Bcfg2.Cache import Cache
from Bcfg2.Compat import Queue, Empty
from Bcfg2.Server.Core import BaseCore, exposed
-from Bcfg2.Server.Plugin import Debuggable
from Bcfg2.Server.BuiltinCore import Core as BuiltinCore
+from Bcfg2.Server.Plugin import Debuggable, track_statistics
class DispatchingCache(Cache, Debuggable):
@@ -144,8 +144,7 @@ class ThreadSafePipeDispatcher(Debuggable):
self.logger.debug("Starting interprocess RPC send thread")
while not self.terminate.isSet():
try:
- data = self._send_queue.get(True, self.poll_wait)
- self._mainpipe.send(data)
+ self._mainpipe.send(self._send_queue.get(True, self.poll_wait))
except Empty:
pass
self.logger.info("Interprocess RPC send thread stopped")
@@ -331,6 +330,7 @@ class ChildCore(BaseCore):
key, data = self.rpc_pipe.recv()
self.rpc_pipe.send((key, self._dispatch(data)))
+ @track_statistics()
def _reap_threads(self):
""" Reap rendering threads that have completed """
for thread in self._threads[:]:
@@ -512,3 +512,35 @@ class Core(BuiltinCore):
pipe = self.pipes[childname]
pipe.send(key, ("GetConfig", [client], dict()))
return pipe.recv(key)
+
+ @exposed
+ def get_statistics(self, address):
+ stats = dict()
+
+ def _aggregate_statistics(newstats, prefix=None):
+ for statname, vals in newstats.items():
+ if statname.startswith("ChildCore:"):
+ statname = statname[5:]
+ if prefix:
+ prettyname = "%s:%s" % (prefix, statname)
+ else:
+ prettyname = statname
+ stats[prettyname] = vals
+ totalname = "Total:%s" % statname
+ if totalname not in stats:
+ stats[totalname] = vals
+ else:
+ newmin = min(stats[totalname][0], vals[0])
+ newmax = max(stats[totalname][1], vals[1])
+ newcount = stats[totalname][3] + vals[3]
+ newmean = ((stats[totalname][2] * stats[totalname][3]) +
+ (vals[2] * vals[3])) / newcount
+ stats[totalname] = (newmin, newmax, newmean, newcount)
+
+ key = ThreadSafePipeDispatcher.genkey("get_statistics")
+ stats = dict()
+ for childname, pipe in self.pipes.items():
+ pipe.send(key, ("get_statistics", [address], dict()))
+ _aggregate_statistics(pipe.recv(key), prefix=childname)
+ _aggregate_statistics(BuiltinCore.get_statistics(self, address))
+ return stats