summaryrefslogtreecommitdiffstats
path: root/testsuite/Testsrc/Testlib/TestServer
diff options
context:
space:
mode:
Diffstat (limited to 'testsuite/Testsrc/Testlib/TestServer')
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestCache.py54
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestEncryption.py149
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testbase.py54
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py1225
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py17
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py223
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py111
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py55
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py59
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py23
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py89
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py21
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py31
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py225
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py41
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py349
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPublicKeyCreator.py2
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py237
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py60
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDefaults.py29
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py326
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py679
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py326
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py259
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py2
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestStatistics.py44
26 files changed, 2307 insertions, 2383 deletions
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestCache.py b/testsuite/Testsrc/Testlib/TestServer/TestCache.py
new file mode 100644
index 000000000..7c26e52b8
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestCache.py
@@ -0,0 +1,54 @@
+import os
+import sys
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import *
+
+from Bcfg2.Server.Cache import *
+
+
+class TestCache(Bcfg2TestCase):
+ def test_cache(self):
+ md_cache = Cache("Metadata")
+ md_cache['foo.example.com'] = 'foo metadata'
+ md_cache['bar.example.com'] = 'bar metadata'
+ self.assertItemsEqual(list(iter(md_cache)),
+ ["foo.example.com", "bar.example.com"])
+
+ probe_cache = Cache("Probes", "data")
+ probe_cache['foo.example.com'] = 'foo probe data'
+ probe_cache['bar.example.com'] = 'bar probe data'
+ self.assertItemsEqual(list(iter(probe_cache)),
+ ["foo.example.com", "bar.example.com"])
+
+ md_cache.expire("foo.example.com")
+ self.assertItemsEqual(list(iter(md_cache)), ["bar.example.com"])
+ self.assertItemsEqual(list(iter(probe_cache)),
+ ["foo.example.com", "bar.example.com"])
+
+ probe_cache.expire("bar.example.com")
+ self.assertItemsEqual(list(iter(md_cache)), ["bar.example.com"])
+ self.assertItemsEqual(list(iter(probe_cache)),
+ ["foo.example.com"])
+
+ probe_cache['bar.example.com'] = 'bar probe data'
+ self.assertItemsEqual(list(iter(md_cache)), ["bar.example.com"])
+ self.assertItemsEqual(list(iter(probe_cache)),
+ ["foo.example.com", "bar.example.com"])
+
+ expire("bar.example.com")
+ self.assertEqual(len(md_cache), 0)
+ self.assertItemsEqual(list(iter(probe_cache)),
+ ["foo.example.com"])
+
+ probe_cache2 = Cache("Probes", "data")
+ self.assertItemsEqual(list(iter(probe_cache)),
+ list(iter(probe_cache2)))
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestEncryption.py b/testsuite/Testsrc/Testlib/TestServer/TestEncryption.py
new file mode 100644
index 000000000..cfb0c023b
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestEncryption.py
@@ -0,0 +1,149 @@
+# -*- coding: utf-8 -*-
+import os
+import sys
+from Bcfg2.Compat import b64decode
+from mock import Mock, MagicMock, patch
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import *
+
+try:
+ from Bcfg2.Server.Encryption import *
+ HAS_CRYPTO = True
+except ImportError:
+ HAS_CRYPTO = False
+
+
+class TestEncryption(Bcfg2TestCase):
+ plaintext = """foo bar
+baz
+รถ
+\t\tquux
+""" + "a" * 16384 # 16K is completely arbitrary
+ iv = "0123456789ABCDEF"
+ salt = "01234567"
+ algo = "des_cbc"
+
+ @skipUnless(HAS_CRYPTO, "Encryption libraries not found")
+ def setUp(self):
+ Bcfg2.Options.setup.algorithm = "aes_256_cbc"
+
+ def test_str_crypt(self):
+ """ test str_encrypt/str_decrypt """
+ key = "a simple key"
+
+ # simple symmetrical test with no options
+ crypted = str_encrypt(self.plaintext, key)
+ self.assertEqual(self.plaintext, str_decrypt(crypted, key))
+
+ # symmetrical test with lots of options
+ crypted = str_encrypt(self.plaintext, key,
+ iv=self.iv, salt=self.salt,
+ algorithm=self.algo)
+ self.assertEqual(self.plaintext,
+ str_decrypt(crypted, key, iv=self.iv,
+ algorithm=self.algo))
+
+ # test that different algorithms are actually used
+ self.assertNotEqual(str_encrypt(self.plaintext, key),
+ str_encrypt(self.plaintext, key,
+ algorithm=self.algo))
+
+ # test that different keys are actually used
+ self.assertNotEqual(str_encrypt(self.plaintext, key),
+ str_encrypt(self.plaintext, "different key"))
+
+ # test that different IVs are actually used
+ self.assertNotEqual(str_encrypt(self.plaintext, key, iv=self.iv),
+ str_encrypt(self.plaintext, key))
+
+ # test that errors are raised on bad decrypts
+ crypted = str_encrypt(self.plaintext, key, algorithm=self.algo)
+ self.assertRaises(EVPError, str_decrypt,
+ crypted, "bogus key", algorithm=self.algo)
+ self.assertRaises(EVPError, str_decrypt,
+ crypted, key) # bogus algorithm
+
+ def test_ssl_crypt(self):
+ """ test ssl_encrypt/ssl_decrypt """
+ passwd = "a simple passphrase"
+
+ # simple symmetrical test
+ crypted = ssl_encrypt(self.plaintext, passwd)
+ self.assertEqual(self.plaintext, ssl_decrypt(crypted, passwd))
+
+ # more complex symmetrical test
+ crypted = ssl_encrypt(self.plaintext, passwd, algorithm=self.algo,
+ salt=self.salt)
+ self.assertEqual(self.plaintext,
+ ssl_decrypt(crypted, passwd, algorithm=self.algo))
+
+ # test that different algorithms are actually used
+ self.assertNotEqual(ssl_encrypt(self.plaintext, passwd),
+ ssl_encrypt(self.plaintext, passwd,
+ algorithm=self.algo))
+
+ # test that different passwords are actually used
+ self.assertNotEqual(ssl_encrypt(self.plaintext, passwd),
+ ssl_encrypt(self.plaintext, "different pass"))
+
+ # there's no reasonable test we can do to see if the
+ # output is base64-encoded, unfortunately, but if it's
+ # obviously not we fail
+ crypted = ssl_encrypt(self.plaintext, passwd)
+ self.assertRegexpMatches(crypted, r'^[A-Za-z0-9+/]+[=]{0,2}$')
+
+ # test that errors are raised on bad decrypts
+ crypted = ssl_encrypt(self.plaintext, passwd,
+ algorithm=self.algo)
+ self.assertRaises(EVPError, ssl_decrypt,
+ crypted, "bogus passwd", algorithm=self.algo)
+ self.assertRaises(EVPError, ssl_decrypt,
+ crypted, passwd) # bogus algorithm
+
+ def test_bruteforce_decrypt(self):
+ passwd = "a simple passphrase"
+ crypted = ssl_encrypt(self.plaintext, passwd)
+
+ # test with no passphrases given nor in config
+ Bcfg2.Options.setup.passphrases = dict()
+ self.assertRaises(EVPError,
+ bruteforce_decrypt, crypted)
+
+ # test with good passphrase given in function call
+ self.assertEqual(self.plaintext,
+ bruteforce_decrypt(crypted,
+ passphrases=["bogus pass",
+ passwd,
+ "also bogus"]))
+
+ # test with no good passphrase given nor in config
+ self.assertRaises(EVPError,
+ bruteforce_decrypt,
+ crypted, passphrases=["bogus", "also bogus"])
+
+ # test with good passphrase in config file
+ Bcfg2.Options.setup.passphrases = dict(bogus="bogus",
+ real=passwd,
+ bogus2="also bogus")
+ self.assertEqual(self.plaintext,
+ bruteforce_decrypt(crypted))
+
+ # test that passphrases given in function call take
+ # precedence over config
+ self.assertRaises(EVPError,
+ bruteforce_decrypt, crypted,
+ passphrases=["bogus", "also bogus"])
+
+ # test that different algorithms are used
+ crypted = ssl_encrypt(self.plaintext, passwd, algorithm=self.algo)
+ self.assertEqual(self.plaintext,
+ bruteforce_decrypt(crypted, algorithm=self.algo))
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testbase.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testbase.py
index 870983f60..f135a0197 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testbase.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testbase.py
@@ -1,6 +1,5 @@
import os
import sys
-import logging
from mock import Mock, MagicMock, patch
from Bcfg2.Server.Plugin.base import *
@@ -14,62 +13,23 @@ while path != '/':
break
path = os.path.dirname(path)
from common import *
-
-
-class TestDebuggable(Bcfg2TestCase):
- test_obj = Debuggable
-
- def get_obj(self):
- return self.test_obj()
-
- def test__init(self):
- d = self.get_obj()
- self.assertIsInstance(d.logger, logging.Logger)
- self.assertFalse(d.debug_flag)
-
- def test_set_debug(self):
- d = self.get_obj()
- self.assertEqual(True, d.set_debug(True))
- self.assertEqual(d.debug_flag, True)
-
- self.assertEqual(False, d.set_debug(False))
- self.assertEqual(d.debug_flag, False)
-
- def test_toggle_debug(self):
- d = self.get_obj()
- d.set_debug = Mock()
- orig = d.debug_flag
- self.assertEqual(d.toggle_debug(),
- d.set_debug.return_value)
- d.set_debug.assert_called_with(not orig)
-
- def test_debug_log(self):
- d = self.get_obj()
- d.logger = Mock()
- d.debug_flag = False
- d.debug_log("test")
- self.assertFalse(d.logger.error.called)
-
- d.logger.reset_mock()
- d.debug_log("test", flag=True)
- self.assertTrue(d.logger.error.called)
-
- d.logger.reset_mock()
- d.debug_flag = True
- d.debug_log("test")
- self.assertTrue(d.logger.error.called)
+from TestLogger import TestDebuggable
class TestPlugin(TestDebuggable):
test_obj = Plugin
+ def setUp(self):
+ TestDebuggable.setUp(self)
+ set_setup_default("filemonitor", MagicMock())
+
def get_obj(self, core=None):
if core is None:
core = Mock()
- core.setup = MagicMock()
+
@patchIf(not isinstance(os.makedirs, Mock), "os.makedirs", Mock())
def inner():
- return self.test_obj(core, datastore)
+ return self.test_obj(core)
return inner()
@patch("os.makedirs")
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py
index ce17cb076..75bd4ec95 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py
@@ -1,8 +1,10 @@
import os
import sys
import copy
+import genshi
import lxml.etree
import Bcfg2.Server
+import genshi.core
from Bcfg2.Compat import reduce
from mock import Mock, MagicMock, patch
from Bcfg2.Server.Plugin.helpers import *
@@ -21,6 +23,11 @@ from common import *
from TestServer.TestPlugin.Testbase import TestPlugin, TestDebuggable
from TestServer.TestPlugin.Testinterfaces import TestGenerator
+try:
+ from Bcfg2.Server.Encryption import EVPError
+except:
+ pass
+
def tostring(el):
return lxml.etree.tostring(el, xml_declaration=False).decode('UTF-8')
@@ -32,81 +39,41 @@ class FakeElementTree(lxml.etree._ElementTree):
class TestFunctions(Bcfg2TestCase):
- def test_bind_info(self):
- entry = lxml.etree.Element("Path", name="/test")
- metadata = Mock()
- default = dict(test1="test1", test2="test2")
- # test without infoxml
- bind_info(entry, metadata, default=default)
- self.assertItemsEqual(entry.attrib,
- dict(test1="test1",
- test2="test2",
- name="/test"))
-
- # test with bogus infoxml
- entry = lxml.etree.Element("Path", name="/test")
- infoxml = Mock()
- self.assertRaises(PluginExecutionError,
- bind_info,
- entry, metadata, infoxml=infoxml)
- infoxml.pnode.Match.assert_called_with(metadata, dict(), entry=entry)
-
- # test with valid infoxml
- entry = lxml.etree.Element("Path", name="/test")
- infoxml.reset_mock()
- infodata = {None: {"test3": "test3", "test4": "test4"}}
- def infoxml_rv(metadata, rv, entry=None):
- rv['Info'] = infodata
- infoxml.pnode.Match.side_effect = infoxml_rv
- bind_info(entry, metadata, infoxml=infoxml, default=default)
- # mock objects don't properly track the called-with value of
- # arguments whose value is changed by the function, so it
- # thinks Match() was called with the final value of the mdata
- # arg, not the initial value. makes this test a little less
- # worthwhile, TBH.
- infoxml.pnode.Match.assert_called_with(metadata, dict(Info=infodata),
- entry=entry)
- self.assertItemsEqual(entry.attrib,
- dict(test1="test1",
- test2="test2",
- test3="test3",
- test4="test4",
- name="/test"))
+ def test_removecomment(self):
+ data = [(None, "test", 1),
+ (None, "test2", 2)]
+ stream = [(genshi.core.COMMENT, "test", 0),
+ data[0],
+ (genshi.core.COMMENT, "test3", 0),
+ data[1]]
+ self.assertItemsEqual(list(removecomment(stream)), data)
class TestDatabaseBacked(TestPlugin):
test_obj = DatabaseBacked
- def get_obj(self, core=None):
- if not HAS_DJANGO:
- if core is None:
- core = MagicMock()
- # disable the database
- core.setup.cfp.getboolean.return_value = False
- return TestPlugin.get_obj(self, core=core)
+ def setUp(self):
+ TestPlugin.setUp(self)
+ set_setup_default("%s_db" % self.test_obj.__name__.lower(), False)
@skipUnless(HAS_DJANGO, "Django not found")
def test__use_db(self):
core = Mock()
- core.setup.cfp.getboolean.return_value = True
- db = self.get_obj(core)
+ db = self.get_obj(core=core)
+ attr = "%s_db" % self.test_obj.__name__.lower()
+
+ db.core.database_available = True
+ setattr(Bcfg2.Options.setup, attr, True)
self.assertTrue(db._use_db)
- core = Mock()
- core.setup.cfp.getboolean.return_value = False
- db = self.get_obj(core)
+ setattr(Bcfg2.Options.setup, attr, False)
self.assertFalse(db._use_db)
- Bcfg2.Server.Plugin.helpers.HAS_DJANGO = False
- core = Mock()
- core.setup.cfp.getboolean.return_value = False
- db = self.get_obj(core)
+ db.core.database_available = False
self.assertFalse(db._use_db)
- core = Mock()
- core.setup.cfp.getboolean.return_value = True
+ setattr(Bcfg2.Options.setup, attr, True)
self.assertRaises(PluginInitError, self.get_obj, core)
- Bcfg2.Server.Plugin.helpers.HAS_DJANGO = True
class TestPluginDatabaseModel(Bcfg2TestCase):
@@ -114,14 +81,18 @@ class TestPluginDatabaseModel(Bcfg2TestCase):
pass
-class TestFileBacked(Bcfg2TestCase):
+class TestFileBacked(TestDebuggable):
test_obj = FileBacked
path = os.path.join(datastore, "test")
- def get_obj(self, path=None, fam=None):
+ def setUp(self):
+ TestDebuggable.setUp(self)
+ set_setup_default("filemonitor", MagicMock())
+
+ def get_obj(self, path=None):
if path is None:
path = self.path
- return self.test_obj(path, fam=fam)
+ return self.test_obj(path)
@patch("%s.open" % builtins)
def test_HandleEvent(self, mock_open):
@@ -149,7 +120,7 @@ class TestFileBacked(Bcfg2TestCase):
self.assertFalse(fb.Index.called)
-class TestDirectoryBacked(Bcfg2TestCase):
+class TestDirectoryBacked(TestDebuggable):
test_obj = DirectoryBacked
testpaths = {1: '',
2: '/foo',
@@ -163,6 +134,10 @@ class TestDirectoryBacked(Bcfg2TestCase):
badevents = [] # DirectoryBacked handles all files, so there's no
# such thing as a bad event
+ def setUp(self):
+ TestDebuggable.setUp(self)
+ set_setup_default("filemonitor", MagicMock())
+
def test_child_interface(self):
""" ensure that the child object has the correct interface """
self.assertTrue(hasattr(self.test_obj.__child__, "HandleEvent"))
@@ -177,8 +152,7 @@ class TestDirectoryBacked(Bcfg2TestCase):
Mock())
def inner():
return self.test_obj(os.path.join(datastore,
- self.test_obj.__name__),
- fam)
+ self.test_obj.__name__))
return inner()
@patch("os.makedirs")
@@ -187,8 +161,8 @@ class TestDirectoryBacked(Bcfg2TestCase):
@patch("%s.%s.add_directory_monitor" % (self.test_obj.__module__,
self.test_obj.__name__))
def inner(mock_add_monitor):
+ db = self.test_obj(datastore)
mock_exists.return_value = True
- db = self.test_obj(datastore, Mock())
mock_add_monitor.assert_called_with('')
mock_exists.assert_called_with(db.data)
self.assertFalse(mock_makedirs.called)
@@ -197,7 +171,7 @@ class TestDirectoryBacked(Bcfg2TestCase):
mock_exists.reset_mock()
mock_makedirs.reset_mock()
mock_exists.return_value = False
- db = self.test_obj(datastore, Mock())
+ db = self.test_obj(datastore)
mock_add_monitor.assert_called_with('')
mock_exists.assert_called_with(db.data)
mock_makedirs.assert_called_with(db.data)
@@ -268,10 +242,9 @@ class TestDirectoryBacked(Bcfg2TestCase):
db.fam = Mock()
class MockChild(Mock):
- def __init__(self, path, fam, **kwargs):
+ def __init__(self, path, **kwargs):
Mock.__init__(self, **kwargs)
self.path = path
- self.fam = fam
self.HandleEvent = Mock()
db.__child__ = MockChild
@@ -281,7 +254,6 @@ class TestDirectoryBacked(Bcfg2TestCase):
self.assertIn(path, db.entries)
self.assertEqual(db.entries[path].path,
os.path.join(db.data, path))
- self.assertEqual(db.entries[path].fam, db.fam)
db.entries[path].HandleEvent.assert_called_with(event)
@patch("os.path.isdir")
@@ -419,28 +391,31 @@ class TestXMLFileBacked(TestFileBacked):
should_monitor = None
path = os.path.join(datastore, "test", "test1.xml")
- def get_obj(self, path=None, fam=None, should_monitor=False):
+ def setUp(self):
+ TestFileBacked.setUp(self)
+ set_setup_default("encoding", 'utf-8')
+
+ def get_obj(self, path=None, should_monitor=False):
if path is None:
path = self.path
@patchIf(not isinstance(os.path.exists, Mock),
"os.path.exists", Mock())
def inner():
- return self.test_obj(path, fam=fam, should_monitor=should_monitor)
+ return self.test_obj(path, should_monitor=should_monitor)
return inner()
- def test__init(self):
- fam = Mock()
+ @patch("Bcfg2.Server.FileMonitor.get_fam")
+ def test__init(self, mock_get_fam):
xfb = self.get_obj()
+ self.assertEqual(xfb.fam, mock_get_fam.return_value)
+
if self.should_monitor:
- self.assertIsNotNone(xfb.fam)
- fam.reset_mock()
- xfb = self.get_obj(fam=fam, should_monitor=True)
- fam.AddMonitor.assert_called_with(self.path, xfb)
+ xfb = self.get_obj(should_monitor=True)
+ xfb.fam.AddMonitor.assert_called_with(self.path, xfb)
else:
- self.assertIsNone(xfb.fam)
- xfb = self.get_obj(fam=fam)
- self.assertFalse(fam.AddMonitor.called)
+ xfb = self.get_obj()
+ self.assertFalse(xfb.fam.AddMonitor.called)
@patch("glob.glob")
@patch("lxml.etree.parse")
@@ -609,6 +584,7 @@ class TestXMLFileBacked(TestFileBacked):
test3 = lxml.etree.Element("Test", name="test3")
replacements = {"/test/test2.xml": test2,
"/test/test_dir/test3.xml": test3}
+
def xinclude():
for el in xfb.xdata.findall('//%sinclude' %
Bcfg2.Server.XI_NAMESPACE):
@@ -626,23 +602,26 @@ class TestXMLFileBacked(TestFileBacked):
self.assertItemsEqual([tostring(e) for e in xfb.entries],
[tostring(e) for e in children])
+ @patch("Bcfg2.Server.FileMonitor.get_fam", Mock())
def test_add_monitor(self):
xfb = self.get_obj()
xfb.add_monitor("/test/test2.xml")
self.assertIn("/test/test2.xml", xfb.extra_monitors)
- fam = Mock()
- fam.reset_mock()
- xfb = self.get_obj(fam=fam)
- if xfb.fam:
- xfb.add_monitor("/test/test4.xml")
- fam.AddMonitor.assert_called_with("/test/test4.xml", xfb)
- self.assertIn("/test/test4.xml", xfb.extra_monitors)
+ xfb = self.get_obj()
+ xfb.fam = Mock()
+ xfb.add_monitor("/test/test4.xml")
+ xfb.fam.AddMonitor.assert_called_with("/test/test4.xml", xfb)
+ self.assertIn("/test/test4.xml", xfb.extra_monitors)
class TestStructFile(TestXMLFileBacked):
test_obj = StructFile
+ def setUp(self):
+ TestXMLFileBacked.setUp(self)
+ set_setup_default("lax_decryption", False)
+
def _get_test_data(self):
""" build a very complex set of test data """
# top-level group and client elements
@@ -684,7 +663,8 @@ class TestStructFile(TestXMLFileBacked):
lxml.etree.SubElement(groups[1], "Child", name="c3")
lxml.etree.SubElement(groups[1], "Child", name="c4")
- standalone.append(lxml.etree.SubElement(xdata, "Standalone", name="s1"))
+ standalone.append(lxml.etree.SubElement(xdata,
+ "Standalone", name="s1"))
groups[2] = lxml.etree.SubElement(xdata, "Client", name="client2",
include="false")
@@ -706,12 +686,140 @@ class TestStructFile(TestXMLFileBacked):
subchildren[3] = []
lxml.etree.SubElement(children[3][-1], "SubChild", name="subchild")
- standalone.append(lxml.etree.SubElement(xdata, "Standalone", name="s3"))
+ standalone.append(lxml.etree.SubElement(xdata,
+ "Standalone", name="s3"))
lxml.etree.SubElement(standalone[-1], "SubStandalone", name="sub1")
- children[4] = standalone
return (xdata, groups, subgroups, children, subchildren, standalone)
+ def _get_template_test_data(self):
+ (xdata, groups, subgroups, children, subchildren, standalone) = \
+ self._get_test_data()
+ template_xdata = \
+ lxml.etree.Element("Test", name="test",
+ nsmap=dict(py='http://genshi.edgewall.org/'))
+ template_xdata.extend(xdata.getchildren())
+ return (template_xdata, groups, subgroups, children, subchildren,
+ standalone)
+
+ @patch("genshi.template.TemplateLoader")
+ def test_Index(self, mock_TemplateLoader):
+ TestXMLFileBacked.test_Index(self)
+
+ sf = self.get_obj()
+ sf.encryption = False
+ sf.encoding = Mock()
+ (xdata, groups, subgroups, children, subchildren, standalone) = \
+ self._get_test_data()
+ sf.data = lxml.etree.tostring(xdata)
+
+ mock_TemplateLoader.reset_mock()
+ sf.Index()
+ self.assertFalse(mock_TemplateLoader.called)
+
+ mock_TemplateLoader.reset_mock()
+ template_xdata = \
+ lxml.etree.Element("Test", name="test",
+ nsmap=dict(py='http://genshi.edgewall.org/'))
+ template_xdata.extend(xdata.getchildren())
+ sf.data = lxml.etree.tostring(template_xdata)
+ sf.Index()
+ mock_TemplateLoader.assert_called_with()
+ loader = mock_TemplateLoader.return_value
+ loader.load.assert_called_with(sf.name,
+ cls=genshi.template.MarkupTemplate,
+ encoding=Bcfg2.Options.setup.encoding)
+ self.assertEqual(sf.template,
+ loader.load.return_value)
+
+ @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
+ def test_Index_crypto(self):
+ if not self.test_obj.encryption:
+ return
+ Bcfg2.Options.setup.lax_decryption = False
+ sf = self.get_obj()
+ sf._decrypt = Mock()
+ sf._decrypt.return_value = 'plaintext'
+ sf.data = '''
+<EncryptedData>
+ <Group name="test">
+ <Datum encrypted="foo">crypted</Datum>
+ </Group>
+ <Group name="test" negate="true">
+ <Datum>plain</Datum>
+ </Group>
+</EncryptedData>'''
+
+ # test successful decryption
+ sf.Index()
+ self.assertItemsEqual(
+ sf._decrypt.call_args_list,
+ [call(el) for el in sf.xdata.xpath("//*[@encrypted]")])
+ for el in sf.xdata.xpath("//*[@encrypted]"):
+ self.assertEqual(el.text, sf._decrypt.return_value)
+
+ # test failed decryption, strict
+ sf._decrypt.reset_mock()
+ sf._decrypt.side_effect = EVPError
+ self.assertRaises(PluginExecutionError, sf.Index)
+
+ # test failed decryption, lax
+ Bcfg2.Options.setup.lax_decryption = True
+ sf._decrypt.reset_mock()
+ sf.Index()
+ self.assertItemsEqual(
+ sf._decrypt.call_args_list,
+ [call(el) for el in sf.xdata.xpath("//*[@encrypted]")])
+
+ @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
+ @patchIf(HAS_CRYPTO, "Bcfg2.Server.Encryption.ssl_decrypt")
+ @patchIf(HAS_CRYPTO, "Bcfg2.Server.Encryption.bruteforce_decrypt")
+ def test_decrypt(self, mock_bruteforce, mock_ssl):
+ sf = self.get_obj()
+
+ def reset():
+ mock_bruteforce.reset_mock()
+ mock_ssl.reset_mock()
+
+
+ # test element without text contents
+ Bcfg2.Options.setup.passphrases = dict()
+ self.assertIsNone(sf._decrypt(lxml.etree.Element("Test")))
+ self.assertFalse(mock_bruteforce.called)
+ self.assertFalse(mock_ssl.called)
+
+ # test element with a passphrase in the config file
+ reset()
+ el = lxml.etree.Element("Test", encrypted="foo")
+ el.text = "crypted"
+ Bcfg2.Options.setup.passphrases = dict(foo="foopass", bar="barpass")
+ mock_ssl.return_value = "decrypted with ssl"
+ self.assertEqual(sf._decrypt(el), mock_ssl.return_value)
+ mock_ssl.assert_called_with(el.text, "foopass")
+ self.assertFalse(mock_bruteforce.called)
+
+ # test failure to decrypt element with a passphrase in the config
+ reset()
+ mock_ssl.side_effect = EVPError
+ self.assertRaises(EVPError, sf._decrypt, el)
+ mock_ssl.assert_called_with(el.text, "foopass")
+ self.assertFalse(mock_bruteforce.called)
+
+ # test element without valid passphrase
+ reset()
+ el.set("encrypted", "true")
+ mock_bruteforce.return_value = "decrypted with bruteforce"
+ self.assertEqual(sf._decrypt(el), mock_bruteforce.return_value)
+ mock_bruteforce.assert_called_with(el.text)
+ self.assertFalse(mock_ssl.called)
+
+ # test failure to decrypt element without valid passphrase
+ reset()
+ mock_bruteforce.side_effect = EVPError
+ self.assertRaises(EVPError, sf._decrypt, el)
+ mock_bruteforce.assert_called_with(el.text)
+ self.assertFalse(mock_ssl.called)
+
def test_include_element(self):
sf = self.get_obj()
metadata = Mock()
@@ -744,22 +852,63 @@ class TestStructFile(TestXMLFileBacked):
self.assertTrue(inc("Other"))
- @patch("Bcfg2.Server.Plugin.helpers.%s._include_element" %
- test_obj.__name__)
- def test__match(self, mock_include):
+ def test__match(self):
sf = self.get_obj()
+ sf._include_element = Mock()
metadata = Mock()
- (xdata, groups, subgroups, children, subchildren, standalone) = \
- self._get_test_data()
-
- mock_include.side_effect = \
- lambda x, _: (x.tag not in ['Client', 'Group'] or
+ sf._include_element.side_effect = \
+ lambda x, _: (x.tag not in sf._include_tests.keys() or
x.get("include") == "true")
- for i, group in groups.items():
- actual = sf._match(group, metadata)
- expected = children[i] + subchildren[i]
+ for test_data in [self._get_test_data(),
+ self._get_template_test_data()]:
+ (xdata, groups, subgroups, children, subchildren, standalone) = \
+ test_data
+
+ for i, group in groups.items():
+ actual = sf._match(group, metadata)
+ expected = children[i] + subchildren[i]
+ self.assertEqual(len(actual), len(expected))
+ # easiest way to compare the values is actually to make
+ # them into an XML document and let assertXMLEqual compare
+ # them
+ xactual = lxml.etree.Element("Container")
+ xactual.extend(actual)
+ xexpected = lxml.etree.Element("Container")
+ xexpected.extend(expected)
+ self.assertXMLEqual(xactual, xexpected)
+
+ for el in standalone:
+ self.assertXMLEqual(el, sf._match(el, metadata)[0])
+
+ def test_do_match(self):
+ Bcfg2.Options.setup.lax_decryption = True
+ sf = self.get_obj()
+ sf._match = Mock()
+
+ def match_rv(el, _):
+ if el.tag not in sf._include_tests.keys():
+ return [el]
+ elif el.get("include") == "true":
+ return el.getchildren()
+ else:
+ return []
+ sf._match.side_effect = match_rv
+
+ metadata = Mock()
+
+ for test_data in [self._get_test_data(),
+ self._get_template_test_data()]:
+ (xdata, groups, subgroups, children, subchildren, standalone) = \
+ test_data
+ sf.data = lxml.etree.tostring(xdata)
+ sf.Index()
+
+ actual = sf._do_match(metadata)
+ expected = reduce(lambda x, y: x + y,
+ list(children.values()) + \
+ list(subgroups.values())) + standalone
self.assertEqual(len(actual), len(expected))
# easiest way to compare the values is actually to make
# them into an XML document and let assertXMLEqual compare
@@ -770,428 +919,244 @@ class TestStructFile(TestXMLFileBacked):
xexpected.extend(expected)
self.assertXMLEqual(xactual, xexpected)
- for el in standalone:
- self.assertXMLEqual(el, sf._match(el, metadata)[0])
+ def test__xml_match(self):
+ sf = self.get_obj()
+ sf._include_element = Mock()
+ metadata = Mock()
+
+ sf._include_element.side_effect = \
+ lambda x, _: (x.tag not in sf._include_tests.keys() or
+ x.get("include") == "true")
- @patch("Bcfg2.Server.Plugin.helpers.%s._match" % test_obj.__name__)
- def test_Match(self, mock_match):
+ for test_data in [self._get_test_data(),
+ self._get_template_test_data()]:
+ (xdata, groups, subgroups, children, subchildren, standalone) = \
+ test_data
+
+ actual = copy.deepcopy(xdata)
+ for el in actual.getchildren():
+ sf._xml_match(el, metadata)
+ expected = lxml.etree.Element(xdata.tag, **dict(xdata.attrib))
+ expected.text = xdata.text
+ expected.extend(reduce(lambda x, y: x + y,
+ list(children.values()) + \
+ list(subchildren.values())))
+ expected.extend(standalone)
+ self.assertXMLEqual(actual, expected)
+
+ def test_do_xmlmatch(self):
sf = self.get_obj()
+ sf._xml_match = Mock()
metadata = Mock()
- (xdata, groups, subgroups, children, subchildren, standalone) = \
- self._get_test_data()
- sf.entries.extend(copy.deepcopy(xdata).getchildren())
+ for data_type, test_data in \
+ [("", self._get_test_data()),
+ ("templated ", self._get_template_test_data())]:
+ (xdata, groups, subgroups, children, subchildren, standalone) = \
+ test_data
+ sf.xdata = xdata
+ sf._xml_match.reset_mock()
+
+ sf._do_xmlmatch(metadata)
+ actual = []
+ for call in sf._xml_match.call_args_list:
+ actual.append(call[0][0])
+ self.assertEqual(call[0][1], metadata)
+ expected = list(groups.values()) + standalone
+ # easiest way to compare the values is actually to make
+ # them into an XML document and let assertXMLEqual compare
+ # them
+ xactual = lxml.etree.Element("Container")
+ xactual.extend(actual)
+ xexpected = lxml.etree.Element("Container")
+ xexpected.extend(expected)
+ self.assertXMLEqual(xactual, xexpected,
+ "XMLMatch() calls were incorrect for "
+ "%stest data" % data_type)
+
+ def test_match_ordering(self):
+ """ Match() returns elements in document order """
+ Bcfg2.Options.setup.lax_decryption = True
+ sf = self.get_obj()
+ sf._match = Mock()
def match_rv(el, _):
- if el.tag not in ['Client', 'Group']:
+ if el.tag not in sf._include_tests.keys():
return [el]
elif el.get("include") == "true":
return el.getchildren()
else:
return []
- mock_match.side_effect = match_rv
- actual = sf.Match(metadata)
- expected = reduce(lambda x, y: x + y,
- list(children.values()) + list(subgroups.values()))
- self.assertEqual(len(actual), len(expected))
- # easiest way to compare the values is actually to make
- # them into an XML document and let assertXMLEqual compare
- # them
- xactual = lxml.etree.Element("Container")
- xactual.extend(actual)
- xexpected = lxml.etree.Element("Container")
- xexpected.extend(expected)
- self.assertXMLEqual(xactual, xexpected)
-
- @patch("Bcfg2.Server.Plugin.helpers.%s._include_element" %
- test_obj.__name__)
- def test__xml_match(self, mock_include):
- sf = self.get_obj()
+ sf._match.side_effect = match_rv
+
metadata = Mock()
+ test_data = lxml.etree.Element("Test")
+ group = lxml.etree.SubElement(test_data, "Group", name="group",
+ include="true")
+ first = lxml.etree.SubElement(group, "Element", name="first")
+ second = lxml.etree.SubElement(test_data, "Element", name="second")
+
+ # sanity check to ensure that first and second are in the
+ # correct document order
+ if test_data.xpath("//Element") != [first, second]:
+ skip("lxml.etree does not construct documents in a reliable order")
+
+ sf.data = lxml.etree.tostring(test_data)
+ sf.Index()
+ rv = sf._do_match(metadata)
+ self.assertEqual(len(rv), 2,
+ "Match() seems to be broken, cannot test ordering")
+ msg = "Match() does not return elements in document order:\n" + \
+ "Expected: [%s, %s]\n" % (first, second) + \
+ "Actual: %s" % rv
+ self.assertXMLEqual(rv[0], first, msg)
+ self.assertXMLEqual(rv[1], second, msg)
+
+ # TODO: add tests to ensure that XMLMatch() returns elements
+ # in document order
+
+
+class TestInfoXML(TestStructFile):
+ test_obj = InfoXML
+
+ def _get_test_data(self):
(xdata, groups, subgroups, children, subchildren, standalone) = \
- self._get_test_data()
+ TestStructFile._get_test_data(self)
+ idx = max(groups.keys()) + 1
+ groups[idx] = lxml.etree.SubElement(
+ xdata, "Path", name="path1", include="true")
+ children[idx] = [lxml.etree.SubElement(groups[idx], "Child",
+ name="pc1")]
+ subgroups[idx] = [lxml.etree.SubElement(groups[idx], "Group",
+ name="pg1", include="true"),
+ lxml.etree.SubElement(groups[idx], "Client",
+ name="pc1", include="false")]
+ subchildren[idx] = [lxml.etree.SubElement(subgroups[idx][0],
+ "SubChild", name="sc1")]
+
+ idx += 1
+ groups[idx] = lxml.etree.SubElement(
+ xdata, "Path", name="path2", include="false")
+ children[idx] = []
+ subgroups[idx] = []
+ subchildren[idx] = []
+
+ path2 = lxml.etree.SubElement(groups[0], "Path", name="path2",
+ include="true")
+ subgroups[0].append(path2)
+ subchildren[0].append(lxml.etree.SubElement(path2, "SubChild",
+ name="sc2"))
+ return xdata, groups, subgroups, children, subchildren, standalone
- mock_include.side_effect = \
- lambda x, _: (x.tag not in ['Client', 'Group'] or
- x.get("include") == "true")
+ def test_include_element(self):
+ TestStructFile.test_include_element(self)
- actual = copy.deepcopy(xdata)
- for el in actual.getchildren():
- sf._xml_match(el, metadata)
- expected = lxml.etree.Element(xdata.tag, **dict(xdata.attrib))
- expected.text = xdata.text
- expected.extend(reduce(lambda x, y: x + y,
- list(children.values()) + list(subchildren.values())))
- expected.extend(standalone)
- self.assertXMLEqual(actual, expected)
-
- @patch("Bcfg2.Server.Plugin.helpers.%s._xml_match" % test_obj.__name__)
- def test_XMLMatch(self, mock_xml_match):
- sf = self.get_obj()
+ ix = self.get_obj()
metadata = Mock()
+ entry = lxml.etree.Element("Path", name="/etc/foo.conf")
+ inc = lambda tag, **attrs: \
+ ix._include_element(lxml.etree.Element(tag, **attrs),
+ metadata, entry)
+
+ self.assertFalse(inc("Path", name="/etc/bar.conf"))
+ self.assertFalse(inc("Path", name="/etc/foo.conf", negate="true"))
+ self.assertFalse(inc("Path", name="/etc/foo.conf", negate="tRuE"))
+ self.assertTrue(inc("Path", name="/etc/foo.conf"))
+ self.assertTrue(inc("Path", name="/etc/foo.conf", negate="false"))
+ self.assertTrue(inc("Path", name="/etc/foo.conf", negate="faLSe"))
+ self.assertTrue(inc("Path", name="/etc/bar.conf", negate="true"))
+ self.assertTrue(inc("Path", name="/etc/bar.conf", negate="tRUe"))
- (sf.xdata, groups, subgroups, children, subchildren, standalone) = \
- self._get_test_data()
-
- sf.XMLMatch(metadata)
- actual = []
- for call in mock_xml_match.call_args_list:
- actual.append(call[0][0])
- self.assertEqual(call[0][1], metadata)
- expected = list(groups.values()) + standalone
- # easiest way to compare the values is actually to make
- # them into an XML document and let assertXMLEqual compare
- # them
- xactual = lxml.etree.Element("Container")
- xactual.extend(actual)
- xexpected = lxml.etree.Element("Container")
- xexpected.extend(expected)
- self.assertXMLEqual(xactual, xexpected)
-
-
-class TestINode(Bcfg2TestCase):
- test_obj = INode
-
- # INode.__init__ and INode._load_children() call each other
- # recursively, which makes this class kind of a nightmare to test.
- # we have to first patch INode._load_children so that we can
- # create an INode object with no children loaded, then we unpatch
- # INode._load_children and patch INode.__init__ so that child
- # objects aren't actually created. but in order to test things
- # atomically, we do this umpteen times in order to test with
- # different data. this convenience method makes this a little
- # easier. fun fun fun.
- @patch("Bcfg2.Server.Plugin.helpers.%s._load_children" %
- test_obj.__name__, Mock())
- def _get_inode(self, data, idict):
- return self.test_obj(data, idict)
-
- def test_raw_predicates(self):
+ def test_BindEntry(self):
+ ix = self.get_obj()
+ entry = lxml.etree.Element("Path", name=self.path)
metadata = Mock()
- metadata.groups = ["group1", "group2"]
- metadata.hostname = "foo.example.com"
- entry = None
-
- parent_predicate = lambda m, e: True
- pred = eval(self.test_obj.raw['Client'] % dict(name="foo.example.com"),
- dict(predicate=parent_predicate))
- self.assertTrue(pred(metadata, entry))
- pred = eval(self.test_obj.raw['Client'] % dict(name="bar.example.com"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
-
- pred = eval(self.test_obj.raw['Group'] % dict(name="group1"),
- dict(predicate=parent_predicate))
- self.assertTrue(pred(metadata, entry))
- pred = eval(self.test_obj.raw['Group'] % dict(name="group3"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
-
- pred = eval(self.test_obj.nraw['Client'] % dict(name="foo.example.com"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
- pred = eval(self.test_obj.nraw['Client'] % dict(name="bar.example.com"),
- dict(predicate=parent_predicate))
- self.assertTrue(pred(metadata, entry))
-
- pred = eval(self.test_obj.nraw['Group'] % dict(name="group1"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
- pred = eval(self.test_obj.nraw['Group'] % dict(name="group3"),
- dict(predicate=parent_predicate))
- self.assertTrue(pred(metadata, entry))
-
- parent_predicate = lambda m, e: False
- pred = eval(self.test_obj.raw['Client'] % dict(name="foo.example.com"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
- pred = eval(self.test_obj.raw['Group'] % dict(name="group1"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
- pred = eval(self.test_obj.nraw['Client'] % dict(name="bar.example.com"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
- pred = eval(self.test_obj.nraw['Group'] % dict(name="group3"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
-
- self.assertItemsEqual(self.test_obj.containers,
- self.test_obj.raw.keys())
- self.assertItemsEqual(self.test_obj.containers,
- self.test_obj.nraw.keys())
-
- @patch("Bcfg2.Server.Plugin.helpers.INode._load_children")
- def test__init(self, mock_load_children):
- data = lxml.etree.Element("Bogus")
- # called with no parent, should not raise an exception; it's a
- # top-level tag in an XML file and so is not expected to be a
- # proper predicate
- INode(data, dict())
- self.assertRaises(PluginExecutionError,
- INode, data, dict(), Mock())
- data = lxml.etree.Element("Client", name="foo.example.com")
- idict = dict()
- inode = INode(data, idict)
- mock_load_children.assert_called_with(data, idict)
- self.assertTrue(inode.predicate(Mock(), Mock()))
+ # test with bogus infoxml
+ ix.Match = Mock()
+ ix.Match.return_value = []
+ self.assertRaises(PluginExecutionError,
+ ix.BindEntry, entry, metadata)
+ ix.Match.assert_called_with(metadata, entry)
- parent = Mock()
- parent.predicate = lambda m, e: True
- metadata = Mock()
- metadata.groups = ["group1", "group2"]
- metadata.hostname = "foo.example.com"
- entry = None
-
- # test setting predicate with parent object
- mock_load_children.reset_mock()
- inode = INode(data, idict, parent=parent)
- mock_load_children.assert_called_with(data, idict)
- self.assertTrue(inode.predicate(metadata, entry))
-
- # test negation
- data = lxml.etree.Element("Client", name="foo.example.com",
- negate="true")
- mock_load_children.reset_mock()
- inode = INode(data, idict, parent=parent)
- mock_load_children.assert_called_with(data, idict)
- self.assertFalse(inode.predicate(metadata, entry))
-
- # test failure of a matching predicate (client names do not match)
- data = lxml.etree.Element("Client", name="foo.example.com")
- metadata.hostname = "bar.example.com"
- mock_load_children.reset_mock()
- inode = INode(data, idict, parent=parent)
- mock_load_children.assert_called_with(data, idict)
- self.assertFalse(inode.predicate(metadata, entry))
-
- # test that parent predicate is AND'ed in correctly
- parent.predicate = lambda m, e: False
- metadata.hostname = "foo.example.com"
- mock_load_children.reset_mock()
- inode = INode(data, idict, parent=parent)
- mock_load_children.assert_called_with(data, idict)
- self.assertFalse(inode.predicate(metadata, entry))
-
- def test_load_children(self):
- data = lxml.etree.Element("Parent")
- child1 = lxml.etree.SubElement(data, "Client", name="foo.example.com")
- child2 = lxml.etree.SubElement(data, "Group", name="bar", negate="true")
- idict = dict()
-
- inode = self._get_inode(data, idict)
-
- @patch("Bcfg2.Server.Plugin.helpers.%s.__init__" %
- inode.__class__.__name__)
- def inner(mock_init):
- mock_init.return_value = None
- inode._load_children(data, idict)
- self.assertItemsEqual(mock_init.call_args_list,
- [call(child1, idict, inode),
- call(child2, idict, inode)])
- self.assertEqual(idict, dict())
- self.assertItemsEqual(inode.contents, dict())
+ # test with valid infoxml
+ ix.Match.reset_mock()
+ ix.Match.return_value = [lxml.etree.Element("Info",
+ mode="0600", owner="root")]
+ ix.BindEntry(entry, metadata)
+ ix.Match.assert_called_with(metadata, entry)
+ self.assertItemsEqual(entry.attrib,
+ dict(name=self.path, mode="0600", owner="root"))
- inner()
+ def _get_test_data(self):
+ (xdata, groups, subgroups, children, subchildren, standalone) = \
+ TestStructFile._get_test_data(self)
+ idx = max(groups.keys()) + 1
+ groups[idx] = lxml.etree.SubElement(
+ xdata, "Path", name="path1", include="true")
+ children[idx] = [lxml.etree.SubElement(groups[idx], "Child",
+ name="pc1")]
+ subgroups[idx] = [lxml.etree.SubElement(groups[idx], "Group",
+ name="pg1", include="true"),
+ lxml.etree.SubElement(groups[idx], "Client",
+ name="pc1", include="false")]
+ subchildren[idx] = [lxml.etree.SubElement(subgroups[idx][0],
+ "SubChild", name="sc1")]
+
+ idx += 1
+ groups[idx] = lxml.etree.SubElement(
+ xdata, "Path", name="path2", include="false")
+ children[idx] = []
+ subgroups[idx] = []
+ subchildren[idx] = []
+
+ path2 = lxml.etree.SubElement(groups[0], "Path", name="path2",
+ include="true")
+ subgroups[0].append(path2)
+ subchildren[0].append(lxml.etree.SubElement(path2, "SubChild",
+ name="sc2"))
+ return xdata, groups, subgroups, children, subchildren, standalone
- data = lxml.etree.Element("Parent")
- child1 = lxml.etree.SubElement(data, "Data", name="child1",
- attr="some attr")
- child1.text = "text"
- subchild1 = lxml.etree.SubElement(child1, "SubChild", name="subchild")
- child2 = lxml.etree.SubElement(data, "Group", name="bar", negate="true")
- idict = dict()
-
- inode = self._get_inode(data, idict)
- inode.ignore = []
-
- @patch("Bcfg2.Server.Plugin.helpers.%s.__init__" %
- inode.__class__.__name__)
- def inner2(mock_init):
- mock_init.return_value = None
- inode._load_children(data, idict)
- mock_init.assert_called_with(child2, idict, inode)
- tag = child1.tag
- name = child1.get("name")
- self.assertEqual(idict, dict(Data=[name]))
- self.assertIn(tag, inode.contents)
- self.assertIn(name, inode.contents[tag])
- self.assertItemsEqual(inode.contents[tag][name],
- dict(name=name,
- attr=child1.get('attr'),
- __text__=child1.text,
- __children__=[subchild1]))
-
- inner2()
-
- # test ignore. no ignore is set on INode by default, so we
- # have to set one
- old_ignore = copy.copy(self.test_obj.ignore)
- self.test_obj.ignore.append("Data")
- idict = dict()
-
- inode = self._get_inode(data, idict)
-
- @patch("Bcfg2.Server.Plugin.helpers.%s.__init__" %
- inode.__class__.__name__)
- def inner3(mock_init):
- mock_init.return_value = None
- inode._load_children(data, idict)
- mock_init.assert_called_with(child2, idict, inode)
- self.assertEqual(idict, dict())
- self.assertItemsEqual(inode.contents, dict())
-
- inner3()
- self.test_obj.ignore = old_ignore
-
- def test_Match(self):
- idata = lxml.etree.Element("Parent")
- contents = lxml.etree.SubElement(idata, "Data", name="contents",
- attr="some attr")
- child = lxml.etree.SubElement(idata, "Group", name="bar", negate="true")
-
- inode = INode(idata, dict())
- inode.predicate = Mock()
- inode.predicate.return_value = False
+ def test_include_element(self):
+ TestStructFile.test_include_element(self)
+ ix = self.get_obj()
metadata = Mock()
- metadata.groups = ['foo']
- data = dict()
- entry = child
-
- inode.Match(metadata, data, entry=child)
- self.assertEqual(data, dict())
- inode.predicate.assert_called_with(metadata, child)
-
- inode.predicate.reset_mock()
- inode.Match(metadata, data)
- self.assertEqual(data, dict())
- # can't easily compare XML args without the original
- # object, and we're testing that Match() works without an
- # XML object passed in, so...
- self.assertEqual(inode.predicate.call_args[0][0],
- metadata)
- self.assertXMLEqual(inode.predicate.call_args[0][1],
- lxml.etree.Element("None"))
-
- inode.predicate.reset_mock()
- inode.predicate.return_value = True
- inode.Match(metadata, data, entry=child)
- self.assertEqual(data, inode.contents)
- inode.predicate.assert_called_with(metadata, child)
-
-
-class TestInfoNode(TestINode):
- __test__ = True
- test_obj = InfoNode
-
- def test_raw_predicates(self):
- TestINode.test_raw_predicates(self)
- metadata = Mock()
- entry = lxml.etree.Element("Path", name="/tmp/foo",
- realname="/tmp/bar")
-
- parent_predicate = lambda m, d: True
- pred = eval(self.test_obj.raw['Path'] % dict(name="/tmp/foo"),
- dict(predicate=parent_predicate))
- self.assertTrue(pred(metadata, entry))
- pred = eval(InfoNode.raw['Path'] % dict(name="/tmp/bar"),
- dict(predicate=parent_predicate))
- self.assertTrue(pred(metadata, entry))
- pred = eval(InfoNode.raw['Path'] % dict(name="/tmp/bogus"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
-
- pred = eval(self.test_obj.nraw['Path'] % dict(name="/tmp/foo"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
- pred = eval(InfoNode.nraw['Path'] % dict(name="/tmp/bar"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
- pred = eval(InfoNode.nraw['Path'] % dict(name="/tmp/bogus"),
- dict(predicate=parent_predicate))
- self.assertTrue(pred(metadata, entry))
-
- parent_predicate = lambda m, d: False
- pred = eval(self.test_obj.raw['Path'] % dict(name="/tmp/foo"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
- pred = eval(InfoNode.raw['Path'] % dict(name="/tmp/bar"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
- pred = eval(InfoNode.nraw['Path'] % dict(name="/tmp/bogus"),
- dict(predicate=parent_predicate))
- self.assertFalse(pred(metadata, entry))
-
-
-class TestXMLSrc(TestXMLFileBacked):
- test_obj = XMLSrc
-
- def test_node_interface(self):
- # ensure that the node object has the necessary interface
- self.assertTrue(hasattr(self.test_obj.__node__, "Match"))
+ entry = lxml.etree.Element("Path", name="/etc/foo.conf")
+ inc = lambda tag, **attrs: \
+ ix._include_element(lxml.etree.Element(tag, **attrs),
+ metadata, entry)
+
+ self.assertFalse(inc("Path", name="/etc/bar.conf"))
+ self.assertFalse(inc("Path", name="/etc/foo.conf", negate="true"))
+ self.assertFalse(inc("Path", name="/etc/foo.conf", negate="tRuE"))
+ self.assertTrue(inc("Path", name="/etc/foo.conf"))
+ self.assertTrue(inc("Path", name="/etc/foo.conf", negate="false"))
+ self.assertTrue(inc("Path", name="/etc/foo.conf", negate="faLSe"))
+ self.assertTrue(inc("Path", name="/etc/bar.conf", negate="true"))
+ self.assertTrue(inc("Path", name="/etc/bar.conf", negate="tRUe"))
- @patch("lxml.etree.parse")
- def test_HandleEvent(self, mock_parse):
- xdata = lxml.etree.Element("Test")
- lxml.etree.SubElement(xdata, "Path", name="path", attr="whatever")
-
- xsrc = self.get_obj("/test/foo.xml")
- xsrc.__node__ = Mock()
- mock_parse.return_value = xdata.getroottree()
-
- if xsrc.__priority_required__:
- # test with no priority at all
- self.assertRaises(PluginExecutionError,
- xsrc.HandleEvent, Mock())
-
- # test with bogus priority
- xdata.set("priority", "cow")
- mock_parse.return_value = xdata.getroottree()
- self.assertRaises(PluginExecutionError,
- xsrc.HandleEvent, Mock())
-
- # assign a priority to use in future tests
- xdata.set("priority", "10")
- mock_parse.return_value = xdata.getroottree()
-
- mock_parse.reset_mock()
- xsrc = self.get_obj("/test/foo.xml")
- xsrc.__node__ = Mock()
- xsrc.HandleEvent(Mock())
- mock_parse.assert_called_with("/test/foo.xml",
- parser=Bcfg2.Server.XMLParser)
- self.assertXMLEqual(xsrc.__node__.call_args[0][0], xdata)
- self.assertEqual(xsrc.__node__.call_args[0][1], dict())
- self.assertEqual(xsrc.pnode, xsrc.__node__.return_value)
- self.assertEqual(xsrc.cache, None)
-
- @patch("Bcfg2.Server.Plugin.helpers.XMLSrc.HandleEvent")
- def test_Cache(self, mock_HandleEvent):
- xsrc = self.get_obj("/test/foo.xml")
+ def test_BindEntry(self):
+ ix = self.get_obj()
+ entry = lxml.etree.Element("Path", name=self.path)
metadata = Mock()
- xsrc.Cache(metadata)
- mock_HandleEvent.assert_any_call()
-
- xsrc.pnode = Mock()
- xsrc.Cache(metadata)
- xsrc.pnode.Match.assert_called_with(metadata, xsrc.__cacheobj__())
- self.assertEqual(xsrc.cache[0], metadata)
-
- xsrc.pnode.reset_mock()
- xsrc.Cache(metadata)
- self.assertFalse(xsrc.pnode.Mock.called)
- self.assertEqual(xsrc.cache[0], metadata)
-
- xsrc.cache = ("bogus")
- xsrc.Cache(metadata)
- xsrc.pnode.Match.assert_called_with(metadata, xsrc.__cacheobj__())
- self.assertEqual(xsrc.cache[0], metadata)
+ # test with bogus infoxml
+ ix.Match = Mock()
+ ix.Match.return_value = []
+ self.assertRaises(PluginExecutionError,
+ ix.BindEntry, entry, metadata)
+ ix.Match.assert_called_with(metadata, entry)
-class TestInfoXML(TestXMLSrc):
- test_obj = InfoXML
+ # test with valid infoxml
+ ix.Match.reset_mock()
+ ix.Match.return_value = [lxml.etree.Element("Info",
+ mode="0600", owner="root")]
+ ix.BindEntry(entry, metadata)
+ ix.Match.assert_called_with(metadata, entry)
+ self.assertItemsEqual(entry.attrib,
+ dict(name=self.path, mode="0600", owner="root"))
class TestXMLDirectoryBacked(TestDirectoryBacked):
@@ -1203,6 +1168,11 @@ class TestXMLDirectoryBacked(TestDirectoryBacked):
class TestPrioDir(TestPlugin, TestGenerator, TestXMLDirectoryBacked):
test_obj = PrioDir
+ def setUp(self):
+ TestPlugin.setUp(self)
+ TestGenerator.setUp(self)
+ TestXMLDirectoryBacked.setUp(self)
+
def get_obj(self, core=None):
if core is None:
core = Mock()
@@ -1212,7 +1182,7 @@ class TestPrioDir(TestPlugin, TestGenerator, TestXMLDirectoryBacked):
Mock())
@patchIf(not isinstance(os.makedirs, Mock), "os.makedirs", Mock())
def inner():
- return self.test_obj(core, datastore)
+ return self.test_obj(core)
return inner()
@@ -1223,13 +1193,20 @@ class TestPrioDir(TestPlugin, TestGenerator, TestXMLDirectoryBacked):
Mock())
def inner():
pd = self.get_obj()
- test1 = Mock()
- test1.items = dict(Path=["/etc/foo.conf", "/etc/bar.conf"])
- test2 = Mock()
- test2.items = dict(Path=["/etc/baz.conf"],
- Package=["quux", "xyzzy"])
- pd.entries = {"/test1.xml": test1,
- "/test2.xml": test2}
+ test1 = lxml.etree.Element("Test")
+ lxml.etree.SubElement(test1, "Path", name="/etc/foo.conf")
+ lxml.etree.SubElement(lxml.etree.SubElement(test1,
+ "Group", name="foo"),
+ "Path", name="/etc/bar.conf")
+
+ test2 = lxml.etree.Element("Test")
+ lxml.etree.SubElement(test2, "Path", name="/etc/baz.conf")
+ lxml.etree.SubElement(test2, "Package", name="quux")
+ lxml.etree.SubElement(lxml.etree.SubElement(test2,
+ "Group", name="bar"),
+ "Package", name="xyzzy")
+ pd.entries = {"/test1.xml": Mock(xdata=test1),
+ "/test2.xml": Mock(xdata=test2)}
pd.HandleEvent(Mock())
self.assertItemsEqual(pd.Entries,
dict(Path={"/etc/foo.conf": pd.BindEntry,
@@ -1242,32 +1219,17 @@ class TestPrioDir(TestPlugin, TestGenerator, TestXMLDirectoryBacked):
def test__matches(self):
pd = self.get_obj()
- self.assertTrue(pd._matches(lxml.etree.Element("Test",
- name="/etc/foo.conf"),
- Mock(),
- {"/etc/foo.conf": pd.BindEntry,
- "/etc/bar.conf": pd.BindEntry}))
- self.assertFalse(pd._matches(lxml.etree.Element("Test",
- name="/etc/baz.conf"),
- Mock(),
- {"/etc/foo.conf": pd.BindEntry,
- "/etc/bar.conf": pd.BindEntry}))
+ entry = lxml.etree.Element("Test", name="/etc/foo.conf")
+ self.assertTrue(pd._matches(entry, Mock(),
+ lxml.etree.Element("Test",
+ name="/etc/foo.conf")))
+ self.assertFalse(pd._matches(entry, Mock(),
+ lxml.etree.Element("Test",
+ name="/etc/baz.conf")))
def test_BindEntry(self):
pd = self.get_obj()
- pd.get_attrs = Mock(return_value=dict(test1="test1", test2="test2"))
- entry = lxml.etree.Element("Path", name="/etc/foo.conf", test1="bogus")
- metadata = Mock()
- pd.BindEntry(entry, metadata)
- pd.get_attrs.assert_called_with(entry, metadata)
- self.assertItemsEqual(entry.attrib,
- dict(name="/etc/foo.conf",
- test1="test1", test2="test2"))
-
- def test_get_attrs(self):
- pd = self.get_obj()
- entry = lxml.etree.Element("Path", name="/etc/foo.conf")
- children = [lxml.etree.Element("Child")]
+ children = [lxml.etree.Element("Child", name="child")]
metadata = Mock()
pd.entries = dict()
@@ -1275,58 +1237,59 @@ class TestPrioDir(TestPlugin, TestGenerator, TestXMLDirectoryBacked):
metadata.reset_mock()
for src in pd.entries.values():
src.reset_mock()
- src.cache = None
# test with no matches
- self.assertRaises(PluginExecutionError,
- pd.get_attrs, entry, metadata)
+ self.assertRaises(PluginExecutionError, pd.BindEntry, Mock(), metadata)
- def add_entry(name, data, prio=10):
+ def add_entry(name, data):
path = os.path.join(pd.data, name)
pd.entries[path] = Mock()
- pd.entries[path].priority = prio
- def do_Cache(metadata):
- pd.entries[path].cache = (metadata, data)
- pd.entries[path].Cache.side_effect = do_Cache
-
- add_entry('test1.xml',
- dict(Path={'/etc/foo.conf': dict(attr="attr1",
- __children__=children),
- '/etc/bar.conf': dict()}))
- add_entry('test2.xml',
- dict(Path={'/etc/bar.conf': dict(__text__="text",
- attr="attr1")},
- Package={'quux': dict(),
- 'xyzzy': dict()}),
- prio=20)
- add_entry('test3.xml',
- dict(Path={'/etc/baz.conf': dict()},
- Package={'xyzzy': dict()}),
- prio=20)
-
- # test with exactly one match, __children__
+ pd.entries[path].priority = data.get("priority")
+ pd.entries[path].XMLMatch.return_value = data
+
+ test1 = lxml.etree.Element("Rules", priority="10")
+ path1 = lxml.etree.SubElement(test1, "Path", name="/etc/foo.conf",
+ attr="attr1")
+ path1.extend(children)
+ lxml.etree.SubElement(test1, "Path", name="/etc/bar.conf")
+ add_entry('test1.xml', test1)
+
+ test2 = lxml.etree.Element("Rules", priority="20")
+ path2 = lxml.etree.SubElement(test2, "Path", name="/etc/bar.conf",
+ attr="attr1")
+ path2.text = "text"
+ lxml.etree.SubElement(test2, "Package", name="quux")
+ lxml.etree.SubElement(test2, "Package", name="xyzzy")
+ add_entry('test2.xml', test2)
+
+ test3 = lxml.etree.Element("Rules", priority="20")
+ lxml.etree.SubElement(test3, "Path", name="/etc/baz.conf")
+ lxml.etree.SubElement(test3, "Package", name="xyzzy")
+ add_entry('test3.xml', test3)
+
+ # test with exactly one match, children
reset()
- self.assertItemsEqual(pd.get_attrs(entry, metadata),
- dict(attr="attr1"))
+ entry = lxml.etree.Element("Path", name="/etc/foo.conf")
+ pd.BindEntry(entry, metadata)
+ self.assertXMLEqual(entry, path1)
+ self.assertIsNot(entry, path1)
for src in pd.entries.values():
- src.Cache.assert_called_with(metadata)
- self.assertEqual(len(entry.getchildren()), 1)
- self.assertXMLEqual(entry.getchildren()[0], children[0])
+ src.XMLMatch.assert_called_with(metadata)
- # test with multiple matches with different priorities, __text__
+ # test with multiple matches with different priorities, text
reset()
entry = lxml.etree.Element("Path", name="/etc/bar.conf")
- self.assertItemsEqual(pd.get_attrs(entry, metadata),
- dict(attr="attr1"))
+ pd.BindEntry(entry, metadata)
+ self.assertXMLEqual(entry, path2)
+ self.assertIsNot(entry, path2)
for src in pd.entries.values():
- src.Cache.assert_called_with(metadata)
- self.assertEqual(entry.text, "text")
+ src.XMLMatch.assert_called_with(metadata)
# test with multiple matches with identical priorities
reset()
entry = lxml.etree.Element("Package", name="xyzzy")
self.assertRaises(PluginExecutionError,
- pd.get_attrs, entry, metadata)
+ pd.BindEntry, entry, metadata)
class TestSpecificity(Bcfg2TestCase):
@@ -1390,16 +1353,20 @@ class TestSpecificity(Bcfg2TestCase):
self.assertGreaterEqual(specs[j], specs[i])
-class TestSpecificData(Bcfg2TestCase):
+class TestSpecificData(TestDebuggable):
test_obj = SpecificData
path = os.path.join(datastore, "test.txt")
- def get_obj(self, name=None, specific=None, encoding=None):
+ def setUp(self):
+ TestDebuggable.setUp(self)
+ set_setup_default("encoding", "utf-8")
+
+ def get_obj(self, name=None, specific=None):
if name is None:
name = self.path
if specific is None:
specific = Mock()
- return self.test_obj(name, specific, encoding)
+ return self.test_obj(name, specific)
def test__init(self):
pass
@@ -1411,10 +1378,10 @@ class TestSpecificData(Bcfg2TestCase):
sd = self.get_obj()
sd.handle_event(event)
self.assertFalse(mock_open.called)
- if hasattr(sd, 'data'):
- self.assertIsNone(sd.data)
- else:
+ try:
self.assertFalse(hasattr(sd, 'data'))
+ except AssertionError:
+ self.assertIsNone(sd.data)
event = Mock()
mock_open.return_value.read.return_value = "test"
@@ -1441,9 +1408,18 @@ class TestEntrySet(TestDebuggable):
ignore = ["foo~", ".#foo", ".foo.swp", ".foo.swx",
"test.txt.genshi_include", "test.G_foo.genshi_include"]
- def get_obj(self, basename="test", path=datastore, entry_type=MagicMock(),
- encoding=None):
- return self.test_obj(basename, path, entry_type, encoding)
+ def setUp(self):
+ TestDebuggable.setUp(self)
+ set_setup_default("default_owner")
+ set_setup_default("default_group")
+ set_setup_default("default_mode")
+ set_setup_default("default_secontext")
+ set_setup_default("default_important", False)
+ set_setup_default("default_paranoid", False)
+ set_setup_default("default_sensitive", False)
+
+ def get_obj(self, basename="test", entry_type=MagicMock()):
+ return self.test_obj(basename, path, entry_type)
def test__init(self):
for basename in self.basenames:
@@ -1573,25 +1549,25 @@ class TestEntrySet(TestDebuggable):
eset.reset_metadata.reset_mock()
eset.entry_init.reset_mock()
- for fname in ["info", "info.xml", ":info"]:
- for evt in ["exists", "created", "changed"]:
- reset()
- event = Mock()
- event.code2str.return_value = evt
- event.filename = fname
- eset.handle_event(event)
- eset.update_metadata.assert_called_with(event)
- self.assertFalse(eset.entry_init.called)
- self.assertFalse(eset.reset_metadata.called)
-
+ fname = "info.xml"
+ for evt in ["exists", "created", "changed"]:
reset()
event = Mock()
- event.code2str.return_value = "deleted"
+ event.code2str.return_value = evt
event.filename = fname
eset.handle_event(event)
- eset.reset_metadata.assert_called_with(event)
+ eset.update_metadata.assert_called_with(event)
self.assertFalse(eset.entry_init.called)
- self.assertFalse(eset.update_metadata.called)
+ self.assertFalse(eset.reset_metadata.called)
+
+ reset()
+ event = Mock()
+ event.code2str.return_value = "deleted"
+ event.filename = fname
+ eset.handle_event(event)
+ eset.reset_metadata.assert_called_with(event)
+ self.assertFalse(eset.entry_init.called)
+ self.assertFalse(eset.update_metadata.called)
for evt in ["exists", "created", "changed"]:
reset()
@@ -1638,8 +1614,9 @@ class TestEntrySet(TestDebuggable):
eset.entry_init(event)
eset.specificity_from_filename.assert_called_with("test.txt",
specific=None)
- eset.entry_type.assert_called_with(os.path.join(eset.path, "test.txt"),
- eset.specificity_from_filename.return_value, None)
+ eset.entry_type.assert_called_with(
+ os.path.join(eset.path, "test.txt"),
+ eset.specificity_from_filename.return_value)
eset.entry_type.return_value.handle_event.assert_called_with(event)
self.assertIn("test.txt", eset.entries)
@@ -1660,8 +1637,7 @@ class TestEntrySet(TestDebuggable):
eset.specificity_from_filename.assert_called_with("test2.txt",
specific=specific)
etype.assert_called_with(os.path.join(eset.path, "test2.txt"),
- eset.specificity_from_filename.return_value,
- None)
+ eset.specificity_from_filename.return_value)
etype.return_value.handle_event.assert_called_with(event)
self.assertIn("test2.txt", eset.entries)
@@ -1750,26 +1726,8 @@ class TestEntrySet(TestDebuggable):
self.assertFalse(mock_InfoXML.called)
eset.infoxml.HandleEvent.assert_called_with(event)
- for fname in [':info', 'info']:
- event = Mock()
- event.filename = fname
-
- idata = ["owner:owner",
- "group: GROUP",
- "mode: 775",
- "important: true",
- "bogus: line"]
- mock_open.return_value.readlines.return_value = idata
- eset.update_metadata(event)
- expected = DEFAULT_FILE_METADATA.copy()
- expected['owner'] = 'owner'
- expected['group'] = 'GROUP'
- expected['mode'] = '0775'
- expected['important'] = 'true'
- self.assertItemsEqual(eset.metadata,
- expected)
-
- def test_reset_metadata(self):
+ @patch("Bcfg2.Server.Plugin.helpers.default_path_metadata")
+ def test_reset_metadata(self, mock_default_path_metadata):
eset = self.get_obj()
# test info.xml
@@ -1779,29 +1737,22 @@ class TestEntrySet(TestDebuggable):
eset.reset_metadata(event)
self.assertIsNone(eset.infoxml)
- for fname in [':info', 'info']:
- event = Mock()
- event.filename = fname
- eset.metadata = Mock()
- eset.reset_metadata(event)
- self.assertItemsEqual(eset.metadata, DEFAULT_FILE_METADATA)
-
- @patch("Bcfg2.Server.Plugin.helpers.bind_info")
- def test_bind_info_to_entry(self, mock_bind_info):
- # There's a strange scoping issue in py3k that prevents this
- # test from working as expected on sub-classes of EntrySet.
- # No idea what's going on, but until I can figure it out we
- # skip this test on subclasses
- if inPy3k and self.test_obj != EntrySet:
- return skip("Skipping this test for py3k scoping issues")
-
+ def test_bind_info_to_entry(self):
eset = self.get_obj()
- entry = Mock()
+ eset.metadata = dict(owner="root", group="root")
+ entry = lxml.etree.Element("Path", name="/test")
metadata = Mock()
+ eset.infoxml = None
+ eset.bind_info_to_entry(entry, metadata)
+ self.assertItemsEqual(entry.attrib,
+ dict(name="/test", owner="root", group="root"))
+
+ entry = lxml.etree.Element("Path", name="/test")
+ eset.infoxml = Mock()
eset.bind_info_to_entry(entry, metadata)
- mock_bind_info.assert_called_with(entry, metadata,
- infoxml=eset.infoxml,
- default=eset.metadata)
+ self.assertItemsEqual(entry.attrib,
+ dict(name="/test", owner="root", group="root"))
+ eset.infoxml.BindEntry.assert_called_with(entry, metadata)
def test_bind_entry(self):
eset = self.get_obj()
@@ -1820,15 +1771,14 @@ class TestEntrySet(TestDebuggable):
class TestGroupSpool(TestPlugin, TestGenerator):
test_obj = GroupSpool
+ def setUp(self):
+ TestPlugin.setUp(self)
+ TestGenerator.setUp(self)
+ set_setup_default("encoding", "utf-8")
+
def get_obj(self, core=None):
if core is None:
core = MagicMock()
- core.setup = MagicMock()
- else:
- try:
- core.setup['encoding']
- except TypeError:
- core.setup.__getitem__ = MagicMock()
@patch("%s.%s.AddDirectoryMonitor" % (self.test_obj.__module__,
self.test_obj.__name__),
@@ -1843,7 +1793,7 @@ class TestGroupSpool(TestPlugin, TestGenerator):
@patch("%s.%s.AddDirectoryMonitor" % (self.test_obj.__module__,
self.test_obj.__name__))
def inner(mock_Add):
- gs = self.test_obj(MagicMock(), datastore)
+ gs = self.test_obj(MagicMock())
mock_Add.assert_called_with('')
self.assertItemsEqual(gs.Entries, {gs.entry_type: {}})
@@ -1899,8 +1849,7 @@ class TestGroupSpool(TestPlugin, TestGenerator):
self.assertFalse(gs.AddDirectoryMonitor.called)
gs.es_cls.assert_called_with(gs.filename_pattern,
gs.data + ident,
- gs.es_child_cls,
- gs.encoding)
+ gs.es_child_cls)
self.assertIn(ident, gs.entries)
self.assertEqual(gs.entries[ident], gs.es_cls.return_value)
self.assertIn(ident, gs.Entries[gs.entry_type])
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py
index 1f5c4790b..bbfb495c4 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testinterfaces.py
@@ -357,3 +357,20 @@ class TestVersion(TestPlugin):
class TestClientRunHooks(Bcfg2TestCase):
""" placeholder for future tests """
pass
+
+
+class TestClientACLs(Bcfg2TestCase):
+ test_obj = ClientACLs
+
+ def get_obj(self):
+ return self.test_obj()
+
+ def test_check_acl_ip(self):
+ ca = self.get_obj()
+ self.assertIn(ca.check_acl_ip(Mock(), Mock()),
+ [True, False, None])
+
+ def test_check_acl_metadata(self):
+ ca = self.get_obj()
+ self.assertIn(ca.check_acl_metadata(Mock(), Mock()),
+ [True, False])
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py
new file mode 100644
index 000000000..86a960701
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestACL.py
@@ -0,0 +1,223 @@
+import os
+import sys
+import lxml.etree
+import Bcfg2.Server.Plugin
+from mock import Mock, MagicMock, patch
+from Bcfg2.Server.Plugins.ACL import *
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import *
+from TestPlugin import TestXMLFileBacked, TestStructFile, TestPlugin, \
+ TestClientACLs
+
+
+class TestFunctions(Bcfg2TestCase):
+ def test_rmi_names_equal(self):
+ good_cases = [('*', 'foo'),
+ ('foo', 'foo'),
+ ('foo.*', 'foo.bar'),
+ ('*.*', 'foo.bar'),
+ ('foo.bar', 'foo.bar'),
+ ('*.bar', 'foo.bar'),
+ ('foo.*.bar', 'foo.baz.bar')]
+ bad_cases = [('foo', 'bar'),
+ ('*', 'foo.bar'),
+ ('*.*', 'foo'),
+ ('*.*', 'foo.bar.baz'),
+ ('foo.*', 'bar.foo'),
+ ('*.bar', 'bar.foo'),
+ ('foo.*', 'foobar')]
+ for first, second in good_cases:
+ self.assertTrue(rmi_names_equal(first, second),
+ "rmi_names_equal(%s, %s) unexpectedly False" %
+ (first, second))
+ self.assertTrue(rmi_names_equal(second, first),
+ "rmi_names_equal(%s, %s) unexpectedly False" %
+ (second, first))
+ for first, second in bad_cases:
+ self.assertFalse(rmi_names_equal(first, second),
+ "rmi_names_equal(%s, %s) unexpectedly True" %
+ (first, second))
+ self.assertFalse(rmi_names_equal(second, first),
+ "rmi_names_equal(%s, %s) unexpectedly True" %
+ (second, first))
+
+ def test_ip_matches(self):
+ good_cases = [
+ ("192.168.1.1", lxml.etree.Element("test", address="192.168.1.1")),
+ ("192.168.1.17", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="24")),
+ ("192.168.1.17", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="255.255.255.0")),
+ ("192.168.1.31", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="255.255.255.224")),
+ ("192.168.1.31", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="27")),
+ ("10.55.67.191", lxml.etree.Element("test", address="10.55.0.0",
+ netmask="16"))]
+ bad_cases = [
+ ("192.168.1.1", lxml.etree.Element("test", address="192.168.1.2")),
+ ("192.168.2.17", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="24")),
+ ("192.168.2.17", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="255.255.255.0")),
+ ("192.168.1.35", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="255.255.255.224")),
+ ("192.168.1.35", lxml.etree.Element("test", address="192.168.1.0",
+ netmask="27")),
+ ("10.56.67.191", lxml.etree.Element("test", address="10.55.0.0",
+ netmask="16"))]
+ for ip, entry in good_cases:
+ self.assertTrue(ip_matches(ip, entry),
+ "ip_matches(%s, %s) unexpectedly False" %
+ (ip, lxml.etree.tostring(entry)))
+ for ip, entry in bad_cases:
+ self.assertFalse(ip_matches(ip, entry),
+ "ip_matches(%s, %s) unexpectedly True" %
+ (ip, lxml.etree.tostring(entry)))
+
+
+class TestIPACLFile(TestXMLFileBacked):
+ test_obj = IPACLFile
+
+ @patch("Bcfg2.Server.Plugins.ACL.ip_matches")
+ @patch("Bcfg2.Server.Plugins.ACL.rmi_names_equal")
+ def test_check_acl(self, mock_rmi_names_equal, mock_ip_matches):
+ af = self.get_obj()
+ ip = "10.0.0.8"
+ rmi = "ACL.test"
+
+ def reset():
+ mock_rmi_names_equal.reset_mock()
+ mock_ip_matches.reset_mock()
+
+ # test default defer with no entries
+ af.entries = []
+ self.assertIsNone(af.check_acl(ip, rmi))
+
+ # test explicit allow, deny, and defer
+ entries = dict(Allow=lxml.etree.Element("Allow", method=rmi),
+ Deny=lxml.etree.Element("Deny", method=rmi),
+ Defer=lxml.etree.Element("Defer", method=rmi))
+ af.entries = list(entries.values())
+
+ def get_ip_matches(tag):
+ def ip_matches(ip, entry):
+ return entry.tag == tag
+
+ return ip_matches
+
+ mock_rmi_names_equal.return_value = True
+
+ reset()
+ mock_ip_matches.side_effect = get_ip_matches("Allow")
+ self.assertTrue(af.check_acl(ip, rmi))
+ mock_ip_matches.assert_called_with(ip, entries['Allow'])
+ mock_rmi_names_equal.assert_called_with(rmi, rmi)
+
+ reset()
+ mock_ip_matches.side_effect = get_ip_matches("Deny")
+ self.assertFalse(af.check_acl(ip, rmi))
+ mock_ip_matches.assert_called_with(ip, entries['Deny'])
+ mock_rmi_names_equal.assert_called_with(rmi, rmi)
+
+ reset()
+ mock_ip_matches.side_effect = get_ip_matches("Defer")
+ self.assertIsNone(af.check_acl(ip, rmi))
+ mock_ip_matches.assert_called_with(ip, entries['Defer'])
+ mock_rmi_names_equal.assert_called_with(rmi, rmi)
+
+ # test matching RMI names
+ reset()
+ mock_ip_matches.side_effect = lambda i, e: True
+ mock_rmi_names_equal.side_effect = lambda a, b: a == b
+ rmi = "ACL.test2"
+ matching = lxml.etree.Element("Allow", method=rmi)
+ af.entries.append(matching)
+ self.assertTrue(af.check_acl(ip, rmi))
+ mock_ip_matches.assert_called_with(ip, matching)
+ self.assertTrue(
+ call('ACL.test', rmi) in mock_rmi_names_equal.call_args_list or
+ call(rmi, 'ACL.test') in mock_rmi_names_equal.call_args_list)
+
+ # test implicit allow for localhost, defer for others
+ reset()
+ mock_ip_matches.side_effect = lambda i, e: False
+ self.assertIsNone(af.check_acl(ip, rmi))
+
+ reset()
+ self.assertTrue(af.check_acl("127.0.0.1", rmi))
+
+
+class TestMetadataACLFile(TestStructFile):
+ test_obj = MetadataACLFile
+
+ @patch("Bcfg2.Server.Plugins.ACL.rmi_names_equal")
+ def test_check_acl(self, mock_rmi_names_equal):
+ af = self.get_obj()
+ af.Match = Mock()
+ metadata = Mock()
+ mock_rmi_names_equal.side_effect = lambda a, b: a == b
+
+ def reset():
+ af.Match.reset_mock()
+ mock_rmi_names_equal.reset_mock()
+
+ # test default allow
+ af.entries = []
+ self.assertTrue(af.check_acl(metadata, 'ACL.test'))
+
+ # test explicit allow and deny
+ reset()
+ af.entries = [lxml.etree.Element("Allow", method='ACL.test'),
+ lxml.etree.Element("Deny", method='ACL.test2')]
+ af.Match.return_value = af.entries
+ self.assertTrue(af.check_acl(metadata, 'ACL.test'))
+ af.Match.assert_called_with(metadata)
+ self.assertIn(call('ACL.test', 'ACL.test'),
+ mock_rmi_names_equal.call_args_list)
+
+ reset()
+ self.assertFalse(af.check_acl(metadata, 'ACL.test2'))
+ af.Match.assert_called_with(metadata)
+ self.assertIn(call('ACL.test2', 'ACL.test2'),
+ mock_rmi_names_equal.call_args_list)
+
+ # test default deny for non-localhost
+ reset()
+ self.assertFalse(af.check_acl(metadata, 'ACL.test3'))
+ af.Match.assert_called_with(metadata)
+
+ # test default allow for localhost
+ reset()
+ metadata.hostname = 'localhost'
+ self.assertTrue(af.check_acl(metadata, 'ACL.test3'))
+ af.Match.assert_called_with(metadata)
+
+
+class TestACL(TestPlugin, TestClientACLs):
+ test_obj = ACL
+
+ def test_check_acl_ip(self):
+ acl = self.get_obj()
+ acl.ip_acls = Mock()
+ self.assertEqual(acl.check_acl_ip(("192.168.1.10", "12345"),
+ "ACL.test"),
+ acl.ip_acls.check_acl.return_value)
+ acl.ip_acls.check_acl.assert_called_with("192.168.1.10", "ACL.test")
+
+ def test_check_acl_metadata(self):
+ acl = self.get_obj()
+ acl.metadata_acls = Mock()
+ metadata = Mock()
+ self.assertEqual(acl.check_acl_metadata(metadata, "ACL.test"),
+ acl.metadata_acls.check_acl.return_value)
+ acl.metadata_acls.check_acl.assert_called_with(metadata, "ACL.test")
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py
new file mode 100644
index 000000000..cfb379c40
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestBundler.py
@@ -0,0 +1,111 @@
+import os
+import sys
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Server.Plugins.Bundler import *
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import *
+from TestPlugin import TestStructFile, TestPlugin, TestStructure, \
+ TestXMLDirectoryBacked
+
+
+class TestBundleFile(TestStructFile):
+ test_obj = BundleFile
+ path = os.path.join(datastore, "test", "test1.xml")
+
+ def test_bundle_name(self):
+ cases = [("foo.xml", "foo"),
+ ("foo.bar.xml", "foo.bar"),
+ ("foo-bar-baz.xml", "foo-bar-baz"),
+ ("foo....xml", "foo..."),
+ ("foo.genshi", "foo")]
+ bf = self.get_obj()
+ for fname, bname in cases:
+ bf.name = fname
+ self.assertEqual(bf.bundle_name, bname)
+
+
+class TestBundler(TestPlugin, TestStructure, TestXMLDirectoryBacked):
+ test_obj = Bundler
+
+ def get_obj(self, core=None):
+ @patch("%s.%s.add_directory_monitor" % (self.test_obj.__module__,
+ self.test_obj.__name__),
+ Mock())
+ def inner():
+ return TestPlugin.get_obj(self, core=core)
+ return inner()
+
+ @patch("Bcfg2.Server.Plugin.XMLDirectoryBacked.HandleEvent")
+ def test_HandleEvent(self, mock_HandleEvent):
+ b = self.get_obj()
+ b.bundles = dict(foo=Mock(), bar=Mock())
+ b.entries = {"foo.xml": BundleFile("foo.xml"),
+ "baz.xml": BundleFile("baz.xml")}
+ event = Mock()
+ b.HandleEvent(event)
+ mock_HandleEvent.assert_called_with(b, event)
+ self.assertItemsEqual(b.bundles,
+ dict(foo=b.entries['foo.xml'],
+ baz=b.entries['baz.xml']))
+
+ def test_BuildStructures(self):
+ b = self.get_obj()
+ b.bundles = dict(error=Mock(), skip=Mock(), xinclude=Mock(),
+ has_dep=Mock(), is_dep=Mock(), indep=Mock())
+ expected = dict()
+
+ b.bundles['error'].XMLMatch.side_effect = TemplateError(None)
+
+ xinclude = lxml.etree.Element("Bundle")
+ lxml.etree.SubElement(lxml.etree.SubElement(xinclude, "Bundle"),
+ "Path", name="/test")
+ b.bundles['xinclude'].XMLMatch.return_value = xinclude
+ expected['xinclude'] = lxml.etree.Element("Bundle", name="xinclude")
+ lxml.etree.SubElement(expected['xinclude'], "Path", name="/test")
+
+ has_dep = lxml.etree.Element("Bundle")
+ lxml.etree.SubElement(has_dep, "Bundle", name="is_dep")
+ lxml.etree.SubElement(has_dep, "Package", name="foo")
+ b.bundles['has_dep'].XMLMatch.return_value = has_dep
+ expected['has_dep'] = lxml.etree.Element("Bundle", name="has_dep")
+ lxml.etree.SubElement(expected['has_dep'], "Package", name="foo")
+
+ is_dep = lxml.etree.Element("Bundle")
+ lxml.etree.SubElement(is_dep, "Package", name="bar")
+ b.bundles['is_dep'].XMLMatch.return_value = is_dep
+ expected['is_dep'] = lxml.etree.Element("Bundle", name="is_dep")
+ lxml.etree.SubElement(expected['is_dep'], "Package", name="bar")
+
+ indep = lxml.etree.Element("Bundle", independent="true")
+ lxml.etree.SubElement(indep, "Service", name="baz")
+ b.bundles['indep'].XMLMatch.return_value = indep
+ expected['indep'] = lxml.etree.Element("Independent", name="indep")
+ lxml.etree.SubElement(expected['indep'], "Service", name="baz")
+
+ metadata = Mock()
+ metadata.bundles = ["error", "xinclude", "has_dep", "indep"]
+
+ rv = b.BuildStructures(metadata)
+ self.assertEqual(len(rv), 4)
+ for bundle in rv:
+ name = bundle.get("name")
+ self.assertIsNotNone(name,
+ "Bundle %s was not built" % name)
+ self.assertIn(name, expected,
+ "Unexpected bundle %s was built" % name)
+ self.assertXMLEqual(bundle, expected[name],
+ "Bundle %s was not built correctly" % name)
+ b.bundles[name].XMLMatch.assert_called_with(metadata)
+
+ b.bundles['error'].XMLMatch.assert_called_with(metadata)
+ self.assertFalse(b.bundles['skip'].XMLMatch.called)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py
index d655a20cd..f41ae8a46 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgAuthorizedKeysGenerator.py
@@ -23,12 +23,16 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile):
test_obj = CfgAuthorizedKeysGenerator
should_monitor = False
- def get_obj(self, name=None, core=None, fam=None):
+ def setUp(self):
+ TestCfgGenerator.setUp(self)
+ TestStructFile.setUp(self)
+
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.get_cfg")
+ def get_obj(self, mock_get_cfg, name=None, core=None, fam=None):
if name is None:
name = self.path
- Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.CFG = Mock()
if core is not None:
- Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.CFG.core = core
+ mock_get_cfg.return_value.core = core
return self.test_obj(name)
@patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event")
@@ -40,33 +44,9 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile):
mock_HandleEvent.assert_called_with(akg, evt)
mock_handle_event.assert_called_with(akg, evt)
- def test_category(self):
- akg = self.get_obj()
- cfp = Mock()
- cfp.has_section.return_value = False
- cfp.has_option.return_value = False
- Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.SETUP = Mock()
- Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.SETUP.cfp = cfp
-
- self.assertIsNone(akg.category)
- cfp.has_section.assert_called_with("sshkeys")
-
- cfp.reset_mock()
- cfp.has_section.return_value = True
- self.assertIsNone(akg.category)
- cfp.has_section.assert_called_with("sshkeys")
- cfp.has_option.assert_called_with("sshkeys", "category")
-
- cfp.reset_mock()
- cfp.has_option.return_value = True
- self.assertEqual(akg.category, cfp.get.return_value)
- cfp.has_section.assert_called_with("sshkeys")
- cfp.has_option.assert_called_with("sshkeys", "category")
- cfp.get.assert_called_with("sshkeys", "category")
-
@patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.ClientMetadata")
- @patch("Bcfg2.Server.Plugins.Cfg.CfgAuthorizedKeysGenerator.CfgAuthorizedKeysGenerator.category", "category")
def test_get_data(self, mock_ClientMetadata):
+ Bcfg2.Options.setup.sshkeys_category = "category"
akg = self.get_obj()
akg.XMLMatch = Mock()
@@ -131,17 +111,18 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile):
reset()
host = "baz.example.com"
spec = lxml.etree.Element("AuthorizedKeys")
- lxml.etree.SubElement(
- lxml.etree.SubElement(spec,
- "Allow",
- attrib={"from": pubkey, "host": host}),
- "Params", foo="foo", bar="bar=bar")
+ allow = lxml.etree.SubElement(spec, "Allow",
+ attrib={"from": pubkey, "host": host})
+ lxml.etree.SubElement(allow, "Option", name="foo", value="foo")
+ lxml.etree.SubElement(allow, "Option", name="bar")
+ lxml.etree.SubElement(allow, "Option", name="baz", value="baz=baz")
akg.XMLMatch.return_value = spec
params, actual_host, actual_pubkey = akg.get_data(entry,
metadata).split()
self.assertEqual(actual_host, host)
self.assertEqual(actual_pubkey, pubkey)
- self.assertItemsEqual(params.split(","), ["foo=foo", "bar=bar=bar"])
+ self.assertItemsEqual(params.split(","), ["foo=foo", "bar",
+ "baz=baz=baz"])
akg.XMLMatch.assert_called_with(metadata)
akg.core.build_metadata.assert_called_with(host)
self.assertEqual(akg.core.Bind.call_args[0][0].get("name"), pubkey)
@@ -151,10 +132,10 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile):
spec = lxml.etree.Element("AuthorizedKeys")
text = lxml.etree.SubElement(spec, "Allow")
text.text = "ssh-rsa publickey /foo/bar\n"
- lxml.etree.SubElement(text, "Params", foo="foo")
+ lxml.etree.SubElement(text, "Option", name="foo")
akg.XMLMatch.return_value = spec
self.assertEqual(akg.get_data(entry, metadata),
- "foo=foo %s" % text.text.strip())
+ "foo %s" % text.text.strip())
akg.XMLMatch.assert_called_with(metadata)
self.assertFalse(akg.core.build_metadata.called)
self.assertFalse(akg.core.Bind.called)
@@ -163,7 +144,7 @@ class TestCfgAuthorizedKeysGenerator(TestCfgGenerator, TestStructFile):
lxml.etree.SubElement(spec, "Allow", attrib={"from": pubkey})
akg.XMLMatch.return_value = spec
self.assertItemsEqual(akg.get_data(entry, metadata).splitlines(),
- ["foo=foo %s" % text.text.strip(),
+ ["foo %s" % text.text.strip(),
"profile %s" % pubkey])
akg.XMLMatch.assert_called_with(metadata)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py
index fc5d5e53d..93331304a 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py
@@ -17,32 +17,39 @@ from common import *
from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator
-if HAS_CHEETAH or can_skip:
- class TestCfgCheetahGenerator(TestCfgGenerator):
- test_obj = CfgCheetahGenerator
+class TestCfgCheetahGenerator(TestCfgGenerator):
+ test_obj = CfgCheetahGenerator
- @skipUnless(HAS_CHEETAH, "Cheetah libraries not found, skipping")
- def setUp(self):
- pass
+ @skipUnless(HAS_CHEETAH, "Cheetah libraries not found, skipping")
+ def setUp(self):
+ TestCfgGenerator.setUp(self)
+ set_setup_default("repository", datastore)
- @patch("Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.Template")
- def test_get_data(self, mock_Template):
- ccg = self.get_obj(encoding='UTF-8')
- ccg.data = "data"
- entry = lxml.etree.Element("Path", name="/test.txt")
- metadata = Mock()
- Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP = MagicMock()
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.Template")
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.get_template_data")
+ def test_get_data(self, mock_get_template_data, mock_Template):
+ ccg = self.get_obj()
+ ccg.data = "data"
+ entry = lxml.etree.Element("Path", name="/test.txt")
+ metadata = Mock()
- self.assertEqual(ccg.get_data(entry, metadata),
- mock_Template.return_value.respond.return_value)
- Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP.__getitem__.assert_called_with("repo")
- mock_Template.assert_called_with("data".decode(ccg.encoding),
- compilerSettings=ccg.settings)
- tmpl = mock_Template.return_value
- tmpl.respond.assert_called_with()
- self.assertEqual(tmpl.metadata, metadata)
- self.assertEqual(tmpl.name, entry.get("name"))
- self.assertEqual(tmpl.path, entry.get("name"))
- self.assertEqual(tmpl.source_path, ccg.name)
- self.assertEqual(tmpl.repo,
- Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.SETUP.__getitem__.return_value)
+ template_vars = dict(name=entry.get("name"),
+ metadata=metadata,
+ path=ccg.name,
+ source_path=ccg.name,
+ repo=datastore)
+ mock_get_template_data.return_value = template_vars
+
+ self.assertEqual(ccg.get_data(entry, metadata),
+ mock_Template.return_value.respond.return_value)
+ mock_Template.assert_called_with(
+ "data".decode(Bcfg2.Options.setup.encoding),
+ compilerSettings=ccg.settings)
+ tmpl = mock_Template.return_value
+ tmpl.respond.assert_called_with()
+ for key, val in template_vars.items():
+ self.assertEqual(getattr(tmpl, key), val)
+ self.assertItemsEqual(mock_get_template_data.call_args[0],
+ [entry, metadata, ccg.name])
+ self.assertIsInstance(mock_get_template_data.call_args[1]['default'],
+ DefaultCheetahDataProvider)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py
index 46062569d..4c987551b 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py
@@ -30,18 +30,17 @@ except ImportError:
HAS_CRYPTO = False
-if can_skip or (HAS_CRYPTO and HAS_CHEETAH):
- class TestCfgEncryptedCheetahGenerator(TestCfgCheetahGenerator,
- TestCfgEncryptedGenerator):
- test_obj = CfgEncryptedCheetahGenerator
+class TestCfgEncryptedCheetahGenerator(TestCfgCheetahGenerator,
+ TestCfgEncryptedGenerator):
+ test_obj = CfgEncryptedCheetahGenerator
- @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping")
- @skipUnless(HAS_CHEETAH, "Cheetah libraries not found, skipping")
- def setUp(self):
- pass
+ @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping")
+ @skipUnless(HAS_CHEETAH, "Cheetah libraries not found, skipping")
+ def setUp(self):
+ pass
- def test_handle_event(self):
- TestCfgEncryptedGenerator.test_handle_event(self)
+ def test_handle_event(self):
+ TestCfgEncryptedGenerator.test_handle_event(self)
- def test_get_data(self):
- TestCfgCheetahGenerator.test_get_data(self)
+ def test_get_data(self):
+ TestCfgCheetahGenerator.test_get_data(self)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py
index 2bfec0e2d..03b9fb0f4 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py
@@ -19,60 +19,53 @@ from common import *
from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator
-if can_skip or HAS_CRYPTO:
- class TestCfgEncryptedGenerator(TestCfgGenerator):
- test_obj = CfgEncryptedGenerator
+class TestCfgEncryptedGenerator(TestCfgGenerator):
+ test_obj = CfgEncryptedGenerator
- @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping")
- def setUp(self):
- pass
+ @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping")
+ def setUp(self):
+ TestCfgGenerator.setUp(self)
- @patchIf(HAS_CRYPTO,
- "Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.get_algorithm")
- @patchIf(HAS_CRYPTO,
- "Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.bruteforce_decrypt")
- def test_handle_event(self, mock_decrypt, mock_get_algorithm):
- @patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event")
- def inner(mock_handle_event):
- def reset():
- mock_decrypt.reset_mock()
- mock_get_algorithm.reset_mock()
- mock_handle_event.reset_mock()
+ @patchIf(HAS_CRYPTO,
+ "Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.bruteforce_decrypt")
+ def test_handle_event(self, mock_decrypt):
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event")
+ def inner(mock_handle_event):
+ def reset():
+ mock_decrypt.reset_mock()
+ mock_handle_event.reset_mock()
- def get_event_data(obj, event):
- obj.data = "encrypted"
+ def get_event_data(obj, event):
+ obj.data = "encrypted"
- mock_handle_event.side_effect = get_event_data
- mock_decrypt.side_effect = lambda d, **kw: "plaintext"
- event = Mock()
- ceg = self.get_obj()
- ceg.handle_event(event)
- mock_handle_event.assert_called_with(ceg, event)
- mock_decrypt.assert_called_with(
- "encrypted",
- setup=Bcfg2.Server.Plugins.Cfg.SETUP,
- algorithm=mock_get_algorithm.return_value)
- self.assertEqual(ceg.data, "plaintext")
+ mock_handle_event.side_effect = get_event_data
+ mock_decrypt.side_effect = lambda d, **kw: "plaintext"
+ event = Mock()
+ ceg = self.get_obj()
+ ceg.handle_event(event)
+ mock_handle_event.assert_called_with(ceg, event)
+ mock_decrypt.assert_called_with("encrypted")
+ self.assertEqual(ceg.data, "plaintext")
- reset()
- mock_decrypt.side_effect = EVPError
- self.assertRaises(PluginExecutionError,
- ceg.handle_event, event)
- inner()
+ reset()
+ mock_decrypt.side_effect = EVPError
+ self.assertRaises(PluginExecutionError,
+ ceg.handle_event, event)
+ inner()
- # to perform the tests from the parent test object, we
- # make bruteforce_decrypt just return whatever data was
- # given to it
- mock_decrypt.side_effect = lambda d, **kw: d
- TestCfgGenerator.test_handle_event(self)
+ # to perform the tests from the parent test object, we
+ # make bruteforce_decrypt just return whatever data was
+ # given to it
+ mock_decrypt.side_effect = lambda d, **kw: d
+ TestCfgGenerator.test_handle_event(self)
- def test_get_data(self):
- ceg = self.get_obj()
- ceg.data = None
- entry = lxml.etree.Element("Path", name="/test.txt")
- metadata = Mock()
+ def test_get_data(self):
+ ceg = self.get_obj()
+ ceg.data = None
+ entry = lxml.etree.Element("Path", name="/test.txt")
+ metadata = Mock()
- self.assertRaises(PluginExecutionError,
- ceg.get_data, entry, metadata)
+ self.assertRaises(PluginExecutionError,
+ ceg.get_data, entry, metadata)
- TestCfgGenerator.test_get_data(self)
+ TestCfgGenerator.test_get_data(self)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py
index b447a9bb8..0b74e4a60 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py
@@ -14,20 +14,13 @@ while path != "/":
path = os.path.dirname(path)
from common import *
-try:
- from TestServer.TestPlugins.TestCfg.TestCfgGenshiGenerator import \
- TestCfgGenshiGenerator
- HAS_GENSHI = True
-except ImportError:
- TestCfgGenshiGenerator = object
- HAS_GENSHI = False
+from TestServer.TestPlugins.TestCfg.TestCfgGenshiGenerator import \
+ TestCfgGenshiGenerator
-if can_skip or (HAS_CRYPTO and HAS_GENSHI):
- class TestCfgEncryptedGenshiGenerator(TestCfgGenshiGenerator):
- test_obj = CfgEncryptedGenshiGenerator
+class TestCfgEncryptedGenshiGenerator(TestCfgGenshiGenerator):
+ test_obj = CfgEncryptedGenshiGenerator
- @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping")
- @skipUnless(HAS_GENSHI, "Genshi libraries not found, skipping")
- def setUp(self):
- pass
+ @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping")
+ def setUp(self):
+ TestCfgGenshiGenerator.setUp(self)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py
index 0f369113b..7ceedb7c2 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py
@@ -21,35 +21,32 @@ from TestServer.TestPlugins.TestCfg.Test_init import TestCfgVerifier
class TestCfgExternalCommandVerifier(TestCfgVerifier):
test_obj = CfgExternalCommandVerifier
- @patch("Bcfg2.Server.Plugins.Cfg.CfgExternalCommandVerifier.Popen")
- def test_verify_entry(self, mock_Popen):
- proc = Mock()
- mock_Popen.return_value = proc
- proc.wait.return_value = 0
- proc.communicate.return_value = ("stdout", "stderr")
+ def test_verify_entry(self):
entry = lxml.etree.Element("Path", name="/test.txt")
metadata = Mock()
ecv = self.get_obj()
ecv.cmd = ["/bin/bash", "-x", "foo"]
+ ecv.exc = Mock()
+ ecv.exc.run.return_value = Mock()
+ ecv.exc.run.return_value.success = True
+
ecv.verify_entry(entry, metadata, "data")
- self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,))
- proc.communicate.assert_called_with(input="data")
- proc.wait.assert_called_with()
+ ecv.exc.run.assert_called_with(ecv.cmd, inputdata="data")
- mock_Popen.reset_mock()
- proc.wait.return_value = 13
+ ecv.exc.reset_mock()
+ ecv.exc.run.return_value.success = False
self.assertRaises(CfgVerificationError,
ecv.verify_entry, entry, metadata, "data")
- self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,))
- proc.communicate.assert_called_with(input="data")
- proc.wait.assert_called_with()
+ ecv.exc.run.assert_called_with(ecv.cmd, inputdata="data")
+
+ ecv.exc.reset_mock()
- mock_Popen.reset_mock()
- mock_Popen.side_effect = OSError
+ ecv.exc.reset_mock()
+ ecv.exc.run.side_effect = OSError
self.assertRaises(CfgVerificationError,
ecv.verify_entry, entry, metadata, "data")
- self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,))
+ ecv.exc.run.assert_called_with(ecv.cmd, inputdata="data")
@patch("os.access")
def test_handle_event(self, mock_access):
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py
index 2e8b7bfa5..b667d417a 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py
@@ -19,111 +19,120 @@ from common import *
from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator
-if can_skip or HAS_GENSHI:
- class TestCfgGenshiGenerator(TestCfgGenerator):
- test_obj = CfgGenshiGenerator
-
- @skipUnless(HAS_GENSHI, "Genshi libraries not found, skipping")
- def setUp(self):
- pass
-
- def test_removecomment(self):
- data = [(None, "test", 1),
- (None, "test2", 2)]
- stream = [(genshi.core.COMMENT, "test", 0),
- data[0],
- (genshi.core.COMMENT, "test3", 0),
- data[1]]
- self.assertItemsEqual(list(removecomment(stream)), data)
-
- def test__init(self):
- TestCfgGenerator.test__init(self)
- cgg = self.get_obj()
- self.assertIsInstance(cgg.loader, cgg.__loader_cls__)
-
- def test_get_data(self):
- cgg = self.get_obj()
- cgg._handle_genshi_exception = Mock()
- cgg.template = Mock()
- fltr = Mock()
- cgg.template.generate.return_value = fltr
- stream = Mock()
- fltr.filter.return_value = stream
- entry = lxml.etree.Element("Path", name="/test.txt")
- metadata = Mock()
-
- Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP = MagicMock()
-
- def reset():
- cgg.template.reset_mock()
- cgg._handle_genshi_exception.reset_mock()
- Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.reset_mock()
-
- template_vars = dict(
- name=entry.get("name"),
- metadata=metadata,
- path=cgg.name,
- source_path=cgg.name,
- repo=Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.return_value)
-
- self.assertEqual(cgg.get_data(entry, metadata),
- stream.render.return_value)
- cgg.template.generate.assert_called_with(**template_vars)
- Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo")
- fltr.filter.assert_called_with(removecomment)
- stream.render.assert_called_with("text", encoding=cgg.encoding,
- strip_whitespace=False)
-
- reset()
- def render(fmt, **kwargs):
- stream.render.side_effect = None
- raise TypeError
- stream.render.side_effect = render
- self.assertEqual(cgg.get_data(entry, metadata),
- stream.render.return_value)
- cgg.template.generate.assert_called_with(**template_vars)
- Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo")
- fltr.filter.assert_called_with(removecomment)
- self.assertEqual(stream.render.call_args_list,
- [call("text", encoding=cgg.encoding,
- strip_whitespace=False),
- call("text", encoding=cgg.encoding)])
-
- reset()
- stream.render.side_effect = UndefinedError("test")
- self.assertRaises(UndefinedError,
- cgg.get_data, entry, metadata)
- cgg.template.generate.assert_called_with(**template_vars)
- Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo")
- fltr.filter.assert_called_with(removecomment)
- stream.render.assert_called_with("text", encoding=cgg.encoding,
- strip_whitespace=False)
-
- reset()
- stream.render.side_effect = ValueError
- cgg._handle_genshi_exception.side_effect = ValueError
- self.assertRaises(ValueError,
- cgg.get_data, entry, metadata)
- cgg.template.generate.assert_called_with(**template_vars)
- Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.SETUP.__getitem__.assert_called_with("repo")
- fltr.filter.assert_called_with(removecomment)
- stream.render.assert_called_with("text", encoding=cgg.encoding,
- strip_whitespace=False)
- self.assertTrue(cgg._handle_genshi_exception.called)
-
- def test_handle_event(self):
- cgg = self.get_obj()
- cgg.loader = Mock()
- event = Mock()
- cgg.handle_event(event)
- cgg.loader.load.assert_called_with(cgg.name,
- cls=NewTextTemplate,
- encoding=cgg.encoding)
-
- cgg.loader.reset_mock()
- cgg.loader.load.side_effect = OSError
- self.assertRaises(PluginExecutionError,
- cgg.handle_event, event)
- cgg.loader.load.assert_called_with(cgg.name,
- cls=NewTextTemplate,
- encoding=cgg.encoding)
+class TestCfgGenshiGenerator(TestCfgGenerator):
+ test_obj = CfgGenshiGenerator
+
+ def setUp(self):
+ TestCfgGenerator.setUp(self)
+ set_setup_default("repository", datastore)
+
+ def test__init(self):
+ TestCfgGenerator.test__init(self)
+ cgg = self.get_obj()
+ self.assertIsInstance(cgg.loader, cgg.__loader_cls__)
+
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator.get_template_data")
+ def test_get_data(self, mock_get_template_data):
+ cgg = self.get_obj()
+ cgg._handle_genshi_exception = Mock()
+ cgg.template = Mock()
+ fltr = Mock()
+ cgg.template.generate.return_value = fltr
+ stream = Mock()
+ fltr.filter.return_value = stream
+ entry = lxml.etree.Element("Path", name="/test.txt")
+ metadata = Mock()
+
+ def reset():
+ cgg.template.reset_mock()
+ cgg._handle_genshi_exception.reset_mock()
+ mock_get_template_data.reset_mock()
+
+ template_vars = dict(name=entry.get("name"),
+ metadata=metadata,
+ path=cgg.name,
+ source_path=cgg.name,
+ repo=datastore)
+ mock_get_template_data.return_value = template_vars
+
+ self.assertEqual(cgg.get_data(entry, metadata),
+ stream.render.return_value)
+ cgg.template.generate.assert_called_with(**template_vars)
+ self.assertItemsEqual(mock_get_template_data.call_args[0],
+ [entry, metadata, cgg.name])
+ self.assertIsInstance(mock_get_template_data.call_args[1]['default'],
+ DefaultGenshiDataProvider)
+ fltr.filter.assert_called_with(removecomment)
+ stream.render.assert_called_with(
+ "text",
+ encoding=Bcfg2.Options.setup.encoding,
+ strip_whitespace=False)
+
+ reset()
+ def render(fmt, **kwargs):
+ stream.render.side_effect = None
+ raise TypeError
+ stream.render.side_effect = render
+ self.assertEqual(cgg.get_data(entry, metadata),
+ stream.render.return_value)
+ cgg.template.generate.assert_called_with(**template_vars)
+ self.assertItemsEqual(mock_get_template_data.call_args[0],
+ [entry, metadata, cgg.name])
+ self.assertIsInstance(mock_get_template_data.call_args[1]['default'],
+ DefaultGenshiDataProvider)
+ fltr.filter.assert_called_with(removecomment)
+ self.assertEqual(stream.render.call_args_list,
+ [call("text",
+ encoding=Bcfg2.Options.setup.encoding,
+ strip_whitespace=False),
+ call("text",
+ encoding=Bcfg2.Options.setup.encoding)])
+
+ reset()
+ stream.render.side_effect = UndefinedError("test")
+ self.assertRaises(UndefinedError,
+ cgg.get_data, entry, metadata)
+ cgg.template.generate.assert_called_with(**template_vars)
+ self.assertItemsEqual(mock_get_template_data.call_args[0],
+ [entry, metadata, cgg.name])
+ self.assertIsInstance(mock_get_template_data.call_args[1]['default'],
+ DefaultGenshiDataProvider)
+ fltr.filter.assert_called_with(removecomment)
+ stream.render.assert_called_with("text",
+ encoding=Bcfg2.Options.setup.encoding,
+ strip_whitespace=False)
+
+ reset()
+ stream.render.side_effect = ValueError
+ cgg._handle_genshi_exception.side_effect = ValueError
+ self.assertRaises(ValueError,
+ cgg.get_data, entry, metadata)
+ cgg.template.generate.assert_called_with(**template_vars)
+ self.assertItemsEqual(mock_get_template_data.call_args[0],
+ [entry, metadata, cgg.name])
+ self.assertIsInstance(mock_get_template_data.call_args[1]['default'],
+ DefaultGenshiDataProvider)
+ fltr.filter.assert_called_with(removecomment)
+ stream.render.assert_called_with("text",
+ encoding=Bcfg2.Options.setup.encoding,
+ strip_whitespace=False)
+ self.assertTrue(cgg._handle_genshi_exception.called)
+
+ def test_handle_event(self):
+ cgg = self.get_obj()
+ cgg.loader = Mock()
+ event = Mock()
+ cgg.handle_event(event)
+ cgg.loader.load.assert_called_with(
+ cgg.name,
+ cls=NewTextTemplate,
+ encoding=Bcfg2.Options.setup.encoding)
+
+ cgg.loader.reset_mock()
+ cgg.loader.load.side_effect = OSError
+ self.assertRaises(PluginExecutionError,
+ cgg.handle_event, event)
+ cgg.loader.load.assert_called_with(
+ cgg.name,
+ cls=NewTextTemplate,
+ encoding=Bcfg2.Options.setup.encoding)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py
index 839e9c3b8..349da2213 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py
@@ -21,53 +21,26 @@ from TestServer.TestPlugins.TestCfg.Test_init import TestCfgInfo
class TestCfgInfoXML(TestCfgInfo):
test_obj = CfgInfoXML
+ def setUp(self):
+ TestCfgInfo.setUp(self)
+ set_setup_default("filemonitor", MagicMock())
+
def test__init(self):
TestCfgInfo.test__init(self)
ci = self.get_obj()
self.assertIsInstance(ci.infoxml, InfoXML)
def test_bind_info_to_entry(self):
- entry = lxml.etree.Element("Path", name="/test.txt")
- metadata = Mock()
ci = self.get_obj()
ci.infoxml = Mock()
- ci._set_info = Mock()
-
- self.assertRaises(PluginExecutionError,
- ci.bind_info_to_entry, entry, metadata)
- ci.infoxml.pnode.Match.assert_called_with(metadata, dict(),
- entry=entry)
- self.assertFalse(ci._set_info.called)
-
- ci.infoxml.reset_mock()
- ci._set_info.reset_mock()
- mdata_value = Mock()
- def set_mdata(metadata, mdata, entry=None):
- mdata['Info'] = {None: mdata_value}
+ entry = Mock()
+ metadata = Mock()
- ci.infoxml.pnode.Match.side_effect = set_mdata
ci.bind_info_to_entry(entry, metadata)
- ci.infoxml.pnode.Match.assert_called_with(metadata,
- dict(Info={None: mdata_value}),
- entry=entry)
- ci._set_info.assert_called_with(entry, mdata_value)
+ ci.infoxml.BindEntry.assert_called_with(entry, metadata)
def test_handle_event(self):
ci = self.get_obj()
ci.infoxml = Mock()
ci.handle_event(Mock)
ci.infoxml.HandleEvent.assert_called_with()
-
- def test__set_info(self):
- @patch("Bcfg2.Server.Plugins.Cfg.CfgInfo._set_info")
- def inner(mock_set_info):
- ci = self.get_obj()
- entry = Mock()
- info = {"foo": "foo",
- "__children__": ["one", "two"]}
- ci._set_info(entry, info)
- self.assertItemsEqual(entry.append.call_args_list,
- [call(c) for c in info['__children__']])
-
- inner()
- TestCfgInfo.test__set_info(self)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py
index e139a592b..d64bbaabf 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPrivateKeyCreator.py
@@ -7,7 +7,7 @@ from Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator import *
from Bcfg2.Server.Plugin import PluginExecutionError
import Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator
try:
- from Bcfg2.Encryption import EVPError
+ from Bcfg2.Server.Encryption import EVPError
HAS_CRYPTO = True
except:
HAS_CRYPTO = False
@@ -22,100 +22,36 @@ while path != "/":
break
path = os.path.dirname(path)
from common import *
-from TestServer.TestPlugins.TestCfg.Test_init import TestCfgCreator
-from TestServer.TestPlugin.Testhelpers import TestStructFile
+from TestServer.TestPlugins.TestCfg.Test_init import TestXMLCfgCreator
-class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
+class TestCfgPrivateKeyCreator(TestXMLCfgCreator):
test_obj = CfgPrivateKeyCreator
should_monitor = False
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgPublicKeyCreator.get_cfg", Mock())
def get_obj(self, name=None, fam=None):
- Bcfg2.Server.Plugins.Cfg.CfgPublicKeyCreator.CFG = Mock()
- return TestCfgCreator.get_obj(self, name=name)
-
- @patch("Bcfg2.Server.Plugins.Cfg.CfgCreator.handle_event")
- @patch("Bcfg2.Server.Plugin.helpers.StructFile.HandleEvent")
- def test_handle_event(self, mock_HandleEvent, mock_handle_event):
- pkc = self.get_obj()
- evt = Mock()
- pkc.handle_event(evt)
- mock_HandleEvent.assert_called_with(pkc, evt)
- mock_handle_event.assert_called_with(pkc, evt)
-
- def test_category(self):
- pkc = self.get_obj()
- cfp = Mock()
- cfp.has_section.return_value = False
- cfp.has_option.return_value = False
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock()
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp
-
- self.assertIsNone(pkc.category)
- cfp.has_section.assert_called_with("sshkeys")
-
- cfp.reset_mock()
- cfp.has_section.return_value = True
- self.assertIsNone(pkc.category)
- cfp.has_section.assert_called_with("sshkeys")
- cfp.has_option.assert_called_with("sshkeys", "category")
-
- cfp.reset_mock()
- cfp.has_option.return_value = True
- self.assertEqual(pkc.category, cfp.get.return_value)
- cfp.has_section.assert_called_with("sshkeys")
- cfp.has_option.assert_called_with("sshkeys", "category")
- cfp.get.assert_called_with("sshkeys", "category")
-
- @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases")
- def test_passphrase(self, mock_get_passphrases):
- pkc = self.get_obj()
- cfp = Mock()
- cfp.has_section.return_value = False
- cfp.has_option.return_value = False
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock()
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp = cfp
-
- self.assertIsNone(pkc.passphrase)
- cfp.has_section.assert_called_with("sshkeys")
-
- cfp.reset_mock()
- cfp.has_section.return_value = True
- self.assertIsNone(pkc.passphrase)
- cfp.has_section.assert_called_with("sshkeys")
- cfp.has_option.assert_called_with("sshkeys", "passphrase")
-
- cfp.reset_mock()
- cfp.get.return_value = "test"
- mock_get_passphrases.return_value = dict(test="foo", test2="bar")
- cfp.has_option.return_value = True
- self.assertEqual(pkc.passphrase, "foo")
- cfp.has_section.assert_called_with("sshkeys")
- cfp.has_option.assert_called_with("sshkeys", "passphrase")
- cfp.get.assert_called_with("sshkeys", "passphrase")
- mock_get_passphrases.assert_called_with(Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
+ return TestXMLCfgCreator.get_obj(self, name=name)
@patch("shutil.rmtree")
@patch("tempfile.mkdtemp")
- @patch("subprocess.Popen")
- def test__gen_keypair(self, mock_Popen, mock_mkdtemp, mock_rmtree):
+ def test__gen_keypair(self, mock_mkdtemp, mock_rmtree):
pkc = self.get_obj()
+ pkc.cmd = Mock()
pkc.XMLMatch = Mock()
mock_mkdtemp.return_value = datastore
metadata = Mock()
- proc = Mock()
- proc.wait.return_value = 0
- proc.communicate.return_value = MagicMock()
- mock_Popen.return_value = proc
+ exc = Mock()
+ exc.success = True
+ pkc.cmd.run.return_value = exc
spec = lxml.etree.Element("PrivateKey")
pkc.XMLMatch.return_value = spec
def reset():
pkc.XMLMatch.reset_mock()
- mock_Popen.reset_mock()
+ pkc.cmd.reset_mock()
mock_mkdtemp.reset_mock()
mock_rmtree.reset_mock()
@@ -123,10 +59,9 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
os.path.join(datastore, "privkey"))
pkc.XMLMatch.assert_called_with(metadata)
mock_mkdtemp.assert_called_with()
- self.assertItemsEqual(mock_Popen.call_args[0][0],
- ["ssh-keygen", "-f",
- os.path.join(datastore, "privkey"),
- "-t", "rsa", "-N", ""])
+ pkc.cmd.run.assert_called_with(["ssh-keygen", "-f",
+ os.path.join(datastore, "privkey"),
+ "-t", "rsa", "-N", ""])
reset()
lxml.etree.SubElement(spec, "Params", bits="768", type="dsa")
@@ -137,73 +72,15 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
os.path.join(datastore, "privkey"))
pkc.XMLMatch.assert_called_with(metadata)
mock_mkdtemp.assert_called_with()
- self.assertItemsEqual(mock_Popen.call_args[0][0],
- ["ssh-keygen", "-f",
- os.path.join(datastore, "privkey"),
- "-t", "dsa", "-b", "768", "-N", "foo"])
+ pkc.cmd.run.assert_called_with(["ssh-keygen", "-f",
+ os.path.join(datastore, "privkey"),
+ "-t", "dsa", "-b", "768", "-N", "foo"])
reset()
- proc.wait.return_value = 1
+ pkc.cmd.run.return_value.success = False
self.assertRaises(CfgCreationError, pkc._gen_keypair, metadata)
mock_rmtree.assert_called_with(datastore)
- def test_get_specificity(self):
- pkc = self.get_obj()
- pkc.XMLMatch = Mock()
-
- metadata = Mock()
-
- def reset():
- pkc.XMLMatch.reset_mock()
- metadata.group_in_category.reset_mock()
-
- category = "Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.CfgPrivateKeyCreator.category"
- @patch(category, None)
- def inner():
- pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey")
- self.assertItemsEqual(pkc.get_specificity(metadata),
- dict(host=metadata.hostname))
- inner()
-
- @patch(category, "foo")
- def inner2():
- pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey")
- self.assertItemsEqual(pkc.get_specificity(metadata),
- dict(group=metadata.group_in_category.return_value,
- prio=50))
- metadata.group_in_category.assert_called_with("foo")
-
- reset()
- pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey",
- perhost="true")
- self.assertItemsEqual(pkc.get_specificity(metadata),
- dict(host=metadata.hostname))
-
- reset()
- pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey",
- category="bar")
- self.assertItemsEqual(pkc.get_specificity(metadata),
- dict(group=metadata.group_in_category.return_value,
- prio=50))
- metadata.group_in_category.assert_called_with("bar")
-
- reset()
- pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey",
- prio="10")
- self.assertItemsEqual(pkc.get_specificity(metadata),
- dict(group=metadata.group_in_category.return_value,
- prio=10))
- metadata.group_in_category.assert_called_with("foo")
-
- reset()
- pkc.XMLMatch.return_value = lxml.etree.Element("PrivateKey")
- metadata.group_in_category.return_value = ''
- self.assertItemsEqual(pkc.get_specificity(metadata),
- dict(host=metadata.hostname))
- metadata.group_in_category.assert_called_with("foo")
-
- inner2()
-
@patch("shutil.rmtree")
@patch("%s.open" % builtins)
def test_create_data(self, mock_open, mock_rmtree):
@@ -216,7 +93,7 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
# the get_specificity() return value is being used
# appropriately, we put some dummy data in it and test for
# that data
- pkc.get_specificity.side_effect = lambda m, s: dict(group="foo")
+ pkc.get_specificity.side_effect = lambda m: dict(group="foo")
pkc._gen_keypair = Mock()
privkey = os.path.join(datastore, "privkey")
pkc._gen_keypair.return_value = privkey
@@ -242,179 +119,15 @@ class TestCfgPrivateKeyCreator(TestCfgCreator, TestStructFile):
mock_open.return_value.read.side_effect = open_read_rv
reset()
- passphrase = "Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.CfgPrivateKeyCreator.passphrase"
-
- @patch(passphrase, None)
- def inner():
- self.assertEqual(pkc.create_data(entry, metadata), "privatekey")
- pkc.XMLMatch.assert_called_with(metadata)
- pkc.get_specificity.assert_called_with(metadata,
- pkc.XMLMatch.return_value)
- pkc._gen_keypair.assert_called_with(metadata,
- pkc.XMLMatch.return_value)
- self.assertItemsEqual(mock_open.call_args_list,
- [call(privkey + ".pub"), call(privkey)])
- pkc.pubkey_creator.get_filename.assert_called_with(group="foo")
- pkc.pubkey_creator.write_data.assert_called_with(
- "ssh-rsa publickey pubkey.filename\n", group="foo")
- pkc.write_data.assert_called_with("privatekey", group="foo")
- mock_rmtree.assert_called_with(datastore)
-
- inner()
-
- if HAS_CRYPTO:
- @patch(passphrase, "foo")
- @patch("Bcfg2.Encryption.ssl_encrypt")
- @patch("Bcfg2.Encryption.get_algorithm")
- def inner2(mock_get_algorithm, mock_ssl_encrypt):
- reset()
- mock_ssl_encrypt.return_value = "encryptedprivatekey"
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = True
- self.assertEqual(pkc.create_data(entry, metadata),
- "encryptedprivatekey")
- pkc.XMLMatch.assert_called_with(metadata)
- pkc.get_specificity.assert_called_with(
- metadata,
- pkc.XMLMatch.return_value)
- pkc._gen_keypair.assert_called_with(metadata,
- pkc.XMLMatch.return_value)
- self.assertItemsEqual(mock_open.call_args_list,
- [call(privkey + ".pub"), call(privkey)])
- pkc.pubkey_creator.get_filename.assert_called_with(group="foo")
- pkc.pubkey_creator.write_data.assert_called_with(
- "ssh-rsa publickey pubkey.filename\n", group="foo")
- pkc.write_data.assert_called_with("encryptedprivatekey",
- group="foo", ext=".crypt")
- mock_ssl_encrypt.assert_called_with(
- "privatekey", "foo",
- algorithm=mock_get_algorithm.return_value)
- mock_rmtree.assert_called_with(datastore)
-
- inner2()
-
- def test_Index(self):
- has_crypto = Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = False
- TestStructFile.test_Index(self)
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.HAS_CRYPTO = has_crypto
-
- @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
- def test_Index_crypto(self):
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = Mock()
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp.get.return_value = "strict"
-
- pkc = self.get_obj()
- pkc._decrypt = Mock()
- pkc._decrypt.return_value = 'plaintext'
- pkc.data = '''
-<PrivateKey>
- <Group name="test">
- <Passphrase encrypted="foo">crypted</Passphrase>
- </Group>
- <Group name="test" negate="true">
- <Passphrase>plain</Passphrase>
- </Group>
-</PrivateKey>'''
-
- # test successful decryption
- pkc.Index()
- self.assertItemsEqual(
- pkc._decrypt.call_args_list,
- [call(el)
- for el in pkc.xdata.xpath("//Passphrase[@encrypted]")])
- for el in pkc.xdata.xpath("//Crypted"):
- self.assertEqual(el.text, pkc._decrypt.return_value)
-
- # test failed decryption, strict
- pkc._decrypt.reset_mock()
- pkc._decrypt.side_effect = EVPError
- self.assertRaises(PluginExecutionError, pkc.Index)
-
- # test failed decryption, lax
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP.cfp.get.return_value = "lax"
- pkc._decrypt.reset_mock()
- pkc.Index()
- self.assertItemsEqual(
- pkc._decrypt.call_args_list,
- [call(el)
- for el in pkc.xdata.xpath("//Passphrase[@encrypted]")])
-
- @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_algorithm")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt")
- def test_decrypt(self, mock_bruteforce, mock_get_passphrases,
- mock_get_algorithm, mock_ssl):
- pkc = self.get_obj()
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP = MagicMock()
-
- def reset():
- mock_bruteforce.reset_mock()
- mock_get_algorithm.reset_mock()
- mock_get_passphrases.reset_mock()
- mock_ssl.reset_mock()
-
- # test element without text contents
- self.assertIsNone(pkc._decrypt(lxml.etree.Element("Test")))
- self.assertFalse(mock_bruteforce.called)
- self.assertFalse(mock_get_passphrases.called)
- self.assertFalse(mock_ssl.called)
-
- # test element with a passphrase in the config file
- reset()
- el = lxml.etree.Element("Test", encrypted="foo")
- el.text = "crypted"
- mock_get_passphrases.return_value = dict(foo="foopass",
- bar="barpass")
- mock_get_algorithm.return_value = "bf_cbc"
- mock_ssl.return_value = "decrypted with ssl"
- self.assertEqual(pkc._decrypt(el), mock_ssl.return_value)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_ssl.assert_called_with(el.text, "foopass",
- algorithm="bf_cbc")
- self.assertFalse(mock_bruteforce.called)
-
- # test failure to decrypt element with a passphrase in the config
- reset()
- mock_ssl.side_effect = EVPError
- self.assertRaises(EVPError, pkc._decrypt, el)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_ssl.assert_called_with(el.text, "foopass",
- algorithm="bf_cbc")
- self.assertFalse(mock_bruteforce.called)
-
- # test element without valid passphrase
- reset()
- el.set("encrypted", "true")
- mock_bruteforce.return_value = "decrypted with bruteforce"
- self.assertEqual(pkc._decrypt(el), mock_bruteforce.return_value)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_bruteforce.assert_called_with(el.text,
- passphrases=["foopass",
- "barpass"],
- algorithm="bf_cbc")
- self.assertFalse(mock_ssl.called)
-
- # test failure to decrypt element without valid passphrase
- reset()
- mock_bruteforce.side_effect = EVPError
- self.assertRaises(EVPError, pkc._decrypt, el)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Cfg.CfgPrivateKeyCreator.SETUP)
- mock_bruteforce.assert_called_with(el.text,
- passphrases=["foopass",
- "barpass"],
- algorithm="bf_cbc")
- self.assertFalse(mock_ssl.called)
+ self.assertEqual(pkc.create_data(entry, metadata), "privatekey")
+ pkc.XMLMatch.assert_called_with(metadata)
+ pkc.get_specificity.assert_called_with(metadata)
+ pkc._gen_keypair.assert_called_with(metadata,
+ pkc.XMLMatch.return_value)
+ self.assertItemsEqual(mock_open.call_args_list,
+ [call(privkey + ".pub"), call(privkey)])
+ pkc.pubkey_creator.get_filename.assert_called_with(group="foo")
+ pkc.pubkey_creator.write_data.assert_called_with(
+ "ssh-rsa publickey pubkey.filename\n", group="foo")
+ pkc.write_data.assert_called_with("privatekey", group="foo")
+ mock_rmtree.assert_called_with(datastore)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPublicKeyCreator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPublicKeyCreator.py
index ef4610fae..f512a6803 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPublicKeyCreator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPublicKeyCreator.py
@@ -25,8 +25,8 @@ class TestCfgPublicKeyCreator(TestCfgCreator, TestStructFile):
test_obj = CfgPublicKeyCreator
should_monitor = False
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgPublicKeyCreator.get_cfg", Mock())
def get_obj(self, name=None, fam=None):
- Bcfg2.Server.Plugins.Cfg.CfgPublicKeyCreator.CFG = Mock()
return TestCfgCreator.get_obj(self, name=name)
@patch("Bcfg2.Server.Plugins.Cfg.CfgCreator.handle_event")
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py
index fdfb3a9f7..1b55beded 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py
@@ -3,7 +3,7 @@ import sys
import errno
import lxml.etree
import Bcfg2.Options
-from Bcfg2.Compat import walk_packages
+from Bcfg2.Compat import walk_packages, ConfigParser
from mock import Mock, MagicMock, patch
from Bcfg2.Server.Plugins.Cfg import *
from Bcfg2.Server.Plugin import PluginExecutionError, Specificity
@@ -19,7 +19,7 @@ while path != "/":
path = os.path.dirname(path)
from common import *
from TestPlugin import TestSpecificData, TestEntrySet, TestGroupSpool, \
- TestPullTarget
+ TestPullTarget, TestStructFile
class TestCfgBaseFileMatcher(TestSpecificData):
@@ -152,28 +152,13 @@ class TestCfgInfo(TestCfgBaseFileMatcher):
@patch("Bcfg2.Server.Plugins.Cfg.CfgBaseFileMatcher.__init__")
def test__init(self, mock__init):
ci = self.get_obj("test.txt")
- mock__init.assert_called_with(ci, "test.txt", None, None)
+ mock__init.assert_called_with(ci, "test.txt", None)
def test_bind_info_to_entry(self):
ci = self.get_obj()
self.assertRaises(NotImplementedError,
ci.bind_info_to_entry, Mock(), Mock())
- def test__set_info(self):
- ci = self.get_obj()
- entry = Mock()
- entry.attrib = dict()
-
- info = {"foo": "foo",
- "_bar": "bar",
- "bar:baz=quux": "quux",
- "baz__": "baz",
- "__quux": "quux"}
- ci._set_info(entry, info)
- self.assertItemsEqual(entry.attrib,
- dict([(k, v) for k, v in info.items()
- if not k.startswith("__")]))
-
class TestCfgVerifier(TestCfgBaseFileMatcher):
test_obj = CfgVerifier
@@ -187,6 +172,12 @@ class TestCfgVerifier(TestCfgBaseFileMatcher):
class TestCfgCreator(TestCfgBaseFileMatcher):
test_obj = CfgCreator
path = "/foo/bar/test.txt"
+ should_monitor = False
+
+ def setUp(self):
+ TestCfgBaseFileMatcher.setUp(self)
+ set_setup_default("filemonitor", MagicMock())
+ set_setup_default("cfg_passphrase", None)
def get_obj(self, name=None):
if name is None:
@@ -256,62 +247,122 @@ class TestCfgCreator(TestCfgBaseFileMatcher):
self.assertRaises(CfgCreationError, cc.write_data, data)
+class TestXMLCfgCreator(TestCfgCreator, TestStructFile):
+ test_obj = XMLCfgCreator
+
+ def setUp(self):
+ TestCfgCreator.setUp(self)
+ TestStructFile.setUp(self)
+
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgCreator.handle_event")
+ @patch("Bcfg2.Server.Plugin.helpers.StructFile.HandleEvent")
+ def test_handle_event(self, mock_HandleEvent, mock_handle_event):
+ cc = self.get_obj()
+ evt = Mock()
+ cc.handle_event(evt)
+ mock_HandleEvent.assert_called_with(cc, evt)
+ mock_handle_event.assert_called_with(cc, evt)
+
+ def test_get_specificity(self):
+ cc = self.get_obj()
+ metadata = Mock()
+
+ def reset():
+ metadata.group_in_category.reset_mock()
+
+ category = "%s.%s.category" % (self.test_obj.__module__,
+ self.test_obj.__name__)
+ @patch(category, None)
+ def inner():
+ cc.xdata = lxml.etree.Element("PrivateKey")
+ self.assertItemsEqual(cc.get_specificity(metadata),
+ dict(host=metadata.hostname))
+ inner()
+
+ @patch(category, "foo")
+ def inner2():
+ cc.xdata = lxml.etree.Element("PrivateKey")
+ self.assertItemsEqual(cc.get_specificity(metadata),
+ dict(group=metadata.group_in_category.return_value,
+ prio=50))
+ metadata.group_in_category.assert_called_with("foo")
+
+ reset()
+ cc.xdata = lxml.etree.Element("PrivateKey", perhost="true")
+ self.assertItemsEqual(cc.get_specificity(metadata),
+ dict(host=metadata.hostname))
+
+ reset()
+ cc.xdata = lxml.etree.Element("PrivateKey", category="bar")
+ self.assertItemsEqual(cc.get_specificity(metadata),
+ dict(group=metadata.group_in_category.return_value,
+ prio=50))
+ metadata.group_in_category.assert_called_with("bar")
+
+ reset()
+ cc.xdata = lxml.etree.Element("PrivateKey", prio="10")
+ self.assertItemsEqual(cc.get_specificity(metadata),
+ dict(group=metadata.group_in_category.return_value,
+ prio=10))
+ metadata.group_in_category.assert_called_with("foo")
+
+ reset()
+ cc.xdata = lxml.etree.Element("PrivateKey")
+ metadata.group_in_category.return_value = ''
+ self.assertItemsEqual(cc.get_specificity(metadata),
+ dict(host=metadata.hostname))
+ metadata.group_in_category.assert_called_with("foo")
+
+ inner2()
+
+
class TestCfgDefaultInfo(TestCfgInfo):
test_obj = CfgDefaultInfo
- def get_obj(self, defaults=None):
- if defaults is None:
- defaults = dict()
- return self.test_obj(defaults)
+ def get_obj(self, *_):
+ return self.test_obj()
- @patch("Bcfg2.Server.Plugins.Cfg.CfgInfo.__init__")
- def test__init(self, mock__init):
- defaults = Mock()
- cdi = self.get_obj(defaults=defaults)
- mock__init.assert_called_with(cdi, '')
- self.assertEqual(defaults, cdi.defaults)
+ def test__init(self):
+ pass
def test_handle_event(self):
# this CfgInfo handler doesn't handle any events -- it's not
# file-driven, but based on the built-in defaults
pass
- def test_bind_info_to_entry(self):
+ @patch("Bcfg2.Server.Plugin.default_path_metadata")
+ def test_bind_info_to_entry(self, mock_default_path_metadata):
cdi = self.get_obj()
- cdi._set_info = Mock()
- entry = Mock()
+ entry = lxml.etree.Element("Test", name="test")
+ mock_default_path_metadata.return_value = \
+ dict(owner="root", mode="0600")
cdi.bind_info_to_entry(entry, Mock())
- cdi._set_info.assert_called_with(entry, cdi.defaults)
+ self.assertItemsEqual(entry.attrib,
+ dict(owner="root", mode="0600", name="test"))
class TestCfgEntrySet(TestEntrySet):
test_obj = CfgEntrySet
+ def setUp(self):
+ TestEntrySet.setUp(self)
+ set_setup_default("cfg_validation", False)
+ set_setup_default("cfg_handlers", [])
+
def test__init(self):
pass
- def test_handlers(self):
- # this is really really difficult to mock out, so we just get
- # a list of handlers and make sure that it roughly matches
- # what's on the filesystem
- expected = []
- for submodule in walk_packages(path=Bcfg2.Server.Plugins.Cfg.__path__,
- prefix="Bcfg2.Server.Plugins.Cfg."):
- expected.append(submodule[1].rsplit('.', 1)[-1])
- self.assertItemsEqual(expected, [h.__name__ for h in handlers()])
-
- @patch("Bcfg2.Server.Plugins.Cfg.handlers")
- def test_handle_event(self, mock_handlers):
+ def test_handle_event(self):
eset = self.get_obj()
eset.entry_init = Mock()
- mock_handlers.return_value = [Mock(), Mock(), Mock()]
- for hdlr in mock_handlers.return_value:
+ Bcfg2.Options.setup.cfg_handlers = [Mock(), Mock(), Mock()]
+ for hdlr in Bcfg2.Options.setup.cfg_handlers:
hdlr.__name__ = "handler"
eset.entries = dict()
def reset():
eset.entry_init.reset_mock()
- for hdlr in mock_handlers.return_value:
+ for hdlr in Bcfg2.Options.setup.cfg_handlers:
hdlr.reset_mock()
# test that a bogus deleted event is discarded
@@ -321,18 +372,19 @@ class TestCfgEntrySet(TestEntrySet):
eset.handle_event(evt)
self.assertFalse(eset.entry_init.called)
self.assertItemsEqual(eset.entries, dict())
- for hdlr in mock_handlers.return_value:
+ for hdlr in Bcfg2.Options.setup.cfg_handlers:
self.assertFalse(hdlr.handles.called)
self.assertFalse(hdlr.ignore.called)
# test creation of a new file
for action in ["exists", "created", "changed"]:
+ print("Testing handling of %s events" % action)
evt = Mock()
evt.code2str.return_value = action
evt.filename = os.path.join(datastore, "test.txt")
# test with no handler that handles
- for hdlr in mock_handlers.return_value:
+ for hdlr in Bcfg2.Options.setup.cfg_handlers:
hdlr.handles.return_value = False
hdlr.ignore.return_value = False
@@ -340,16 +392,16 @@ class TestCfgEntrySet(TestEntrySet):
eset.handle_event(evt)
self.assertFalse(eset.entry_init.called)
self.assertItemsEqual(eset.entries, dict())
- for hdlr in mock_handlers.return_value:
+ for hdlr in Bcfg2.Options.setup.cfg_handlers:
hdlr.handles.assert_called_with(evt, basename=eset.path)
hdlr.ignore.assert_called_with(evt, basename=eset.path)
# test with a handler that handles the entry
reset()
- mock_handlers.return_value[-1].handles.return_value = True
+ Bcfg2.Options.setup.cfg_handlers[-1].handles.return_value = True
eset.handle_event(evt)
- eset.entry_init.assert_called_with(evt, mock_handlers.return_value[-1])
- for hdlr in mock_handlers.return_value:
+ eset.entry_init.assert_called_with(evt, Bcfg2.Options.setup.cfg_handlers[-1])
+ for hdlr in Bcfg2.Options.setup.cfg_handlers:
hdlr.handles.assert_called_with(evt, basename=eset.path)
if not hdlr.return_value:
hdlr.ignore.assert_called_with(evt, basename=eset.path)
@@ -357,14 +409,14 @@ class TestCfgEntrySet(TestEntrySet):
# test with a handler that ignores the entry before one
# that handles it
reset()
- mock_handlers.return_value[0].ignore.return_value = True
+ Bcfg2.Options.setup.cfg_handlers[0].ignore.return_value = True
eset.handle_event(evt)
self.assertFalse(eset.entry_init.called)
- mock_handlers.return_value[0].handles.assert_called_with(evt,
- basename=eset.path)
- mock_handlers.return_value[0].ignore.assert_called_with(evt,
- basename=eset.path)
- for hdlr in mock_handlers.return_value[1:]:
+ Bcfg2.Options.setup.cfg_handlers[0].handles.assert_called_with(
+ evt, basename=eset.path)
+ Bcfg2.Options.setup.cfg_handlers[0].ignore.assert_called_with(
+ evt, basename=eset.path)
+ for hdlr in Bcfg2.Options.setup.cfg_handlers[1:]:
self.assertFalse(hdlr.handles.called)
self.assertFalse(hdlr.ignore.called)
@@ -376,7 +428,7 @@ class TestCfgEntrySet(TestEntrySet):
eset.entries[evt.filename] = Mock()
eset.handle_event(evt)
self.assertFalse(eset.entry_init.called)
- for hdlr in mock_handlers.return_value:
+ for hdlr in Bcfg2.Options.setup.cfg_handlers:
self.assertFalse(hdlr.handles.called)
self.assertFalse(hdlr.ignore.called)
eset.entries[evt.filename].handle_event.assert_called_with(evt)
@@ -386,7 +438,7 @@ class TestCfgEntrySet(TestEntrySet):
evt.code2str.return_value = "deleted"
eset.handle_event(evt)
self.assertFalse(eset.entry_init.called)
- for hdlr in mock_handlers.return_value:
+ for hdlr in Bcfg2.Options.setup.cfg_handlers:
self.assertFalse(hdlr.handles.called)
self.assertFalse(hdlr.ignore.called)
self.assertItemsEqual(eset.entries, dict())
@@ -438,15 +490,15 @@ class TestCfgEntrySet(TestEntrySet):
@patch("Bcfg2.Server.Plugins.Cfg.u_str")
@patch("Bcfg2.Server.Plugins.Cfg.b64encode")
def test_bind_entry(self, mock_b64encode, mock_u_str):
- Bcfg2.Server.Plugins.Cfg.SETUP = dict(validate=False)
-
mock_u_str.side_effect = lambda x: x
+ Bcfg2.Options.setup.cfg_validation = False
eset = self.get_obj()
eset.bind_info_to_entry = Mock()
eset._generate_data = Mock()
eset.get_handlers = Mock()
eset._validate_data = Mock()
+ eset.setup = dict(validate=False)
def reset():
mock_b64encode.reset_mock()
@@ -524,7 +576,7 @@ class TestCfgEntrySet(TestEntrySet):
# test successful validation
entry = reset()
- Bcfg2.Server.Plugins.Cfg.SETUP['validate'] = True
+ Bcfg2.Options.setup.cfg_validation = True
bound = eset.bind_entry(entry, metadata)
eset.bind_info_to_entry.assert_called_with(entry, metadata)
eset._generate_data.assert_called_with(entry, metadata)
@@ -546,16 +598,16 @@ class TestCfgEntrySet(TestEntrySet):
def test_get_handlers(self):
eset = self.get_obj()
eset.entries['test1.txt'] = CfgInfo("test1.txt")
- eset.entries['test2.txt'] = CfgGenerator("test2.txt", Mock(), None)
+ eset.entries['test2.txt'] = CfgGenerator("test2.txt", Mock())
eset.entries['test2.txt'].specific.matches.return_value = True
eset.entries['test3.txt'] = CfgInfo("test3.txt")
- eset.entries['test4.txt'] = CfgGenerator("test4.txt", Mock(), None)
+ eset.entries['test4.txt'] = CfgGenerator("test4.txt", Mock())
eset.entries['test4.txt'].specific.matches.return_value = False
- eset.entries['test5.txt'] = CfgGenerator("test5.txt", Mock(), None)
+ eset.entries['test5.txt'] = CfgGenerator("test5.txt", Mock())
eset.entries['test5.txt'].specific.matches.return_value = True
- eset.entries['test6.txt'] = CfgVerifier("test6.txt", Mock(), None)
+ eset.entries['test6.txt'] = CfgVerifier("test6.txt", Mock())
eset.entries['test6.txt'].specific.matches.return_value = True
- eset.entries['test7.txt'] = CfgFilter("test7.txt", Mock(), None)
+ eset.entries['test7.txt'] = CfgFilter("test7.txt", Mock())
eset.entries['test7.txt'].specific.matches.return_value = False
def reset():
@@ -603,24 +655,24 @@ class TestCfgEntrySet(TestEntrySet):
if hasattr(entry.specific.matches, "called"):
self.assertFalse(entry.specific.matches.called)
- def test_bind_info_to_entry(self):
- default_info = Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgDefaultInfo")
+ def test_bind_info_to_entry(self, mock_DefaultInfo):
eset = self.get_obj()
eset.get_handlers = Mock()
eset.get_handlers.return_value = []
- Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO = Mock()
metadata = Mock()
def reset():
eset.get_handlers.reset_mock()
- Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.reset_mock()
+ mock_DefaultInfo.reset_mock()
return lxml.etree.Element("Path", name="/test.txt")
# test with no info handlers
entry = reset()
eset.bind_info_to_entry(entry, metadata)
eset.get_handlers.assert_called_with(metadata, CfgInfo)
- Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata)
+ mock_DefaultInfo.return_value.bind_info_to_entry.assert_called_with(
+ entry, metadata)
self.assertEqual(entry.get("type"), "file")
# test with one info handler
@@ -629,7 +681,8 @@ class TestCfgEntrySet(TestEntrySet):
eset.get_handlers.return_value = [handler]
eset.bind_info_to_entry(entry, metadata)
eset.get_handlers.assert_called_with(metadata, CfgInfo)
- Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata)
+ mock_DefaultInfo.return_value.bind_info_to_entry.assert_called_with(
+ entry, metadata)
handler.bind_info_to_entry.assert_called_with(entry, metadata)
self.assertEqual(entry.get("type"), "file")
@@ -639,7 +692,8 @@ class TestCfgEntrySet(TestEntrySet):
eset.get_handlers.return_value = handlers
eset.bind_info_to_entry(entry, metadata)
eset.get_handlers.assert_called_with(metadata, CfgInfo)
- Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata)
+ mock_DefaultInfo.return_value.bind_info_to_entry.assert_called_with(
+ entry, metadata)
# we don't care which handler gets called as long as exactly
# one of them does
called = 0
@@ -650,8 +704,6 @@ class TestCfgEntrySet(TestEntrySet):
self.assertEqual(called, 1)
self.assertEqual(entry.get("type"), "file")
- Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO = default_info
-
def test_create_data(self):
eset = self.get_obj()
eset.best_matching = Mock()
@@ -753,35 +805,16 @@ class TestCfgEntrySet(TestEntrySet):
class TestCfg(TestGroupSpool, TestPullTarget):
test_obj = Cfg
+ def setUp(self):
+ TestGroupSpool.setUp(self)
+ TestPullTarget.setUp(self)
+ set_setup_default("cfg_handlers", [])
+
def get_obj(self, core=None):
if core is None:
core = Mock()
- core.setup = MagicMock()
return TestGroupSpool.get_obj(self, core=core)
- @patch("Bcfg2.Server.Plugin.GroupSpool.__init__")
- @patch("Bcfg2.Server.Plugin.PullTarget.__init__")
- def test__init(self, mock_pulltarget_init, mock_groupspool_init):
- core = Mock()
- core.setup = MagicMock()
- cfg = self.test_obj(core, datastore)
- mock_pulltarget_init.assert_called_with(cfg)
- mock_groupspool_init.assert_called_with(cfg, core, datastore)
- core.setup.add_option.assert_called_with("validate",
- Bcfg2.Options.CFG_VALIDATION)
- core.setup.reparse.assert_called_with()
-
- core.reset_mock()
- core.setup.reset_mock()
- mock_pulltarget_init.reset_mock()
- mock_groupspool_init.reset_mock()
- core.setup.__contains__.return_value = True
- cfg = self.test_obj(core, datastore)
- mock_pulltarget_init.assert_called_with(cfg)
- mock_groupspool_init.assert_called_with(cfg, core, datastore)
- self.assertFalse(core.setup.add_option.called)
- self.assertFalse(core.setup.reparse.called)
-
def test_has_generator(self):
cfg = self.get_obj()
cfg.entries = dict()
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py
new file mode 100644
index 000000000..537ceb4ff
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDecisions.py
@@ -0,0 +1,60 @@
+import os
+import sys
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Server.Plugins.Decisions import *
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import *
+from TestPlugin import TestStructFile, TestPlugin, TestDecision
+
+
+class TestDecisionFile(TestStructFile):
+ test_obj = DecisionFile
+
+ def test_get_decisions(self):
+ df = self.get_obj()
+ metadata = Mock()
+
+ df.xdata = None
+ self.assertItemsEqual(df.get_decisions(metadata), [])
+
+ df.xdata = lxml.etree.Element("Decisions")
+ df.XMLMatch = Mock()
+ df.XMLMatch.return_value = lxml.etree.Element("Decisions")
+ lxml.etree.SubElement(df.XMLMatch.return_value,
+ "Decision", type="Service", name='*')
+ lxml.etree.SubElement(df.XMLMatch.return_value,
+ "Decision", type="Path",
+ name='/etc/apt/apt.conf')
+
+ self.assertItemsEqual(df.get_decisions(metadata),
+ [("Service", '*'),
+ ("Path", '/etc/apt/apt.conf')])
+ df.XMLMatch.assert_called_with(metadata)
+
+
+class TestDecisions(TestPlugin, TestDecision):
+ test_obj = Decisions
+
+ def test_GetDecisions(self):
+ d = self.get_obj()
+ d.whitelist = Mock()
+ d.blacklist = Mock()
+ metadata = Mock()
+
+ self.assertEqual(d.GetDecisions(metadata, "whitelist"),
+ d.whitelist.get_decision.return_value)
+ d.whitelist.get_decision.assert_called_with(metadata)
+
+ self.assertEqual(d.GetDecisions(metadata, "blacklist"),
+ d.blacklist.get_decision.return_value)
+ d.blacklist.get_decision.assert_called_with(metadata)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDefaults.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDefaults.py
index 7be3d8e84..9b4a6af88 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDefaults.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestDefaults.py
@@ -1,5 +1,6 @@
import os
import sys
+import copy
import lxml.etree
from mock import Mock, MagicMock, patch
from Bcfg2.Server.Plugins.Defaults import *
@@ -62,3 +63,31 @@ class TestDefaults(TestRules, TestGoalValidator):
def test__regex_enabled(self):
r = self.get_obj()
self.assertTrue(r._regex_enabled)
+
+ def _do_test(self, name, groups=None):
+ if groups is None:
+ groups = []
+ d = self.get_obj()
+ metadata = Mock(groups=groups)
+ config = lxml.etree.Element("Configuration")
+ struct = lxml.etree.SubElement(config, "Bundle", name=name)
+ entry = copy.deepcopy(self.abstract[name])
+ struct.append(entry)
+ d.validate_goals(metadata, config)
+ self.assertXMLEqual(entry, self.concrete[name])
+
+ def _do_test_failure(self, name, groups=None, handles=None):
+ if groups is None:
+ groups = []
+ d = self.get_obj()
+ metadata = Mock(groups=groups)
+ config = lxml.etree.Element("Configuration")
+ struct = lxml.etree.SubElement(config, "Bundle", name=name)
+ orig = copy.deepcopy(self.abstract[name])
+ entry = copy.deepcopy(self.abstract[name])
+ struct.append(entry)
+ d.validate_goals(metadata, config)
+ self.assertXMLEqual(entry, orig)
+
+ def test_regex(self):
+ self._do_test('regex')
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py
index a07fffba1..d3fa15236 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestMetadata.py
@@ -6,7 +6,6 @@ import socket
import lxml.etree
import Bcfg2.Server
import Bcfg2.Server.Plugin
-from Bcfg2.Server.Plugins.Metadata import *
from mock import Mock, MagicMock, patch
# add all parent testsuite directories to sys.path to allow (most)
@@ -19,9 +18,13 @@ while path != "/":
break
path = os.path.dirname(path)
from common import *
+from Bcfg2.Server.Plugins.Metadata import load_django_models
from TestPlugin import TestXMLFileBacked, TestMetadata as _TestMetadata, \
TestClientRunHooks, TestDatabaseBacked
+load_django_models()
+from Bcfg2.Server.Plugins.Metadata import *
+
def get_clients_test_tree():
return lxml.etree.XML('''
@@ -88,18 +91,18 @@ def get_groups_test_tree():
</Groups>''').getroottree()
-def get_metadata_object(core=None, watch_clients=False, use_db=False):
+def get_metadata_object(core=None):
if core is None:
core = Mock()
- core.setup = MagicMock()
core.metadata_cache = MagicMock()
- core.setup.cfp.getboolean = Mock(return_value=use_db)
+ set_setup_default("password")
@patchIf(not isinstance(os.makedirs, Mock), "os.makedirs", Mock())
@patchIf(not isinstance(lxml.etree.Element, Mock),
"lxml.etree.Element", Mock())
+
def inner():
- return Metadata(core, datastore, watch_clients=watch_clients)
+ return Metadata(core)
return inner()
@@ -108,117 +111,117 @@ class TestMetadataDB(DBModelTestCase):
models = [MetadataClientModel]
-if HAS_DJANGO or can_skip:
- class TestClientVersions(TestDatabaseBacked):
- test_clients = dict(client1="1.2.0",
- client2="1.2.2",
- client3="1.3.0pre1",
- client4="1.1.0",
- client5=None,
- client6=None)
-
- @skipUnless(HAS_DJANGO, "Django not found")
- def setUp(self):
- self.test_obj = ClientVersions
- syncdb(TestMetadataDB)
- for client, version in self.test_clients.items():
- MetadataClientModel(hostname=client, version=version).save()
-
- def test__contains(self):
- v = self.get_obj()
- self.assertIn("client1", v)
- self.assertIn("client5", v)
- self.assertNotIn("client__contains", v)
-
- def test_keys(self):
- v = self.get_obj()
- self.assertItemsEqual(self.test_clients.keys(), v.keys())
-
- def test__setitem(self):
- v = self.get_obj()
-
- # test setting version of existing client
- v["client1"] = "1.2.3"
- self.assertIn("client1", v)
- self.assertEqual(v['client1'], "1.2.3")
- client = MetadataClientModel.objects.get(hostname="client1")
- self.assertEqual(client.version, "1.2.3")
-
- # test adding new client
- new = "client__setitem"
- v[new] = "1.3.0"
- self.assertIn(new, v)
- self.assertEqual(v[new], "1.3.0")
- client = MetadataClientModel.objects.get(hostname=new)
- self.assertEqual(client.version, "1.3.0")
-
- # test adding new client with no version
- new2 = "client__setitem_2"
- v[new2] = None
- self.assertIn(new2, v)
- self.assertEqual(v[new2], None)
- client = MetadataClientModel.objects.get(hostname=new2)
- self.assertEqual(client.version, None)
-
- def test__getitem(self):
- v = self.get_obj()
-
- # test getting existing client
- self.assertEqual(v['client2'], "1.2.2")
- self.assertIsNone(v['client5'])
-
- # test exception on nonexistent client
- expected = KeyError
- try:
- v['clients__getitem']
- except expected:
- pass
- except:
- err = sys.exc_info()[1]
- self.assertFalse(True, "%s raised instead of %s" %
- (err.__class__.__name__,
- expected.__class__.__name__))
- else:
- self.assertFalse(True,
- "%s not raised" % expected.__class__.__name__)
+class TestClientVersions(TestDatabaseBacked):
+ test_clients = dict(client1="1.2.0",
+ client2="1.2.2",
+ client3="1.3.0pre1",
+ client4="1.1.0",
+ client5=None,
+ client6=None)
+
+ @skipUnless(HAS_DJANGO, "Django not found")
+ def setUp(self):
+ TestDatabaseBacked.setUp(self)
+ self.test_obj = ClientVersions
+ syncdb(TestMetadataDB)
+ for client, version in self.test_clients.items():
+ MetadataClientModel(hostname=client, version=version).save()
+
+ def test__contains(self):
+ v = self.get_obj()
+ self.assertIn("client1", v)
+ self.assertIn("client5", v)
+ self.assertNotIn("client__contains", v)
+
+ def test_keys(self):
+ v = self.get_obj()
+ self.assertItemsEqual(self.test_clients.keys(), v.keys())
+
+ def test__setitem(self):
+ v = self.get_obj()
+
+ # test setting version of existing client
+ v["client1"] = "1.2.3"
+ self.assertIn("client1", v)
+ self.assertEqual(v['client1'], "1.2.3")
+ client = MetadataClientModel.objects.get(hostname="client1")
+ self.assertEqual(client.version, "1.2.3")
+
+ # test adding new client
+ new = "client__setitem"
+ v[new] = "1.3.0"
+ self.assertIn(new, v)
+ self.assertEqual(v[new], "1.3.0")
+ client = MetadataClientModel.objects.get(hostname=new)
+ self.assertEqual(client.version, "1.3.0")
+
+ # test adding new client with no version
+ new2 = "client__setitem_2"
+ v[new2] = None
+ self.assertIn(new2, v)
+ self.assertEqual(v[new2], None)
+ client = MetadataClientModel.objects.get(hostname=new2)
+ self.assertEqual(client.version, None)
+
+ def test__getitem(self):
+ v = self.get_obj()
+
+ # test getting existing client
+ self.assertEqual(v['client2'], "1.2.2")
+ self.assertIsNone(v['client5'])
+
+ # test exception on nonexistent client
+ expected = KeyError
+ try:
+ v['clients__getitem']
+ except expected:
+ pass
+ except:
+ err = sys.exc_info()[1]
+ self.assertFalse(True, "%s raised instead of %s" %
+ (err.__class__.__name__,
+ expected.__class__.__name__))
+ else:
+ self.assertFalse(True,
+ "%s not raised" % expected.__class__.__name__)
- def test__len(self):
- v = self.get_obj()
- self.assertEqual(len(v), MetadataClientModel.objects.count())
+ def test__len(self):
+ v = self.get_obj()
+ self.assertEqual(len(v), MetadataClientModel.objects.count())
- def test__iter(self):
- v = self.get_obj()
- self.assertItemsEqual([h for h in iter(v)], v.keys())
+ def test__iter(self):
+ v = self.get_obj()
+ self.assertItemsEqual([h for h in iter(v)], v.keys())
- def test__delitem(self):
- v = self.get_obj()
+ def test__delitem(self):
+ v = self.get_obj()
- # test adding new client
- new = "client__delitem"
- v[new] = "1.3.0"
+ # test adding new client
+ new = "client__delitem"
+ v[new] = "1.3.0"
- del v[new]
- self.assertIn(new, v)
- self.assertIsNone(v[new])
+ del v[new]
+ self.assertIn(new, v)
+ self.assertIsNone(v[new])
class TestXMLMetadataConfig(TestXMLFileBacked):
test_obj = XMLMetadataConfig
path = os.path.join(datastore, 'Metadata', 'clients.xml')
- def get_obj(self, basefile="clients.xml", core=None, watch_clients=False):
- self.metadata = get_metadata_object(core=core,
- watch_clients=watch_clients)
+ def get_obj(self, basefile="clients.xml", core=None):
+ self.metadata = get_metadata_object(core=core)
@patchIf(not isinstance(lxml.etree.Element, Mock),
"lxml.etree.Element", Mock())
def inner():
- return XMLMetadataConfig(self.metadata, watch_clients, basefile)
+ return XMLMetadataConfig(self.metadata, basefile)
return inner()
+ @patch("Bcfg2.Server.FileMonitor.get_fam", Mock())
def test__init(self):
xmc = self.get_obj()
- self.assertEqual(self.metadata.core.fam, xmc.fam)
- self.assertFalse(xmc.fam.AddMonitor.called)
+ self.assertNotIn(call(xmc.basefile),
+ xmc.fam.AddMonitor.call_args_list)
def test_xdata(self):
config = self.get_obj()
@@ -262,20 +265,15 @@ class TestXMLMetadataConfig(TestXMLFileBacked):
self.assertEqual(config.base_xdata, "<test/>")
def test_add_monitor(self):
- core = MagicMock()
- config = self.get_obj(core=core)
+ config = self.get_obj()
+ config.fam = Mock()
fname = "test.xml"
fpath = os.path.join(self.metadata.data, fname)
config.extras = []
config.add_monitor(fpath)
- self.assertFalse(core.fam.AddMonitor.called)
- self.assertEqual(config.extras, [fpath])
-
- config = self.get_obj(core=core, watch_clients=True)
- config.add_monitor(fpath)
- core.fam.AddMonitor.assert_called_with(fpath, config.metadata)
+ config.fam.AddMonitor.assert_called_with(fpath, config.metadata)
self.assertItemsEqual(config.extras, [fpath])
def test_Index(self):
@@ -480,11 +478,16 @@ class TestClientMetadata(Bcfg2TestCase):
class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
test_obj = Metadata
- use_db = False
- def get_obj(self, core=None, watch_clients=False):
- return get_metadata_object(core=core, watch_clients=watch_clients,
- use_db=self.use_db)
+ def setUp(self):
+ _TestMetadata.setUp(self)
+ TestClientRunHooks.setUp(self)
+ TestDatabaseBacked.setUp(self)
+ Bcfg2.Options.setup.metadata_db = False
+ Bcfg2.Options.setup.authentication = "cert+password"
+
+ def get_obj(self, core=None):
+ return get_metadata_object(core=core)
@skipUnless(HAS_DJANGO, "Django not found")
def test__use_db(self):
@@ -504,33 +507,24 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
client_name = "%s%s" % (prefix, i)
return client_name
- def test__init(self):
- # test with watch_clients=False
+ @patch("Bcfg2.Server.FileMonitor.get_fam")
+ def test__init(self, mock_get_fam):
core = MagicMock()
metadata = self.get_obj(core=core)
- self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Plugin)
- self.assertIsInstance(metadata, Bcfg2.Server.Plugin.Metadata)
- self.assertIsInstance(metadata, Bcfg2.Server.Plugin.ClientRunHooks)
- self.assertIsInstance(metadata.clients_xml, XMLMetadataConfig)
- self.assertIsInstance(metadata.groups_xml, XMLMetadataConfig)
- self.assertIsInstance(metadata.query, MetadataQuery)
- self.assertEqual(metadata.states, dict())
-
- # test with watch_clients=True
- core.fam = MagicMock()
- metadata = self.get_obj(core=core, watch_clients=True)
self.assertEqual(len(metadata.states), 2)
- core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
- "groups.xml"),
- metadata)
- core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
- "clients.xml"),
- metadata)
-
- core.fam.reset_mock()
- core.fam.AddMonitor = Mock(side_effect=IOError)
+ mock_get_fam.return_value.AddMonitor.assert_any_call(
+ os.path.join(metadata.data, "groups.xml"),
+ metadata)
+ mock_get_fam.return_value.AddMonitor.assert_any_call(
+ os.path.join(metadata.data, "clients.xml"),
+ metadata)
+
+ mock_get_fam.reset_mock()
+ fam = Mock()
+ fam.AddMonitor = Mock(side_effect=IOError)
+ mock_get_fam.return_value = fam
self.assertRaises(Bcfg2.Server.Plugin.PluginInitError,
- self.get_obj, core=core, watch_clients=True)
+ self.get_obj, core=core)
@patch('os.makedirs', Mock())
@patch('%s.open' % builtins)
@@ -591,6 +585,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
def test_add_group(self):
metadata = self.get_obj()
metadata.groups_xml.write = Mock()
+ metadata.groups_xml.load_xml = Mock()
metadata.groups_xml.data = lxml.etree.XML('<Groups/>').getroottree()
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
@@ -623,6 +618,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
def test_update_group(self):
metadata = self.get_obj()
metadata.groups_xml.write_xml = Mock()
+ metadata.groups_xml.load_xml = Mock()
metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree())
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
@@ -640,6 +636,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
def test_remove_group(self):
metadata = self.get_obj()
metadata.groups_xml.write_xml = Mock()
+ metadata.groups_xml.load_xml = Mock()
metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree())
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
@@ -655,6 +652,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
def test_add_bundle(self):
metadata = self.get_obj()
metadata.groups_xml.write = Mock()
+ metadata.groups_xml.load_xml = Mock()
metadata.groups_xml.data = lxml.etree.XML('<Groups/>').getroottree()
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
@@ -678,6 +676,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
def test_remove_bundle(self):
metadata = self.get_obj()
metadata.groups_xml.write_xml = Mock()
+ metadata.groups_xml.load_xml = Mock()
metadata.groups_xml.data = copy.deepcopy(get_groups_test_tree())
metadata.groups_xml.basedata = copy.copy(metadata.groups_xml.data)
@@ -693,6 +692,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
def test_add_client(self):
metadata = self.get_obj()
metadata.clients_xml.write = Mock()
+ metadata.clients_xml.load_xml = Mock()
metadata.clients_xml.data = lxml.etree.XML('<Clients/>').getroottree()
metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
@@ -727,6 +727,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
def test_update_client(self):
metadata = self.get_obj()
metadata.clients_xml.write_xml = Mock()
+ metadata.clients_xml.load_xml = Mock()
metadata.clients_xml.data = copy.deepcopy(get_clients_test_tree())
metadata.clients_xml.basedata = copy.copy(metadata.clients_xml.data)
@@ -762,7 +763,7 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
metadata.clients_xml.xdata = copy.deepcopy(get_clients_test_tree())
metadata._handle_clients_xml_event(Mock())
- if not self.use_db:
+ if not Bcfg2.Options.setup.metadata_db:
self.assertItemsEqual(metadata.clients,
dict([(c.get("name"), c.get("profile"))
for c in get_clients_test_tree().findall("//Client")]))
@@ -1251,10 +1252,13 @@ class TestMetadata(_TestMetadata, TestClientRunHooks, TestDatabaseBacked):
class TestMetadataBase(TestMetadata):
""" base test object for testing Metadata with database enabled """
__test__ = False
- use_db = True
@skipUnless(HAS_DJANGO, "Django not found")
def setUp(self):
+ _TestMetadata.setUp(self)
+ TestClientRunHooks.setUp(self)
+ TestDatabaseBacked.setUp(self)
+ Bcfg2.Options.setup.metadata_db = True
syncdb(TestMetadataDB)
def load_clients_data(self, metadata=None, xdata=None):
@@ -1274,25 +1278,24 @@ class TestMetadataBase(TestMetadata):
return client_name
@patch('os.path.exists')
- def test__init(self, mock_exists):
- core = MagicMock()
- core.fam = Mock()
+ @patch('Bcfg2.Server.FileMonitor.get_fam')
+ def test__init(self, mock_get_fam, mock_exists):
mock_exists.return_value = False
- metadata = self.get_obj(core=core, watch_clients=True)
+ metadata = self.get_obj()
self.assertIsInstance(metadata, Bcfg2.Server.Plugin.DatabaseBacked)
- core.fam.AddMonitor.assert_called_once_with(os.path.join(metadata.data,
- "groups.xml"),
- metadata)
+ mock_get_fam.return_value.AddMonitor.assert_called_with(
+ os.path.join(metadata.data, "groups.xml"),
+ metadata)
mock_exists.return_value = True
- core.fam.reset_mock()
- metadata = self.get_obj(core=core, watch_clients=True)
- core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
- "groups.xml"),
- metadata)
- core.fam.AddMonitor.assert_any_call(os.path.join(metadata.data,
- "clients.xml"),
- metadata)
+ mock_get_fam.reset_mock()
+ metadata = self.get_obj()
+ mock_get_fam.return_value.AddMonitor.assert_any_call(
+ os.path.join(metadata.data, "groups.xml"),
+ metadata)
+ mock_get_fam.return_value.AddMonitor.assert_any_call(
+ os.path.join(metadata.data, "clients.xml"),
+ metadata)
def test_add_group(self):
pass
@@ -1356,12 +1359,7 @@ class TestMetadataBase(TestMetadata):
class TestMetadata_NoClientsXML(TestMetadataBase):
""" test Metadata without a clients.xml. we have to disable or
override tests that rely on client options """
- # only run these tests if it's possible to skip tests or if we
- # have django. otherwise they'll all get run because our fake
- # skipping decorators for python < 2.7 won't work when they
- # decorate setUp()
- if can_skip or HAS_DJANGO:
- __test__ = True
+ __test__ = True
def load_groups_data(self, metadata=None, xdata=None):
if metadata is None:
@@ -1525,17 +1523,13 @@ class TestMetadata_NoClientsXML(TestMetadataBase):
class TestMetadata_ClientsXML(TestMetadataBase):
""" test Metadata with a clients.xml. """
- # only run these tests if it's possible to skip tests or if we
- # have django. otherwise they'll all get run because our fake
- # skipping decorators for python < 2.7 won't work when they
- # decorate setUp()
- if can_skip or HAS_DJANGO:
- __test__ = True
+ __test__ = True
def load_clients_data(self, metadata=None, xdata=None):
if metadata is None:
metadata = self.get_obj()
- metadata.core.fam = Mock()
+ fam = Bcfg2.Server.FileMonitor._FAM
+ Bcfg2.Server.FileMonitor._FAM = MagicMock()
@patchIf(not isinstance(lxml.etree.Element, Mock),
"lxml.etree.Element", Mock())
def inner():
@@ -1543,5 +1537,7 @@ class TestMetadata_ClientsXML(TestMetadataBase):
inner()
metadata = TestMetadata.load_clients_data(self, metadata=metadata,
xdata=xdata)
- return TestMetadataBase.load_clients_data(self, metadata=metadata,
- xdata=xdata)
+ rv = TestMetadataBase.load_clients_data(self, metadata=metadata,
+ xdata=xdata)
+ Bcfg2.Server.FileMonitor._FAM = fam
+ return rv
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py
index 2face023f..c9a982bb3 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProbes.py
@@ -1,8 +1,8 @@
import os
import re
import sys
-import copy
-import time
+import shutil
+import tempfile
import lxml.etree
import Bcfg2.version
import Bcfg2.Server
@@ -19,54 +19,22 @@ while path != "/":
break
path = os.path.dirname(path)
from common import *
-from Bcfg2.Server.Plugins.Probes import *
-from TestPlugin import TestEntrySet, TestProbing, TestConnector, \
+from Bcfg2.Server.Plugins.Probes import load_django_models
+from TestPlugin import TestEntrySet, TestPlugin, \
TestDatabaseBacked
-# test data for JSON and YAML tests
-test_data = dict(a=1, b=[1, 2, 3], c="test",
- d=dict(a=1, b=dict(a=1), c=(1, "2", 3)))
-
-
-class FakeElement(lxml.etree._Element):
- getroottree = Mock()
-
- def __init__(self, el):
- self._element = el
-
- def __getattribute__(self, attr):
- el = lxml.etree._Element.__getattribute__(self,
- '__dict__')['_element']
- if attr == "getroottree":
- return FakeElement.getroottree
- elif attr == "_element":
- return el
- else:
- return getattr(el, attr)
-
-
-class StoringElement(object):
- OriginalElement = copy.copy(lxml.etree.Element)
-
- def __init__(self):
- self.element = None
- self.return_value = None
-
- def __call__(self, *args, **kwargs):
- self.element = self.OriginalElement(*args, **kwargs)
- self.return_value = FakeElement(self.element)
- return self.return_value
+load_django_models()
+from Bcfg2.Server.Plugins.Probes import *
+if HAS_JSON:
+ json = json
-class StoringSubElement(object):
- OriginalSubElement = copy.copy(lxml.etree.SubElement)
+if HAS_YAML:
+ yaml = yaml
- def __call__(self, parent, tag, **kwargs):
- try:
- return self.OriginalSubElement(parent._element, tag,
- **kwargs)
- except AttributeError:
- return self.OriginalSubElement(parent, tag, **kwargs)
+# test data for JSON and YAML tests
+test_data = dict(a=1, b=[1, 2, 3], c="test",
+ d=dict(a=1, b=dict(a=1), c=(1, "2", 3)))
class FakeList(list):
@@ -75,19 +43,8 @@ class FakeList(list):
class TestProbesDB(DBModelTestCase):
if HAS_DJANGO:
- models = [ProbesGroupsModel, ProbesDataModel]
-
-
-class TestClientProbeDataSet(Bcfg2TestCase):
- def test__init(self):
- ds = ClientProbeDataSet()
- self.assertLessEqual(ds.timestamp, time.time())
- self.assertIsInstance(ds, dict)
- self.assertNotIn("timestamp", ds)
-
- ds = ClientProbeDataSet(timestamp=123)
- self.assertEqual(ds.timestamp, 123)
- self.assertNotIn("timestamp", ds)
+ models = [ProbesGroupsModel,
+ ProbesDataModel]
class TestProbeData(Bcfg2TestCase):
@@ -109,19 +66,22 @@ class TestProbeData(Bcfg2TestCase):
def test_xdata(self):
xdata = lxml.etree.Element("test")
lxml.etree.SubElement(xdata, "test2")
- data = ProbeData(lxml.etree.tostring(xdata,
- xml_declaration=False).decode('UTF-8'))
+ data = ProbeData(
+ lxml.etree.tostring(xdata,
+ xml_declaration=False).decode('UTF-8'))
self.assertIsNotNone(data.xdata)
self.assertIsNotNone(data.xdata.find("test2"))
- @skipUnless(HAS_JSON, "JSON libraries not found, skipping JSON tests")
+ @skipUnless(HAS_JSON,
+ "JSON libraries not found, skipping JSON tests")
def test_json(self):
jdata = json.dumps(test_data)
data = ProbeData(jdata)
self.assertIsNotNone(data.json)
self.assertItemsEqual(test_data, data.json)
- @skipUnless(HAS_YAML, "YAML libraries not found, skipping YAML tests")
+ @skipUnless(HAS_YAML,
+ "YAML libraries not found, skipping YAML tests")
def test_yaml(self):
jdata = yaml.dump(test_data)
data = ProbeData(jdata)
@@ -135,22 +95,20 @@ class TestProbeSet(TestEntrySet):
ignore = ["foo~", ".#foo", ".foo.swp", ".foo.swx", "probed.xml"]
bogus_names = ["test.py"]
- def get_obj(self, path=datastore, fam=None, encoding=None,
+ def get_obj(self, path=datastore, encoding=None,
plugin_name="Probes", basename=None):
# get_obj() accepts the basename argument, accepted by the
# parent get_obj() method, and just throws it away, since
# ProbeSet uses a regex for the "basename"
- if fam is None:
- fam = Mock()
- rv = self.test_obj(path, fam, encoding, plugin_name)
+ rv = self.test_obj(path, plugin_name)
rv.entry_type = MagicMock()
return rv
- def test__init(self):
- fam = Mock()
- ps = self.get_obj(fam=fam)
+ @patch("Bcfg2.Server.FileMonitor.get_fam")
+ def test__init(self, mock_get_fam):
+ ps = self.get_obj()
self.assertEqual(ps.plugin_name, "Probes")
- fam.AddMonitor.assert_called_with(datastore, ps)
+ mock_get_fam.return_value.AddMonitor.assert_called_with(datastore, ps)
TestEntrySet.test__init(self)
def test_HandleEvent(self):
@@ -243,418 +201,191 @@ group-specific"""
assert False, "Strange probe found in get_probe_data() return"
-class TestProbes(TestProbing, TestConnector, TestDatabaseBacked):
+class TestProbes(TestPlugin):
test_obj = Probes
- def get_obj(self, core=None):
- return TestDatabaseBacked.get_obj(self, core=core)
-
- def get_test_probedata(self):
- test_xdata = lxml.etree.Element("test")
- lxml.etree.SubElement(test_xdata, "test", foo="foo")
- rv = dict()
- rv["foo.example.com"] = ClientProbeDataSet(timestamp=time.time())
- rv["foo.example.com"]["xml"] = \
- ProbeData(lxml.etree.tostring(test_xdata,
- xml_declaration=False).decode('UTF-8'))
- rv["foo.example.com"]["text"] = ProbeData("freeform text")
- rv["foo.example.com"]["multiline"] = ProbeData("""multiple
+ test_xdata = lxml.etree.Element("test")
+ lxml.etree.SubElement(test_xdata, "test", foo="foo")
+ test_xdoc = lxml.etree.tostring(test_xdata,
+ xml_declaration=False).decode('UTF-8')
+
+ data = dict()
+ data['xml'] = "group:group\n" + test_xdoc
+ data['text'] = "freeform text"
+ data['multiline'] = """multiple
lines
of
freeform
text
-""")
- rv["bar.example.com"] = ClientProbeDataSet(timestamp=time.time())
- rv["bar.example.com"]["empty"] = ProbeData("")
- if HAS_JSON:
- rv["bar.example.com"]["json"] = ProbeData(json.dumps(test_data))
- if HAS_YAML:
- rv["bar.example.com"]["yaml"] = ProbeData(yaml.dump(test_data))
- return rv
-
- def get_test_cgroups(self):
- return {"foo.example.com": ["group", "group with spaces",
- "group-with-dashes"],
- "bar.example.com": []}
-
- def get_probes_object(self, use_db=False, load_data=None):
- core = MagicMock()
- core.setup.cfp.getboolean = Mock()
- core.setup.cfp.getboolean.return_value = use_db
- if load_data is None:
- load_data = MagicMock()
- # we have to patch load_data() in a funny way because
- # different versions of Mock have different scopes for
- # patching. in some versions, a patch applied to
- # get_probes_object() would only apply to that function, while
- # in others it would also apply to the calling function (e.g.,
- # test__init(), which relies on being able to check the calls
- # of load_data(), and thus on load_data() being consistently
- # mocked)
- @patch("%s.%s.load_data" % (self.test_obj.__module__,
- self.test_obj.__name__), new=load_data)
- def inner():
- return self.get_obj(core)
-
- rv = inner()
- rv.allowed_cgroups = [re.compile("^.*$")]
- return rv
+group:group-with-dashes
+group: group:with:colons
+"""
+ data['empty'] = ''
+ data['almost_empty'] = 'group: other_group'
+ if HAS_JSON:
+ data['json'] = json.dumps(test_data)
+ if HAS_YAML:
+ data['yaml'] = yaml.dump(test_data)
+
+ def setUp(self):
+ Bcfg2TestCase.setUp(self)
+ set_setup_default("probes_db")
+ set_setup_default("probes_allowed_groups", [re.compile(".*")])
+ self.datastore = None
+ Bcfg2.Server.Cache.expire("Probes")
+
+ def tearDown(self):
+ Bcfg2.Server.Cache.expire("Probes")
+ if self.datastore is not None:
+ shutil.rmtree(self.datastore)
+ self.datastore = None
+ Bcfg2.Options.setup.repository = datastore
+
+ def get_obj(self):
+ if not Bcfg2.Options.setup.probes_db:
+ # actually use a real datastore so we can read and write
+ # probed.xml
+ if self.datastore is None:
+ self.datastore = tempfile.mkdtemp()
+ Bcfg2.Options.setup.repository = self.datastore
+ datadir = os.path.join(self.datastore, self.test_obj.name)
+ if not os.path.exists(datadir):
+ os.makedirs(datadir)
+ return TestPlugin.get_obj(self)
def test__init(self):
- mock_load_data = Mock()
- probes = self.get_probes_object(load_data=mock_load_data)
- probes.core.fam.AddMonitor.assert_called_with(os.path.join(datastore,
- probes.name),
- probes.probes)
- mock_load_data.assert_any_call()
- self.assertEqual(probes.probedata, ClientProbeDataSet())
- self.assertEqual(probes.cgroups, dict())
-
- @patch("Bcfg2.Server.Plugins.Probes.Probes.load_data", Mock())
- def test__use_db(self):
- probes = self.get_probes_object()
- self.assertFalse(probes._use_db)
- probes.core.setup.cfp.getboolean.assert_called_with("probes",
- "use_database",
- default=False)
-
- @skipUnless(HAS_DJANGO, "Django not found, skipping")
- @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_db", Mock())
- @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_xml", Mock())
- def test_write_data_xml(self):
- probes = self.get_probes_object(use_db=False)
- probes.write_data("test")
- probes._write_data_xml.assert_called_with("test")
- self.assertFalse(probes._write_data_db.called)
-
- @skipUnless(HAS_DJANGO, "Django not found, skipping")
- @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_db", Mock())
- @patch("Bcfg2.Server.Plugins.Probes.Probes._write_data_xml", Mock())
- def test_write_data_db(self):
- probes = self.get_probes_object(use_db=True)
- probes.write_data("test")
- probes._write_data_db.assert_called_with("test")
- self.assertFalse(probes._write_data_xml.called)
-
- def test__write_data_xml(self):
- probes = self.get_probes_object(use_db=False)
- probes.probedata = self.get_test_probedata()
- probes.cgroups = self.get_test_cgroups()
-
- @patch("lxml.etree.Element")
- @patch("lxml.etree.SubElement", StoringSubElement())
- def inner(mock_Element):
- mock_Element.side_effect = StoringElement()
- probes._write_data_xml(None)
-
- top = mock_Element.side_effect.return_value
- write = top.getroottree.return_value.write
- self.assertEqual(write.call_args[0][0],
- os.path.join(datastore, probes.name,
- "probed.xml"))
-
- data = top._element
- foodata = data.find("Client[@name='foo.example.com']")
- self.assertIsNotNone(foodata)
- self.assertIsNotNone(foodata.get("timestamp"))
- self.assertEqual(len(foodata.findall("Probe")),
- len(probes.probedata['foo.example.com']))
- self.assertEqual(len(foodata.findall("Group")),
- len(probes.cgroups['foo.example.com']))
- xml = foodata.find("Probe[@name='xml']")
- self.assertIsNotNone(xml)
- self.assertIsNotNone(xml.get("value"))
- xdata = lxml.etree.XML(xml.get("value"))
- self.assertIsNotNone(xdata)
- self.assertIsNotNone(xdata.find("test"))
- self.assertEqual(xdata.find("test").get("foo"), "foo")
- text = foodata.find("Probe[@name='text']")
- self.assertIsNotNone(text)
- self.assertIsNotNone(text.get("value"))
- multiline = foodata.find("Probe[@name='multiline']")
- self.assertIsNotNone(multiline)
- self.assertIsNotNone(multiline.get("value"))
- self.assertGreater(len(multiline.get("value").splitlines()), 1)
-
- bardata = data.find("Client[@name='bar.example.com']")
- self.assertIsNotNone(bardata)
- self.assertIsNotNone(bardata.get("timestamp"))
- self.assertEqual(len(bardata.findall("Probe")),
- len(probes.probedata['bar.example.com']))
- self.assertEqual(len(bardata.findall("Group")),
- len(probes.cgroups['bar.example.com']))
- empty = bardata.find("Probe[@name='empty']")
- self.assertIsNotNone(empty)
- self.assertIsNotNone(empty.get("value"))
- self.assertEqual(empty.get("value"), "")
- if HAS_JSON:
- jdata = bardata.find("Probe[@name='json']")
- self.assertIsNotNone(jdata)
- self.assertIsNotNone(jdata.get("value"))
- self.assertItemsEqual(test_data,
- json.loads(jdata.get("value")))
- if HAS_YAML:
- ydata = bardata.find("Probe[@name='yaml']")
- self.assertIsNotNone(ydata)
- self.assertIsNotNone(ydata.get("value"))
- self.assertItemsEqual(test_data,
- yaml.load(ydata.get("value")))
-
- inner()
-
- @skipUnless(HAS_DJANGO, "Django not found, skipping")
- def test__write_data_db(self):
- syncdb(TestProbesDB)
- probes = self.get_probes_object(use_db=True)
- probes.probedata = self.get_test_probedata()
- probes.cgroups = self.get_test_cgroups()
-
- for cname in ["foo.example.com", "bar.example.com"]:
- client = Mock()
- client.hostname = cname
- probes._write_data_db(client)
-
- pdata = ProbesDataModel.objects.filter(hostname=cname).all()
- self.assertEqual(len(pdata), len(probes.probedata[cname]))
-
- for probe in pdata:
- self.assertEqual(probe.hostname, client.hostname)
- self.assertIsNotNone(probe.data)
- if probe.probe == "xml":
- xdata = lxml.etree.XML(probe.data)
- self.assertIsNotNone(xdata)
- self.assertIsNotNone(xdata.find("test"))
- self.assertEqual(xdata.find("test").get("foo"), "foo")
- elif probe.probe == "text":
- pass
- elif probe.probe == "multiline":
- self.assertGreater(len(probe.data.splitlines()), 1)
- elif probe.probe == "empty":
- self.assertEqual(probe.data, "")
- elif probe.probe == "yaml":
- self.assertItemsEqual(test_data, yaml.load(probe.data))
- elif probe.probe == "json":
- self.assertItemsEqual(test_data, json.loads(probe.data))
- else:
- assert False, "Strange probe found in _write_data_db data"
-
- pgroups = ProbesGroupsModel.objects.filter(hostname=cname).all()
- self.assertEqual(len(pgroups), len(probes.cgroups[cname]))
-
- # test that old probe data is removed properly
- cname = 'foo.example.com'
- del probes.probedata[cname]['text']
- probes.cgroups[cname].pop()
- client = Mock()
- client.hostname = cname
- probes._write_data_db(client)
-
- pdata = ProbesDataModel.objects.filter(hostname=cname).all()
- self.assertEqual(len(pdata), len(probes.probedata[cname]))
- pgroups = ProbesGroupsModel.objects.filter(hostname=cname).all()
- self.assertEqual(len(pgroups), len(probes.cgroups[cname]))
-
- @skipUnless(HAS_DJANGO, "Django not found, skipping")
- @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_db", Mock())
- @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_xml", Mock())
- def test_load_data_xml(self):
- probes = self.get_probes_object(use_db=False)
- probes.load_data()
- probes._load_data_xml.assert_any_call()
- self.assertFalse(probes._load_data_db.called)
-
- @skipUnless(HAS_DJANGO, "Django not found, skipping")
- @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_db", Mock())
- @patch("Bcfg2.Server.Plugins.Probes.Probes._load_data_xml", Mock())
- def test_load_data_db(self):
- probes = self.get_probes_object(use_db=True)
- probes.load_data()
- probes._load_data_db.assert_any_call(client=None)
- self.assertFalse(probes._load_data_xml.called)
-
- @patch("lxml.etree.parse")
- def test__load_data_xml(self, mock_parse):
- probes = self.get_probes_object(use_db=False)
- probes.probedata = self.get_test_probedata()
- probes.cgroups = self.get_test_cgroups()
-
- # to get the value for lxml.etree.parse to parse, we call
- # _write_data_xml, mock the lxml.etree._ElementTree.write()
- # call, and grab the data that gets "written" to probed.xml
- @patch("lxml.etree.Element")
- @patch("lxml.etree.SubElement", StoringSubElement())
- def inner(mock_Element):
- mock_Element.side_effect = StoringElement()
- probes._write_data_xml(None)
- top = mock_Element.side_effect.return_value
- return top._element
-
- xdata = inner()
- mock_parse.return_value = xdata.getroottree()
- probes.probedata = dict()
- probes.cgroups = dict()
-
- probes._load_data_xml()
- mock_parse.assert_called_with(os.path.join(datastore, probes.name,
- 'probed.xml'),
- parser=Bcfg2.Server.XMLParser)
- self.assertItemsEqual(probes.probedata, self.get_test_probedata())
- self.assertItemsEqual(probes.cgroups, self.get_test_cgroups())
-
- @skipUnless(HAS_DJANGO, "Django not found, skipping")
- def test__load_data_db(self):
- syncdb(TestProbesDB)
- probes = self.get_probes_object(use_db=True)
- probes.probedata = self.get_test_probedata()
- probes.cgroups = self.get_test_cgroups()
- for cname in probes.probedata.keys():
- client = Mock()
- client.hostname = cname
- probes._write_data_db(client)
-
- probes.probedata = dict()
- probes.cgroups = dict()
- probes._load_data_db()
- self.assertItemsEqual(probes.probedata, self.get_test_probedata())
- # the db backend does not store groups at all if a client has
- # no groups set, so we can't just use assertItemsEqual here,
- # because loading saved data may _not_ result in the original
- # data if some clients had no groups set.
- test_cgroups = self.get_test_cgroups()
- for cname, groups in test_cgroups.items():
- if cname in probes.cgroups:
- self.assertEqual(groups, probes.cgroups[cname])
- else:
- self.assertEqual(groups, [])
+ if Bcfg2.Options.setup.probes_db:
+ TestPlugin.test__init(self)
- @patch("Bcfg2.Server.Plugins.Probes.ProbeSet.get_probe_data")
- def test_GetProbes(self, mock_get_probe_data):
- probes = self.get_probes_object()
+ def test_GetProbes(self):
+ p = self.get_obj()
+ p.probes = Mock()
metadata = Mock()
- probes.GetProbes(metadata)
- mock_get_probe_data.assert_called_with(metadata)
-
- @patch("Bcfg2.Server.Plugins.Probes.Probes.write_data")
- @patch("Bcfg2.Server.Plugins.Probes.Probes.ReceiveDataItem")
- def test_ReceiveData(self, mock_ReceiveDataItem, mock_write_data):
- # we use a simple (read: bogus) datalist here to make this
- # easy to test
- datalist = ["a", "b", "c"]
-
- probes = self.get_probes_object()
- probes.core.metadata_cache_mode = 'off'
+ p.GetProbes(metadata)
+ p.probes.get_probe_data.assert_called_with(metadata)
+
+ def additionalDataEqual(self, actual, expected):
+ self.assertItemsEqual(
+ dict([(k, str(d)) for k, d in actual.items()]),
+ expected)
+
+ def test_probes_xml(self):
+ """ Set and retrieve probe data with database disabled """
+ Bcfg2.Options.setup.probes_db = False
+ self._perform_tests()
+
+ @skipUnless(HAS_DJANGO, "Django not found")
+ def test_probes_db(self):
+ """ Set and retrieve probe data with database enabled """
+ Bcfg2.Options.setup.probes_db = True
+ self._perform_tests()
+
+ def test_allowed_cgroups(self):
+ """ Test option to only allow probes to set certain groups """
+ probes = self.get_obj()
+
+ test_text = """a couple lines
+of freeform text
+"""
+ test_groups = ["group", "group2", "group-with-dashes"]
+ test_probe_data = lxml.etree.Element("Probe", name="test")
+ test_probe_data.text = test_text
+ for group in test_groups:
+ test_probe_data.text += "group:%s\n" % group
+
client = Mock()
- client.hostname = "foo.example.com"
- probes.ReceiveData(client, datalist)
-
- cgroups = []
- cprobedata = ClientProbeDataSet()
- self.assertItemsEqual(mock_ReceiveDataItem.call_args_list,
- [call(client, "a", cgroups, cprobedata),
- call(client, "b", cgroups, cprobedata),
- call(client, "c", cgroups, cprobedata)])
- mock_write_data.assert_called_with(client)
- self.assertFalse(probes.core.metadata_cache.expire.called)
-
- # change the datalist, ensure that the cache is cleared
- probes.cgroups[client.hostname] = datalist
- probes.core.metadata_cache_mode = 'aggressive'
- probes.ReceiveData(client, ['a', 'b', 'd'])
-
- mock_write_data.assert_called_with(client)
- probes.core.metadata_cache.expire.assert_called_with(client.hostname)
-
- def test_ReceiveDataItem(self):
- probes = self.get_probes_object()
- for cname, cdata in self.get_test_probedata().items():
- client = Mock()
- client.hostname = cname
- cgroups = []
- cprobedata = ClientProbeDataSet()
- for pname, pdata in cdata.items():
- dataitem = lxml.etree.Element("Probe", name=pname)
- if pname == "text":
- # add some groups to the plaintext test to test
- # group parsing
- data = [pdata]
- for group in self.get_test_cgroups()[cname]:
- data.append("group:%s" % group)
- dataitem.text = "\n".join(data)
- else:
- dataitem.text = str(pdata)
-
- probes.ReceiveDataItem(client, dataitem, cgroups, cprobedata)
-
- probes.cgroups[client.hostname] = cgroups
- probes.probedata[client.hostname] = cprobedata
- self.assertIn(client.hostname, probes.probedata)
- self.assertIn(pname, probes.probedata[cname])
- self.assertEqual(pdata, probes.probedata[cname][pname])
- self.assertIn(client.hostname, probes.cgroups)
- self.assertEqual(probes.cgroups[cname],
- self.get_test_cgroups()[cname])
-
- # test again, with an explicit list of allowed groups
- probes.allowed_cgroups = [re.compile(r'^.*s$')]
- for cname, cdata in self.get_test_probedata().items():
- client = Mock()
- client.hostname = cname
- cgroups = []
- cprobedata = ClientProbeDataSet()
- for pname, pdata in cdata.items():
- dataitem = lxml.etree.Element("Probe", name=pname)
- if pname == "text":
- # add some groups to the plaintext test to test
- # group parsing
- data = [pdata]
- for group in self.get_test_cgroups()[cname]:
- data.append("group:%s" % group)
- dataitem.text = "\n".join(data)
- else:
- dataitem.text = str(pdata)
-
- probes.ReceiveDataItem(client, dataitem, cgroups, cprobedata)
-
- probes.cgroups[client.hostname] = cgroups
- probes.probedata[client.hostname] = cprobedata
- self.assertIn(client.hostname, probes.probedata)
- self.assertIn(pname, probes.probedata[cname])
- self.assertEqual(pdata, probes.probedata[cname][pname])
- self.assertIn(client.hostname, probes.cgroups)
- self.assertEqual(probes.cgroups[cname],
- [g for g in self.get_test_cgroups()[cname]
- if g.endswith("s")])
-
- def test_get_additional_groups(self):
- TestConnector.test_get_additional_groups(self)
-
- probes = self.get_probes_object()
- test_cgroups = self.get_test_cgroups()
- probes.cgroups = self.get_test_cgroups()
- for cname in test_cgroups.keys():
- metadata = Mock()
- metadata.hostname = cname
- self.assertEqual(test_cgroups[cname],
- probes.get_additional_groups(metadata))
- # test a non-existent client
- metadata = Mock()
- metadata.hostname = "nonexistent"
- self.assertEqual(probes.get_additional_groups(metadata),
- list())
-
- def test_get_additional_data(self):
- TestConnector.test_get_additional_data(self)
-
- probes = self.get_probes_object()
- test_probedata = self.get_test_probedata()
- probes.probedata = self.get_test_probedata()
- for cname in test_probedata.keys():
- metadata = Mock()
- metadata.hostname = cname
- self.assertEqual(test_probedata[cname],
- probes.get_additional_data(metadata))
- # test a non-existent client
- metadata = Mock()
- metadata.hostname = "nonexistent"
- self.assertEqual(probes.get_additional_data(metadata),
- ClientProbeDataSet())
+ groups, data = probes.ReceiveDataItem(client, test_probe_data)
+ self.assertItemsEqual(groups, test_groups)
+ self.assertEqual(data, test_text)
+
+ old_allowed_groups = Bcfg2.Options.setup.probes_allowed_groups
+ Bcfg2.Options.setup.probes_allowed_groups = [re.compile(r'^group.?$')]
+ groups, data = probes.ReceiveDataItem(client, test_probe_data)
+ self.assertItemsEqual(groups, ['group', 'group2'])
+ self.assertEqual(data, test_text)
+ Bcfg2.Options.setup.probes_allowed_groups = old_allowed_groups
+
+ def _perform_tests(self):
+ p = self.get_obj()
+
+ # first, sanity checks
+ foo_md = Mock(hostname="foo.example.com")
+ bar_md = Mock(hostname="bar.example.com")
+ self.assertItemsEqual(p.get_additional_groups(foo_md), [])
+ self.assertItemsEqual(p.get_additional_data(foo_md), dict())
+ self.assertItemsEqual(p.get_additional_groups(bar_md), [])
+ self.assertItemsEqual(p.get_additional_data(bar_md), dict())
+
+ # next, set some initial probe data
+ foo_datalist = []
+ for key in ['xml', 'text', 'multiline']:
+ pdata = lxml.etree.Element("Probe", name=key)
+ pdata.text = self.data[key]
+ foo_datalist.append(pdata)
+ foo_addl_data = dict(xml=self.test_xdoc,
+ text="freeform text",
+ multiline="""multiple
+lines
+of
+freeform
+text""")
+ bar_datalist = []
+ for key in ['empty', 'almost_empty', 'json', 'yaml']:
+ if key in self.data:
+ pdata = lxml.etree.Element("Probe", name=key)
+ pdata.text = self.data[key]
+ bar_datalist.append(pdata)
+ bar_addl_data = dict(empty="", almost_empty="")
+ if HAS_JSON:
+ bar_addl_data['json'] = self.data['json']
+ if HAS_YAML:
+ bar_addl_data['yaml'] = self.data['yaml']
+
+ p.ReceiveData(foo_md, foo_datalist)
+ self.assertItemsEqual(p.get_additional_groups(foo_md),
+ ["group", "group-with-dashes",
+ "group:with:colons"])
+ self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data)
+
+ p.ReceiveData(bar_md, bar_datalist)
+ self.assertItemsEqual(p.get_additional_groups(foo_md),
+ ["group", "group-with-dashes",
+ "group:with:colons"])
+ self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data)
+ self.assertItemsEqual(p.get_additional_groups(bar_md), ['other_group'])
+ self.additionalDataEqual(p.get_additional_data(bar_md), bar_addl_data)
+
+ # instantiate a new Probes object and clear Probes caches to
+ # imitate a server restart
+ p = self.get_obj()
+ Bcfg2.Server.Cache.expire("Probes")
+
+ self.assertItemsEqual(p.get_additional_groups(foo_md),
+ ["group", "group-with-dashes",
+ "group:with:colons"])
+ self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data)
+ self.assertItemsEqual(p.get_additional_groups(bar_md), ['other_group'])
+ self.additionalDataEqual(p.get_additional_data(bar_md), bar_addl_data)
+
+ # set new data (and groups) for foo
+ foo_datalist = []
+ pdata = lxml.etree.Element("Probe", name='xml')
+ pdata.text = self.data['xml']
+ foo_datalist.append(pdata)
+ foo_addl_data = dict(xml=self.test_xdoc)
+
+ p.ReceiveData(foo_md, foo_datalist)
+ self.assertItemsEqual(p.get_additional_groups(foo_md), ["group"])
+ self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data)
+ self.assertItemsEqual(p.get_additional_groups(bar_md), ['other_group'])
+ self.additionalDataEqual(p.get_additional_data(bar_md), bar_addl_data)
+
+ # instantiate a new Probes object and clear Probes caches to
+ # imitate a server restart
+ p = self.get_obj()
+ Bcfg2.Server.Cache.expire("Probes")
+
+ self.assertItemsEqual(p.get_additional_groups(foo_md), ["group"])
+ self.additionalDataEqual(p.get_additional_data(foo_md), foo_addl_data)
+ self.assertItemsEqual(p.get_additional_groups(bar_md), ['other_group'])
+ self.additionalDataEqual(p.get_additional_data(bar_md), bar_addl_data)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py
index 896f5861e..159dc6e66 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestProperties.py
@@ -19,12 +19,6 @@ from TestPlugin import TestStructFile, TestFileBacked, TestConnector, \
TestPlugin, TestDirectoryBacked
try:
- from Bcfg2.Encryption import EVPError
- HAS_CRYPTO = True
-except:
- HAS_CRYPTO = False
-
-try:
import json
JSON = "json"
except ImportError:
@@ -36,12 +30,12 @@ class TestPropertyFile(Bcfg2TestCase):
path = os.path.join(datastore, "test")
def get_obj(self, path=None):
+ set_setup_default("writes_enabled", False)
if path is None:
path = self.path
return self.test_obj(path)
def test_write(self):
- Bcfg2.Server.Plugins.Properties.SETUP = MagicMock()
pf = self.get_obj()
pf.validate_data = Mock()
pf._write = Mock()
@@ -52,20 +46,16 @@ class TestPropertyFile(Bcfg2TestCase):
def reset():
pf.validate_data.reset_mock()
pf._write.reset_mock()
- Bcfg2.Server.Plugins.Properties.SETUP.reset_mock()
# test writes disabled
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = False
+ Bcfg2.Options.setup.writes_enabled = False
self.assertRaises(PluginExecutionError, pf.write)
self.assertFalse(pf.validate_data.called)
self.assertFalse(pf._write.called)
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties",
- "writes_enabled",
- default=True)
# test successful write
reset()
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = True
+ Bcfg2.Options.setup.writes_enabled = True
self.assertEqual(pf.write(), pf._write.return_value)
pf.validate_data.assert_called_with()
pf._write.assert_called_with()
@@ -99,96 +89,95 @@ class TestPropertyFile(Bcfg2TestCase):
mock_copy.assert_called_with(pf)
-if can_skip or HAS_JSON:
- class TestJSONPropertyFile(TestFileBacked, TestPropertyFile):
- test_obj = JSONPropertyFile
-
- def get_obj(self, *args, **kwargs):
- return TestFileBacked.get_obj(self, *args, **kwargs)
-
- @skipUnless(HAS_JSON, "JSON libraries not found, skipping")
- def setUp(self):
- pass
-
- @patch("%s.loads" % JSON)
- def test_Index(self, mock_loads):
- pf = self.get_obj()
- pf.Index()
- mock_loads.assert_called_with(pf.data)
- self.assertEqual(pf.json, mock_loads.return_value)
-
- mock_loads.reset_mock()
- mock_loads.side_effect = ValueError
- self.assertRaises(PluginExecutionError, pf.Index)
- mock_loads.assert_called_with(pf.data)
-
- @patch("%s.dump" % JSON)
- @patch("%s.open" % builtins)
- def test__write(self, mock_open, mock_dump):
- pf = self.get_obj()
- self.assertTrue(pf._write())
- mock_open.assert_called_with(pf.name, 'wb')
- mock_dump.assert_called_with(pf.json, mock_open.return_value)
-
- @patch("%s.dumps" % JSON)
- def test_validate_data(self, mock_dumps):
- pf = self.get_obj()
- pf.validate_data()
- mock_dumps.assert_called_with(pf.json)
-
- mock_dumps.reset_mock()
- mock_dumps.side_effect = ValueError
- self.assertRaises(PluginExecutionError, pf.validate_data)
- mock_dumps.assert_called_with(pf.json)
-
-
-if can_skip or HAS_YAML:
- class TestYAMLPropertyFile(TestFileBacked, TestPropertyFile):
- test_obj = YAMLPropertyFile
-
- def get_obj(self, *args, **kwargs):
- return TestFileBacked.get_obj(self, *args, **kwargs)
-
- @skipUnless(HAS_YAML, "YAML libraries not found, skipping")
- def setUp(self):
- pass
-
- @patch("yaml.load")
- def test_Index(self, mock_load):
- pf = self.get_obj()
- pf.Index()
- mock_load.assert_called_with(pf.data)
- self.assertEqual(pf.yaml, mock_load.return_value)
-
- mock_load.reset_mock()
- mock_load.side_effect = yaml.YAMLError
- self.assertRaises(PluginExecutionError, pf.Index)
- mock_load.assert_called_with(pf.data)
-
- @patch("yaml.dump")
- @patch("%s.open" % builtins)
- def test__write(self, mock_open, mock_dump):
- pf = self.get_obj()
- self.assertTrue(pf._write())
- mock_open.assert_called_with(pf.name, 'wb')
- mock_dump.assert_called_with(pf.yaml, mock_open.return_value)
-
- @patch("yaml.dump")
- def test_validate_data(self, mock_dump):
- pf = self.get_obj()
- pf.validate_data()
- mock_dump.assert_called_with(pf.yaml)
-
- mock_dump.reset_mock()
- mock_dump.side_effect = yaml.YAMLError
- self.assertRaises(PluginExecutionError, pf.validate_data)
- mock_dump.assert_called_with(pf.yaml)
+class TestJSONPropertyFile(TestFileBacked, TestPropertyFile):
+ test_obj = JSONPropertyFile
+
+ @skipUnless(HAS_JSON, "JSON libraries not found, skipping")
+ def setUp(self):
+ TestFileBacked.setUp(self)
+ TestPropertyFile.setUp(self)
+
+ @patch("%s.loads" % JSON)
+ def test_Index(self, mock_loads):
+ pf = self.get_obj()
+ pf.Index()
+ mock_loads.assert_called_with(pf.data)
+ self.assertEqual(pf.json, mock_loads.return_value)
+
+ mock_loads.reset_mock()
+ mock_loads.side_effect = ValueError
+ self.assertRaises(PluginExecutionError, pf.Index)
+ mock_loads.assert_called_with(pf.data)
+
+ @patch("%s.dump" % JSON)
+ @patch("%s.open" % builtins)
+ def test__write(self, mock_open, mock_dump):
+ pf = self.get_obj()
+ self.assertTrue(pf._write())
+ mock_open.assert_called_with(pf.name, 'wb')
+ mock_dump.assert_called_with(pf.json, mock_open.return_value)
+
+ @patch("%s.dumps" % JSON)
+ def test_validate_data(self, mock_dumps):
+ pf = self.get_obj()
+ pf.validate_data()
+ mock_dumps.assert_called_with(pf.json)
+
+ mock_dumps.reset_mock()
+ mock_dumps.side_effect = ValueError
+ self.assertRaises(PluginExecutionError, pf.validate_data)
+ mock_dumps.assert_called_with(pf.json)
+
+
+class TestYAMLPropertyFile(TestFileBacked, TestPropertyFile):
+ test_obj = YAMLPropertyFile
+
+ @skipUnless(HAS_YAML, "YAML libraries not found, skipping")
+ def setUp(self):
+ TestFileBacked.setUp(self)
+ TestPropertyFile.setUp(self)
+
+ @patch("yaml.load")
+ def test_Index(self, mock_load):
+ pf = self.get_obj()
+ pf.Index()
+ mock_load.assert_called_with(pf.data)
+ self.assertEqual(pf.yaml, mock_load.return_value)
+
+ mock_load.reset_mock()
+ mock_load.side_effect = yaml.YAMLError
+ self.assertRaises(PluginExecutionError, pf.Index)
+ mock_load.assert_called_with(pf.data)
+
+ @patch("yaml.dump")
+ @patch("%s.open" % builtins)
+ def test__write(self, mock_open, mock_dump):
+ pf = self.get_obj()
+ self.assertTrue(pf._write())
+ mock_open.assert_called_with(pf.name, 'wb')
+ mock_dump.assert_called_with(pf.yaml, mock_open.return_value)
+
+ @patch("yaml.dump")
+ def test_validate_data(self, mock_dump):
+ pf = self.get_obj()
+ pf.validate_data()
+ mock_dump.assert_called_with(pf.yaml)
+
+ mock_dump.reset_mock()
+ mock_dump.side_effect = yaml.YAMLError
+ self.assertRaises(PluginExecutionError, pf.validate_data)
+ mock_dump.assert_called_with(pf.yaml)
class TestXMLPropertyFile(TestPropertyFile, TestStructFile):
test_obj = XMLPropertyFile
path = TestStructFile.path
+ def setUp(self):
+ TestPropertyFile.setUp(self)
+ TestStructFile.setUp(self)
+ set_setup_default("automatch", False)
+
def get_obj(self, *args, **kwargs):
return TestStructFile.get_obj(self, *args, **kwargs)
@@ -243,178 +232,47 @@ class TestXMLPropertyFile(TestPropertyFile, TestStructFile):
mock_exists.assert_called_with(schemafile)
mock_XMLSchema.assert_called_with(file=schemafile)
- def test_Index(self):
- TestStructFile.test_Index(self)
-
- pf = self.get_obj()
- pf.xdata = lxml.etree.Element("Properties")
- lxml.etree.SubElement(pf.xdata, "Crypted", encrypted="foo")
- pf.data = lxml.etree.tostring(pf.xdata)
-
- @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
- def test_Index_crypto(self):
- pf = self.get_obj()
- pf._decrypt = Mock()
- pf._decrypt.return_value = 'plaintext'
- pf.data = '''
-<Properties decrypt="strict">
- <Crypted encrypted="foo">
- crypted
- <Plain foo="bar">plain</Plain>
- </Crypted>
- <Crypted encrypted="bar">crypted</Crypted>
- <Plain bar="baz">plain</Plain>
- <Plain>
- <Crypted encrypted="foo">crypted</Crypted>
- </Plain>
-</Properties>'''
-
- # test successful decryption
- pf.Index()
- self.assertItemsEqual(pf._decrypt.call_args_list,
- [call(el) for el in pf.xdata.xpath("//Crypted")])
- for el in pf.xdata.xpath("//Crypted"):
- self.assertEqual(el.text, pf._decrypt.return_value)
-
- # test failed decryption, strict
- pf._decrypt.reset_mock()
- pf._decrypt.side_effect = EVPError
- self.assertRaises(PluginExecutionError, pf.Index)
-
- # test failed decryption, lax
- pf.data = pf.data.replace("strict", "lax")
- pf._decrypt.reset_mock()
- pf.Index()
- self.assertItemsEqual(pf._decrypt.call_args_list,
- [call(el) for el in pf.xdata.xpath("//Crypted")])
-
- @skipUnless(HAS_CRYPTO, "No crypto libraries found, skipping")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.ssl_decrypt")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_algorithm")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.get_passphrases")
- @patchIf(HAS_CRYPTO, "Bcfg2.Encryption.bruteforce_decrypt")
- def test_decrypt(self, mock_bruteforce, mock_get_passphrases,
- mock_get_algorithm, mock_ssl):
- pf = self.get_obj()
- Bcfg2.Server.Plugins.Properties.SETUP = MagicMock()
-
- def reset():
- mock_bruteforce.reset_mock()
- mock_get_algorithm.reset_mock()
- mock_get_passphrases.reset_mock()
- mock_ssl.reset_mock()
-
- # test element without text contents
- self.assertIsNone(pf._decrypt(lxml.etree.Element("Test")))
- self.assertFalse(mock_bruteforce.called)
- self.assertFalse(mock_get_passphrases.called)
- self.assertFalse(mock_ssl.called)
-
- # test element with a passphrase in the config file
- reset()
- el = lxml.etree.Element("Test", encrypted="foo")
- el.text = "crypted"
- mock_get_passphrases.return_value = dict(foo="foopass",
- bar="barpass")
- mock_get_algorithm.return_value = "bf_cbc"
- mock_ssl.return_value = "decrypted with ssl"
- self.assertEqual(pf._decrypt(el), mock_ssl.return_value)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Properties.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Properties.SETUP)
- mock_ssl.assert_called_with(el.text, "foopass",
- algorithm="bf_cbc")
- self.assertFalse(mock_bruteforce.called)
-
- # test failure to decrypt element with a passphrase in the config
- reset()
- mock_ssl.side_effect = EVPError
- self.assertRaises(EVPError, pf._decrypt, el)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Properties.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Properties.SETUP)
- mock_ssl.assert_called_with(el.text, "foopass",
- algorithm="bf_cbc")
- self.assertFalse(mock_bruteforce.called)
-
- # test element without valid passphrase
- reset()
- el.set("encrypted", "true")
- mock_bruteforce.return_value = "decrypted with bruteforce"
- self.assertEqual(pf._decrypt(el), mock_bruteforce.return_value)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Properties.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Properties.SETUP)
- mock_bruteforce.assert_called_with(el.text,
- passphrases=["foopass",
- "barpass"],
- algorithm="bf_cbc")
- self.assertFalse(mock_ssl.called)
-
- # test failure to decrypt element without valid passphrase
- reset()
- mock_bruteforce.side_effect = EVPError
- self.assertRaises(EVPError, pf._decrypt, el)
- mock_get_passphrases.assert_called_with(
- Bcfg2.Server.Plugins.Properties.SETUP)
- mock_get_algorithm.assert_called_with(
- Bcfg2.Server.Plugins.Properties.SETUP)
- mock_bruteforce.assert_called_with(el.text,
- passphrases=["foopass",
- "barpass"],
- algorithm="bf_cbc")
- self.assertFalse(mock_ssl.called)
-
@patch("copy.copy")
def test_get_additional_data(self, mock_copy):
- Bcfg2.Server.Plugins.Properties.SETUP = Mock()
pf = self.get_obj()
+ pf.setup = Mock()
pf.XMLMatch = Mock()
metadata = Mock()
def reset():
mock_copy.reset_mock()
pf.XMLMatch.reset_mock()
- Bcfg2.Server.Plugins.Properties.SETUP.reset_mock()
+ pf.setup.reset_mock()
pf.xdata = lxml.etree.Element("Properties", automatch="true")
- for automatch in [True, False]:
+ for Bcfg2.Options.setup.automatch in [True, False]:
reset()
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = automatch
self.assertEqual(pf.get_additional_data(metadata),
pf.XMLMatch.return_value)
pf.XMLMatch.assert_called_with(metadata)
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False)
self.assertFalse(mock_copy.called)
pf.xdata = lxml.etree.Element("Properties", automatch="false")
- for automatch in [True, False]:
+ for Bcfg2.Options.setup.automatch in [True, False]:
reset()
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = automatch
self.assertEqual(pf.get_additional_data(metadata),
mock_copy.return_value)
mock_copy.assert_called_with(pf)
self.assertFalse(pf.XMLMatch.called)
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False)
pf.xdata = lxml.etree.Element("Properties")
reset()
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = False
+ Bcfg2.Options.setup.automatch = False
self.assertEqual(pf.get_additional_data(metadata),
mock_copy.return_value)
mock_copy.assert_called_with(pf)
self.assertFalse(pf.XMLMatch.called)
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False)
reset()
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.return_value = True
+ Bcfg2.Options.setup.automatch = True
self.assertEqual(pf.get_additional_data(metadata),
pf.XMLMatch.return_value)
pf.XMLMatch.assert_called_with(metadata)
- Bcfg2.Server.Plugins.Properties.SETUP.cfp.getboolean.assert_called_with("properties", "automatch", default=False)
self.assertFalse(mock_copy.called)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py
index f018b45dc..45f3671e8 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestRules.py
@@ -1,9 +1,11 @@
import os
import sys
+import copy
import lxml.etree
-import Bcfg2.Server.Plugin
+import Bcfg2.Options
from mock import Mock, MagicMock, patch
from Bcfg2.Server.Plugins.Rules import *
+from Bcfg2.Server.Plugin import PluginExecutionError
# add all parent testsuite directories to sys.path to allow (most)
# relative imports in python 2.4
@@ -15,116 +17,159 @@ while path != "/":
break
path = os.path.dirname(path)
from common import *
-from TestPlugin import TestPrioDir
+from TestPlugin.Testhelpers import TestPrioDir
class TestRules(TestPrioDir):
test_obj = Rules
- def test_HandlesEntry(self):
+ abstract = dict(
+ basic=lxml.etree.Element("Path", name="/etc/basic"),
+ unhandled=lxml.etree.Element("Path", name="/etc/unhandled"),
+ priority=lxml.etree.Element("Path", name="/etc/priority"),
+ content=lxml.etree.Element("Path", name="/etc/text-content"),
+ duplicate=lxml.etree.Element("SEBoolean", name="duplicate"),
+ group=lxml.etree.Element("SEPort", name="6789/tcp"),
+ children=lxml.etree.Element("Path", name="/etc/child-entries"),
+ regex=lxml.etree.Element("Package", name="regex"),
+ slash=lxml.etree.Element("Path", name="/etc/trailing/slash"),
+ no_slash=lxml.etree.Element("Path", name="/etc/no/trailing/slash/"))
+
+ concrete = dict(
+ basic=lxml.etree.Element("Path", name="/etc/basic", type="directory",
+ owner="root", group="root", mode="0600"),
+ priority=lxml.etree.Element("Path", name="/etc/priority",
+ type="directory", owner="root",
+ group="root", mode="0600"),
+ content=lxml.etree.Element("Path", name="/etc/text-content",
+ type="file", owner="bar", group="bar",
+ mode="0644"),
+ duplicate=lxml.etree.Element("SEBoolean", name="duplicate",
+ value="on"),
+ group=lxml.etree.Element("SEPort", name="6789/tcp",
+ selinuxtype="bcfg2_server_t"),
+ children=lxml.etree.Element("Path", name="/etc/child-entries",
+ type="directory", owner="root",
+ group="root", mode="0775"),
+ regex=lxml.etree.Element("Package", name="regex", type="yum",
+ version="any"),
+ slash=lxml.etree.Element("Path", name="/etc/trailing/slash",
+ type="directory", owner="root", group="root",
+ mode="0600"),
+ no_slash=lxml.etree.Element("Path", name="/etc/no/trailing/slash/",
+ type="directory", owner="root",
+ group="root", mode="0600"))
+
+ concrete['content'].text = "Text content"
+ lxml.etree.SubElement(concrete['children'],
+ "ACL", type="default", scope="user", user="foouser",
+ perms="rw")
+ lxml.etree.SubElement(concrete['children'],
+ "ACL", type="default", scope="group", group="users",
+ perms="rx")
+
+ in_file = copy.deepcopy(concrete)
+ in_file['regex'].set("name", ".*")
+ in_file['slash'].set("name", "/etc/trailing/slash/")
+ in_file['no_slash'].set("name", "/etc/no/trailing/slash")
+
+ rules1 = lxml.etree.Element("Rules", priority="10")
+ rules1.append(in_file['basic'])
+ lxml.etree.SubElement(rules1, "Path", name="/etc/priority",
+ type="directory", owner="foo", group="foo",
+ mode="0644")
+ foogroup = lxml.etree.SubElement(rules1, "Group", name="foogroup")
+ foogroup.append(in_file['group'])
+ rules1.append(in_file['content'])
+ rules1.append(copy.copy(in_file['duplicate']))
+
+ rules2 = lxml.etree.Element("Rules", priority="20")
+ rules2.append(in_file['priority'])
+ rules2.append(in_file['children'])
+ rules2.append(in_file['no_slash'])
+
+ rules3 = lxml.etree.Element("Rules", priority="10")
+ rules3.append(in_file['duplicate'])
+ rules3.append(in_file['regex'])
+ rules3.append(in_file['slash'])
+
+ rules = {"rules1.xml": rules1, "rules2.xml": rules2, "rules3.xml": rules3}
+
+ def setUp(self):
+ TestPrioDir.setUp(self)
+ set_setup_default("lax_decryption", True)
+ set_setup_default("rules_regex", False)
+
+ def get_child(self, name):
+ """ Turn one of the XML documents in `rules` into a child
+ object """
+ filename = os.path.join(datastore, self.test_obj.name, name)
+ rv = self.test_obj.__child__(filename)
+ rv.data = lxml.etree.tostring(self.rules[name])
+ rv.Index()
+ return rv
+
+ def get_obj(self, core=None):
+ r = TestPrioDir.get_obj(self, core=core)
+ r.entries = dict([(n, self.get_child(n)) for n in self.rules.keys()])
+ return r
+
+ def _do_test(self, name, groups=None):
+ if groups is None:
+ groups = []
r = self.get_obj()
- r.Entries = dict(Path={"/etc/foo.conf": Mock(),
- "/etc/bar.conf": Mock()})
- r._matches = Mock()
- metadata = Mock()
-
- entry = lxml.etree.Element("Path", name="/etc/foo.conf")
- self.assertEqual(r.HandlesEntry(entry, metadata),
- r._matches.return_value)
- r._matches.assert_called_with(entry, metadata,
- r.Entries['Path'].keys())
-
- r._matches.reset_mock()
- entry = lxml.etree.Element("Path", name="/etc/baz.conf")
- self.assertEqual(r.HandlesEntry(entry, metadata),
- r._matches.return_value)
- r._matches.assert_called_with(entry, metadata,
- r.Entries['Path'].keys())
-
- r._matches.reset_mock()
- entry = lxml.etree.Element("Package", name="foo")
- self.assertFalse(r.HandlesEntry(entry, metadata))
-
- def test_BindEntry(self, method="BindEntry"):
+ metadata = Mock(groups=groups)
+ entry = copy.deepcopy(self.abstract[name])
+ self.assertTrue(r.HandlesEntry(entry, metadata))
+ r.HandleEntry(entry, metadata)
+ self.assertXMLEqual(entry, self.concrete[name])
+
+ def _do_test_failure(self, name, groups=None, handles=None):
+ if groups is None:
+ groups = []
r = self.get_obj()
- r.get_attrs = Mock()
- r.get_attrs.return_value = dict(overwrite="new", add="add",
- text="text")
- entry = lxml.etree.Element("Test", overwrite="old", keep="keep")
- metadata = Mock()
-
- getattr(r, method)(entry, metadata)
- r.get_attrs.assert_called_with(entry, metadata)
- self.assertItemsEqual(entry.attrib,
- dict(overwrite="old", add="add", keep="keep",
- text="text"))
-
- def test_HandleEntry(self):
- self.test_BindEntry(method="HandleEntry")
-
- @patch("Bcfg2.Server.Plugin.PrioDir._matches")
- def test__matches(self, mock_matches):
- """ test _matches() behavior regardless of state of _regex_enabled """
- r = self.get_obj()
- metadata = Mock()
-
- entry = lxml.etree.Element("Path", name="/etc/foo.conf")
- rules = []
- mock_matches.return_value = True
- self.assertTrue(r._matches(entry, metadata, rules))
- mock_matches.assert_called_with(r, entry, metadata, rules)
-
- # test special Path cases -- adding and removing trailing slash
- mock_matches.reset_mock()
- mock_matches.return_value = False
- rules = ["/etc/foo/", "/etc/bar"]
- entry = lxml.etree.Element("Path", name="/etc/foo")
- self.assertTrue(r._matches(entry, metadata, rules))
- mock_matches.assert_called_with(r, entry, metadata, rules)
-
- mock_matches.reset_mock()
- entry = lxml.etree.Element("Path", name="/etc/bar/")
- self.assertTrue(r._matches(entry, metadata, rules))
- mock_matches.assert_called_with(r, entry, metadata, rules)
-
- @patch("Bcfg2.Server.Plugin.PrioDir._matches")
- def test__matches_regex_disabled(self, mock_matches):
- """ test failure to match with regex disabled """
- r = self.get_obj()
- self.set_regex_enabled(r, False)
- metadata = Mock()
- mock_matches.return_value = False
-
- entry = lxml.etree.Element("Path", name="/etc/foo.conf")
- rules = []
- self.assertFalse(r._matches(entry, metadata, rules))
- mock_matches.assert_called_with(r, entry, metadata, rules)
-
- @patch("Bcfg2.Server.Plugin.PrioDir._matches")
- def test__matches_regex_enabled(self, mock_matches):
- """ test match with regex enabled """
- r = self.get_obj()
- self.set_regex_enabled(r, True)
- metadata = Mock()
- mock_matches.return_value = False
-
- entry = lxml.etree.Element("Path", name="/etc/foo.conf")
- rules = ["/etc/.*\.conf", "/etc/bar"]
- self.assertTrue(r._matches(entry, metadata, rules))
- mock_matches.assert_called_with(r, entry, metadata, rules)
- self.assertIn("/etc/.*\.conf", r._regex_cache.keys())
-
- def set_regex_enabled(self, rules_obj, state):
- """ set the state of regex_enabled for this implementation of
- Rules """
- if not isinstance(rules_obj.core.setup, MagicMock):
- rules_obj.core.setup = MagicMock()
- rules_obj.core.setup.cfp.getboolean.return_value = state
-
- def test__regex_enabled(self):
- r = self.get_obj()
- r.core.setup = MagicMock()
- self.assertEqual(r._regex_enabled,
- r.core.setup.cfp.getboolean.return_value)
- r.core.setup.cfp.getboolean.assert_called_with("rules", "regex",
- default=False)
+ metadata = Mock(groups=groups)
+ entry = self.abstract[name]
+ if handles is not None:
+ self.assertEqual(handles, r.HandlesEntry(entry, metadata))
+ self.assertRaises(PluginExecutionError,
+ r.HandleEntry, entry, metadata)
+
+ def test_basic(self):
+ """ Test basic Rules usage """
+ self._do_test('basic')
+ self._do_test_failure('unhandled', handles=False)
+
+ def test_priority(self):
+ """ Test that Rules respects priority """
+ self._do_test('priority')
+
+ def test_duplicate(self):
+ """ Test that Rules raises exceptions for duplicate entries """
+ self._do_test_failure('duplicate')
+
+ def test_content(self):
+ """ Test that Rules copies text content from concrete entries """
+ self._do_test('content')
+
+ def test_group(self):
+ """ Test that Rules respects <Group/> tags """
+ self._do_test('group', groups=['foogroup'])
+ self._do_test_failure('group', groups=['bargroup'], handles=False)
+
+ def test_children(self):
+ """ Test that Rules copies child elements from concrete entries """
+ self._do_test('children')
+
+ def test_regex(self):
+ """ Test that Rules handles regular expressions properly """
+ Bcfg2.Options.setup.rules_regex = False
+ self._do_test_failure('regex', handles=False)
+ Bcfg2.Options.setup.rules_regex = True
+ self._do_test('regex')
+ Bcfg2.Options.setup.rules_regex = False
+
+ def test_slash(self):
+ """ Test that Rules handles trailing slashes on Path entries """
+ self._do_test('slash')
+ self._do_test('no_slash')
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py
index bf9a3ced3..128d6cae5 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestTemplateHelper.py
@@ -25,7 +25,7 @@ class TestHelperModule(Bcfg2TestCase):
def get_obj(self, path=None):
if path is None:
path = self.path
- return self.test_obj(path, fam=Mock())
+ return self.test_obj(path)
def test__init(self):
hm = self.get_obj()
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestStatistics.py b/testsuite/Testsrc/Testlib/TestServer/TestStatistics.py
new file mode 100644
index 000000000..bf918ef76
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestStatistics.py
@@ -0,0 +1,44 @@
+import os
+import sys
+from mock import Mock, MagicMock, patch
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import *
+
+from Bcfg2.Server.Statistics import *
+
+
+class TestStatistic(Bcfg2TestCase):
+ def test_stat(self):
+ stat = Statistic("test", 1)
+ self.assertEqual(stat.get_value(), ("test", (1.0, 1.0, 1.0, 1)))
+ stat.add_value(10)
+ self.assertEqual(stat.get_value(), ("test", (1.0, 10.0, 5.5, 2)))
+ stat.add_value(100)
+ self.assertEqual(stat.get_value(), ("test", (1.0, 100.0, 37.0, 3)))
+ stat.add_value(12.345)
+ self.assertEqual(stat.get_value(), ("test", (1.0, 100.0, 30.83625, 4)))
+ stat.add_value(0.655)
+ self.assertEqual(stat.get_value(), ("test", (0.655, 100.0, 24.8, 5)))
+
+
+class TestStatistics(Bcfg2TestCase):
+ def test_stats(self):
+ stats = Statistics()
+ self.assertEqual(stats.display(), dict())
+ stats.add_value("test1", 1)
+ self.assertEqual(stats.display(), dict(test1=(1.0, 1.0, 1.0, 1)))
+ stats.add_value("test2", 1.23)
+ self.assertEqual(stats.display(), dict(test1=(1.0, 1.0, 1.0, 1),
+ test2=(1.23, 1.23, 1.23, 1)))
+ stats.add_value("test1", 10)
+ self.assertEqual(stats.display(), dict(test1=(1.0, 10.0, 5.5, 2),
+ test2=(1.23, 1.23, 1.23, 1)))