diff options
Diffstat (limited to 'src/lib/Bcfg2/Server/Lint/RequiredAttrs.py')
-rw-r--r-- | src/lib/Bcfg2/Server/Lint/RequiredAttrs.py | 163 |
1 files changed, 126 insertions, 37 deletions
diff --git a/src/lib/Bcfg2/Server/Lint/RequiredAttrs.py b/src/lib/Bcfg2/Server/Lint/RequiredAttrs.py index 6f76cf2db..fcb7c6c28 100644 --- a/src/lib/Bcfg2/Server/Lint/RequiredAttrs.py +++ b/src/lib/Bcfg2/Server/Lint/RequiredAttrs.py @@ -1,32 +1,105 @@ -import os.path +import os +import re import lxml.etree import Bcfg2.Server.Lint +import Bcfg2.Client.Tools.POSIX +import Bcfg2.Client.Tools.VCS from Bcfg2.Server.Plugins.Packages import Apt, Yum +try: + from Bcfg2.Server.Plugins.Bundler import BundleTemplateFile + has_genshi = True +except ImportError: + has_genshi = False + +# format verifying functions +def is_filename(val): + return val.startswith("/") and len(val) > 1 + +def is_selinux_type(val): + return re.match(r'^[a-z_]+_t', val) + +def is_selinux_user(val): + return re.match(r'^[a-z_]+_u', val) + +def is_octal_mode(val): + return re.match(r'[0-7]{3,4}', val) + +def is_username(val): + return re.match(r'^([a-z]\w{0,30}|\d+)$', val) + +def is_device_mode(val): + try: + # checking upper bound seems like a good way to discover some + # obscure OS with >8-bit device numbers + return int(val) > 0 + except: + return False class RequiredAttrs(Bcfg2.Server.Lint.ServerPlugin): """ verify attributes for configuration entries (as defined in doc/server/configurationentries) """ def __init__(self, *args, **kwargs): Bcfg2.Server.Lint.ServerPlugin.__init__(self, *args, **kwargs) - self.required_attrs = { - 'Path': { - 'device': ['name', 'owner', 'group', 'dev_type'], - 'directory': ['name', 'owner', 'group', 'perms'], - 'file': ['name', 'owner', 'group', 'perms', '__text__'], - 'hardlink': ['name', 'to'], - 'symlink': ['name', 'to'], - 'ignore': ['name'], - 'nonexistent': ['name'], - 'permissions': ['name', 'owner', 'group', 'perms'], - 'vcs': ['vcstype', 'revision', 'sourceurl']}, - 'Service': { - 'chkconfig': ['name'], - 'deb': ['name'], - 'rc-update': ['name'], - 'smf': ['name', 'FMRI'], - 'upstart': ['name']}, - 'Action': ['name', 'timing', 'when', 'status', 'command'], - 'Package': ['name']} + self.required_attrs = dict( + Path=dict( + device=dict(name=is_filename, owner=is_username, + group=is_username, + dev_type=lambda v: \ + v in Bcfg2.Client.Tools.POSIX.device_map), + directory=dict(name=is_filename, owner=is_username, + group=is_username, perms=is_octal_mode), + file=dict(name=is_filename, owner=is_username, + group=is_username, perms=is_octal_mode, + __text__=None), + hardlink=dict(name=is_filename, to=is_filename), + symlink=dict(name=is_filename, to=is_filename), + ignore=dict(name=is_filename), + nonexistent=dict(name=is_filename), + permissions=dict(name=is_filename, owner=is_username, + group=is_username, perms=is_octal_mode), + vcs=dict(vcstype=lambda v: (v != 'Path' and + hasattr(Bcfg2.Client.Tools.VCS, + "Install%s" % v)), + revision=None, sourceurl=None)), + Service={ + "chkconfig": dict(name=None), + "deb": dict(name=None), + "rc-update": dict(name=None), + "smf": dict(name=None, FMRI=None), + "upstart": dict(name=None)}, + Action={None: dict(name=None, + timing=lambda v: v in ['pre', 'post', 'both'], + when=lambda v: v in ['modified', 'always'], + status=lambda v: v in ['ignore', 'check'], + command=None)}, + ACL=dict( + default=dict(scope=lambda v: v in ['user', 'group'], + perms=lambda v: re.match('^([0-7]|[rwx\-]{0,3}', + v)), + access=dict(scope=lambda v: v in ['user', 'group'], + perms=lambda v: re.match('^([0-7]|[rwx\-]{0,3}', + v)), + mask=dict(perms=lambda v: re.match('^([0-7]|[rwx\-]{0,3}', v))), + Package={None: dict(name=None)}, + SELinux=dict( + boolean=dict(name=None, + value=lambda v: v in ['on', 'off']), + module=dict(name=None, __text__=None), + port=dict(name=lambda v: re.match(r'^\d+(-\d+)?/(tcp|udp)', v), + selinuxtype=is_selinux_type), + fcontext=dict(name=None, selinuxtype=is_selinux_type), + node=dict(name=lambda v: "/" in v, + selinuxtype=is_selinux_type, + proto=lambda v: v in ['ipv6', 'ipv4']), + login=dict(name=is_username, + selinuxuser=is_selinux_user), + user=dict(name=is_selinux_user, + roles=lambda v: all(is_selinux_user(u) + for u in " ".split(v)), + prefix=None), + interface=dict(name=None, selinuxtype=is_selinux_type), + permissive=dict(name=is_selinux_type)) + ) def Run(self): self.check_packages() @@ -42,9 +115,9 @@ class RequiredAttrs(Bcfg2.Server.Lint.ServerPlugin): return {"unknown-entry-type":"error", "unknown-entry-tag":"error", "required-attrs-missing":"error", + "required-attr-format":"error", "extra-attrs":"warning"} - def check_packages(self): """ check package sources for Source entries with missing attrs """ if 'Packages' in self.core.plugins: @@ -85,13 +158,17 @@ class RequiredAttrs(Bcfg2.Server.Lint.ServerPlugin): """ check bundles for BoundPath entries with missing attrs """ if 'Bundler' in self.core.plugins: for bundle in self.core.plugins['Bundler'].entries.values(): - try: - xdata = lxml.etree.XML(bundle.data) - except (lxml.etree.XMLSyntaxError, AttributeError): - xdata = lxml.etree.parse(bundle.template.filepath).getroot() + if (self.HandlesFile(bundle.name) and + (not has_genshi or + not isinstance(bundle, BundleTemplateFile))): + try: + xdata = lxml.etree.XML(bundle.data) + except (lxml.etree.XMLSyntaxError, AttributeError): + xdata = \ + lxml.etree.parse(bundle.template.filepath).getroot() - for path in xdata.xpath("//*[substring(name(), 1, 5) = 'Bound']"): - self.check_entry(path, bundle.name) + for path in xdata.xpath("//*[substring(name(), 1, 5) = 'Bound']"): + self.check_entry(path, bundle.name) def check_entry(self, entry, filename): """ generic entry check """ @@ -103,43 +180,55 @@ class RequiredAttrs(Bcfg2.Server.Lint.ServerPlugin): if tag not in self.required_attrs: self.LintError("unknown-entry-tag", "Unknown entry tag '%s': %s" % - (entry.tag, self.RenderXML(entry))) + (tag, self.RenderXML(entry))) if isinstance(self.required_attrs[tag], dict): etype = entry.get('type') if etype in self.required_attrs[tag]: - required_attrs = set(self.required_attrs[tag][etype] + - ['type']) + required_attrs = self.required_attrs[tag][etype] else: self.LintError("unknown-entry-type", "Unknown %s type %s: %s" % (tag, etype, self.RenderXML(entry))) return else: - required_attrs = set(self.required_attrs[tag]) + required_attrs = self.required_attrs[tag] attrs = set(entry.attrib.keys()) if 'dev_type' in required_attrs: dev_type = entry.get('dev_type') if dev_type in ['block', 'char']: # check if major/minor are specified - required_attrs |= set(['major', 'minor']) + required_attrs['major'] = is_device_mode + required_attrs['minor'] = is_device_mode + + if tag == 'ACL' and 'scope' in required_attrs: + required_attrs[entry.get('scope')] = is_username if '__text__' in required_attrs: - required_attrs.remove('__text__') + del required_attrs['__text__'] if (not entry.text and not entry.get('empty', 'false').lower() == 'true'): self.LintError("required-attrs-missing", "Text missing for %s %s in %s: %s" % - (entry.tag, name, filename, + (tag, name, filename, self.RenderXML(entry))) - if not attrs.issuperset(required_attrs): + if not attrs.issuperset(required_attrs.keys()): self.LintError("required-attrs-missing", "The following required attribute(s) are " "missing for %s %s in %s: %s\n%s" % - (entry.tag, name, filename, + (tag, name, filename, ", ".join([attr for attr in - required_attrs.difference(attrs)]), + set(required_attrs.keys()).difference(attrs)]), self.RenderXML(entry))) + + for attr, fmt in required_attrs.items(): + if fmt and attr in attrs and not fmt(entry.attrib[attr]): + self.LintError("required-attr-format", + "The %s attribute of %s %s in %s is " + "malformed\n%s" % + (attr, tag, name, filename, + self.RenderXML(entry))) + |