summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Reporting/Collector.py
blob: df82248d06a8695ecb9eb5cab99a661937a24801 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
import atexit
import daemon
import logging
import time
import traceback
import threading

# pylint: disable=E0611
try:
    from lockfile.pidlockfile import PIDLockFile
    from lockfile import Error as PIDFileError
except ImportError:
    from daemon.pidlockfile import PIDLockFile, PIDFileError
# pylint: enable=E0611

import Bcfg2.Logger
from Bcfg2.Reporting.Transport import load_transport_from_config, \
    TransportError, TransportImportError
from Bcfg2.Reporting.Transport.DirectStore import DirectStore
from Bcfg2.Reporting.Storage import load_storage_from_config, \
    StorageError, StorageImportError

class ReportingError(Exception):
    """Generic reporting exception"""
    pass

class ReportingCollector(object):
    """The collecting process for reports"""

    def __init__(self, setup):
        """Setup the collector.  This may be called by the daemon or though 
        bcfg2-admin"""
        self.setup = setup
        self.datastore = setup['repo']
        self.encoding = setup['encoding']
        self.terminate = None
        self.context = None

        if setup['debug']:
            level = logging.DEBUG
        elif setup['verbose']:
            level = logging.INFO
        else:
            level = logging.WARNING

        Bcfg2.Logger.setup_logging('bcfg2-report-collector',
                                   to_console=logging.INFO,
                                   to_syslog=setup['syslog'],
                                   to_file=setup['logging'],
                                   level=level)
        self.logger = logging.getLogger('bcfg2-report-collector')

        try:
            self.transport = load_transport_from_config(setup)
            self.storage = load_storage_from_config(setup)
        except TransportError:
            self.logger.error("Failed to load transport: %s" %
                traceback.format_exc().splitlines()[-1])
            raise ReportingError
        except StorageError:
            self.logger.error("Failed to load storage: %s" %
                traceback.format_exc().splitlines()[-1])
            raise ReportingError

        if isinstance(self.transport, DirectStore):
            self.logger.error("DirectStore cannot be used with the collector. "
                              "Use LocalFilesystem instead")
            self.shutdown()
            raise ReportingError

        try:
            self.logger.debug("Validating storage %s" % 
                self.storage.__class__.__name__)
            self.storage.validate()
        except:
            self.logger.error("Storage backed %s failed to validate: %s" %
                (self.storage.__class__.__name__, 
                    traceback.format_exc().splitlines()[-1]))


    def run(self):
        """Startup the processing and go!"""
        self.terminate = threading.Event()
        atexit.register(self.shutdown)
        self.context = daemon.DaemonContext()

        if self.setup['daemon']:
            self.logger.debug("Daemonizing")
            try:
                self.context.pidfile = PIDLockFile(self.setup['daemon'])
                self.context.open()
            except PIDFileError:
                self.logger.error("Error writing pid file: %s" %
                    traceback.format_exc().splitlines()[-1])
                self.shutdown()
                return
            self.logger.info("Starting daemon")

        self.transport.start_monitor(self)

        while not self.terminate.isSet():
            try:
                interaction = self.transport.fetch()
                if not interaction:
                    continue
                try:
                    start = time.time()
                    self.storage.import_interaction(interaction)
                    self.logger.info("Imported interaction for %s in %ss" %
                        (interaction.get('hostname', '<unknown>'),
                            time.time() - start))
                except:
                    #TODO requeue?
                    raise
            except (SystemExit, KeyboardInterrupt):
                self.logger.info("Shutting down")
                self.shutdown()
            except:
                self.logger.error("Unhandled exception in main loop %s" %
                    traceback.format_exc().splitlines()[-1])

    def shutdown(self):
        """Cleanup and go"""
        if self.terminate:
            # this wil be missing if called from bcfg2-admin
            self.terminate.set()
        if self.transport:
            self.transport.shutdown()
        if self.storage:
            self.storage.shutdown()