summaryrefslogtreecommitdiffstats
path: root/src/lib/Bcfg2/Reporting/Storage
diff options
context:
space:
mode:
authorTim Laszlo <tim.laszlo@gmail.com>2012-10-08 10:38:02 -0500
committerTim Laszlo <tim.laszlo@gmail.com>2012-10-08 10:38:02 -0500
commit44638176067df5231bf0be30801e36863391cd1f (patch)
tree6aaba73d03f9a5532047518b9a3e8ef3e63d3f9f /src/lib/Bcfg2/Reporting/Storage
parent1a3ced3f45423d79e08ca7d861e8118e8618d3b2 (diff)
downloadbcfg2-44638176067df5231bf0be30801e36863391cd1f.tar.gz
bcfg2-44638176067df5231bf0be30801e36863391cd1f.tar.bz2
bcfg2-44638176067df5231bf0be30801e36863391cd1f.zip
Reporting: Merge new reporting data
Move reporting data to a new schema Use south for django migrations Add bcfg2-report-collector daemon Conflicts: doc/development/index.txt doc/server/plugins/connectors/properties.txt doc/server/plugins/generators/packages.txt setup.py src/lib/Bcfg2/Client/Tools/SELinux.py src/lib/Bcfg2/Compat.py src/lib/Bcfg2/Encryption.py src/lib/Bcfg2/Options.py src/lib/Bcfg2/Server/Admin/Init.py src/lib/Bcfg2/Server/Admin/Reports.py src/lib/Bcfg2/Server/BuiltinCore.py src/lib/Bcfg2/Server/Core.py src/lib/Bcfg2/Server/FileMonitor/Inotify.py src/lib/Bcfg2/Server/Plugin/base.py src/lib/Bcfg2/Server/Plugin/interfaces.py src/lib/Bcfg2/Server/Plugins/Cfg/CfgEncryptedGenerator.py src/lib/Bcfg2/Server/Plugins/FileProbes.py src/lib/Bcfg2/Server/Plugins/Ohai.py src/lib/Bcfg2/Server/Plugins/Packages/Collection.py src/lib/Bcfg2/Server/Plugins/Packages/Source.py src/lib/Bcfg2/Server/Plugins/Packages/Yum.py src/lib/Bcfg2/Server/Plugins/Packages/__init__.py src/lib/Bcfg2/Server/Plugins/Probes.py src/lib/Bcfg2/Server/Plugins/Properties.py src/lib/Bcfg2/Server/Reports/backends.py src/lib/Bcfg2/Server/Reports/manage.py src/lib/Bcfg2/Server/Reports/nisauth.py src/lib/Bcfg2/settings.py src/sbin/bcfg2-crypt src/sbin/bcfg2-yum-helper testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestSEModules.py
Diffstat (limited to 'src/lib/Bcfg2/Reporting/Storage')
-rw-r--r--src/lib/Bcfg2/Reporting/Storage/DjangoORM.py316
-rw-r--r--src/lib/Bcfg2/Reporting/Storage/__init__.py32
-rw-r--r--src/lib/Bcfg2/Reporting/Storage/base.py51
3 files changed, 399 insertions, 0 deletions
diff --git a/src/lib/Bcfg2/Reporting/Storage/DjangoORM.py b/src/lib/Bcfg2/Reporting/Storage/DjangoORM.py
new file mode 100644
index 000000000..17eb52f66
--- /dev/null
+++ b/src/lib/Bcfg2/Reporting/Storage/DjangoORM.py
@@ -0,0 +1,316 @@
+"""
+The base for the original DjangoORM (DBStats)
+"""
+
+import os
+import traceback
+from lxml import etree
+from datetime import datetime
+from time import strptime
+
+os.environ['DJANGO_SETTINGS_MODULE'] = 'Bcfg2.settings'
+from Bcfg2 import settings
+
+from Bcfg2.Compat import md5
+from Bcfg2.Reporting.Storage.base import StorageBase, StorageError
+from Bcfg2.Server.Plugin.exceptions import PluginExecutionError
+from django.core import management
+from django.core.exceptions import ObjectDoesNotExist, MultipleObjectsReturned
+from django.core.cache import cache
+from django.db import transaction
+
+#Used by GetCurrentEntry
+import difflib
+from Bcfg2.Compat import b64decode
+from Bcfg2.Reporting.models import *
+
+
+class DjangoORM(StorageBase):
+ def __init__(self, setup):
+ super(DjangoORM, self).__init__(setup)
+ self.size_limit = setup.get('reporting_size_limit')
+
+ @transaction.commit_on_success
+ def _import_interaction(self, interaction):
+ """Real import function"""
+ hostname = interaction['hostname']
+ stats = etree.fromstring(interaction['stats'])
+ metadata = interaction['metadata']
+ server = metadata['server']
+
+ client = cache.get(hostname)
+ if not client:
+ client, created = Client.objects.get_or_create(name=hostname)
+ if created:
+ self.logger.debug("Client %s added to the db" % hostname)
+ cache.set(hostname, client)
+
+ timestamp = datetime(*strptime(stats.get('time'))[0:6])
+ if len(Interaction.objects.filter(client=client, timestamp=timestamp)) > 0:
+ self.logger.warn("Interaction for %s at %s already exists" %
+ (hostname, timestamp))
+ return
+
+ profile, created = Group.objects.get_or_create(name=metadata['profile'])
+ inter = Interaction(client=client,
+ timestamp=timestamp,
+ state=stats.get('state', default="unknown"),
+ repo_rev_code=stats.get('revision',
+ default="unknown"),
+ good_count=stats.get('good', default="0"),
+ total_count=stats.get('total', default="0"),
+ server=server,
+ profile=profile)
+ inter.save()
+ self.logger.debug("Interaction for %s at %s with INSERTED in to db" %
+ (client.id, timestamp))
+
+ #FIXME - this should be more efficient
+ for group_name in metadata['groups']:
+ group = cache.get("GROUP_" + group_name)
+ if not group:
+ group, created = Group.objects.get_or_create(name=group_name)
+ if created:
+ self.logger.debug("Added group %s" % group)
+ cache.set("GROUP_" + group_name, group)
+
+ inter.groups.add(group)
+ for bundle_name in metadata['bundles']:
+ bundle = cache.get("BUNDLE_" + bundle_name)
+ if not bundle:
+ bundle, created = Bundle.objects.get_or_create(name=bundle_name)
+ if created:
+ self.logger.debug("Added bundle %s" % bundle)
+ cache.set("BUNDLE_" + bundle_name, bundle)
+ inter.bundles.add(bundle)
+ inter.save()
+
+ counter_fields = {TYPE_BAD: 0,
+ TYPE_MODIFIED: 0,
+ TYPE_EXTRA: 0}
+ pattern = [('Bad/*', TYPE_BAD),
+ ('Extra/*', TYPE_EXTRA),
+ ('Modified/*', TYPE_MODIFIED)]
+ updates = dict(failures=[], paths=[], packages=[], actions=[], services=[])
+ for (xpath, state) in pattern:
+ for entry in stats.findall(xpath):
+ counter_fields[state] = counter_fields[state] + 1
+
+ entry_type = entry.tag
+ name = entry.get('name')
+ exists = entry.get('current_exists', default="true").lower() == "true"
+
+ # handle server failures differently
+ failure = entry.get('failure', '')
+ if failure:
+ act_dict = dict(name=name, entry_type=entry_type,
+ message=failure)
+ newact = FailureEntry.entry_get_or_create(act_dict)
+ updates['failures'].append(newact)
+ continue
+
+ act_dict = dict(name=name, state=state, exists=exists)
+
+ if entry_type == 'Action':
+ act_dict['status'] = entry.get('status', default="check")
+ act_dict['output'] = entry.get('rc', default=-1)
+ self.logger.debug("Adding action %s" % name)
+ updates['actions'].append(ActionEntry.entry_get_or_create(act_dict))
+ elif entry_type == 'Package':
+ act_dict['target_version'] = entry.get('version', default='')
+ act_dict['current_version'] = entry.get('current_version', default='')
+
+ # extra entries are a bit different. They can have Instance objects
+ if not act_dict['target_version']:
+ for instance in entry.findall("Instance"):
+ #TODO - this probably only works for rpms
+ release = instance.get('release', '')
+ arch = instance.get('arch', '')
+ act_dict['current_version'] = instance.get('version')
+ if release:
+ act_dict['current_version'] += "-" + release
+ if arch:
+ act_dict['current_version'] += "." + arch
+ self.logger.debug("Adding package %s %s" % (name, act_dict['current_version']))
+ updates['packages'].append(PackageEntry.entry_get_or_create(act_dict))
+ else:
+
+ self.logger.debug("Adding package %s %s" % (name, act_dict['target_version']))
+
+ # not implemented yet
+ act_dict['verification_details'] = entry.get('verification_details', '')
+ updates['packages'].append(PackageEntry.entry_get_or_create(act_dict))
+
+ elif entry_type == 'Path':
+ path_type = entry.get("type").lower()
+ act_dict['path_type'] = path_type
+
+ target_dict = dict(
+ owner=entry.get('owner', default="root"),
+ group=entry.get('group', default="root"),
+ perms=entry.get('perms', default=""),
+ )
+ fperm, created = FilePerms.objects.get_or_create(**target_dict)
+ act_dict['target_perms'] = fperm
+
+ current_dict = dict(
+ owner=entry.get('current_owner', default=""),
+ group=entry.get('current_group', default=""),
+ perms=entry.get('current_perms', default=""),
+ )
+ fperm, created = FilePerms.objects.get_or_create(**current_dict)
+ act_dict['current_perms'] = fperm
+
+ if path_type in ('symlink', 'hardlink'):
+ act_dict['target_path'] = entry.get('to', default="")
+ act_dict['current_path'] = entry.get('current_to', default="")
+ self.logger.debug("Adding link %s" % name)
+ updates['paths'].append(LinkEntry.entry_get_or_create(act_dict))
+ continue
+ elif path_type == 'device':
+ #TODO devices
+ self.logger.warn("device path types are not supported yet")
+ continue
+
+ # TODO - vcs output
+ act_dict['detail_type'] = PathEntry.DETAIL_UNUSED
+ if path_type == 'directory' and entry.get('prune', 'false') == 'true':
+ unpruned_elist = [e.get('path') for e in entry.findall('Prune')]
+ if unpruned_elist:
+ act_dict['detail_type'] = PathEntry.DETAIL_PRUNED
+ act_dict['details'] = "\n".join(unpruned_elist)
+ elif entry.get('sensitive', 'false').lower() == 'true':
+ act_dict['detail_type'] = PathEntry.DETAIL_SENSITIVE
+ else:
+ cdata = None
+ if entry.get('current_bfile', None):
+ act_dict['detail_type'] = PathEntry.DETAIL_BINARY
+ cdata = entry.get('current_bfile')
+ elif entry.get('current_bdiff', None):
+ act_dict['detail_type'] = PathEntry.DETAIL_DIFF
+ cdata = b64decode(entry.get('current_bdiff'))
+ elif entry.get('current_diff', None):
+ act_dict['detail_type'] = PathEntry.DETAIL_DIFF
+ cdata = entry.get('current_bdiff')
+ if cdata:
+ if len(cdata) > self.size_limit:
+ act_dict['detail_type'] = PathEntry.DETAIL_SIZE_LIMIT
+ act_dict['details'] = md5(cdata).hexdigest()
+ else:
+ act_dict['details'] = cdata
+ self.logger.debug("Adding path %s" % name)
+ updates['paths'].append(PathEntry.entry_get_or_create(act_dict))
+
+
+ #TODO - secontext
+ #TODO - acls
+
+ elif entry_type == 'Service':
+ act_dict['target_status'] = entry.get('status', default='')
+ act_dict['current_status'] = entry.get('current_status', default='')
+ self.logger.debug("Adding service %s" % name)
+ updates['services'].append(ServiceEntry.entry_get_or_create(act_dict))
+ elif entry_type == 'SELinux':
+ self.logger.info("SELinux not implemented yet")
+ else:
+ self.logger.error("Unknown type %s not handled by reporting yet" % entry_type)
+
+ inter.bad_count = counter_fields[TYPE_BAD]
+ inter.modified_count = counter_fields[TYPE_MODIFIED]
+ inter.extra_count = counter_fields[TYPE_EXTRA]
+ inter.save()
+ for entry_type in updates.keys():
+ getattr(inter, entry_type).add(*updates[entry_type])
+
+ # performance metrics
+ for times in stats.findall('OpStamps'):
+ for metric, value in list(times.items()):
+ Performance(interaction=inter, metric=metric, value=value).save()
+
+
+ def import_interaction(self, interaction):
+ """Import the data into the backend"""
+
+ try:
+ self._import_interaction(interaction)
+ except:
+ self.logger.error("Failed to import interaction: %s" %
+ traceback.format_exc().splitlines()[-1])
+
+
+ def validate(self):
+ """Validate backend storage. Should be called once when loaded"""
+
+ settings.read_config(repo=self.setup['repo'])
+
+ # verify our database schema
+ try:
+ if self.setup['verbose'] or self.setup['debug']:
+ vrb = 2
+ else:
+ vrb = 0
+ management.call_command("syncdb", verbosity=vrb, interactive=False)
+ management.call_command("migrate", verbosity=vrb, interactive=False)
+ except:
+ self.logger.error("Failed to update database schema: %s" % \
+ traceback.format_exc().splitlines()[-1])
+ raise StorageError
+
+ def GetExtra(self, client):
+ """Fetch extra entries for a client"""
+ try:
+ c_inst = Client.objects.get(name=client)
+ return [(ent.entry_type, ent.name) for ent in
+ c_inst.current_interaction.extra()]
+ except ObjectDoesNotExist:
+ return []
+ except MultipleObjectsReturned:
+ self.logger.error("%s Inconsistency: Multiple entries for %s." %
+ (self.__class__.__name__, client))
+ return []
+
+ def GetCurrentEntry(self, client, e_type, e_name):
+ """"GetCurrentEntry: Used by PullSource"""
+ try:
+ c_inst = Client.objects.get(name=client)
+ except ObjectDoesNotExist:
+ self.logger.error("Unknown client: %s" % client)
+ raise PluginExecutionError
+ except MultipleObjectsReturned:
+ self.logger.error("%s Inconsistency: Multiple entries for %s." %
+ (self.__class__.__name__, client))
+ raise PluginExecutionError
+ try:
+ cls = BaseEntry.entry_from_name(e_type + "Entry")
+ result = cls.objects.filter(name=e_name, state=TYPE_BAD,
+ interaction=c_inst.current_interaction)
+ except ValueError:
+ self.logger.error("Unhandled type %s" % e_type)
+ raise PluginExecutionError
+ if not result:
+ raise PluginExecutionError
+ entry = result[0]
+ ret = []
+ for p_entry in ('owner', 'group', 'perms'):
+ this_entry = getattr(entry.current_perms, p_entry)
+ if this_entry == '':
+ ret.append(getattr(entry.target_perms, p_entry))
+ else:
+ ret.append(this_entry)
+ if entry.entry_type == 'Path':
+ if entry.is_sensitive():
+ raise PluginExecutionError
+ elif entry.detail_type == PathEntry.DETAIL_PRUNED:
+ ret.append('\n'.join(entry.details))
+ elif entry.is_binary():
+ ret.append(b64decode(entry.details))
+ elif entry.is_diff():
+ ret.append('\n'.join(difflib.restore(\
+ entry.details.split('\n'), 1)))
+ elif entry.is_too_large():
+ # If len is zero the object was too large to store
+ raise PluginExecutionError
+ else:
+ ret.append(None)
+ return ret
+
diff --git a/src/lib/Bcfg2/Reporting/Storage/__init__.py b/src/lib/Bcfg2/Reporting/Storage/__init__.py
new file mode 100644
index 000000000..85356fcfe
--- /dev/null
+++ b/src/lib/Bcfg2/Reporting/Storage/__init__.py
@@ -0,0 +1,32 @@
+"""
+Public storage routines
+"""
+
+import traceback
+
+from Bcfg2.Reporting.Storage.base import StorageError, \
+ StorageImportError
+
+def load_storage(storage_name, setup):
+ """
+ Try to load the storage. Raise StorageImportError on failure
+ """
+ try:
+ mod_name = "%s.%s" % (__name__, storage_name)
+ mod = getattr(__import__(mod_name).Reporting.Storage, storage_name)
+ except ImportError:
+ try:
+ mod = __import__(storage_name)
+ except:
+ raise StorageImportError("Unavailable")
+ try:
+ cls = getattr(mod, storage_name)
+ return cls(setup)
+ except:
+ raise StorageImportError("Storage unavailable: %s" %
+ traceback.format_exc().splitlines()[-1])
+
+def load_storage_from_config(setup):
+ """Load the storage in the config... eventually"""
+ return load_storage('DjangoORM', setup)
+
diff --git a/src/lib/Bcfg2/Reporting/Storage/base.py b/src/lib/Bcfg2/Reporting/Storage/base.py
new file mode 100644
index 000000000..92cc3a68b
--- /dev/null
+++ b/src/lib/Bcfg2/Reporting/Storage/base.py
@@ -0,0 +1,51 @@
+"""
+The base for all Storage backends
+"""
+
+import logging
+
+class StorageError(Exception):
+ """Generic StorageError"""
+ pass
+
+class StorageImportError(StorageError):
+ """Raised when a storage module fails to import"""
+ pass
+
+class StorageBase(object):
+ """The base for all storages"""
+
+ __rmi__ = ['Ping', 'GetExtra', 'GetCurrentEntry']
+
+ def __init__(self, setup):
+ """Do something here"""
+ clsname = self.__class__.__name__
+ self.logger = logging.getLogger(clsname)
+ self.logger.debug("Loading %s storage" % clsname)
+ self.setup = setup
+ self.encoding = setup['encoding']
+
+ def import_interaction(self, interaction):
+ """Import the data into the backend"""
+ raise NotImplementedError
+
+ def validate(self):
+ """Validate backend storage. Should be called once when loaded"""
+ raise NotImplementedError
+
+ def shutdown(self):
+ """Called at program exit"""
+ pass
+
+ def Ping(self):
+ """Test for communication with reporting collector"""
+ return "Pong"
+
+ def GetExtra(self, client):
+ """Return a list of extra entries for a client. Minestruct"""
+ raise NotImplementedError
+
+ def GetCurrentEntry(self, client, e_type, e_name):
+ """Get the current status of an entry on the client"""
+ raise NotImplementedError
+