summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Server/MultiprocessingCore.py
blob: ce33438087fed9e6b23b004954c0943158a61173 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
""" The multiprocessing server core is a reimplementation of the
:mod:`Bcfg2.Server.BuiltinCore` that uses the Python
:mod:`multiprocessing` library to offload work to multiple child
processes.  As such, it requires Python 2.6+.

The parent communicates with the children over
:class:`multiprocessing.Pipe` objects that are wrapped in a
:class:`Bcfg2.Server.MultiprocessingCore.ThreadSafePipeDispatcher` to
make them thread-safe.  Each command passed over the Pipe should be in
the following format::

    (<method>, <args>, <kwargs>)

The parent can also communicate with children over a one-way
:class:`multiprocessing.Queue` object that is used for
publish-subscribe communications, i.e., most XML-RPC commands.
(Setting debug, e.g., doesn't require a response from the children.)

The method must be exposed by the child by decorating it with
:func:`Bcfg2.Server.Core.exposed`.
"""

import time
import threading
import lxml.etree
import multiprocessing
from uuid import uuid4
from itertools import cycle
from Bcfg2.Cache import Cache
from Bcfg2.Compat import Queue, Empty
from Bcfg2.Server.Core import BaseCore, exposed
from Bcfg2.Server.BuiltinCore import Core as BuiltinCore
from Bcfg2.Server.Plugin import Debuggable, track_statistics


class DispatchingCache(Cache, Debuggable):
    """ Implementation of :class:`Bcfg2.Cache.Cache` that propagates
    cache expiration events to child nodes. """

    #: The method to send over the pipe to expire the cache
    method = "expire_cache"

    def __init__(self, *args, **kwargs):
        self.cmd_q = kwargs.pop("queue")
        Debuggable.__init__(self)
        Cache.__init__(self, *args, **kwargs)

    def expire(self, key=None):
        self.cmd_q.put((self.method, [key], dict()))
        Cache.expire(self, key=key)


class PublishSubscribeQueue(object):
    """ An implementation of a :class:`multiprocessing.Queue` designed
    for publish-subscribe use patterns. I.e., a single node adds items
    to the queue, and every other node retrieves the item.  This is
    the 'publish' end; the subscribers can deal with this as a normal
    Queue with no special handling.

    Note that, since this is the publishing end, there's no support
    for getting.
    """

    def __init__(self):
        self._queues = []

    def add_subscriber(self):
        """ Add a subscriber to the queue.  This returns a
        :class:`multiprocessing.Queue` object that is used as the
        subscription end of the queue. """
        new_q = multiprocessing.Queue()
        self._queues.append(new_q)
        return new_q

    def put(self, obj, block=True, timeout=None):
        """ Put ``obj`` into the queue.  See
        :func:`multiprocessing.Queue.put` for more details."""
        for queue in self._queues:
            queue.put(obj, block=block, timeout=timeout)

    def put_nowait(self, obj):
        """ Equivalent to ``put(obj, False)``. """
        self.put(obj, block=False)

    def close(self):
        """ Close the queue.  See :func:`multiprocessing.Queue.close`
        for more details. """
        for queue in self._queues:
            queue.close()


