summaryrefslogtreecommitdiffstats
path: root/testsuite
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2012-08-14 14:44:08 -0400
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2012-08-14 15:43:32 -0400
commitbe4f6ad065fa17ae34c810f2c09bc9f5fa4d9c23 (patch)
tree0e4a2172ef4fa94320ddf35756f92d8089e1a980 /testsuite
parent554022ae751777a6f853f54663dc5e9fab818dce (diff)
downloadbcfg2-be4f6ad065fa17ae34c810f2c09bc9f5fa4d9c23.tar.gz
bcfg2-be4f6ad065fa17ae34c810f2c09bc9f5fa4d9c23.tar.bz2
bcfg2-be4f6ad065fa17ae34c810f2c09bc9f5fa4d9c23.zip
added unit tests for EntrySet
Diffstat (limited to 'testsuite')
-rw-r--r--testsuite/Testlib/TestServer/TestPlugin.py399
-rw-r--r--testsuite/Testlib/TestServer/TestPlugins/TestProbes.py15
2 files changed, 397 insertions, 17 deletions
diff --git a/testsuite/Testlib/TestServer/TestPlugin.py b/testsuite/Testlib/TestServer/TestPlugin.py
index 9ca3243a3..e5baf9a0b 100644
--- a/testsuite/Testlib/TestServer/TestPlugin.py
+++ b/testsuite/Testlib/TestServer/TestPlugin.py
@@ -1,4 +1,5 @@
import os
+import re
import copy
import logging
import unittest
@@ -164,32 +165,39 @@ class TestPluginDatabaseModel(Bcfg2TestCase):
class TestGenerator(Bcfg2TestCase):
- def test_HandleEntry(self):
- g = Generator()
- self.assertRaises(NotImplementedError,
- g.HandleEntry, None, None)
+ test_obj = Generator
class TestStructure(Bcfg2TestCase):
+ test_obj = Structure
+
+ def get_obj(self):
+ return self.test_obj()
+
def test_BuildStructures(self):
- s = Structure()
+ s = self.get_obj()
self.assertRaises(NotImplementedError,
s.BuildStructures, None)
class TestMetadata(Bcfg2TestCase):
+ test_obj = Metadata
+
+ def get_obj(self):
+ return self.test_obj()
+
def test_get_initial_metadata(self):
- m = Metadata()
+ m = self.get_obj()
self.assertRaises(NotImplementedError,
m.get_initial_metadata, None)
def test_merge_additional_data(self):
- m = Metadata()
+ m = self.get_obj()
self.assertRaises(NotImplementedError,
m.merge_additional_data, None, None, None)
def test_merge_additional_groups(self):
- m = Metadata()
+ m = self.get_obj()
self.assertRaises(NotImplementedError,
m.merge_additional_groups, None, None)
@@ -1511,12 +1519,12 @@ class TestPrioDir(TestPlugin, TestGenerator, TestXMLDirectoryBacked):
pd.get_attrs, entry, metadata)
-class SpecificityError(Bcfg2TestCase):
+class TestSpecificityError(Bcfg2TestCase):
""" placeholder for future tests """
pass
-class Specificity(Bcfg2TestCase):
+class TestSpecificity(Bcfg2TestCase):
test_obj = Specificity
def get_obj(self, **kwargs):
@@ -1560,7 +1568,6 @@ class Specificity(Bcfg2TestCase):
for i in range(len(specs)):
for j in range(len(specs)):
- print "i=%s, j=%s" % (i, j)
if i < j:
self.assertGreater(specs[i], specs[j])
self.assertLess(specs[j], specs[i])
@@ -1578,7 +1585,7 @@ class Specificity(Bcfg2TestCase):
self.assertGreaterEqual(specs[j], specs[i])
-class SpecificData(Bcfg2TestCase):
+class TestSpecificData(Bcfg2TestCase):
test_obj = SpecificData
def get_obj(self, name="/test.txt", specific=None, encoding=None):
@@ -1608,12 +1615,376 @@ class SpecificData(Bcfg2TestCase):
class TestEntrySet(TestDebuggable):
test_obj = EntrySet
+ # filenames that should be matched successfully by the EntrySet
+ # 'specific' regex. these are filenames alone -- a specificity
+ # will be added to these
+ basenames = ["test", "test.py", "test with spaces.txt",
+ "test.multiple.dots.py", "test_underscores.and.dots",
+ "really_misleading.G10_test",
+ "name$with*regex(special){chars}",
+ "misleading.H_hostname.test.com"]
+ # filenames that do not match any of the basenames (or the
+ # basename regex, if applicable)
+ bogus_names = ["bogus"]
+ # filenames that should be ignored
+ ignore = ["foo~", ".#foo", ".foo.swp", ".foo.swx",
+ "test.txt.genshi_include", "test.G_foo.genshi_include"]
+
- def get_obj(self, basename="test", path=datastore, entry_type=SpecificData,
+ def get_obj(self, basename="test", path=datastore, entry_type=MagicMock(),
encoding=None):
return self.test_obj(basename, path, entry_type, encoding)
+ def test__init(self):
+ for basename in self.basenames:
+ eset = self.get_obj(basename=basename)
+ self.assertIsInstance(eset.specific, re._pattern_type)
+ self.assertTrue(eset.specific.match(os.path.join(datastore,
+ basename)))
+ ppath = os.path.join(datastore, "Plugin", basename)
+ self.assertTrue(eset.specific.match(ppath))
+ self.assertTrue(eset.specific.match(ppath + ".G20_foo"))
+ self.assertTrue(eset.specific.match(ppath + ".G1_foo"))
+ self.assertTrue(eset.specific.match(ppath + ".G32768_foo"))
+ # a group named '_'
+ self.assertTrue(eset.specific.match(ppath + ".G10__"))
+ self.assertTrue(eset.specific.match(ppath + ".H_hostname"))
+ self.assertTrue(eset.specific.match(ppath + ".H_fqdn.subdomain.example.com"))
+ self.assertTrue(eset.specific.match(ppath + ".G20_group_with_underscores"))
+
+ self.assertFalse(eset.specific.match(ppath + ".G20_group with spaces"))
+ self.assertFalse(eset.specific.match(ppath + ".G_foo"))
+ self.assertFalse(eset.specific.match(ppath + ".G_"))
+ self.assertFalse(eset.specific.match(ppath + ".G20_"))
+ self.assertFalse(eset.specific.match(ppath + ".H_"))
+
+ for bogus in self.bogus_names:
+ self.assertFalse(eset.specific.match(os.path.join(datastore,
+ "Plugin",
+ bogus)))
+
+ for ignore in self.ignore:
+ self.assertTrue(eset.ignore.match(ignore))
+
+ self.assertFalse(eset.ignore.match(basename))
+ self.assertFalse(eset.ignore.match(basename + ".G20_foo"))
+ self.assertFalse(eset.ignore.match(basename + ".G1_foo"))
+ self.assertFalse(eset.ignore.match(basename + ".G32768_foo"))
+ self.assertFalse(eset.ignore.match(basename + ".G10__"))
+ self.assertFalse(eset.ignore.match(basename + ".H_hostname"))
+ self.assertFalse(eset.ignore.match(basename + ".H_fqdn.subdomain.example.com"))
+ self.assertFalse(eset.ignore.match(basename + ".G20_group_with_underscores"))
+
+ def test_get_matching(self):
+ items = {0: Mock(), 1: Mock(), 2: Mock(), 3: Mock(), 4: Mock(),
+ 5: Mock()}
+ items[0].specific.matches.return_value = False
+ items[1].specific.matches.return_value = True
+ items[2].specific.matches.return_value = False
+ items[3].specific.matches.return_value = False
+ items[4].specific.matches.return_value = True
+ items[5].specific.matches.return_value = True
+ metadata = Mock()
+ eset = self.get_obj()
+ eset.entries = items
+ self.assertItemsEqual(eset.get_matching(metadata),
+ [items[1], items[4], items[5]])
+ for i in items.values():
+ i.specific.matches.assert_called_with(metadata)
+
+ @patch("Bcfg2.Server.Plugin.%s.get_matching" % test_obj.__name__)
+ def test_best_matching(self, mock_get_matching):
+ eset = self.get_obj()
+ metadata = Mock()
+ matching = []
+
+ def reset():
+ mock_get_matching.reset_mock()
+ metadata.reset_mock()
+ for m in matching:
+ m.reset_mock()
+
+ def specific(all=False, group=False, prio=None, host=False):
+ spec = MagicMock()
+ spec.all = all
+ spec.group = group
+ spec.prio = prio
+ spec.host = host
+ if prio:
+ spec.__cmp__ = lambda o: cmp(spec.prio, o.prio)
+ return spec
+
+ self.assertRaises(PluginExecutionError,
+ eset.best_matching, metadata, matching=[])
+
+ reset()
+ mock_get_matching.return_value = matching
+ self.assertRaises(PluginExecutionError,
+ eset.best_matching, metadata)
+ mock_get_matching.assert_called_with(metadata)
+
+ # test with a single file for all
+ reset()
+ matching.insert(0, specific(all=True))
+ mock_get_matching.return_value = matching
+ self.assertEqual(eset.best_matching(metadata),
+ matching[0])
+ mock_get_matching.assert_called_with(metadata)
+
+ # test with a single group-specific file
+ reset()
+ matching.insert(0, specific(group=True, prio=10))
+ mock_get_matching.return_value = matching
+ self.assertEqual(eset.best_matching(metadata),
+ matching[0])
+ mock_get_matching.assert_called_with(metadata)
+
+ # test with multiple group-specific files
+ reset()
+ matching.insert(0, specific(group=True, prio=20))
+ mock_get_matching.return_value = matching
+ self.assertEqual(eset.best_matching(metadata),
+ matching[0])
+ mock_get_matching.assert_called_with(metadata)
+
+ # test with host-specific file
+ reset()
+ matching.insert(0, specific(host=True))
+ mock_get_matching.return_value = matching
+ self.assertEqual(eset.best_matching(metadata),
+ matching[0])
+ mock_get_matching.assert_called_with(metadata)
+
+ @patch("Bcfg2.Server.Plugin.%s.entry_init" % test_obj.__name__)
+ @patch("Bcfg2.Server.Plugin.%s.reset_metadata" % test_obj.__name__)
+ @patch("Bcfg2.Server.Plugin.%s.update_metadata" % test_obj.__name__)
+ def test_handle_event(self, mock_update_md, mock_reset_md, mock_init):
+ def reset():
+ mock_update_md.reset_mock()
+ mock_reset_md.reset_mock()
+ mock_init.reset_mock()
+
+ eset = self.get_obj()
+ for fname in ["info", "info.xml", ":info"]:
+ for evt in ["exists", "created", "changed"]:
+ reset()
+ event = Mock()
+ event.code2str.return_value = evt
+ event.filename = fname
+ eset.handle_event(event)
+ mock_update_md.assert_called_with(event)
+ self.assertFalse(mock_init.called)
+ self.assertFalse(mock_reset_md.called)
+
+ reset()
+ event = Mock()
+ event.code2str.return_value = "deleted"
+ event.filename = fname
+ eset.handle_event(event)
+ mock_reset_md.assert_called_with(event)
+ self.assertFalse(mock_init.called)
+ self.assertFalse(mock_update_md.called)
+
+ for evt in ["exists", "created", "changed"]:
+ reset()
+ event = Mock()
+ event.code2str.return_value = evt
+ event.filename = "test.txt"
+ eset.handle_event(event)
+ mock_init.assert_called_with(event)
+ self.assertFalse(mock_reset_md.called)
+ self.assertFalse(mock_update_md.called)
+
+ reset()
+ entry = Mock()
+ eset.entries["test.txt"] = entry
+ event = Mock()
+ event.code2str.return_value = "changed"
+ event.filename = "test.txt"
+ eset.handle_event(event)
+ entry.handle_event.assert_called_with(event)
+ self.assertFalse(mock_init.called)
+ self.assertFalse(mock_reset_md.called)
+ self.assertFalse(mock_update_md.called)
+
+ reset()
+ entry = Mock()
+ eset.entries["test.txt"] = entry
+ event = Mock()
+ event.code2str.return_value = "deleted"
+ event.filename = "test.txt"
+ eset.handle_event(event)
+ self.assertNotIn("test.txt", eset.entries)
+
+ @patch("Bcfg2.Server.Plugin.%s.specificity_from_filename" %
+ test_obj.__name__)
+ def test_entry_init(self, mock_spec):
+ eset = self.get_obj()
+
+ def reset():
+ eset.entry_type.reset_mock()
+ mock_spec.reset_mock()
+
+ event = Mock()
+ event.code2str.return_value = "created"
+ event.filename = "test.txt"
+ eset.entry_init(event)
+ mock_spec.assert_called_with("test.txt", specific=None)
+ eset.entry_type.assert_called_with(os.path.join(eset.path, "test.txt"),
+ mock_spec.return_value, None)
+ eset.entry_type.return_value.handle_event.assert_called_with(event)
+ self.assertIn("test.txt", eset.entries)
+
+ # test duplicate add
+ reset()
+ eset.entry_init(event)
+ self.assertFalse(mock_spec.called)
+ self.assertFalse(eset.entry_type.called)
+ eset.entries["test.txt"].handle_event.assert_called_with(event)
+
+ # test keyword args
+ etype = Mock()
+ specific = Mock()
+ event = Mock()
+ event.code2str.return_value = "created"
+ event.filename = "test2.txt"
+ eset.entry_init(event, entry_type=etype, specific=specific)
+ mock_spec.assert_called_with("test2.txt", specific=specific)
+ etype.assert_called_with(os.path.join(eset.path, "test2.txt"),
+ mock_spec.return_value, None)
+ etype.return_value.handle_event.assert_called_with(event)
+ self.assertIn("test2.txt", eset.entries)
+
+ # test specificity error
+ event = Mock()
+ event.code2str.return_value = "created"
+ event.filename = "test3.txt"
+ mock_spec.side_effect = SpecificityError
+ eset.entry_init(event)
+ mock_spec.assert_called_with("test3.txt", specific=None)
+ self.assertFalse(eset.entry_type.called)
+
+ @patch("Bcfg2.Server.Plugin.Specificity")
+ def test_specificity_from_filename(self, mock_spec):
+ def test(eset, fname, **kwargs):
+ mock_spec.reset_mock()
+ if "specific" in kwargs:
+ specific = kwargs['specific']
+ del kwargs['specific']
+ else:
+ specific = None
+ self.assertEqual(eset.specificity_from_filename(fname,
+ specific=specific),
+ mock_spec.return_value)
+ mock_spec.assert_called_with(**kwargs)
+
+ def fails(eset, fname, specific=None):
+ mock_spec.reset_mock()
+ self.assertRaises(SpecificityError,
+ eset.specificity_from_filename, fname,
+ specific=specific)
+
+ for basename in self.basenames:
+ eset = self.get_obj(basename=basename)
+ ppath = os.path.join(datastore, "Plugin", basename)
+ test(eset, ppath, all=True)
+ test(eset, ppath + ".G20_foo", group="foo", prio=20)
+ test(eset, ppath + ".G1_foo", group="foo", prio=1)
+ test(eset, ppath + ".G32768_foo", group="foo", prio=32768)
+ test(eset, ppath + ".G10__", group="_", prio=10)
+ test(eset, ppath + ".H_hostname", hostname="hostname")
+ test(eset, ppath + ".H_fqdn.subdomain.example.com",
+ hostname="fqdn.subdomain.example.com")
+ test(eset, ppath + ".G20_group_with_underscores",
+ group="group_with_underscores", prio=20)
+
+ for bogus in self.bogus_names:
+ fails(eset, bogus)
+ fails(eset, ppath + ".G_group with spaces")
+ fails(eset, ppath + ".G_foo")
+ fails(eset, ppath + ".G_")
+ fails(eset, ppath + ".G20_")
+ fails(eset, ppath + ".H_")
+
+ @patch("__builtin__.open")
+ @patch("Bcfg2.Server.Plugin.InfoXML")
+ def test_update_metadata(self, mock_InfoXML, mock_open):
+ eset = self.get_obj()
+
+ # add info.xml
+ event = Mock()
+ event.filename = "info.xml"
+ eset.update_metadata(event)
+ mock_InfoXML.assert_called_with(os.path.join(eset.path, "info.xml"),
+ True)
+ mock_InfoXML.return_value.HandleEvent.assert_called_with(event)
+ self.assertEqual(eset.infoxml, mock_InfoXML.return_value)
+
+ # modify info.xml
+ mock_InfoXML.reset_mock()
+ eset.update_metadata(event)
+ self.assertFalse(mock_InfoXML.called)
+ eset.infoxml.HandleEvent.assert_called_with(event)
+
+ for fname in [':info', 'info']:
+ event = Mock()
+ event.filename = fname
+
+ idata = ["owner:owner",
+ "group: GROUP",
+ "perms: 775",
+ "important: true",
+ "bogus: line"]
+ mock_open.return_value.readlines.return_value = idata
+ eset.update_metadata(event)
+ expected = default_file_metadata.copy()
+ expected['owner'] = 'owner'
+ expected['group'] = 'GROUP'
+ expected['perms'] = '0775'
+ expected['important'] = 'true'
+ self.assertItemsEqual(eset.metadata,
+ expected)
+
+ def test_reset_metadata(self):
+ eset = self.get_obj()
+
+ # test info.xml
+ event = Mock()
+ event.filename = "info.xml"
+ eset.infoxml = Mock()
+ eset.reset_metadata(event)
+ self.assertIsNone(eset.infoxml)
+
+ for fname in [':info', 'info']:
+ event = Mock()
+ event.filename = fname
+ eset.metadata = Mock()
+ eset.reset_metadata(event)
+ self.assertItemsEqual(eset.metadata, default_file_metadata)
+
+ @patch("Bcfg2.Server.Plugin.bind_info")
+ def test_bind_info_to_entry(self, mock_bind_info):
+ eset = self.get_obj()
+ entry = Mock()
+ metadata = Mock()
+ eset.bind_info_to_entry(entry, metadata)
+ mock_bind_info.assert_called_with(entry, metadata,
+ infoxml=eset.infoxml,
+ default=eset.metadata)
+
+ @patch("Bcfg2.Server.Plugin.%s.best_matching" % test_obj.__name__)
+ @patch("Bcfg2.Server.Plugin.%s.bind_info_to_entry" % test_obj.__name__)
+ def test_bind_entry(self, mock_bind_info, mock_best_matching):
+ eset = self.get_obj()
+ entry = Mock()
+ metadata = Mock()
+ eset.bind_entry(entry, metadata)
+ mock_bind_info.assert_called_with(entry, metadata)
+ mock_best_matching.assert_called_with(metadata)
+ mock_best_matching.return_value.bind_entry.assert_called_with(entry,
+ metadata)
+
-class GroupSpool(TestPlugin, TestGenerator):
+class TestGroupSpool(TestPlugin, TestGenerator):
test_obj = GroupSpool
pass
diff --git a/testsuite/Testlib/TestServer/TestPlugins/TestProbes.py b/testsuite/Testlib/TestServer/TestPlugins/TestProbes.py
index f90096210..0bcb65dc4 100644
--- a/testsuite/Testlib/TestServer/TestPlugins/TestProbes.py
+++ b/testsuite/Testlib/TestServer/TestPlugins/TestProbes.py
@@ -3,7 +3,7 @@ import sys
import time
import unittest
import lxml.etree
-from mock import Mock, patch
+from mock import Mock, MagicMock, patch
from ....common import *
import Bcfg2.Server
import Bcfg2.Server.Plugin
@@ -75,18 +75,27 @@ class TestProbeData(Bcfg2TestCase):
class TestProbeSet(TestEntrySet):
test_obj = ProbeSet
+ basenames = ["test", "_test", "test-test"]
+ ignore = ["foo~", ".#foo", ".foo.swp", ".foo.swx", "probed.xml"]
+ bogus_names = ["test.py"]
def get_obj(self, path=datastore, fam=None, encoding=None,
- plugin_name="Probes"):
+ plugin_name="Probes", basename=None):
+ # get_obj() accepts the basename argument, accepted by the
+ # parent get_obj() method, and just throws it away, since
+ # ProbeSet uses a regex for the "basename"
if fam is None:
fam = Mock()
- return self.test_obj(path, fam, encoding, plugin_name)
+ rv = self.test_obj(path, fam, encoding, plugin_name)
+ rv.entry_type = MagicMock()
+ return rv
def test__init(self):
fam = Mock()
ps = self.get_obj(fam=fam)
self.assertEqual(ps.plugin_name, "Probes")
fam.AddMonitor.assert_called_with(datastore, ps)
+ TestEntrySet.test__init(self)
def test_HandleEvent(self):
ps = self.get_obj()