From 44638176067df5231bf0be30801e36863391cd1f Mon Sep 17 00:00:00 2001 From: Tim Laszlo Date: Mon, 8 Oct 2012 10:38:02 -0500 Subject: Reporting: Merge new reporting data Move reporting data to a new schema Use south for django migrations Add bcfg2-report-collector daemon Conflicts: doc/development/index.txt doc/server/plugins/connectors/properties.txt doc/server/plugins/generators/packages.txt setup.py src/lib/Bcfg2/Client/Tools/SELinux.py src/lib/Bcfg2/Compat.py src/lib/Bcfg2/Encryption.py src/lib/Bcfg2/Options.py src/lib/Bcfg2/Server/Admin/Init.py src/lib/Bcfg2/Server/Admin/Reports.py src/lib/Bcfg2/Server/BuiltinCore.py src/lib/Bcfg2/Server/Core.py src/lib/Bcfg2/Server/FileMonitor/Inotify.py src/lib/Bcfg2/Server/Plugin/base.py src/lib/Bcfg2/Server/Plugin/interfaces.py src/lib/Bcfg2/Server/Plugins/Cfg/CfgEncryptedGenerator.py src/lib/Bcfg2/Server/Plugins/FileProbes.py src/lib/Bcfg2/Server/Plugins/Ohai.py src/lib/Bcfg2/Server/Plugins/Packages/Collection.py src/lib/Bcfg2/Server/Plugins/Packages/Source.py src/lib/Bcfg2/Server/Plugins/Packages/Yum.py src/lib/Bcfg2/Server/Plugins/Packages/__init__.py src/lib/Bcfg2/Server/Plugins/Probes.py src/lib/Bcfg2/Server/Plugins/Properties.py src/lib/Bcfg2/Server/Reports/backends.py src/lib/Bcfg2/Server/Reports/manage.py src/lib/Bcfg2/Server/Reports/nisauth.py src/lib/Bcfg2/settings.py src/sbin/bcfg2-crypt src/sbin/bcfg2-yum-helper testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestSEModules.py --- src/lib/Bcfg2/Reporting/Storage/DjangoORM.py | 316 +++++++++++++++++++++++++++ src/lib/Bcfg2/Reporting/Storage/__init__.py | 32 +++ src/lib/Bcfg2/Reporting/Storage/base.py | 51 +++++ 3 files changed, 399 insertions(+) create mode 100644 src/lib/Bcfg2/Reporting/Storage/DjangoORM.py create mode 100644 src/lib/Bcfg2/Reporting/Storage/__init__.py create mode 100644 src/lib/Bcfg2/Reporting/Storage/base.py (limited to 'src/lib/Bcfg2/Reporting/Storage') diff --git a/src/lib/Bcfg2/Reporting/Storage/DjangoORM.py b/src/lib/Bcfg2/Reporting/Storage/DjangoORM.py new file mode 100644 index 000000000..17eb52f66 --- /dev/null +++ b/src/lib/Bcfg2/Reporting/Storage/DjangoORM.py @@ -0,0 +1,316 @@ +""" +The base for the original DjangoORM (DBStats) +""" + +import os +import traceback +from lxml import etree +from datetime import datetime +from time import strptime + +os.environ['DJANGO_SETTINGS_MODULE'] = 'Bcfg2.settings' +from Bcfg2 import settings + +from Bcfg2.Compat import md5 +from Bcfg2.Reporting.Storage.base import StorageBase, StorageError +from Bcfg2.Server.Plugin.exceptions import PluginExecutionError +from django.core import management +from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned +from django.core.cache import cache +from django.db import transaction + +#Used by GetCurrentEntry +import difflib +from Bcfg2.Compat import b64decode +from Bcfg2.Reporting.models import * + + +class DjangoORM(StorageBase): + def __init__(self, setup): + super(DjangoORM, self).__init__(setup) + self.size_limit = setup.get('reporting_size_limit') + + @transaction.commit_on_success + def _import_interaction(self, interaction): + """Real import function""" + hostname = interaction['hostname'] + stats = etree.fromstring(interaction['stats']) + metadata = interaction['metadata'] + server = metadata['server'] + + client = cache.get(hostname) + if not client: + client, created = Client.objects.get_or_create(name=hostname) + if created: + self.logger.debug("Client %s added to the db" % hostname) + cache.set(hostname, client) + + timestamp = datetime(*strptime(stats.get('time'))[0:6]) + if len(Interaction.objects.filter(client=client, timestamp=timestamp)) > 0: + self.logger.warn("Interaction for %s at %s already exists" % + (hostname, timestamp)) + return + + profile, created = Group.objects.get_or_create(name=metadata['profile']) + inter = Interaction(client=client, + timestamp=timestamp, + state=stats.get('state', default="unknown"), + repo_rev_code=stats.get('revision', + default="unknown"), + good_count=stats.get('good', default="0"), + total_count=stats.get('total', default="0"), + server=server, + profile=profile) + inter.save() + self.logger.debug("Interaction for %s at %s with INSERTED in to db" % + (client.id, timestamp)) + + #FIXME - this should be more efficient + for group_name in metadata['groups']: + group = cache.get("GROUP_" + group_name) + if not group: + group, created = Group.objects.get_or_create(name=group_name) + if created: + self.logger.debug("Added group %s" % group) + cache.set("GROUP_" + group_name, group) + + inter.groups.add(group) + for bundle_name in metadata['bundles']: + bundle = cache.get("BUNDLE_" + bundle_name) + if not bundle: + bundle, created = Bundle.objects.get_or_create(name=bundle_name) + if created: + self.logger.debug("Added bundle %s" % bundle) + cache.set("BUNDLE_" + bundle_name, bundle) + inter.bundles.add(bundle) + inter.save() + + counter_fields = {TYPE_BAD: 0, + TYPE_MODIFIED: 0, + TYPE_EXTRA: 0} + pattern = [('Bad/*', TYPE_BAD), + ('Extra/*', TYPE_EXTRA), + ('Modified/*', TYPE_MODIFIED)] + updates = dict(failures=[], paths=[], packages=[], actions=[], services=[]) + for (xpath, state) in pattern: + for entry in stats.findall(xpath): + counter_fields[state] = counter_fields[state] + 1 + + entry_type = entry.tag + name = entry.get('name') + exists = entry.get('current_exists', default="true").lower() == "true" + + # handle server failures differently + failure = entry.get('failure', '') + if failure: + act_dict = dict(name=name, entry_type=entry_type, + message=failure) + newact = FailureEntry.entry_get_or_create(act_dict) + updates['failures'].append(newact) + continue + + act_dict = dict(name=name, state=state, exists=exists) + + if entry_type == 'Action': + act_dict['status'] = entry.get('status', default="check") + act_dict['output'] = entry.get('rc', default=-1) + self.logger.debug("Adding action %s" % name) + updates['actions'].append(ActionEntry.entry_get_or_create(act_dict)) + elif entry_type == 'Package': + act_dict['target_version'] = entry.get('version', default='') + act_dict['current_version'] = entry.get('current_version', default='') + + # extra entries are a bit different. They can have Instance objects + if not act_dict['target_version']: + for instance in entry.findall("Instance"): + #TODO - this probably only works for rpms + release = instance.get('release', '') + arch = instance.get('arch', '') + act_dict['current_version'] = instance.get('version') + if release: + act_dict['current_version'] += "-" + release + if arch: + act_dict['current_version'] += "." + arch + self.logger.debug("Adding package %s %s" % (name, act_dict['current_version'])) + updates['packages'].append(PackageEntry.entry_get_or_create(act_dict)) + else: + + self.logger.debug("Adding package %s %s" % (name, act_dict['target_version'])) + + # not implemented yet + act_dict['verification_details'] = entry.get('verification_details', '') + updates['packages'].append(PackageEntry.entry_get_or_create(act_dict)) + + elif entry_type == 'Path': + path_type = entry.get("type").lower() + act_dict['path_type'] = path_type + + target_dict = dict( + owner=entry.get('owner', default="root"), + group=entry.get('group', default="root"), + perms=entry.get('perms', default=""), + ) + fperm, created = FilePerms.objects.get_or_create(**target_dict) + act_dict['target_perms'] = fperm + + current_dict = dict( + owner=entry.get('current_owner', default=""), + group=entry.get('current_group', default=""), + perms=entry.get('current_perms', default=""), + ) + fperm, created = FilePerms.objects.get_or_create(**current_dict) + act_dict['current_perms'] = fperm + + if path_type in ('symlink', 'hardlink'): + act_dict['target_path'] = entry.get('to', default="") + act_dict['current_path'] = entry.get('current_to', default="") + self.logger.debug("Adding link %s" % name) + updates['paths'].append(LinkEntry.entry_get_or_create(act_dict)) + continue + elif path_type == 'device': + #TODO devices + self.logger.warn("device path types are not supported yet") + continue + + # TODO - vcs output + act_dict['detail_type'] = PathEntry.DETAIL_UNUSED + if path_type == 'directory' and entry.get('prune', 'false') == 'true': + unpruned_elist = [e.get('path') for e in entry.findall('Prune')] + if unpruned_elist: + act_dict['detail_type'] = PathEntry.DETAIL_PRUNED + act_dict['details'] = "\n".join(unpruned_elist) + elif entry.get('sensitive', 'false').lower() == 'true': + act_dict['detail_type'] = PathEntry.DETAIL_SENSITIVE + else: + cdata = None + if entry.get('current_bfile', None): + act_dict['detail_type'] = PathEntry.DETAIL_BINARY + cdata = entry.get('current_bfile') + elif entry.get('current_bdiff', None): + act_dict['detail_type'] = PathEntry.DETAIL_DIFF + cdata = b64decode(entry.get('current_bdiff')) + elif entry.get('current_diff', None): + act_dict['detail_type'] = PathEntry.DETAIL_DIFF + cdata = entry.get('current_bdiff') + if cdata: + if len(cdata) > self.size_limit: + act_dict['detail_type'] = PathEntry.DETAIL_SIZE_LIMIT + act_dict['details'] = md5(cdata).hexdigest() + else: + act_dict['details'] = cdata + self.logger.debug("Adding path %s" % name) + updates['paths'].append(PathEntry.entry_get_or_create(act_dict)) + + + #TODO - secontext + #TODO - acls + + elif entry_type == 'Service': + act_dict['target_status'] = entry.get('status', default='') + act_dict['current_status'] = entry.get('current_status', default='') + self.logger.debug("Adding service %s" % name) + updates['services'].append(ServiceEntry.entry_get_or_create(act_dict)) + elif entry_type == 'SELinux': + self.logger.info("SELinux not implemented yet") + else: + self.logger.error("Unknown type %s not handled by reporting yet" % entry_type) + + inter.bad_count = counter_fields[TYPE_BAD] + inter.modified_count = counter_fields[TYPE_MODIFIED] + inter.extra_count = counter_fields[TYPE_EXTRA] + inter.save() + for entry_type in updates.keys(): + getattr(inter, entry_type).add(*updates[entry_type]) + + # performance metrics + for times in stats.findall('OpStamps'): + for metric, value in list(times.items()): + Performance(interaction=inter, metric=metric, value=value).save() + + + def import_interaction(self, interaction): + """Import the data into the backend""" + + try: + self._import_interaction(interaction) + except: + self.logger.error("Failed to import interaction: %s" % + traceback.format_exc().splitlines()[-1]) + + + def validate(self): + """Validate backend storage. Should be called once when loaded""" + + settings.read_config(repo=self.setup['repo']) + + # verify our database schema + try: + if self.setup['verbose'] or self.setup['debug']: + vrb = 2 + else: + vrb = 0 + management.call_command("syncdb", verbosity=vrb, interactive=False) + management.call_command("migrate", verbosity=vrb, interactive=False) + except: + self.logger.error("Failed to update database schema: %s" % \ + traceback.format_exc().splitlines()[-1]) + raise StorageError + + def GetExtra(self, client): + """Fetch extra entries for a client""" + try: + c_inst = Client.objects.get(name=client) + return [(ent.entry_type, ent.name) for ent in + c_inst.current_interaction.extra()] + except ObjectDoesNotExist: + return [] + except MultipleObjectsReturned: + self.logger.error("%s Inconsistency: Multiple entries for %s." % + (self.__class__.__name__, client)) + return [] + + def GetCurrentEntry(self, client, e_type, e_name): + """"GetCurrentEntry: Used by PullSource""" + try: + c_inst = Client.objects.get(name=client) + except ObjectDoesNotExist: + self.logger.error("Unknown client: %s" % client) + raise PluginExecutionError + except MultipleObjectsReturned: + self.logger.error("%s Inconsistency: Multiple entries for %s." % + (self.__class__.__name__, client)) + raise PluginExecutionError + try: + cls = BaseEntry.entry_from_name(e_type + "Entry") + result = cls.objects.filter(name=e_name, state=TYPE_BAD, + interaction=c_inst.current_interaction) + except ValueError: + self.logger.error("Unhandled type %s" % e_type) + raise PluginExecutionError + if not result: + raise PluginExecutionError + entry = result[0] + ret = [] + for p_entry in ('owner', 'group', 'perms'): + this_entry = getattr(entry.current_perms, p_entry) + if this_entry == '': + ret.append(getattr(entry.target_perms, p_entry)) + else: + ret.append(this_entry) + if entry.entry_type == 'Path': + if entry.is_sensitive(): + raise PluginExecutionError + elif entry.detail_type == PathEntry.DETAIL_PRUNED: + ret.append('\n'.join(entry.details)) + elif entry.is_binary(): + ret.append(b64decode(entry.details)) + elif entry.is_diff(): + ret.append('\n'.join(difflib.restore(\ + entry.details.split('\n'), 1))) + elif entry.is_too_large(): + # If len is zero the object was too large to store + raise PluginExecutionError + else: + ret.append(None) + return ret + diff --git a/src/lib/Bcfg2/Reporting/Storage/__init__.py b/src/lib/Bcfg2/Reporting/Storage/__init__.py new file mode 100644 index 000000000..85356fcfe --- /dev/null +++ b/src/lib/Bcfg2/Reporting/Storage/__init__.py @@ -0,0 +1,32 @@ +""" +Public storage routines +""" + +import traceback + +from Bcfg2.Reporting.Storage.base import StorageError, \ + StorageImportError + +def load_storage(storage_name, setup): + """ + Try to load the storage. Raise StorageImportError on failure + """ + try: + mod_name = "%s.%s" % (__name__, storage_name) + mod = getattr(__import__(mod_name).Reporting.Storage, storage_name) + except ImportError: + try: + mod = __import__(storage_name) + except: + raise StorageImportError("Unavailable") + try: + cls = getattr(mod, storage_name) + return cls(setup) + except: + raise StorageImportError("Storage unavailable: %s" % + traceback.format_exc().splitlines()[-1]) + +def load_storage_from_config(setup): + """Load the storage in the config... eventually""" + return load_storage('DjangoORM', setup) + diff --git a/src/lib/Bcfg2/Reporting/Storage/base.py b/src/lib/Bcfg2/Reporting/Storage/base.py new file mode 100644 index 000000000..92cc3a68b --- /dev/null +++ b/src/lib/Bcfg2/Reporting/Storage/base.py @@ -0,0 +1,51 @@ +""" +The base for all Storage backends +""" + +import logging + +class StorageError(Exception): + """Generic StorageError""" + pass + +class StorageImportError(StorageError): + """Raised when a storage module fails to import""" + pass + +class StorageBase(object): + """The base for all storages""" + + __rmi__ = ['Ping', 'GetExtra', 'GetCurrentEntry'] + + def __init__(self, setup): + """Do something here""" + clsname = self.__class__.__name__ + self.logger = logging.getLogger(clsname) + self.logger.debug("Loading %s storage" % clsname) + self.setup = setup + self.encoding = setup['encoding'] + + def import_interaction(self, interaction): + """Import the data into the backend""" + raise NotImplementedError + + def validate(self): + """Validate backend storage. Should be called once when loaded""" + raise NotImplementedError + + def shutdown(self): + """Called at program exit""" + pass + + def Ping(self): + """Test for communication with reporting collector""" + return "Pong" + + def GetExtra(self, client): + """Return a list of extra entries for a client. Minestruct""" + raise NotImplementedError + + def GetCurrentEntry(self, client, e_type, e_name): + """Get the current status of an entry on the client""" + raise NotImplementedError + -- cgit v1.2.3-1-g7c22