class ThreadSafePipeDispatcher(Debuggable):
    """ This is a wrapper around :class:`multiprocessing.Pipe` objects
    that allows them to be used in multithreaded applications.  When
    performing a ``send()``, a key is included that will be used to
    identify the response.  As responses are received from the Pipe,
    they are added to a dict that is used to get the appropriate
    response for a given thread.

    The remote end of the Pipe must deal with the key being sent with
    the data in a tuple of ``(key, data)``, and it must include the
    key with its response.

    It is the responsibility of the user to ensure that the key is
    unique.

    Note that this adds a bottleneck -- all communication over the
    actual Pipe happens in a single thread.  But for our purposes,
    Pipe communication is fairly minimal and that's an acceptable
    bottleneck."""

    #: How long to wait while polling for new data to send.  This
    #: doesn't affect the speed with which data is sent, but
    #: setting it too high will result in longer shutdown times, since
    #: we only check for the termination event from the main process
    #: every ``poll_wait`` seconds.
    poll_wait = 2.0

    _sentinel = object()

    def __init__(self, terminate):
        Debuggable.__init__(self)

        #: The threading flag that is used to determine when the
        #: threads should stop.
        self.terminate = terminate

        #: The :class:`multiprocessing.Pipe` tuple used by this object
        self.pipe = multiprocessing.Pipe()

        self._mainpipe = self.pipe[0]
        self._recv_dict = dict()
        self._send_queue = Queue()

        self.send_thread = threading.Thread(name="PipeSendThread",
                                            target=self._send_thread)
        self.send_thread.start()
        self.recv_thread = threading.Thread(name="PipeRecvThread",
                                            target=self._recv_thread)
        self.recv_thread.start()

    def _send_thread(self):
        """ Run the single thread through which send requests are passed """
        self.logger.debug("Starting interprocess RPC send thread")
        while not self.terminate.isSet():
            try:
                self._mainpipe.send(self._send_queue.get(True, self.poll_wait))
            except Empty:
                pass
        self.logger.info("Interprocess RPC send thread stopped")

    def send(self, key, data):
        """ Send data with the given unique key """
        self._send_queue.put((key, data))

    def _recv_thread(self):
        """ Run the single thread through which recv requests are passed """
        self.logger.debug("Starting interprocess RPC receive thread")
        while not self.terminate.isSet():
            if self._mainpipe.poll(self.poll_wait):
                key, data = self._mainpipe.recv()
                if key in self._recv_dict:
                    self.logger.error("Duplicate key in received data: %s" %
                                      key)
                    self._mainpipe.close()
                self._recv_dict[key] = data
        self.logger.info("Interprocess RPC receive thread stopped")

    def recv(self, key):
        """ Receive data with the given unique key """
        self.poll(key, timeout=None)
        return self._recv_dict.pop(key)

    def poll(self, key, timeout=_sentinel):
        """ Poll for data with the given unique key.  See
        :func:`multiprocessing.Connection.poll` for the possible
        values of ``timeout``. """
        if timeout is self._sentinel:
            return key in self._recv_dict

        abort = threading.Event()

        if timeout is not None:
            timer = threading.Timer(float(timeout), abort.set)
            timer.start()
        try:
            while not abort.is_set():
                if key in self._recv_dict:
                    return True
            return False
        finally:
            if timeout is not None:
                timer.cancel()

    @staticmethod
    def genkey(base):
        """ Generate a key suitable for use with
        :class:`Bcfg2.Server.MultiprocessingCore.ThreadSafePipeDispatcher`
        send() requests, based on the given data.  The key is
        constructed from the string given, some information about this
        thread, and some random data. """
        thread = threading.current_thread()
        return "%s-%s-%s-%s" % (base, thread.name, thread.ident, uuid4())


class DualEvent(object):
    """ DualEvent is a clone of :class:`threading.Event` that
    internally implements both :class:`threading.Event` and
    :class:`multiprocessing.Event`. """

    def __init__(self, threading_event=None, multiprocessing_event=None):
        self._threading_event = threading_event or threading.Event()
        self._multiproc_event = multiprocessing_event or \
            multiprocessing.Event()
        if threading_event or multiprocessing_event:
            # initialize internal flag to false, regardless of the
            # state of either object passed in
            self.clear()

    def is_set(self):
        """ Return true if and only if the internal flag is true. """
        return self._threading_event.is_set()

    isSet = is_set

    def set(self):
        """ Set the internal flag to true. """
        self._threading_event.set()
        self._multiproc_event.set()

    def clear(self):
        """ Reset the internal flag to false. """
        self._threading_event.clear()
        self._multiproc_event.clear()

    def wait(self, timeout=None):
        """ Block until the internal flag is true, or until the
        optional timeout occurs. """
        return self._threading_event.wait(timeout=timeout)


