1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
|
import atexit
import daemon
import logging
import time
import traceback
import threading
# pylint: disable=E0611
try:
from lockfile.pidlockfile import PIDLockFile
from lockfile import Error as PIDFileError
except ImportError:
from daemon.pidlockfile import PIDLockFile, PIDFileError
# pylint: enable=E0611
import Bcfg2.Logger
from Bcfg2.Reporting.Transport import load_transport_from_config, \
TransportError, TransportImportError
from Bcfg2.Reporting.Transport.DirectStore import DirectStore
from Bcfg2.Reporting.Storage import load_storage_from_config, \
StorageError, StorageImportError
class ReportingError(Exception):
"""Generic reporting exception"""
pass
class ReportingStoreThread(threading.Thread):
"""Thread for calling the storage backend"""
def __init__(self, interaction, storage, group=None, target=None,
name=None, args=(), kwargs=None):
"""Initialize the thread with a reference to the interaction
as well as the storage engine to use"""
threading.Thread.__init__(self, group, target, name, args,
kwargs or dict())
self.interaction = interaction
self.storage = storage
self.logger = logging.getLogger('bcfg2-report-collector')
def run(self):
"""Call the database storage procedure (aka import)"""
try:
start = time.time()
self.storage.import_interaction(self.interaction)
self.logger.info("Imported interaction for %s in %ss" %
(self.interaction.get('hostname', '<unknown>'),
time.time() - start))
except:
#TODO requeue?
self.logger.error("Unhandled exception in import thread %s" %
traceback.format_exc().splitlines()[-1])
class ReportingCollector(object):
"""The collecting process for reports"""
def __init__(self, setup):
"""Setup the collector. This may be called by the daemon or though
bcfg2-admin"""
self.setup = setup
self.datastore = setup['repo']
self.encoding = setup['encoding']
self.terminate = None
self.context = None
if setup['debug']:
level = logging.DEBUG
elif setup['verbose']:
level = logging.INFO
else:
level = logging.WARNING
Bcfg2.Logger.setup_logging('bcfg2-report-collector',
to_console=logging.INFO,
to_syslog=setup['syslog'],
to_file=setup['logging'],
level=level)
self.logger = logging.getLogger('bcfg2-report-collector')
try:
self.transport = load_transport_from_config(setup)
self.storage = load_storage_from_config(setup)
except TransportError:
self.logger.error("Failed to load transport: %s" %
traceback.format_exc().splitlines()[-1])
raise ReportingError
except StorageError:
self.logger.error("Failed to load storage: %s" %
traceback.format_exc().splitlines()[-1])
raise ReportingError
if isinstance(self.transport, DirectStore):
self.logger.error("DirectStore cannot be used with the collector. "
"Use LocalFilesystem instead")
self.shutdown()
raise ReportingError
try:
self.logger.debug("Validating storage %s" %
self.storage.__class__.__name__)
self.storage.validate()
except:
self.logger.error("Storage backed %s failed to validate: %s" %
(self.storage.__class__.__name__,
traceback.format_exc().splitlines()[-1]))
def run(self):
"""Startup the processing and go!"""
self.terminate = threading.Event()
atexit.register(self.shutdown)
self.context = daemon.DaemonContext(detach_process=True)
if self.setup['daemon']:
self.logger.debug("Daemonizing")
try:
self.context.pidfile = PIDLockFile(self.setup['daemon'])
self.context.open()
except PIDFileError:
self.logger.error("Error writing pid file: %s" %
traceback.format_exc().splitlines()[-1])
self.shutdown()
return
self.logger.info("Starting daemon")
self.transport.start_monitor(self)
while not self.terminate.isSet():
try:
interaction = self.transport.fetch()
if not interaction:
continue
store_thread = ReportingStoreThread(interaction, self.storage)
store_thread.start()
except (SystemExit, KeyboardInterrupt):
self.logger.info("Shutting down")
self.shutdown()
except:
self.logger.error("Unhandled exception in main loop %s" %
traceback.format_exc().splitlines()[-1])
def shutdown(self):
"""Cleanup and go"""
if self.terminate:
# this wil be missing if called from bcfg2-admin
self.terminate.set()
if self.transport:
try:
self.transport.shutdown()
except OSError:
pass
if self.storage:
self.storage.shutdown()
|