From 6d4d8df68717780239fad273dd722359db10e64b Mon Sep 17 00:00:00 2001 From: "Chris St. Pierre" Date: Mon, 24 Sep 2012 13:07:15 -0400 Subject: expanded pylint tests --- src/lib/Bcfg2/Client/Tools/POSIX/base.py | 272 ++++++++++++++++++++----------- 1 file changed, 176 insertions(+), 96 deletions(-) (limited to 'src/lib/Bcfg2/Client/Tools/POSIX/base.py') diff --git a/src/lib/Bcfg2/Client/Tools/POSIX/base.py b/src/lib/Bcfg2/Client/Tools/POSIX/base.py index 2fe6791ae..6e7eb70b0 100644 --- a/src/lib/Bcfg2/Client/Tools/POSIX/base.py +++ b/src/lib/Bcfg2/Client/Tools/POSIX/base.py @@ -1,3 +1,5 @@ +""" Base class for tools that handle POSIX (Path) entries """ + import os import sys import pwd @@ -9,47 +11,51 @@ import Bcfg2.Client.XML try: import selinux - has_selinux = True + HAS_SELINUX = True except ImportError: - has_selinux = False + HAS_SELINUX = False try: import posix1e - has_acls = True + HAS_ACLS = True # map between permissions characters and numeric ACL constants - acl_map = dict(r=posix1e.ACL_READ, + ACL_MAP = dict(r=posix1e.ACL_READ, w=posix1e.ACL_WRITE, x=posix1e.ACL_EXECUTE) except ImportError: - has_acls = False - acl_map = dict(r=4, w=2, x=1) + HAS_ACLS = False + ACL_MAP = dict(r=4, w=2, x=1) # map between dev_type attribute and stat constants -device_map = dict(block=stat.S_IFBLK, +device_map = dict(block=stat.S_IFBLK, # pylint: disable=C0103 char=stat.S_IFCHR, fifo=stat.S_IFIFO) class POSIXTool(Bcfg2.Client.Tools.Tool): - def fully_specified(self, entry): + """ Base class for tools that handle POSIX (Path) entries """ + def fully_specified(self, entry): # pylint: disable=W0613 + """ return True if the entry is fully specified """ # checking is done by __req__ return True - def verify(self, entry, modlist): + def verify(self, entry, modlist): # pylint: disable=W0613 + """ return True if the entry is correct on disk """ if not self._verify_metadata(entry): return False if entry.get('recursive', 'false').lower() == 'true': # verify ownership information recursively for root, dirs, files in os.walk(entry.get('name')): - for p in dirs + files: + for path in dirs + files: if not self._verify_metadata(entry, - path=os.path.join(root, p)): + path=os.path.join(root, + path)): return False return True def install(self, entry): - plist = [entry.get('name')] + """ Install the given entry. Return True on success. """ rv = True rv &= self._set_perms(entry) if entry.get('recursive', 'false').lower() == 'true': @@ -60,28 +66,31 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): return rv def _exists(self, entry, remove=False): + """ check for existing paths and optionally remove them. if + the path exists, return the lstat of it """ try: - # check for existing paths and optionally remove them ondisk = os.lstat(entry.get('name')) if remove: if os.path.isdir(entry.get('name')): - rm = shutil.rmtree + remove = shutil.rmtree else: - rm = os.unlink + remove = os.unlink try: - rm(entry.get('name')) - return False + remove(entry.get('name')) + return None except OSError: err = sys.exc_info()[1] self.logger.warning('POSIX: Failed to unlink %s: %s' % (entry.get('name'), err)) - return ondisk # probably still exists + return ondisk # probably still exists else: return ondisk except OSError: - return False + return None def _set_perms(self, entry, path=None): + """ set permissions on the given entry, or on the given path + according to the given entry """ if path is None: path = entry.get("name") @@ -105,13 +114,13 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): rv = False if entry.get("perms"): - configPerms = int(entry.get('perms'), 8) + wanted_perms = int(entry.get('perms'), 8) if entry.get('dev_type'): - configPerms |= device_map[entry.get('dev_type')] + wanted_perms |= device_map[entry.get('dev_type')] try: self.logger.debug("POSIX: Setting permissions on %s to %s" % - (path, oct(configPerms))) - os.chmod(path, configPerms) + (path, oct(wanted_perms))) + os.chmod(path, wanted_perms) except (OSError, KeyError): self.logger.error('POSIX: Failed to change permissions on %s' % path) @@ -129,10 +138,35 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): rv &= self._set_acls(entry, path=path) return rv + def _apply_acl(self, acl, path, atype=posix1e.ACL_TYPE_ACCESS): + """ Apply the given ACL to the given path """ + if atype == posix1e.ACL_TYPE_ACCESS: + atype_str = "access" + else: + atype_str = "default" + if acl.valid(): + self.logger.debug("POSIX: Applying %s ACL to %s:" % (atype_str, + path)) + for line in str(acl).splitlines(): + self.logger.debug(" " + line) + try: + acl.applyto(path, atype) + return True + except OSError: + err = sys.exc_info()[1] + self.logger.error("POSIX: Failed to set ACLs on %s: %s" % + (path, err)) + return False + else: + self.logger.warning("POSIX: %s ACL created for %s was invalid:" + % (atype_str.title(), path)) + for line in str(acl).splitlines(): + self.logger.warning(" " + line) + return False - def _set_acls(self, entry, path=None): + def _set_acls(self, entry, path=None): # pylint: disable=R0912 """ set POSIX ACLs on the file on disk according to the config """ - if not has_acls: + if not HAS_ACLS: if entry.findall("ACL"): self.logger.debug("POSIX: ACLs listed for %s but no pylibacl " "library installed" % entry.get('name')) @@ -182,20 +216,20 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): self.logger.warning("POSIX: Cannot set default ACLs on " "non-directory %s" % path) continue - entry = posix1e.Entry(defacl) + aclentry = posix1e.Entry(defacl) else: - entry = posix1e.Entry(acl) - for perm in acl_map.values(): + aclentry = posix1e.Entry(acl) + for perm in ACL_MAP.values(): if perm & perms: - entry.permset.add(perm) - entry.tag_type = scope + aclentry.permset.add(perm) + aclentry.tag_type = scope try: if scope == posix1e.ACL_USER: scopename = "user" - entry.qualifier = self._norm_uid(qualifier) + aclentry.qualifier = self._norm_uid(qualifier) elif scope == posix1e.ACL_GROUP: scopename = "group" - entry.qualifier = self._norm_gid(qualifier) + aclentry.qualifier = self._norm_gid(qualifier) except (OSError, KeyError): err = sys.exc_info()[1] self.logger.error("POSIX: Could not resolve %s %s: %s" % @@ -203,41 +237,16 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): continue acl.calc_mask() - def _apply_acl(acl, path, atype=posix1e.ACL_TYPE_ACCESS): - if atype == posix1e.ACL_TYPE_ACCESS: - atype_str = "access" - else: - atype_str = "default" - if acl.valid(): - self.logger.debug("POSIX: Applying %s ACL to %s:" % (atype_str, - path)) - for line in str(acl).splitlines(): - self.logger.debug(" " + line) - try: - acl.applyto(path, atype) - return True - except: - err = sys.exc_info()[1] - self.logger.error("POSIX: Failed to set ACLs on %s: %s" % - (path, err)) - return False - else: - self.logger.warning("POSIX: %s ACL created for %s was invalid:" - % (atype_str.title(), path)) - for line in str(acl).splitlines(): - self.logger.warning(" " + line) - return False - - rv = _apply_acl(acl, path) + rv = self._apply_acl(acl, path) if defacl: defacl.calc_mask() - rv &= _apply_acl(defacl, path, posix1e.ACL_TYPE_DEFAULT) + rv &= self._apply_acl(defacl, path, posix1e.ACL_TYPE_DEFAULT) return rv def _set_secontext(self, entry, path=None): """ set the SELinux context of the file on disk according to the config""" - if not has_selinux: + if not HAS_SELINUX: return True if path is None: @@ -251,7 +260,7 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): try: selinux.restorecon(path) rv = True - except: + except OSError: err = sys.exc_info()[1] self.logger.error("POSIX: Failed to restore SELinux context " "for %s: %s" % (path, err)) @@ -259,7 +268,7 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): else: try: rv = selinux.lsetfilecon(path, context) == 0 - except: + except OSError: err = sys.exc_info()[1] self.logger.error("POSIX: Failed to restore SELinux context " "for %s: %s" % (path, err)) @@ -275,12 +284,15 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): return int(grp.getgrnam(gid)[2]) def _norm_entry_gid(self, entry): + """ Given an entry, return the GID number of the desired group """ try: return self._norm_gid(entry.get('group')) except (OSError, KeyError): err = sys.exc_info()[1] - self.logger.error('POSIX: GID normalization failed for %s on %s: %s' - % (entry.get('group'), entry.get('name'), err)) + self.logger.error('POSIX: GID normalization failed for %s on %s: ' + '%s' % (entry.get('group'), + entry.get('name'), + err)) return 0 def _norm_uid(self, uid): @@ -292,12 +304,15 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): return int(pwd.getpwnam(uid)[2]) def _norm_entry_uid(self, entry): + """ Given an entry, return the UID number of the desired owner """ try: return self._norm_uid(entry.get("owner")) except (OSError, KeyError): err = sys.exc_info()[1] - self.logger.error('POSIX: UID normalization failed for %s on %s: %s' - % (entry.get('owner'), entry.get('name'), err)) + self.logger.error('POSIX: UID normalization failed for %s on %s: ' + '%s' % (entry.get('owner'), + entry.get('name'), + err)) return 0 def _norm_acl_perms(self, perms): @@ -307,7 +322,7 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): 'w', 'x', or '-' characters, or a posix1e.Permset object""" if hasattr(perms, 'test'): # Permset object - return sum([p for p in acl_map.values() + return sum([p for p in ACL_MAP.values() if perms.test(p)]) try: @@ -329,17 +344,19 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): for char in perms: if char == '-': continue - elif char not in acl_map: + elif char not in ACL_MAP: self.logger.warning("POSIX: Unknown permissions character " "in ACL: %s" % char) - elif rv & acl_map[char]: + elif rv & ACL_MAP[char]: self.logger.warning("POSIX: Duplicate permissions " "character in ACL: %s" % perms) else: - rv |= acl_map[char] + rv |= ACL_MAP[char] return rv def _acl2string(self, aclkey, perms): + """ Get a string representation of the given ACL. aclkey must + be a tuple of (, , ) """ atype, scope, qualifier = aclkey acl_str = [] if atype == 'default': @@ -353,15 +370,19 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): return ":".join(acl_str) def _acl_perm2string(self, perm): + """ Turn an octal permissions integer into a string suitable + for use with ACLs """ rv = [] for char in 'rwx': - if acl_map[char] & perm: + if ACL_MAP[char] & perm: rv.append(char) else: rv.append('-') return ''.join(rv) def _gather_data(self, path): + """ Get data on the existing state of -- e.g., whether + or not it exists, owner, group, permissions, etc. """ try: ondisk = os.stat(path) except OSError: @@ -394,11 +415,11 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): perms = oct(ondisk[stat.ST_MODE])[-4:] except (OSError, KeyError, TypeError): err = sys.exc_info()[1] - self.logger.debug("POSIX: Could not get current permissions of %s: " - "%s" % (path, err)) + self.logger.debug("POSIX: Could not get current permissions of " + "%s: %s" % (path, err)) perms = None - if has_selinux: + if HAS_SELINUX: try: secontext = selinux.getfilecon(path)[1].split(":")[2] except (OSError, KeyError): @@ -409,13 +430,13 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): else: secontext = None - if has_acls: + if HAS_ACLS: acls = self._list_file_acls(path) else: acls = None return (ondisk, owner, group, perms, secontext, acls) - def _verify_metadata(self, entry, path=None): + def _verify_metadata(self, entry, path=None): # pylint: disable=R0912 """ generic method to verify perms, owner, group, secontext, acls, and mtime """ # allow setting an alternate path for recursive permissions checking @@ -423,9 +444,9 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): path = entry.get('name') attrib = dict() ondisk, attrib['current_owner'], attrib['current_group'], \ - attrib['current_perms'], attrib['current_secontext'], acls = \ - self._gather_data(path) - + attrib['current_perms'], attrib['current_secontext'] = \ + self._gather_data(path)[0:5] + if not ondisk: entry.set('current_exists', 'false') return False @@ -438,31 +459,31 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): # symlink and hardlink entries, which have SELinux contexts # but not other permissions, optional secontext and mtime # attrs, and so on. - configOwner, configGroup, configPerms, mtime = None, None, None, -1 + wanted_owner, wanted_group, wanted_perms, mtime = None, None, None, -1 if entry.get('mtime', '-1') != '-1': mtime = str(ondisk[stat.ST_MTIME]) if entry.get("owner"): - configOwner = str(self._norm_entry_uid(entry)) + wanted_owner = str(self._norm_entry_uid(entry)) if entry.get("group"): - configGroup = str(self._norm_entry_gid(entry)) + wanted_group = str(self._norm_entry_gid(entry)) if entry.get("perms"): while len(entry.get('perms', '')) < 4: entry.set('perms', '0' + entry.get('perms', '')) - configPerms = int(entry.get('perms'), 8) + wanted_perms = int(entry.get('perms'), 8) errors = [] - if configOwner and attrib['current_owner'] != configOwner: + if wanted_owner and attrib['current_owner'] != wanted_owner: errors.append("Owner for path %s is incorrect. " "Current owner is %s but should be %s" % (path, attrib['current_owner'], entry.get('owner'))) - if configGroup and attrib['current_group'] != configGroup: + if wanted_group and attrib['current_group'] != wanted_group: errors.append("Group for path %s is incorrect. " "Current group is %s but should be %s" % (path, attrib['current_group'], entry.get('group'))) - if (configPerms and - oct(int(attrib['current_perms'], 8)) != oct(configPerms)): + if (wanted_perms and + oct(int(attrib['current_perms'], 8)) != oct(wanted_perms)): errors.append("Permissions for path %s are incorrect. " "Current permissions are %s but should be %s" % (path, attrib['current_perms'], entry.get('perms'))) @@ -474,16 +495,17 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): "Current mtime is %s but should be %s" % (path, mtime, entry.get('mtime'))) - if has_selinux and entry.get("secontext"): + if HAS_SELINUX and entry.get("secontext"): if entry.get("secontext") == "__default__": - configContext = selinux.matchpathcon(path, 0)[1].split(":")[2] + wanted_secontext = \ + selinux.matchpathcon(path, 0)[1].split(":")[2] else: - configContext = entry.get("secontext") - if attrib['current_secontext'] != configContext: + wanted_secontext = entry.get("secontext") + if attrib['current_secontext'] != wanted_secontext: errors.append("SELinux context for path %s is incorrect. " "Current context is %s but should be %s" % (path, attrib['current_secontext'], - configContext)) + wanted_secontext)) if errors: for error in errors: @@ -494,10 +516,11 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): if val is not None: entry.set(attr, str(val)) - aclVerifies = self._verify_acls(entry, path=path) - return aclVerifies and len(errors) == 0 + return self._verify_acls(entry, path=path) and len(errors) == 0 def _list_entry_acls(self, entry): + """ Given an entry, get a dict of POSIX ACLs described in that + entry. """ wanted = dict() for acl in entry.findall("ACL"): if acl.get("scope") == "user": @@ -513,7 +536,14 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): return wanted def _list_file_acls(self, path): + """ Given a path, get a dict of existing POSIX ACLs on that + path. The dict keys are a tuple of (, , . values are the permissions of + the described ACL. """ def _process_acl(acl, atype): + """ Given an ACL object, process it appropriately and add + it to the return value """ try: if acl.tag_type == posix1e.ACL_USER: qual = pwd.getpwuid(acl.qualifier)[0] @@ -550,7 +580,9 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): return existing def _verify_acls(self, entry, path=None): - if not has_acls: + """ verify POSIX ACLs on the given entry. return True if all + ACLS are correct, false otherwise """ + if not HAS_ACLS: if entry.findall("ACL"): self.logger.debug("POSIX: ACLs listed for %s but no pylibacl " "library installed" % entry.get('name')) @@ -593,7 +625,7 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): for aclkey, perms in existing.items(): if aclkey not in wanted: extra.append(self._acl2string(aclkey, perms)) - + msg = [] if missing: msg.append("%s ACLs are missing: %s" % (len(missing), @@ -640,3 +672,51 @@ class POSIXTool(Bcfg2.Client.Tools.Tool): for cpath in created: rv &= self._set_perms(entry, path=cpath) return rv + + +class POSIXLinkTool(POSIXTool): + """ Base handler for link (symbolic and hard) entries """ + __req__ = ['name', 'to'] + __linktype__ = None + + def verify(self, entry, modlist): + rv = True + + try: + if not self._verify(entry): + msg = "%s %s is incorrect" % (self.__linktype__.title(), + entry.get('name')) + self.logger.debug("POSIX: " + msg) + entry.set('qtext', "\n".join([entry.get('qtext', ''), msg])) + rv = False + except OSError: + self.logger.debug("POSIX: %s %s does not exist" % + (entry.tag, entry.get("name"))) + entry.set('current_exists', 'false') + return False + + return POSIXTool.verify(self, entry, modlist) and rv + + def _verify(self, entry): + """ perform actual verification of the link entry """ + raise NotImplementedError + + def install(self, entry): + ondisk = self._exists(entry, remove=True) + if ondisk: + self.logger.info("POSIX: %s %s cleanup failed" % + (self.__linktype__.title(), entry.get('name'))) + try: + self._link(entry) + rv = True + except OSError: + err = sys.exc_info()[1] + self.logger.error("POSIX: Failed to create %s %s to %s: %s" % + (self.__linktype__, entry.get('name'), + entry.get('to'), err)) + rv = False + return POSIXTool.install(self, entry) and rv + + def _link(self, entry): + """ create the link """ + raise NotImplementedError -- cgit v1.2.3-1-g7c22