class ChildCore(BaseCore):
    """ A child process for :class:`Bcfg2.MultiprocessingCore.Core`.
    This core builds configurations from a given
    :class:`multiprocessing.Pipe`.  Note that this is a full-fledged
    server core; the only input it gets from the parent process is the
    hostnames of clients to render.  All other state comes from the
    FAM. However, this core only is used to render configs; it doesn't
    handle anything else (authentication, probes, etc.) because those
    are all much faster.  There's no reason that it couldn't handle
    those, though, if the pipe communication "protocol" were made more
    robust. """

    #: How long to wait while polling for new RPC commands.  This
    #: doesn't affect the speed with which a command is processed, but
    #: setting it too high will result in longer shutdown times, since
    #: we only check for the termination event from the main process
    #: every ``poll_wait`` seconds.
    poll_wait = 3.0

    def __init__(self, name, setup, rpc_pipe, cmd_q, terminate):
        """
        :param name: The name of this child
        :type name: string
        :param setup: A Bcfg2 options dict
        :type setup: Bcfg2.Options.OptionParser
        :param rpc_pipe: The pipe used for RPC communication with the
                         parent process
        :type rpc_pipe: multiprocessing.Pipe
        :param cmd_q: The queue used for one-way pub-sub
                      communications from the parent process
        :type cmd_q: multiprocessing.Queue
        :param terminate: An event that flags ChildCore objects to shut
                          themselves down.
        :type terminate: multiprocessing.Event
        """
        BaseCore.__init__(self, setup)

        #: The name of this child
        self.name = name

        #: The pipe used for RPC communication with the parent
        self.rpc_pipe = rpc_pipe

        #: The queue used to receive pub-sub commands
        self.cmd_q = cmd_q

        #: The :class:`multiprocessing.Event` that will be monitored
        #: to determine when this child should shut down.
        self.terminate = terminate

        # a list of all rendering threads
        self._threads = []

        # the thread used to process publish-subscribe commands
        self._command_thread = threading.Thread(name="CommandThread",
                                                target=self._dispatch_commands)

        # override this setting so that the child doesn't try to write
        # the pidfile
        self.setup['daemon'] = False

        # ensure that the child doesn't start a perflog thread
        self.perflog_thread = None

    def _run(self):
        self._command_thread.start()
        return True

    def _daemonize(self):
        return True

    def _dispatch_commands(self):
        """ Dispatch commands received via the pub-sub queue interface
        """
        self.logger.debug("Starting %s RPC subscription thread" % self.name)
        while not self.terminate.is_set():
            try:
                data = self.cmd_q.get(True, self.poll_wait)
                self.logger.debug("%s: Processing asynchronous command: %s" %
                                  (self.name, data[0]))
                self._dispatch(data)
            except Empty:
                pass
        self.logger.info("%s RPC subscription thread stopped" % self.name)

    def _dispatch_render(self):
        """ Dispatch render requests received via the RPC pipe
        interface """
        key, data = self.rpc_pipe.recv()
        self.rpc_pipe.send((key, self._dispatch(data)))

    @track_statistics()
    def _reap_threads(self):
        """ Reap rendering threads that have completed """
        for thread in self._threads[:]:
            if not thread.is_alive():
                self._threads.remove(thread)

    def _dispatch(self, data):
        """ Generic method dispatcher used for commands received from
        either the pub-sub queue or the RPC pipe. """
        method, args, kwargs = data
        if not hasattr(self, method):
            self.logger.error("%s: Method %s does not exist" % (self.name,
                                                                method))
            return None

        func = getattr(self, method)
        if func.exposed:
            self.logger.debug("%s: Calling RPC method %s" % (self.name,
                                                             method))
            return func(*args, **kwargs)
        else:
            self.logger.error("%s: Method %s is not exposed" % (self.name,
                                                                method))
            return None

    def _block(self):
        while not self.terminate.isSet():
            try:
                if self.rpc_pipe.poll(self.poll_wait):
                    rpc_thread = threading.Thread(
                        name="Renderer%s" % len(self._threads),
                        target=self._dispatch_render)
                    self._threads.append(rpc_thread)
                    rpc_thread.start()
                self._reap_threads()
            except KeyboardInterrupt:
                break
        self.shutdown()

    def shutdown(self):
        BaseCore.shutdown(self)
        self._reap_threads()
        while len(threading.enumerate()) > 1:
            threads = [t for t in threading.enumerate()
                       if t != threading.current_thread()]
            self.logger.info("%s: Waiting for %d thread(s): %s" %
                             (self.name, len(threads),
                              [t.name for t in threads]))
            time.sleep(1)
            self._reap_threads()
        self.logger.info("%s: All threads stopped" % self.name)

    @exposed
    def set_debug(self, address, debug):
        BaseCore.set_debug(self, address, debug)

    @exposed
    def expire_cache(self, client=None):
        """ Expire the metadata cache for a client """
        self.metadata_cache.expire(client)

    @exposed
    def GetConfig(self, client):
        """ Render the configuration for a client """
        self.logger.debug("%s: Building configuration for %s" %
                          (self.name, client))
        return lxml.etree.tostring(self.BuildConfiguration(client))


