summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Reporting/Transport/DirectStore.py
blob: b9d17212edd72fe49d790fdf5f06e8bcdcc8c820 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
""" Reporting Transport that stores statistics data directly in the
storage backend """

import os
import sys
import time
import threading
import Bcfg2.Options
from Bcfg2.Reporting.Transport.base import TransportBase, TransportError
from Bcfg2.Compat import Queue, Full, Empty, cPickle


class DirectStore(TransportBase, threading.Thread):
    options = TransportBase.options + [Bcfg2.Options.Common.reporting_storage]

    def __init__(self):
        TransportBase.__init__(self)
        threading.Thread.__init__(self)
        self.save_file = os.path.join(self.data, ".saved")

        self.storage = Bcfg2.Options.setup.reporting_storage()
        self.storage.validate()

        self.queue = Queue(100000)
        self.terminate = threading.Event()
        self.debug_log("Reporting: Starting %s thread" %
                       self.__class__.__name__)
        self.start()

    def shutdown(self):
        self.terminate.set()

    def store(self, hostname, metadata, stats):
        try:
            self.queue.put_nowait(dict(hostname=hostname,
                                       metadata=metadata,
                                       stats=stats))
        except Full:
            self.logger.warning("Reporting: Queue is full, "
                                "dropping statistics")

    def run(self):
        if not self._load():
            self.logger.warning("Reporting: Failed to load saved data, "
                                "DirectStore thread exiting")
            return
        while not self.terminate.isSet() and self.queue is not None:
            try:
                interaction = self.queue.get(block=True,
                                             timeout=self.timeout)
                start = time.time()
                self.storage.import_interaction(interaction)
                self.logger.info("Imported data for %s in %s seconds" %
                                 (interaction.get('hostname', '<unknown>'),
                                  time.time() - start))
            except Empty:
                self.debug_log("Reporting: Queue is empty")
                continue
            except:
                err = sys.exc_info()[1]
                self.logger.error("Reporting: Could not import interaction: %s"
                                  % err)
                continue
        self.debug_log("Reporting: Stopping %s thread" %
                       self.__class__.__name__)
        if self.queue is not None and not self.queue.empty():
            self._save()

    def fetch(self):
        """ no collector is necessary with this backend """
        pass

    def start_monitor(self, collector):
        """ no collector is necessary with this backend """
        pass

    def rpc(self, method, *args, **kwargs):
        try:
            return getattr(self.storage, method)(*args, **kwargs)
        except:  # pylint: disable=W0702
            msg = "Reporting: RPC method %s failed: %s" % (method,
                                                           sys.exc_info()[1])
            self.logger.error(msg)
            raise TransportError(msg)

    def _save(self):
        """ Save any saved data to a file """
        self.debug_log("Reporting: Saving pending data to %s" %
                       self.save_file)
        saved_data = []
        try:
            while not self.queue.empty():
                saved_data.append(self.queue.get_nowait())
        except Empty:
            pass

        try:
            savefile = open(self.save_file, 'w')
            cPickle.dump(saved_data, savefile)
            savefile.close()
            self.logger.info("Saved pending Reporting data")
        except (IOError, TypeError):
            err = sys.exc_info()[1]
            self.logger.warning("Failed to save pending data: %s" % err)

    def _load(self):
        """ Load any saved data from a file """
        if not os.path.exists(self.save_file):
            self.debug_log("Reporting: No saved data to load")
            return True
        saved_data = []
        try:
            savefile = open(self.save_file, 'r')
            saved_data = cPickle.load(savefile)
            savefile.close()
        except (IOError, cPickle.UnpicklingError):
            err = sys.exc_info()[1]
            self.logger.warning("Failed to load saved data: %s" % err)
            return False
        for interaction in saved_data:
            # check that shutdown wasnt called early
            if self.terminate.isSet():
                self.logger.warning("Reporting: Shutdown called while loading "
                                    " saved data")
                return False

            try:
                self.queue.put_nowait(interaction)
            except Full:
                self.logger.warning("Reporting: Queue is full, failed to "
                                    "load saved interaction data")
                break
        try:
            os.unlink(self.save_file)
        except OSError:
            self.logger.error("Reporting: Failed to unlink save file: %s" %
                              self.save_file)
        self.logger.info("Reporting: Loaded saved interaction data")
        return True