1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
|
import atexit
import daemon
import logging
import time
import traceback
import threading
# pylint: disable=E0611
try:
from lockfile.pidlockfile import PIDLockFile
from lockfile import Error as PIDFileError
except ImportError:
from daemon.pidlockfile import PIDLockFile, PIDFileError
# pylint: enable=E0611
import Bcfg2.Logger
from Bcfg2.Reporting.Transport import load_transport_from_config, \
TransportError, TransportImportError
from Bcfg2.Reporting.Transport.DirectStore import DirectStore
from Bcfg2.Reporting.Storage import load_storage_from_config, \
StorageError, StorageImportError
class ReportingError(Exception):
"""Generic reporting exception"""
pass
class ReportingCollector(object):
"""The collecting process for reports"""
def __init__(self, setup):
"""Setup the collector. This may be called by the daemon or though
bcfg2-admin"""
self.setup = setup
self.datastore = setup['repo']
self.encoding = setup['encoding']
self.terminate = None
self.context = None
if setup['debug']:
level = logging.DEBUG
elif setup['verbose']:
level = logging.INFO
else:
level = logging.WARNING
Bcfg2.Logger.setup_logging('bcfg2-report-collector',
to_console=logging.INFO,
to_syslog=setup['syslog'],
to_file=setup['logging'],
level=level)
self.logger = logging.getLogger('bcfg2-report-collector')
try:
self.transport = load_transport_from_config(setup)
self.storage = load_storage_from_config(setup)
except TransportError:
self.logger.error("Failed to load transport: %s" %
traceback.format_exc().splitlines()[-1])
raise ReportingError
except StorageError:
self.logger.error("Failed to load storage: %s" %
traceback.format_exc().splitlines()[-1])
raise ReportingError
if isinstance(self.transport, DirectStore):
self.logger.error("DirectStore cannot be used with the collector. "
"Use LocalFilesystem instead")
self.shutdown()
raise ReportingError
try:
self.logger.debug("Validating storage %s" %
self.storage.__class__.__name__)
self.storage.validate()
except:
self.logger.error("Storage backed %s failed to validate: %s" %
(self.storage.__class__.__name__,
traceback.format_exc().splitlines()[-1]))
def run(self):
"""Startup the processing and go!"""
self.terminate = threading.Event()
atexit.register(self.shutdown)
self.context = daemon.DaemonContext()
if self.setup['daemon']:
self.logger.debug("Daemonizing")
try:
self.context.pidfile = PIDLockFile(self.setup['daemon'])
self.context.open()
except PIDFileError:
self.logger.error("Error writing pid file: %s" %
traceback.format_exc().splitlines()[-1])
self.shutdown()
return
self.logger.info("Starting daemon")
self.transport.start_monitor(self)
while not self.terminate.isSet():
try:
interaction = self.transport.fetch()
if not interaction:
continue
try:
start = time.time()
self.storage.import_interaction(interaction)
self.logger.info("Imported interaction for %s in %ss" %
(interaction.get('hostname', '<unknown>'),
time.time() - start))
except:
#TODO requeue?
raise
except (SystemExit, KeyboardInterrupt):
self.logger.info("Shutting down")
self.shutdown()
except:
self.logger.error("Unhandled exception in main loop %s" %
traceback.format_exc().splitlines()[-1])
def shutdown(self):
"""Cleanup and go"""
if self.terminate:
# this wil be missing if called from bcfg2-admin
self.terminate.set()
if self.transport:
self.transport.shutdown()
if self.storage:
self.storage.shutdown()
|