summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Server/Lint/Comments.py
blob: bb1217f924459e48aa41fe0c1daaf27a2bfe5612 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
""" check files for various required comments """

import os
import lxml.etree
import Bcfg2.Server.Lint
from Bcfg2.Server.Plugins.Cfg.CfgPlaintextGenerator \
    import CfgPlaintextGenerator
from Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator import CfgGenshiGenerator
from Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator import CfgCheetahGenerator
from Bcfg2.Server.Plugins.Cfg.CfgInfoXML import CfgInfoXML


class Comments(Bcfg2.Server.Lint.ServerPlugin):
    """ check files for various required headers """
    def __init__(self, *args, **kwargs):
        Bcfg2.Server.Lint.ServerPlugin.__init__(self, *args, **kwargs)
        self.config_cache = {}

    def Run(self):
        self.check_bundles()
        self.check_properties()
        self.check_metadata()
        self.check_cfg()
        self.check_probes()

    @classmethod
    def Errors(cls):
        return {"unexpanded-keywords": "warning",
                "keywords-not-found": "warning",
                "comments-not-found": "warning"}

    def required_keywords(self, rtype):
        """ given a file type, fetch the list of required VCS keywords
        from the bcfg2-lint config """
        return self.required_items(rtype, "keyword")

    def required_comments(self, rtype):
        """ given a file type, fetch the list of required comments
        from the bcfg2-lint config """
        return self.required_items(rtype, "comment")

    def required_items(self, rtype, itype):
        """ given a file type and item type (comment or keyword),
        fetch the list of required items from the bcfg2-lint config """
        if itype not in self.config_cache:
            self.config_cache[itype] = {}

        if rtype not in self.config_cache[itype]:
            rv = []
            global_item = "global_%ss" % itype
            if global_item in self.config:
                rv.extend(self.config[global_item].split(","))

            item = "%s_%ss" % (rtype.lower(), itype)
            if item in self.config:
                if self.config[item]:
                    rv.extend(self.config[item].split(","))
                else:
                    # config explicitly specifies nothing
                    rv = []
            self.config_cache[itype][rtype] = rv
        return self.config_cache[itype][rtype]

    def check_bundles(self):
        """ check bundle files for required headers """
        if 'Bundler' in self.core.plugins:
            for bundle in self.core.plugins['Bundler'].entries.values():
                xdata = None
                rtype = ""
                try:
                    xdata = lxml.etree.XML(bundle.data)
                    rtype = "bundler"
                except (lxml.etree.XMLSyntaxError, AttributeError):
                    xdata = \
                        lxml.etree.parse(bundle.template.filepath).getroot()
                    rtype = "sgenshi"

                self.check_xml(bundle.name, xdata, rtype)

    def check_properties(self):
        """ check properties files for required headers """
        if 'Properties' in self.core.plugins:
            props = self.core.plugins['Properties']
            for propfile, pdata in props.store.entries.items():
                if os.path.splitext(propfile)[1] == ".xml":
                    self.check_xml(pdata.name, pdata.xdata, 'properties')

    def check_metadata(self):
        """ check metadata files for required headers """
        if self.has_all_xincludes("groups.xml"):
            self.check_xml(os.path.join(self.metadata.data, "groups.xml"),
                           self.metadata.groups_xml.data,
                           "metadata")
        if self.has_all_xincludes("clients.xml"):
            self.check_xml(os.path.join(self.metadata.data, "clients.xml"),
                           self.metadata.clients_xml.data,
                           "metadata")

    def check_cfg(self):
        """ check Cfg files and info.xml files for required headers """
        if 'Cfg' in self.core.plugins:
            for entryset in self.core.plugins['Cfg'].entries.values():
                for entry in entryset.entries.values():
                    rtype = None
                    if isinstance(entry, CfgGenshiGenerator):
                        rtype = "tgenshi"
                    elif isinstance(entry, CfgPlaintextGenerator):
                        rtype = "cfg"
                    elif isinstance(entry, CfgCheetahGenerator):
                        rtype = "tcheetah"
                    elif isinstance(entry, CfgInfoXML):
                        self.check_xml(entry.infoxml.name,
                                       entry.infoxml.pnode.data,
                                       "infoxml")
                        continue
                    if rtype:
                        self.check_plaintext(entry.name, entry.data, rtype)

    def check_probes(self):
        """ check probes for required headers """
        if 'Probes' in self.core.plugins:
            for probe in self.core.plugins['Probes'].probes.entries.values():
                self.check_plaintext(probe.name, probe.data, "probes")

    def check_xml(self, filename, xdata, rtype):
        """ check generic XML files for required headers """
        self.check_lines(filename,
                         [str(el)
                          for el in xdata.getiterator(lxml.etree.Comment)],
                         rtype)

    def check_plaintext(self, filename, data, rtype):
        """ check generic plaintex files for required headers """
        self.check_lines(filename, data.splitlines(), rtype)

    def check_lines(self, filename, lines, rtype):
        """ generic header check for a set of lines """
        if self.HandlesFile(filename):
            # found is trivalent:
            # False == not found
            # None == found but not expanded
            # True == found and expanded
            found = dict((k, False) for k in self.required_keywords(rtype))

            for line in lines:
                # we check for both '$<keyword>:' and '$<keyword>$' to see
                # if the keyword just hasn't been expanded
                for (keyword, status) in found.items():
                    if not status:
                        if '$%s:' % keyword in line:
                            found[keyword] = True
                        elif '$%s$' % keyword in line:
                            found[keyword] = None

            unexpanded = [keyword for (keyword, status) in found.items()
                          if status is None]
            if unexpanded:
                self.LintError("unexpanded-keywords",
                               "%s: Required keywords(s) found but not "
                               "expanded: %s" %
                               (filename, ", ".join(unexpanded)))
            missing = [keyword for (keyword, status) in found.items()
                       if status is False]
            if missing:
                self.LintError("keywords-not-found",
                               "%s: Required keywords(s) not found: $%s$" %
                               (filename, "$, $".join(missing)))

            # next, check for required comments.  found is just
            # boolean
            found = dict((k, False) for k in self.required_comments(rtype))

            for line in lines:
                for (comment, status) in found.items():
                    if not status:
                        found[comment] = comment in line

            missing = [comment for (comment, status) in found.items()
                       if status is False]
            if missing:
                self.LintError("comments-not-found",
                               "%s: Required comments(s) not found: %s" %
                               (filename, ", ".join(missing)))