From 3417435d20e0ec7002ee7fd7a33e5efbec2bc0cf Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Tue, 4 Sep 2012 16:30:13 -0400 Subject: Core: fixed threading issues preventing successful daemonization of builtin core --- src/lib/Bcfg2/Server/BuiltinCore.py | 39 +++++++++++++--------- src/lib/Bcfg2/Server/Core.py | 50 +++++++++++++--------------- src/lib/Bcfg2/Server/FileMonitor/Inotify.py | 27 ++++++++++++--- src/lib/Bcfg2/Server/FileMonitor/__init__.py | 5 +++ 4 files changed, 75 insertions(+), 46 deletions(-) (limited to 'src/lib') diff --git a/src/lib/Bcfg2/Server/BuiltinCore.py b/src/lib/Bcfg2/Server/BuiltinCore.py index d15c4eebc..df25a24a8 100644 --- a/src/lib/Bcfg2/Server/BuiltinCore.py +++ b/src/lib/Bcfg2/Server/BuiltinCore.py @@ -4,6 +4,7 @@ import os import sys import time import socket +import daemon import logging from Bcfg2.Server.Core import BaseCore from Bcfg2.Compat import xmlrpclib, urlparse @@ -17,6 +18,11 @@ class NoExposedMethod (Exception): class Core(BaseCore): name = 'bcfg2-server' + + def __init__(self, setup, start_fam_thread=False): + BaseCore.__init__(self, setup, start_fam_thread=start_fam_thread) + self.server = None + self.context = daemon.DaemonContext() def _resolve_exposed_method(self, method_name): """Resolve an exposed method. @@ -65,34 +71,37 @@ class Core(BaseCore): raise xmlrpclib.Fault(getattr(e, "fault_code", 1), str(e)) return result - def run(self): - if self.setup['daemon']: - self._daemonize() - + def _daemonize(self): + self.context.open() + self.logger.info("%s daemonized" % self.name) + + def _run(self): hostname, port = urlparse(self.setup['location'])[1].split(':') server_address = socket.getaddrinfo(hostname, port, socket.AF_UNSPEC, socket.SOCK_STREAM)[0][4] try: - server = XMLRPCServer(self.setup['listen_all'], - server_address, - keyfile=self.setup['key'], - certfile=self.setup['cert'], - register=False, - timeout=1, - ca=self.setup['ca'], - protocol=self.setup['protocol']) + self.server = XMLRPCServer(self.setup['listen_all'], + server_address, + keyfile=self.setup['key'], + certfile=self.setup['cert'], + register=False, + timeout=1, + ca=self.setup['ca'], + protocol=self.setup['protocol']) except: err = sys.exc_info()[1] self.logger.error("Server startup failed: %s" % err) os._exit(1) - server.register_instance(self) + self.server.register_instance(self) + def _block(self): try: - server.serve_forever() + self.server.serve_forever() finally: - server.server_close() + self.server.server_close() + self.context.close() self.shutdown() def methodHelp(self, method_name): diff --git a/src/lib/Bcfg2/Server/Core.py b/src/lib/Bcfg2/Server/Core.py index e7ff2552a..8b9cd4916 100644 --- a/src/lib/Bcfg2/Server/Core.py +++ b/src/lib/Bcfg2/Server/Core.py @@ -192,10 +192,7 @@ class BaseCore(object): threading.Thread(name="%sFAMThread" % setup['filemonitor'], target=self._file_monitor_thread) self.lock = threading.Lock() - - if start_fam_thread: - self.fam_thread.start() - self.fam.AddMonitor(self.cfile, self.setup) + self.start_fam_thread = start_fam_thread self.stats = Statistics() @@ -453,33 +450,34 @@ class BaseCore(object): (client, time.time() - start)) return config - def run(self, **kwargs): - """ run the server core """ - raise NotImplementedError + def run(self): + """ run the server core. note that it is the responsibility of + the server core implementation to call shutdown() """ + if self.setup['daemon']: + self._daemonize() + open(self.setup['daemon'], "w").write("%s\n" % os.getpid()) - def _daemonize(self): - child_pid = os.fork() - if child_pid != 0: - return + self._run() + + self.fam.start() + if self.start_fam_thread: + self.fam_thread.start() + self.fam.AddMonitor(self.cfile, self.setup) - os.setsid() + self._block() - child_pid = os.fork() - if child_pid != 0: - os._exit(0) - - redirect_file = open("/dev/null", "w+") - os.dup2(redirect_file.fileno(), sys.__stdin__.fileno()) - os.dup2(redirect_file.fileno(), sys.__stdout__.fileno()) - os.dup2(redirect_file.fileno(), sys.__stderr__.fileno()) + def _daemonize(self): + """ daemonize the server """ + raise NotImplementedError - os.chdir(os.sep) - - pidfile = open(self.setup['daemon'] or "/dev/null", "w") - pidfile.write("%s\n" % os.getpid()) - pidfile.close() + def _run(self): + """ start up the server; this method should return immediately """ + raise NotImplementedError - return os.getpid() + def _block(self): + """ enter the infinite loop. this method should not return + until the server is killed """ + raise NotImplementedError def critical_error(self, operation): """ this should be overridden by child classes """ diff --git a/src/lib/Bcfg2/Server/FileMonitor/Inotify.py b/src/lib/Bcfg2/Server/FileMonitor/Inotify.py index 32390c4eb..e4948ea8d 100644 --- a/src/lib/Bcfg2/Server/FileMonitor/Inotify.py +++ b/src/lib/Bcfg2/Server/FileMonitor/Inotify.py @@ -1,10 +1,10 @@ """ Inotify driver for file alteration events """ +import os +import sys import logging import operator -import os import pyinotify -import sys from Bcfg2.Compat import reduce from Bcfg2.Server.FileMonitor import Event from Bcfg2.Server.FileMonitor.Pseudo import Pseudo @@ -22,11 +22,22 @@ class Inotify(Pseudo, pyinotify.ProcessEvent): def __init__(self, ignore=None, debug=False): Pseudo.__init__(self, ignore=ignore, debug=debug) + self.event_filter = dict() + self.watches_by_path = dict() + # these are created in start() after the server is done forking + self.notifier = None + self.wm = None + self.started = False + self.add_q = [] + + def start(self): self.wm = pyinotify.WatchManager() self.notifier = pyinotify.ThreadedNotifier(self.wm, self) self.notifier.start() - self.event_filter = dict() - self.watches_by_path = dict() + self.started = True + for monitor in self.add_q: + self.AddMonitor(*monitor) + self.add_q = [] def fileno(self): return self.wm.get_fd() @@ -75,6 +86,11 @@ class Inotify(Pseudo, pyinotify.ProcessEvent): def AddMonitor(self, path, obj): # strip trailing slashes path = path.rstrip("/") + + if not self.started: + self.add_q.append((path, obj)) + return path + if not os.path.isdir(path): # inotify is a little wonky about watching files. for # instance, if you watch /tmp/foo, and then do 'mv @@ -123,4 +139,5 @@ class Inotify(Pseudo, pyinotify.ProcessEvent): return path def shutdown(self): - self.notifier.stop() + if self.started: + self.notifier.stop() diff --git a/src/lib/Bcfg2/Server/FileMonitor/__init__.py b/src/lib/Bcfg2/Server/FileMonitor/__init__.py index c490acc81..a9bf50ccf 100644 --- a/src/lib/Bcfg2/Server/FileMonitor/__init__.py +++ b/src/lib/Bcfg2/Server/FileMonitor/__init__.py @@ -44,6 +44,11 @@ class FileMonitor(object): def __repr__(self): return "%s (%s events, fd %s)" % (str(self), len(self.events), self.fileno) + def start(self): + """ start threads or anything else that needs to be done after + the server forks and daemonizes """ + pass + def debug_log(self, msg): if self.debug: logger.info(msg) -- cgit v1.2.3-1-g7c22