summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2012-10-02 15:00:03 -0400
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2012-10-02 15:00:03 -0400
commitadf037aa31031be164e68b1a4817a7cada936c90 (patch)
treee4913b33fbed2bb480a2090b419a7fb3fddc67a7
parent414f1c017f5a1e0f0549bcb27175983b04e3312c (diff)
downloadbcfg2-adf037aa31031be164e68b1a4817a7cada936c90.tar.gz
bcfg2-adf037aa31031be164e68b1a4817a7cada936c90.tar.bz2
bcfg2-adf037aa31031be164e68b1a4817a7cada936c90.zip
testsuite: added unit tests for Cfg handlers
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Cfg/CfgCatFilter.py1
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Cfg/CfgEncryptedGenerator.py12
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Cfg/CfgExternalCommandVerifier.py24
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Cfg/CfgGenshiGenerator.py6
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Cfg/CfgInfoXML.py6
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Cfg/CfgPlaintextGenerator.py1
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py1
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py10
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py41
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py50
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py76
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py35
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py89
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py133
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py75
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPlaintextGenerator.py18
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py76
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/__init__.py0
18 files changed, 591 insertions, 63 deletions
diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgCatFilter.py b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgCatFilter.py
index a2e86b3db..49a5a85b3 100644
--- a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgCatFilter.py
+++ b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgCatFilter.py
@@ -3,6 +3,7 @@ plaintext files """
from Bcfg2.Server.Plugins.Cfg import CfgFilter
+
class CfgCatFilter(CfgFilter):
""" CfgCatFilter appends lines to and remove lines from plaintext
:ref:`server-plugins-generators-Cfg` files"""
diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgEncryptedGenerator.py b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgEncryptedGenerator.py
index f8d08b394..dc4bab9f6 100644
--- a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgEncryptedGenerator.py
+++ b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgEncryptedGenerator.py
@@ -30,18 +30,12 @@ class CfgEncryptedGenerator(CfgGenerator):
__init__.__doc__ = CfgGenerator.__init__.__doc__
def handle_event(self, event):
- if event.code2str() == 'deleted':
- return
- try:
- crypted = open(self.name).read()
- except UnicodeDecodeError:
- crypted = open(self.name, mode='rb').read()
- except:
- LOGGER.error("Failed to read %s" % self.name)
+ CfgGenerator.handle_event(self, event)
+ if self.data is None:
return
# todo: let the user specify a passphrase by name
try:
- self.data = bruteforce_decrypt(crypted, setup=SETUP,
+ self.data = bruteforce_decrypt(self.data, setup=SETUP,
algorithm=get_algorithm(SETUP))
except EVPError:
msg = "Failed to decrypt %s" % self.name
diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgExternalCommandVerifier.py b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgExternalCommandVerifier.py
index fb66ca8bf..023af7d4e 100644
--- a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgExternalCommandVerifier.py
+++ b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgExternalCommandVerifier.py
@@ -1,6 +1,7 @@
""" Invoke an external command to verify file contents """
import os
+import sys
import shlex
import logging
import Bcfg2.Server.Plugin
@@ -23,23 +24,30 @@ class CfgExternalCommandVerifier(CfgVerifier):
__init__.__doc__ = CfgVerifier.__init__.__doc__
def verify_entry(self, entry, metadata, data):
- proc = Popen(self.cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
- err = proc.communicate(input=data)[1]
- rv = proc.wait()
- if rv != 0:
- raise CfgVerificationError(err)
+ try:
+ proc = Popen(self.cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
+ err = proc.communicate(input=data)[1]
+ rv = proc.wait()
+ if rv != 0:
+ raise CfgVerificationError(err)
+ except:
+ err = sys.exc_info()[1]
+ raise CfgVerificationError("Error running external command "
+ "verifier: %s" % err)
verify_entry.__doc__ = CfgVerifier.verify_entry.__doc__
def handle_event(self, event):
- if event.code2str() == 'deleted':
+ CfgVerifier.handle_event(self, event)
+ if not self.data:
return
self.cmd = []
if not os.access(self.name, os.X_OK):
- bangpath = open(self.name).readline().strip()
+ bangpath = self.data.splitlines()[0].strip()
if bangpath.startswith("#!"):
self.cmd.extend(shlex.split(bangpath[2:].strip()))
else:
- msg = "Cannot execute %s" % self.name
+ msg = "%s: Cannot execute %s" % (self.__class__.__name__,
+ self.name)
LOGGER.error(msg)
raise Bcfg2.Server.Plugin.PluginExecutionError(msg)
self.cmd.append(self.name)
diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgGenshiGenerator.py b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgGenshiGenerator.py
index 21662a984..2f59d74f2 100644
--- a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgGenshiGenerator.py
+++ b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgGenshiGenerator.py
@@ -142,13 +142,13 @@ class CfgGenshiGenerator(CfgGenerator):
raise
def handle_event(self, event):
- if event.code2str() == 'deleted':
- return
CfgGenerator.handle_event(self, event)
+ if self.data is None:
+ return
try:
self.template = self.loader.load(self.name, cls=NewTextTemplate,
encoding=self.encoding)
- except Exception:
+ except:
msg = "Cfg: Could not load template %s: %s" % (self.name,
sys.exc_info()[1])
LOGGER.error(msg)
diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgInfoXML.py b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgInfoXML.py
index 2396d6eb6..e5ba0a51b 100644
--- a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgInfoXML.py
+++ b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgInfoXML.py
@@ -23,9 +23,9 @@ class CfgInfoXML(CfgInfo):
mdata = dict()
self.infoxml.pnode.Match(metadata, mdata, entry=entry)
if 'Info' not in mdata:
- LOGGER.error("Failed to set metadata for file %s" %
- entry.get('name'))
- raise Bcfg2.Server.Plugin.PluginExecutionError
+ msg = "Failed to set metadata for file %s" % entry.get('name')
+ LOGGER.error(msg)
+ raise Bcfg2.Server.Plugin.PluginExecutionError(msg)
self._set_info(entry, mdata['Info'][None])
bind_info_to_entry.__doc__ = CfgInfo.bind_info_to_entry.__doc__
diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPlaintextGenerator.py b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPlaintextGenerator.py
index 333e2f670..92fb06ee9 100644
--- a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPlaintextGenerator.py
+++ b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgPlaintextGenerator.py
@@ -4,6 +4,7 @@
from Bcfg2.Server.Plugins.Cfg import CfgGenerator
+
class CfgPlaintextGenerator(CfgGenerator):
""" CfgPlaintextGenerator is a
:class:`Bcfg2.Server.Plugins.Cfg.CfgGenerator` that handles plain
diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py b/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py
index c4958ceef..0ab7fd98e 100644
--- a/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py
+++ b/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py
@@ -673,7 +673,6 @@ class Cfg(Bcfg2.Server.Plugin.GroupSpool,
Bcfg2.Server.Plugin.PullTarget.__init__(self)
SETUP = core.setup
- print "SETUP=%s" % SETUP
if 'validate' not in SETUP:
SETUP.add_option('validate', Bcfg2.Options.CFG_VALIDATION)
SETUP.reparse()
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py
index f03ac2fd7..27d726fbb 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py
@@ -49,7 +49,7 @@ class TestFunctions(Bcfg2TestCase):
bind_info,
entry, metadata, infoxml=infoxml)
infoxml.pnode.Match.assert_called_with(metadata, dict(), entry=entry)
-
+
# test with valid infoxml
entry = lxml.etree.Element("Path", name="/test")
infoxml.reset_mock()
@@ -1343,7 +1343,7 @@ class TestSpecificData(Bcfg2TestCase):
if specific is None:
specific = Mock()
return self.test_obj(name, specific, encoding)
-
+
def test__init(self):
pass
@@ -1358,7 +1358,7 @@ class TestSpecificData(Bcfg2TestCase):
self.assertIsNone(sd.data)
else:
self.assertFalse(hasattr(sd, 'data'))
-
+
event = Mock()
mock_open.return_value.read.return_value = "test"
sd.handle_event(event)
@@ -1761,14 +1761,14 @@ class TestEntrySet(TestDebuggable):
class TestGroupSpool(TestPlugin, TestGenerator):
test_obj = GroupSpool
-
+
def get_obj(self, core=None):
@patch("%s.%s.AddDirectoryMonitor" % (self.test_obj.__module__,
self.test_obj.__name__),
Mock())
def inner():
return TestPlugin.get_obj(self, core=core)
-
+
return inner()
def test__init(self):
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py
new file mode 100644
index 000000000..2577b29df
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgCheetahGenerator.py
@@ -0,0 +1,41 @@
+import os
+import sys
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator import *
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
+ skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
+ patchIf, datastore, re_type
+from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator
+
+
+if HAS_CHEETAH or can_skip:
+ class TestCfgCheetahGenerator(TestCfgGenerator):
+ test_obj = CfgCheetahGenerator
+
+ @skipUnless(HAS_CHEETAH, "Cheetah libraries not found, skipping")
+ def setUp(self):
+ pass
+
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgCheetahGenerator.Template")
+ def test_get_data(self, mock_Template):
+ ccg = self.get_obj(encoding='UTF-8')
+ ccg.data = "data"
+ entry = lxml.etree.Element("Path", name="/test.txt")
+ metadata = Mock()
+
+ self.assertEqual(ccg.get_data(entry, metadata),
+ mock_Template.return_value.respond.return_value)
+ mock_Template.assert_called_with("data".decode(ccg.encoding),
+ compilerSettings=ccg.settings)
+ mock_Template.return_value.respond.assert_called_with()
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py
new file mode 100644
index 000000000..0522b9206
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedCheetahGenerator.py
@@ -0,0 +1,50 @@
+import os
+import sys
+from mock import Mock, MagicMock, patch
+from Bcfg2.Server.Plugins.Cfg.CfgEncryptedCheetahGenerator import *
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
+ skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
+ patchIf, datastore, re_type
+
+try:
+ from TestServer.TestPlugins.TestCfg.TestCfgCheetahGenerator import \
+ TestCfgCheetahGenerator
+ HAS_CHEETAH = True
+except ImportError:
+ TestCfgCheetahGenerator = object
+ HAS_CHEETAH = False
+
+try:
+ from TestServer.TestPlugins.TestCfg.TestCfgEncryptedGenerator import \
+ TestCfgEncryptedGenerator
+ HAS_CRYPTO = True
+except ImportError:
+ TestCfgEncryptedGenerator = object
+ HAS_CRYPTO = False
+
+
+if can_skip or (HAS_CRYPTO and HAS_CHEETAH):
+ class TestCfgEncryptedCheetahGenerator(TestCfgCheetahGenerator,
+ TestCfgEncryptedGenerator):
+ test_obj = CfgEncryptedCheetahGenerator
+
+ @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping")
+ @skipUnless(HAS_CHEETAH, "Cheetah libraries not found, skipping")
+ def setUp(self):
+ pass
+
+ def test_handle_event(self):
+ TestCfgEncryptedGenerator.test_handle_event(self)
+
+ def test_get_data(self):
+ TestCfgCheetahGenerator.test_get_data(self)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py
new file mode 100644
index 000000000..d8819b11c
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenerator.py
@@ -0,0 +1,76 @@
+import os
+import sys
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator import *
+from Bcfg2.Server.Plugin import PluginExecutionError
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
+ skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
+ patchIf, datastore, re_type
+from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator
+
+
+if can_skip or HAS_CRYPTO:
+ class TestCfgEncryptedGenerator(TestCfgGenerator):
+ test_obj = CfgEncryptedGenerator
+
+ @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping")
+ def setUp(self):
+ pass
+
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.get_algorithm")
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenerator.bruteforce_decrypt")
+ def test_handle_event(self, mock_decrypt, mock_get_algorithm):
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event")
+ def inner(mock_handle_event):
+ def reset():
+ mock_decrypt.reset_mock()
+ mock_get_algorithm.reset_mock()
+ mock_handle_event.reset_mock()
+
+ def get_event_data(obj, event):
+ obj.data = "encrypted"
+
+ mock_handle_event.side_effect = get_event_data
+ mock_decrypt.side_effect = lambda d, **kw: "plaintext"
+ event = Mock()
+ ceg = self.get_obj()
+ ceg.handle_event(event)
+ mock_handle_event.assert_called_with(ceg, event)
+ mock_decrypt.assert_called_with("encrypted",
+ setup=SETUP,
+ algorithm=mock_get_algorithm.return_value)
+ self.assertEqual(ceg.data, "plaintext")
+
+ reset()
+ mock_decrypt.side_effect = EVPError
+ self.assertRaises(PluginExecutionError,
+ ceg.handle_event, event)
+ inner()
+
+ # to perform the tests from the parent test object, we
+ # make bruteforce_decrypt just return whatever data was
+ # given to it
+ mock_decrypt.side_effect = lambda d, **kw: d
+ TestCfgGenerator.test_handle_event(self)
+
+ def test_get_data(self):
+ ceg = self.get_obj()
+ ceg.data = None
+ entry = lxml.etree.Element("Path", name="/test.txt")
+ metadata = Mock()
+
+ self.assertRaises(PluginExecutionError,
+ ceg.get_data, entry, metadata)
+
+ TestCfgGenerator.test_get_data(self)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py
new file mode 100644
index 000000000..0375b8928
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py
@@ -0,0 +1,35 @@
+import os
+import sys
+from mock import Mock, MagicMock, patch
+from Bcfg2.Server.Plugins.Cfg.CfgEncryptedGenshiGenerator import *
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
+ skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
+ patchIf, datastore, re_type
+
+try:
+ from TestServer.TestPlugins.TestCfg.TestCfgGenshiGenerator import \
+ TestCfgGenshiGenerator
+ HAS_GENSHI = True
+except ImportError:
+ TestCfgGenshiGenerator = object
+ HAS_GENSHI = False
+
+
+if can_skip or (HAS_CRYPTO and HAS_GENSHI):
+ class TestCfgEncryptedGenshiGenerator(TestCfgGenshiGenerator):
+ test_obj = CfgEncryptedGenshiGenerator
+
+ @skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping")
+ @skipUnless(HAS_GENSHI, "Genshi libraries not found, skipping")
+ def setUp(self):
+ pass
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py
new file mode 100644
index 000000000..fde7ed722
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgExternalCommandVerifier.py
@@ -0,0 +1,89 @@
+import os
+import sys
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Server.Plugins.Cfg.CfgExternalCommandVerifier import *
+from Bcfg2.Server.Plugin import PluginExecutionError
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
+ skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
+ patchIf, datastore, re_type
+from TestServer.TestPlugins.TestCfg.Test_init import TestCfgVerifier
+
+
+class TestCfgExternalCommandVerifier(TestCfgVerifier):
+ test_obj = CfgExternalCommandVerifier
+
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgExternalCommandVerifier.Popen")
+ def test_verify_entry(self, mock_Popen):
+ proc = Mock()
+ mock_Popen.return_value = proc
+ proc.wait.return_value = 0
+ proc.communicate.return_value = MagicMock()
+ entry = lxml.etree.Element("Path", name="/test.txt")
+ metadata = Mock()
+
+ ecv = self.get_obj()
+ ecv.cmd = ["/bin/bash", "-x", "foo"]
+ ecv.verify_entry(entry, metadata, "data")
+ self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,))
+ proc.communicate.assert_called_with(input="data")
+ proc.wait.assert_called_with()
+
+ mock_Popen.reset_mock()
+ proc.wait.return_value = 13
+ self.assertRaises(CfgVerificationError,
+ ecv.verify_entry, entry, metadata, "data")
+ self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,))
+ proc.communicate.assert_called_with(input="data")
+ proc.wait.assert_called_with()
+
+ mock_Popen.reset_mock()
+ mock_Popen.side_effect = OSError
+ self.assertRaises(CfgVerificationError,
+ ecv.verify_entry, entry, metadata, "data")
+ self.assertEqual(mock_Popen.call_args[0], (ecv.cmd,))
+
+ @patch("os.access")
+ def test_handle_event(self, mock_access):
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgVerifier.handle_event")
+ def inner(mock_handle_event):
+ ecv = self.get_obj()
+ event = Mock()
+ mock_access.return_value = False
+ ecv.data = "data"
+ self.assertRaises(PluginExecutionError,
+ ecv.handle_event, event)
+ mock_handle_event.assert_called_with(ecv, event)
+ mock_access.assert_called_with(ecv.name, os.X_OK)
+ self.assertItemsEqual(ecv.cmd, [])
+
+ mock_access.reset_mock()
+ mock_handle_event.reset_mock()
+ ecv.data = "#! /bin/bash -x\ntrue"
+ ecv.handle_event(event)
+ mock_handle_event.assert_called_with(ecv, event)
+ mock_access.assert_called_with(ecv.name, os.X_OK)
+ self.assertEqual(ecv.cmd, ["/bin/bash", "-x", ecv.name])
+
+ mock_access.reset_mock()
+ mock_handle_event.reset_mock()
+ mock_access.return_value = True
+ ecv.data = "true"
+ ecv.handle_event(event)
+ mock_handle_event.assert_called_with(ecv, event)
+ mock_access.assert_called_with(ecv.name, os.X_OK)
+ self.assertItemsEqual(ecv.cmd, [ecv.name])
+
+ inner()
+ mock_access.return_value = True
+ TestCfgVerifier.test_handle_event(self)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py
new file mode 100644
index 000000000..2dd647352
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py
@@ -0,0 +1,133 @@
+import os
+import sys
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Server.Plugins.Cfg.CfgGenshiGenerator import *
+from Bcfg2.Server.Plugin import PluginExecutionError
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
+ skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
+ patchIf, datastore, re_type
+from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator
+
+
+if can_skip or HAS_GENSHI:
+ class TestCfgGenshiGenerator(TestCfgGenerator):
+ test_obj = CfgGenshiGenerator
+
+ @skipUnless(HAS_GENSHI, "Genshi libraries not found, skipping")
+ def setUp(self):
+ pass
+
+ def test_removecomment(self):
+ data = [(None, "test", 1),
+ (None, "test2", 2)]
+ stream = [(genshi.core.COMMENT, "test", 0),
+ data[0],
+ (genshi.core.COMMENT, "test3", 0),
+ data[1]]
+ self.assertItemsEqual(list(removecomment(stream)), data)
+
+ def test__init(self):
+ TestCfgGenerator.test__init(self)
+ cgg = self.get_obj()
+ self.assertIsInstance(cgg.loader, cgg.__loader_cls__)
+
+ def test_get_data(self):
+ cgg = self.get_obj()
+ cgg._handle_genshi_exception = Mock()
+ cgg.template = Mock()
+ fltr = Mock()
+ cgg.template.generate.return_value = fltr
+ stream = Mock()
+ fltr.filter.return_value = stream
+ entry = lxml.etree.Element("Path", name="/test.txt")
+ metadata = Mock()
+
+ def reset():
+ cgg.template.reset_mock()
+ cgg._handle_genshi_exception.reset_mock()
+
+ self.assertEqual(cgg.get_data(entry, metadata),
+ stream.render.return_value)
+ cgg.template.generate.assert_called_with(name=entry.get("name"),
+ metadata=metadata,
+ path=cgg.name)
+ fltr.filter.assert_called_with(removecomment)
+ stream.render.assert_called_with("text", encoding=cgg.encoding,
+ strip_whitespace=False)
+
+ reset()
+ def render(fmt, **kwargs):
+ stream.render.side_effect = None
+ raise TypeError
+ stream.render.side_effect = render
+ self.assertEqual(cgg.get_data(entry, metadata),
+ stream.render.return_value)
+ cgg.template.generate.assert_called_with(name=entry.get("name"),
+ metadata=metadata,
+ path=cgg.name)
+ fltr.filter.assert_called_with(removecomment)
+ self.assertEqual(stream.render.call_args_list,
+ [call("text", encoding=cgg.encoding,
+ strip_whitespace=False),
+ call("text", encoding=cgg.encoding)])
+
+ reset()
+ stream.render.side_effect = UndefinedError("test")
+ self.assertRaises(UndefinedError,
+ cgg.get_data, entry, metadata)
+ cgg.template.generate.assert_called_with(name=entry.get("name"),
+ metadata=metadata,
+ path=cgg.name)
+ fltr.filter.assert_called_with(removecomment)
+ stream.render.assert_called_with("text", encoding=cgg.encoding,
+ strip_whitespace=False)
+
+ reset()
+ stream.render.side_effect = ValueError
+ cgg._handle_genshi_exception.side_effect = ValueError
+ self.assertRaises(ValueError,
+ cgg.get_data, entry, metadata)
+ cgg.template.generate.assert_called_with(name=entry.get("name"),
+ metadata=metadata,
+ path=cgg.name)
+ fltr.filter.assert_called_with(removecomment)
+ stream.render.assert_called_with("text", encoding=cgg.encoding,
+ strip_whitespace=False)
+ self.assertTrue(cgg._handle_genshi_exception.called)
+
+ def test_handle_event(self):
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgGenerator.handle_event")
+ def inner(mock_handle_event):
+ cgg = self.get_obj()
+ cgg.loader = Mock()
+ cgg.data = "template data"
+ event = Mock()
+ cgg.handle_event(event)
+ cgg.loader.load.assert_called_with(cgg.name,
+ cls=NewTextTemplate,
+ encoding=cgg.encoding)
+
+ cgg.loader.reset_mock()
+ cgg.loader.load.side_effect = OSError
+ self.assertRaises(PluginExecutionError,
+ cgg.handle_event, event)
+ cgg.loader.load.assert_called_with(cgg.name,
+ cls=NewTextTemplate,
+ encoding=cgg.encoding)
+
+ inner()
+ loader_cls = self.test_obj.__loader_cls__
+ self.test_obj.__loader_cls__ = Mock
+ TestCfgGenerator.test_handle_event(self)
+ self.test_obj.__loader_cls__ = loader_cls
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py
new file mode 100644
index 000000000..88dd1f18f
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgInfoXML.py
@@ -0,0 +1,75 @@
+import os
+import sys
+import lxml.etree
+from mock import Mock, MagicMock, patch
+from Bcfg2.Server.Plugins.Cfg.CfgInfoXML import *
+from Bcfg2.Server.Plugin import InfoXML, PluginExecutionError
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
+ skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
+ patchIf, datastore, re_type
+from TestServer.TestPlugins.TestCfg.Test_init import TestCfgInfo
+
+
+class TestCfgInfoXML(TestCfgInfo):
+ test_obj = CfgInfoXML
+
+ def test__init(self):
+ TestCfgInfo.test__init(self)
+ ci = self.get_obj()
+ self.assertIsInstance(ci.infoxml, InfoXML)
+
+ def test_bind_info_to_entry(self):
+ entry = lxml.etree.Element("Path", name="/test.txt")
+ metadata = Mock()
+ ci = self.get_obj()
+ ci.infoxml = Mock()
+ ci._set_info = Mock()
+
+ self.assertRaises(PluginExecutionError,
+ ci.bind_info_to_entry, entry, metadata)
+ ci.infoxml.pnode.Match.assert_called_with(metadata, dict(),
+ entry=entry)
+ self.assertFalse(ci._set_info.called)
+
+ ci.infoxml.reset_mock()
+ ci._set_info.reset_mock()
+ mdata_value = Mock()
+ def set_mdata(metadata, mdata, entry=None):
+ mdata['Info'] = {None: mdata_value}
+
+ ci.infoxml.pnode.Match.side_effect = set_mdata
+ ci.bind_info_to_entry(entry, metadata)
+ ci.infoxml.pnode.Match.assert_called_with(metadata,
+ dict(Info={None: mdata_value}),
+ entry=entry)
+ ci._set_info.assert_called_with(entry, mdata_value)
+
+ def test_handle_event(self):
+ ci = self.get_obj()
+ ci.infoxml = Mock()
+ ci.handle_event(Mock)
+ ci.infoxml.HandleEvent.assert_called_with()
+
+ def test__set_info(self):
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgInfo._set_info")
+ def inner(mock_set_info):
+ ci = self.get_obj()
+ entry = Mock()
+ info = {"foo": "foo",
+ "__children__": ["one", "two"]}
+ ci._set_info(entry, info)
+ self.assertItemsEqual(entry.append.call_args_list,
+ [call(c) for c in info['__children__']])
+
+ inner()
+ TestCfgInfo.test__set_info(self)
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPlaintextGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPlaintextGenerator.py
new file mode 100644
index 000000000..5c6767a59
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgPlaintextGenerator.py
@@ -0,0 +1,18 @@
+import os
+import sys
+from Bcfg2.Server.Plugins.Cfg.CfgPlaintextGenerator import *
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator
+
+
+class TestCfgPlaintextGenerator(TestCfgGenerator):
+ test_obj = CfgPlaintextGenerator
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py
index 32db90304..fb1a28bca 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py
@@ -1,6 +1,5 @@
import os
import sys
-import copy
import lxml.etree
import Bcfg2.Options
from Bcfg2.Compat import walk_packages
@@ -44,29 +43,36 @@ class TestCfgBaseFileMatcher(TestSpecificData):
for basename in basenames:
for extension in extensions:
def test_match(spec):
- return regex.match(basename + "." + spec + extension)
-
- regex = self.test_obj.get_regex(basename=basename)
- self.assertTrue(regex.match(basename))
- self.assertFalse(regex.match("bogus"))
- if self.test_obj.__specific__:
- self.assertTrue(test_match("G20_foo"))
- self.assertTrue(test_match("G1_foo"))
- self.assertTrue(test_match("G32768_foo"))
- # a group named '_'
- self.assertTrue(test_match("G10__"))
- self.assertTrue(test_match("H_hostname"))
- self.assertTrue(test_match("H_fqdn.subdomain.example.com"))
- self.assertTrue(test_match("G20_group_with_underscores"))
-
- self.assertFalse(test_match("G20_group with spaces"))
- self.assertFalse(test_match("G_foo"))
- self.assertFalse(test_match("G_"))
- self.assertFalse(test_match("G20_"))
- self.assertFalse(test_match("H_"))
- else:
- self.assertFalse(test_match("G20_foo"))
- self.assertFalse(test_match("H_hostname"))
+ mstr = basename
+ if spec:
+ mstr += "." + spec
+ if extension:
+ mstr += "." + extension
+ return regex.match(mstr)
+
+ regex = self.test_obj.get_regex(basename=basename)
+ self.assertTrue(test_match(''))
+ self.assertFalse(regex.match("bogus"))
+ if self.test_obj.__specific__:
+ if extension:
+ self.assertFalse(regex.match("bogus." + extension))
+ self.assertTrue(test_match("G20_foo"))
+ self.assertTrue(test_match("G1_foo"))
+ self.assertTrue(test_match("G32768_foo"))
+ # a group named '_'
+ self.assertTrue(test_match("G10__"))
+ self.assertTrue(test_match("H_hostname"))
+ self.assertTrue(test_match("H_fqdn.subdomain.example.com"))
+ self.assertTrue(test_match("G20_group_with_underscores"))
+
+ self.assertFalse(test_match("G20_group with spaces"))
+ self.assertFalse(test_match("G_foo"))
+ self.assertFalse(test_match("G_"))
+ self.assertFalse(test_match("G20_"))
+ self.assertFalse(test_match("H_"))
+ else:
+ self.assertFalse(test_match("G20_foo"))
+ self.assertFalse(test_match("H_hostname"))
@patch("Bcfg2.Server.Plugins.Cfg.CfgBaseFileMatcher.get_regex")
def test_handles(self, mock_get_regex):
@@ -125,7 +131,7 @@ class TestCfgBaseFileMatcher(TestSpecificData):
self.assertFalse(mock_get_regex.called)
elif self.test_obj.__basenames__:
match.return_value = False
- self.assertFalse(self.test_obj.handles(evt))
+ self.assertFalse(self.test_obj.ignore(evt))
self.assertItemsEqual(mock_get_regex.call_args_list,
[call(basename=b,
extensions=self.test_obj.__ignore__)
@@ -137,12 +143,13 @@ class TestCfgBaseFileMatcher(TestSpecificData):
mock_get_regex.reset_mock()
match.reset_mock()
match.return_value = True
- self.assertTrue(self.test_obj.handles(evt))
+ self.assertTrue(self.test_obj.ignore(evt))
match.assert_called_with(evt.filename)
else:
match.return_value = False
- self.assertFalse(self.test_obj.handles(evt,
- basename=os.path.basename(self.path)))
+ self.assertFalse(self.test_obj.ignore(
+ evt,
+ basename=os.path.basename(self.path)))
mock_get_regex.assert_called_with(
basename=os.path.basename(self.path),
extensions=self.test_obj.__ignore__)
@@ -151,11 +158,12 @@ class TestCfgBaseFileMatcher(TestSpecificData):
mock_get_regex.reset_mock()
match.reset_mock()
match.return_value = True
- self.assertTrue(self.test_obj.handles(evt,
- basename=os.path.basename(self.path),
- extensions=self.test_obj.__ignore__))
+ self.assertTrue(self.test_obj.ignore(
+ evt,
+ basename=os.path.basename(self.path)))
mock_get_regex.assert_called_with(
- basename=os.path.basename(self.path))
+ basename=os.path.basename(self.path),
+ extensions=self.test_obj.__ignore__)
match.assert_called_with(evt.filename)
@@ -496,7 +504,7 @@ class TestCfgEntrySet(TestEntrySet):
# test failed validation
entry = reset()
eset._validate_data.side_effect = CfgVerificationError
- self.assertRaises(Bcfg2.Server.Plugin.PluginExecutionError,
+ self.assertRaises(PluginExecutionError,
eset.bind_entry, entry, metadata)
eset.bind_info_to_entry.assert_called_with(entry, metadata)
eset._generate_data.assert_called_with(entry, metadata)
@@ -634,7 +642,7 @@ class TestCfgEntrySet(TestEntrySet):
# test failure to generate data
reset()
generator.get_data.side_effect = OSError
- self.assertRaises(Bcfg2.Server.Plugin.PluginExecutionError,
+ self.assertRaises(PluginExecutionError,
eset._generate_data, entry, metadata)
def test_validate_data(self):
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/__init__.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/__init__.py
new file mode 100644
index 000000000..e69de29bb
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/__init__.py