class Core(BuiltinCore):
    """ A multiprocessing core that delegates building the actual
    client configurations to
    :class:`Bcfg2.Server.MultiprocessingCore.ChildCore` objects.  The
    parent process doesn't build any children itself; all calls to
    :func:`GetConfig` are delegated to children. All other calls are
    handled by the parent process. """

    #: How long to wait for a child process to shut down cleanly
    #: before it is terminated.
    shutdown_timeout = 10.0

    def __init__(self, setup):
        BuiltinCore.__init__(self, setup)
        if setup['children'] is None:
            setup['children'] = multiprocessing.cpu_count()

        #: A dict of child name ->
        #: :class:`Bcfg2.Server.MultiprocessingCore.ThreadSafePipeDispatcher`
        #: objects used to pass render requests to that child.  (The
        #: child is given the other end of the Pipe.)
        self.pipes = dict()

        #: A
        #: :class:`Bcfg2.Server.MultiprocessingCore.PublishSubscribeQueue`
        #: object used to publish commands to all children.
        self.cmd_q = PublishSubscribeQueue()

        #: The flag that indicates when to stop child threads and
        #: processes
        self.terminate = DualEvent(threading_event=self.terminate)

        self.metadata_cache = DispatchingCache(queue=self.cmd_q)

        #: A list of children that will be cycled through
        self._all_children = []

        #: An iterator that each child will be taken from in sequence,
        #: to provide a round-robin distribution of render requests
        self.children = None

    def _run(self):
        for cnum in range(self.setup['children']):
            name = "Child-%s" % cnum

            # create Pipe for render requests
            dispatcher = ThreadSafePipeDispatcher(self.terminate)
            self.pipes[name] = dispatcher

            self.logger.debug("Starting child %s" % name)
            childcore = ChildCore(name, self.setup, dispatcher.pipe[1],
                                  self.cmd_q.add_subscriber(), self.terminate)
            child = multiprocessing.Process(target=childcore.run, name=name)
            child.start()
            self.logger.debug("Child %s started with PID %s" % (name,
                                                                child.pid))
            self._all_children.append(name)
        self.logger.debug("Started %s children: %s" % (len(self._all_children),
                                                       self._all_children))
        self.children = cycle(self._all_children)
        return BuiltinCore._run(self)

    def shutdown(self):
        BuiltinCore.shutdown(self)
        self.logger.debug("Closing RPC command queues")
        self.cmd_q.close()

        def term_children():
            """ Terminate all remaining multiprocessing children. """
            for child in multiprocessing.active_children():
                self.logger.error("Waited %s seconds to shut down %s, "
                                  "terminating" % (self.shutdown_timeout,
                                                   child.name))
                child.terminate()

        timer = threading.Timer(self.shutdown_timeout, term_children)
        timer.start()
        while len(multiprocessing.active_children()):
            self.logger.info("Waiting for %s child(ren): %s" %
                             (len(multiprocessing.active_children()),
                              [c.name
                               for c in multiprocessing.active_children()]))
            time.sleep(1)
        timer.cancel()
        self.logger.info("All children shut down")

        while len(threading.enumerate()) > 1:
            threads = [t for t in threading.enumerate()
                       if t != threading.current_thread()]
            self.logger.info("Waiting for %s thread(s): %s" %
                             (len(threads), [t.name for t in threads]))
            time.sleep(1)
        self.logger.info("Shutdown complete")

    @exposed
    def set_debug(self, address, debug):
        self.cmd_q.put(("set_debug", [address, debug], dict()))
        self.metadata_cache.set_debug(debug)
        for pipe in self.pipes.values():
            pipe.set_debug(debug)
        return BuiltinCore.set_debug(self, address, debug)

    @exposed
    def GetConfig(self, address):
        client = self.resolve_client(address)[0]
        childname = self.children.next()
        self.logger.debug("Building configuration for %s on %s" % (client,
                                                                   childname))
        key = ThreadSafePipeDispatcher.genkey(client)
        pipe = self.pipes[childname]
        pipe.send(key, ("GetConfig", [client], dict()))
        return pipe.recv(key)

    @exposed
    def get_statistics(self, address):
        stats = dict()

        def _aggregate_statistics(newstats, prefix=None):
            for statname, vals in newstats.items():
                if statname.startswith("ChildCore:"):
                    statname = statname[5:]
                if prefix:
                    prettyname = "%s:%s" % (prefix, statname)
                else:
                    prettyname = statname
                stats[prettyname] = vals
                totalname = "Total:%s" % statname
                if totalname not in stats:
                    stats[totalname] = vals
                else:
                    newmin = min(stats[totalname][0], vals[0])
                    newmax = max(stats[totalname][1], vals[1])
                    newcount = stats[totalname][3] + vals[3]
                    newmean = ((stats[totalname][2] * stats[totalname][3]) +
                               (vals[2] * vals[3])) / newcount
                    stats[totalname] = (newmin, newmax, newmean, newcount)

        key = ThreadSafePipeDispatcher.genkey("get_statistics")
        stats = dict()
        for childname, pipe in self.pipes.items():
            pipe.send(key, ("get_statistics", [address], dict()))
            _aggregate_statistics(pipe.recv(key), prefix=childname)
        _aggregate_statistics(BuiltinCore.get_statistics(self, address))
        return stats