summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Reporting/Transport/LocalFilesystem.py
blob: 189967cb0de11c2e506082a46df446d43367a3af (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
"""
The local transport.  Stats are pickled and written to
<repo>/store/<hostname>-timestamp

Leans on FileMonitor to detect changes
"""

import os
import select
import time
import traceback
import Bcfg2.Options
import Bcfg2.Server.FileMonitor
from Bcfg2.Reporting.Collector import ReportingCollector, ReportingError
from Bcfg2.Reporting.Transport.base import TransportBase, TransportError
from Bcfg2.Compat import cPickle


class LocalFilesystem(TransportBase):
    options = TransportBase.options + [Bcfg2.Options.Common.filemonitor]

    def __init__(self):
        super(LocalFilesystem, self).__init__()

        self.work_path = "%s/work" % self.data
        self.debug_log("LocalFilesystem: work path %s" % self.work_path)
        self.fmon = None
        self._phony_collector = None

        #setup our local paths or die
        if not os.path.exists(self.work_path):
            try:
                os.makedirs(self.work_path)
            except:
                self.logger.error("%s: Unable to create storage: %s" %
                    (self.__class__.__name__,
                        traceback.format_exc().splitlines()[-1]))
                raise TransportError

    def set_debug(self, debug):
        rv = TransportBase.set_debug(self, debug)
        if self.fmon is not None:
            self.fmon.set_debug(debug)
        return rv

    def start_monitor(self, collector):
        """Start the file monitor.  Most of this comes from BaseCore"""
        try:
            self.fmon = Bcfg2.Server.FileMonitor.get_fam()
        except IOError:
            msg = "Failed to instantiate fam driver %s" % \
                Bcfg2.Options.setup.filemonitor
            self.logger.error(msg, exc_info=1)
            raise TransportError(msg)

        if self.debug_flag:
            self.fmon.set_debug(self.debug_flag)
        self.fmon.start()
        self.fmon.AddMonitor(self.work_path, self)

    def store(self, hostname, metadata, stats):
        """Store the file to disk"""

        try:
            payload = cPickle.dumps(dict(hostname=hostname,
                                         metadata=metadata,
                                         stats=stats))
        except:  # pylint: disable=W0702
            msg = "%s: Failed to build interaction object: %s" % \
                (self.__class__.__name__,
                 traceback.format_exc().splitlines()[-1])
            self.logger.error(msg)
            raise TransportError(msg)

        fname = "%s-%s" % (hostname, time.time())
        save_file = os.path.join(self.work_path, fname)
        tmp_file = os.path.join(self.work_path, "." + fname)
        if os.path.exists(save_file):
            self.logger.error("%s: Oops.. duplicate statistic in directory." %
                self.__class__.__name__)
            raise TransportError

        # using a tmpfile to hopefully avoid the file monitor from grabbing too
        # soon
        saved = open(tmp_file, 'wb')
        try:
            saved.write(payload)
        except IOError:
            self.logger.error("Failed to store interaction for %s: %s" %
                (hostname, traceback.format_exc().splitlines()[-1]))
            os.unlink(tmp_file)
        saved.close()
        os.rename(tmp_file, save_file)

    def fetch(self):
        """Fetch the next object"""
        event = None
        fmonfd = self.fmon.fileno()
        if self.fmon.pending():
            event = self.fmon.get_event()
        elif fmonfd:
            select.select([fmonfd], [], [], self.timeout)
            if self.fmon.pending():
                event = self.fmon.get_event()
        else:
            # pseudo.. if nothings pending sleep and loop
            time.sleep(self.timeout)

        if not event or event.filename == self.work_path:
            return None

        #deviate from the normal routines here we only want one event
        etype = event.code2str()
        self.debug_log("Recieved event %s for %s" % (etype, event.filename))
        if os.path.basename(event.filename)[0] == '.':
            return None
        if etype in ('created', 'exists'):
            self.debug_log("Handling event %s" % event.filename)
            payload = os.path.join(self.work_path, event.filename)
            try:
                payloadfd = open(payload, "rb")
                interaction = cPickle.load(payloadfd)
                payloadfd.close()
                os.unlink(payload)
                return interaction
            except IOError:
                self.logger.error("Failed to read payload: %s" %
                    traceback.format_exc().splitlines()[-1])
            except cPickle.UnpicklingError:
                self.logger.error("Failed to unpickle payload: %s" %
                    traceback.format_exc().splitlines()[-1])
                payloadfd.close()
                raise TransportError
        return None

    def shutdown(self):
        """Called at program exit"""
        if self.fmon:
            self.fmon.shutdown()
        if self._phony_collector:
            self._phony_collector.shutdown()

    def rpc(self, method, *args, **kwargs):
        """
        Here this is more of a dummy.  Rather then start a layer
        which doesn't exist or muck with files, start the collector

        This will all change when other layers are added
        """
        try:
            if not self._phony_collector:
                self._phony_collector = ReportingCollector()
        except ReportingError:
            raise TransportError
        except:
            self.logger.error("Failed to load collector: %s" %
                traceback.format_exc().splitlines()[-1])
            raise TransportError

        if not method in self._phony_collector.storage.__class__.__rmi__ or \
                not hasattr(self._phony_collector.storage, method):
            self.logger.error("Unknown method %s called on storage engine %s" %
                (method, self._phony_collector.storage.__class__.__name__))
            raise TransportError


        try:
            cls_method = getattr(self._phony_collector.storage, method)
            return cls_method(*args, **kwargs)
        except:
            self.logger.error("RPC method %s failed: %s" %
                (method, traceback.format_exc().splitlines()[-1]))
            raise TransportError