summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/lib/Bcfg2/Server/Plugin/helpers.py3
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py224
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py14
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py720
-rw-r--r--testsuite/common.py7
5 files changed, 859 insertions, 109 deletions
diff --git a/src/lib/Bcfg2/Server/Plugin/helpers.py b/src/lib/Bcfg2/Server/Plugin/helpers.py
index 96d661b57..74f4dad8b 100644
--- a/src/lib/Bcfg2/Server/Plugin/helpers.py
+++ b/src/lib/Bcfg2/Server/Plugin/helpers.py
@@ -1292,7 +1292,8 @@ class EntrySet(Debuggable):
""" Shortcut to call :func:`bind_info` with the base
info/info.xml for this EntrySet.
- :param entry: The abstract entry to bind the info to
+ :param entry: The abstract entry to bind the info to. This
+ will be modified in place
:type entry: lxml.etree._Element
:param metadata: The client metadata to get info for
:type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata
diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py b/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py
index c34cad30e..952230080 100644
--- a/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py
+++ b/src/lib/Bcfg2/Server/Plugins/Cfg/__init__.py
@@ -10,7 +10,7 @@ import Bcfg2.Options
import Bcfg2.Server.Plugin
import Bcfg2.Server.Lint
# pylint: disable=W0622
-from Bcfg2.Compat import u_str, unicode, b64encode, walk_packages
+from Bcfg2.Compat import u_str, unicode, b64encode, walk_packages, any
# pylint: enable=W0622
LOGGER = logging.getLogger(__name__)
@@ -63,7 +63,7 @@ class CfgBaseFileMatcher(Bcfg2.Server.Plugin.SpecificData):
Bcfg2.Server.Plugin.SpecificData.__init__(self, name, specific,
encoding)
self.encoding = encoding
- self.regex = self.__class__.get_regex(name)
+ self.regex = self.__class__.get_regex(basename=name)
__init__.__doc__ = Bcfg2.Server.Plugin.SpecificData.__init__.__doc__ + \
"""
.. -----
@@ -126,15 +126,10 @@ class CfgBaseFileMatcher(Bcfg2.Server.Plugin.SpecificData):
else:
basenames = [basename]
- # do simple non-regex matching first
- match = False
- for bname in basenames:
- if event.filename.startswith(os.path.basename(bname)):
- match = True
- break
- return (match and
- cls.get_regex(
- basename=os.path.basename(basename)).match(event.filename))
+ return any(
+ cls.get_regex(
+ basename=os.path.basename(bname)).match(event.filename)
+ for bname in basenames)
@classmethod
def ignore(cls, event, basename=None):
@@ -162,15 +157,10 @@ class CfgBaseFileMatcher(Bcfg2.Server.Plugin.SpecificData):
else:
basenames = [basename]
- # do simple non-regex matching first
- match = False
- for bname in basenames:
- if event.filename.startswith(os.path.basename(bname)):
- match = True
- break
- return (match and
- cls.get_regex(basename=os.path.basename(basename),
- extensions=cls.__ignore__).match(event.filename))
+ return any(
+ cls.get_regex(basename=os.path.basename(bname),
+ extensions=cls.__ignore__).match(event.filename)
+ for bname in basenames)
def __str__(self):
return "%s(%s)" % (self.__class__.__name__, self.name)
@@ -282,7 +272,7 @@ class CfgInfo(CfgBaseFileMatcher):
"""
for key, value in list(info.items()):
if not key.startswith("__"):
- entry.attrib.__setitem__(key, value)
+ entry.attrib[key] = value
class CfgVerifier(CfgBaseFileMatcher):
@@ -413,9 +403,7 @@ class CfgEntrySet(Bcfg2.Server.Plugin.EntrySet):
(action, event.filename))
def get_matching(self, metadata):
- return [item for item in list(self.entries.values())
- if (isinstance(item, CfgGenerator) and
- item.specific.matches(metadata))]
+ return self.get_handlers(metadata, CfgGenerator)
get_matching.__doc__ = Bcfg2.Server.Plugin.EntrySet.get_matching.__doc__
def entry_init(self, event, hdlr): # pylint: disable=W0221
@@ -444,76 +432,22 @@ class CfgEntrySet(Bcfg2.Server.Plugin.EntrySet):
self.entries[event.filename] = hdlr(fpath)
self.entries[event.filename].handle_event(event)
- def bind_entry(self, entry, metadata): # pylint: disable=R0912,R0915
- info_handlers = []
- generators = []
- filters = []
- verifiers = []
- for ent in self.entries.values():
- if ent.__specific__ and not ent.specific.matches(metadata):
- continue
- if isinstance(ent, CfgInfo):
- info_handlers.append(ent)
- if isinstance(ent, CfgGenerator):
- generators.append(ent)
- if isinstance(ent, CfgFilter):
- filters.append(ent)
- if isinstance(ent, CfgVerifier):
- verifiers.append(ent)
- if ent.deprecated:
- if ent.__basenames__:
- fdesc = "/".join(ent.__basenames__)
- elif ent.__extensions__:
- fdesc = "." + "/.".join(ent.__extensions__)
- LOGGER.warning("Cfg: %s: Use of %s files is deprecated" %
- (ent.name, fdesc))
-
- DEFAULT_INFO.bind_info_to_entry(entry, metadata)
- if len(info_handlers) > 1:
- LOGGER.error("More than one info supplier found for %s: %s" %
- (entry.get("name"), info_handlers))
- if len(info_handlers):
- info_handlers[0].bind_info_to_entry(entry, metadata)
- if entry.tag == 'Path':
- entry.set('type', 'file')
-
- generator = self.best_matching(metadata, generators)
- if entry.get('perms').lower() == 'inherit':
- # use on-disk permissions
- fname = os.path.join(self.path, generator.name)
- entry.set('perms',
- str(oct(stat.S_IMODE(os.stat(fname).st_mode))))
- try:
- data = generator.get_data(entry, metadata)
- except:
- msg = "Cfg: exception rendering %s with %s: %s" % \
- (entry.get("name"), generator, sys.exc_info()[1])
- LOGGER.error(msg)
- raise Bcfg2.Server.Plugin.PluginExecutionError(msg)
+ def bind_entry(self, entry, metadata):
+ self.bind_info_to_entry(entry, metadata)
+ data = self._generate_data(entry, metadata)
- for fltr in filters:
+ for fltr in self.get_handlers(metadata, CfgFilter):
data = fltr.modify_data(entry, metadata, data)
if SETUP['validate']:
- # we can have multiple verifiers, but we only want to use the
- # best matching verifier of each class
- verifiers_by_class = dict()
- for verifier in verifiers:
- cls = verifier.__class__.__name__
- if cls not in verifiers_by_class:
- verifiers_by_class[cls] = [verifier]
- else:
- verifiers_by_class[cls].append(verifier)
- for verifiers in verifiers_by_class.values():
- verifier = self.best_matching(metadata, verifiers)
- try:
- verifier.verify_entry(entry, metadata, data)
- except CfgVerificationError:
- msg = "Data for %s for %s failed to verify: %s" % \
- (entry.get('name'), metadata.hostname,
- sys.exc_info()[1])
- LOGGER.error(msg)
- raise Bcfg2.Server.Plugin.PluginExecutionError(msg)
+ try:
+ self._validate_data(entry, metadata, data)
+ except CfgVerificationError:
+ msg = "Data for %s for %s failed to verify: %s" % \
+ (entry.get('name'), metadata.hostname,
+ sys.exc_info()[1])
+ LOGGER.error(msg)
+ raise Bcfg2.Server.Plugin.PluginExecutionError(msg)
if entry.get('encoding') == 'base64':
data = b64encode(data)
@@ -531,7 +465,7 @@ class CfgEntrySet(Bcfg2.Server.Plugin.EntrySet):
msg = "Error in specification for %s: %s" % (entry.get('name'),
sys.exc_info()[1])
LOGGER.error(msg)
- LOGGER.error("You need to specify base64 encoding for %s." %
+ LOGGER.error("You need to specify base64 encoding for %s" %
entry.get('name'))
raise Bcfg2.Server.Plugin.PluginExecutionError(msg)
except TypeError:
@@ -546,6 +480,100 @@ class CfgEntrySet(Bcfg2.Server.Plugin.EntrySet):
return entry
bind_entry.__doc__ = Bcfg2.Server.Plugin.EntrySet.bind_entry.__doc__
+ def get_handlers(self, metadata, handler_type):
+ """ Get all handlers of the given type for the given metadata.
+
+ :param metadata: The metadata to get all handlers for.
+ :type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata
+ :param handler_type: The type of Cfg handler to get
+ :type handler_type: type
+ :returns: list of Cfg handler classes
+ """
+ rv = []
+ for ent in self.entries.values():
+ if (isinstance(ent, handler_type) and
+ (not ent.__specific__ or ent.specific.matches(metadata))):
+ rv.append(ent)
+ if ent.deprecated:
+ if ent.__basenames__:
+ fdesc = "/".join(ent.__basenames__)
+ elif ent.__extensions__:
+ fdesc = "." + "/.".join(ent.__extensions__)
+ LOGGER.warning("Cfg: %s: Use of %s files is deprecated" %
+ (ent.name, fdesc))
+ return rv
+
+ def bind_info_to_entry(self, entry, metadata):
+ """ Bind entry metadata to the entry with the best CfgInfo
+ handler
+
+ :param entry: The abstract entry to bind the info to. This
+ will be modified in place
+ :type entry: lxml.etree._Element
+ :param metadata: The client metadata to get info for
+ :type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata
+ :returns: None
+ """
+ info_handlers = self.get_handlers(metadata, CfgInfo)
+ DEFAULT_INFO.bind_info_to_entry(entry, metadata)
+ if len(info_handlers) > 1:
+ LOGGER.error("More than one info supplier found for %s: %s" %
+ (entry.get("name"), info_handlers))
+ if len(info_handlers):
+ info_handlers[0].bind_info_to_entry(entry, metadata)
+ if entry.tag == 'Path':
+ entry.set('type', 'file')
+
+ def _generate_data(self, entry, metadata):
+ """ Generate data for the given entry on the given client
+
+ :param entry: The abstract entry to generate data for. This
+ will not be modified
+ :type entry: lxml.etree._Element
+ :param metadata: The client metadata generate data for
+ :type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata
+ :returns: string - the data for the entry
+ """
+ generator = self.best_matching(metadata,
+ self.get_handlers(metadata,
+ CfgGenerator))
+ if entry.get('perms').lower() == 'inherit':
+ # use on-disk permissions
+ fname = os.path.join(self.path, generator.name)
+ entry.set('perms',
+ str(oct(stat.S_IMODE(os.stat(fname).st_mode))))
+ try:
+ return generator.get_data(entry, metadata)
+ except:
+ msg = "Cfg: exception rendering %s with %s: %s" % \
+ (entry.get("name"), generator, sys.exc_info()[1])
+ LOGGER.error(msg)
+ raise Bcfg2.Server.Plugin.PluginExecutionError(msg)
+
+ def _validate_data(self, entry, metadata, data):
+ """ Validate data for the given entry on the given client
+
+ :param entry: The abstract entry to validate data for
+ :type entry: lxml.etree._Element
+ :param metadata: The client metadata validate data for
+ :type metadata: Bcfg2.Server.Plugins.Metadata.ClientMetadata
+ :returns: None
+ :raises: :exc:`Bcfg2.Server.Plugins.Cfg.CfgVerificationError`
+ """
+ verifiers = self.get_handlers(metadata, CfgVerifier)
+ # we can have multiple verifiers, but we only want to use the
+ # best matching verifier of each class
+ verifiers_by_class = dict()
+ for verifier in verifiers:
+ cls = verifier.__class__.__name__
+ if cls not in verifiers_by_class:
+ verifiers_by_class[cls] = [verifier]
+ else:
+ verifiers_by_class[cls].append(verifier)
+ for verifiers in verifiers_by_class.values():
+ verifier = self.best_matching(metadata, verifiers)
+ verifier.verify_entry(entry, metadata, data)
+
def list_accept_choices(self, entry, metadata):
'''return a list of candidate pull locations'''
generators = [ent for ent in list(self.entries.values())
@@ -643,6 +671,7 @@ class Cfg(Bcfg2.Server.Plugin.GroupSpool,
Bcfg2.Server.Plugin.PullTarget.__init__(self)
SETUP = core.setup
+ print "SETUP=%s" % SETUP
if 'validate' not in SETUP:
SETUP.add_option('validate', Bcfg2.Options.CFG_VALIDATION)
SETUP.reparse()
@@ -664,12 +693,8 @@ class Cfg(Bcfg2.Server.Plugin.GroupSpool,
if entry.get('name') not in self.entries:
return False
- for ent in self.entries[entry.get('name')].entries.values():
- if ent.__specific__ and not ent.specific.matches(metadata):
- continue
- if isinstance(ent, CfgGenerator):
- return True
- return False
+ return bool(self.entries[entry.get('name')].get_handlers(metadata,
+ CfgGenerator))
def AcceptChoices(self, entry, metadata):
return self.entries[entry.get('name')].list_accept_choices(entry,
@@ -692,11 +717,10 @@ class CfgLint(Bcfg2.Server.Lint.ServerPlugin):
for basename, entry in list(self.core.plugins['Cfg'].entries.items()):
self.check_entry(basename, entry)
-
@classmethod
def Errors(cls):
- return {"cat-file-used":"warning",
- "diff-file-used":"warning"}
+ return {"cat-file-used": "warning",
+ "diff-file-used": "warning"}
def check_entry(self, basename, entry):
""" check that no .cat or .diff files are in use """
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py
index db8ee0cca..f03ac2fd7 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py
@@ -17,16 +17,11 @@ while path != '/':
if os.path.basename(path) == "testsuite":
break
path = os.path.dirname(path)
-from common import XI_NAMESPACE, XI, call, builtins, skip, skipIf, skipUnless, \
- Bcfg2TestCase, patchIf, datastore, inPy3k, can_skip
+from common import XI_NAMESPACE, XI, call, builtins, skip, skipIf, \
+ skipUnless, Bcfg2TestCase, patchIf, datastore, inPy3k, can_skip, re_type
from TestServer.TestPlugin.Testbase import TestPlugin, TestDebuggable
from TestServer.TestPlugin.Testinterfaces import TestGenerator
-try:
- re_type = re._pattern_type
-except AttributeError:
- re_type = type(re.compile(""))
-
def tostring(el):
return lxml.etree.tostring(el, xml_declaration=False).decode('UTF-8')
@@ -1349,6 +1344,9 @@ class TestSpecificData(Bcfg2TestCase):
specific = Mock()
return self.test_obj(name, specific, encoding)
+ def test__init(self):
+ pass
+
@patch("%s.open" % builtins)
def test_handle_event(self, mock_open):
event = Mock()
@@ -1783,7 +1781,7 @@ class TestGroupSpool(TestPlugin, TestGenerator):
self.assertItemsEqual(gs.Entries, {gs.entry_type: {}})
inner()
-
+
@patch("os.path.isdir")
@patch("os.path.isfile")
def test_add_entry(self, mock_isfile, mock_isdir):
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py
new file mode 100644
index 000000000..32db90304
--- /dev/null
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/Test_init.py
@@ -0,0 +1,720 @@
+import os
+import sys
+import copy
+import lxml.etree
+import Bcfg2.Options
+from Bcfg2.Compat import walk_packages
+from mock import Mock, MagicMock, patch
+from Bcfg2.Server.Plugins.Cfg import *
+from Bcfg2.Server.Plugin import PluginExecutionError
+
+# add all parent testsuite directories to sys.path to allow (most)
+# relative imports in python 2.4
+path = os.path.dirname(__file__)
+while path != "/":
+ if os.path.basename(path).lower().startswith("test"):
+ sys.path.append(path)
+ if os.path.basename(path) == "testsuite":
+ break
+ path = os.path.dirname(path)
+from common import XI_NAMESPACE, XI, inPy3k, call, builtins, u, can_skip, \
+ skip, skipIf, skipUnless, Bcfg2TestCase, DBModelTestCase, syncdb, \
+ patchIf, datastore, re_type
+from TestPlugin import TestSpecificData, TestEntrySet, TestGroupSpool, \
+ TestPullTarget
+
+
+class TestCfgBaseFileMatcher(TestSpecificData):
+ test_obj = CfgBaseFileMatcher
+
+ def test__init(self):
+ TestSpecificData.test__init(self)
+ bfm = self.get_obj()
+ self.assertIsInstance(bfm.regex, re_type)
+
+ def test_get_regex(self):
+ if self.test_obj.__basenames__:
+ basenames = self.test_obj.__basenames__
+ else:
+ basenames = [os.path.basename(self.path)]
+ if self.test_obj.__extensions__:
+ extensions = self.test_obj.__extensions__
+ else:
+ extensions = ['']
+ for basename in basenames:
+ for extension in extensions:
+ def test_match(spec):
+ return regex.match(basename + "." + spec + extension)
+
+ regex = self.test_obj.get_regex(basename=basename)
+ self.assertTrue(regex.match(basename))
+ self.assertFalse(regex.match("bogus"))
+ if self.test_obj.__specific__:
+ self.assertTrue(test_match("G20_foo"))
+ self.assertTrue(test_match("G1_foo"))
+ self.assertTrue(test_match("G32768_foo"))
+ # a group named '_'
+ self.assertTrue(test_match("G10__"))
+ self.assertTrue(test_match("H_hostname"))
+ self.assertTrue(test_match("H_fqdn.subdomain.example.com"))
+ self.assertTrue(test_match("G20_group_with_underscores"))
+
+ self.assertFalse(test_match("G20_group with spaces"))
+ self.assertFalse(test_match("G_foo"))
+ self.assertFalse(test_match("G_"))
+ self.assertFalse(test_match("G20_"))
+ self.assertFalse(test_match("H_"))
+ else:
+ self.assertFalse(test_match("G20_foo"))
+ self.assertFalse(test_match("H_hostname"))
+
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgBaseFileMatcher.get_regex")
+ def test_handles(self, mock_get_regex):
+ match = Mock()
+ mock_get_regex.return_value = Mock()
+ mock_get_regex.return_value.match = match
+
+ evt = Mock()
+ evt.filename = "event.txt"
+
+ if self.test_obj.__basenames__:
+ match.return_value = False
+ self.assertFalse(self.test_obj.handles(evt))
+ self.assertItemsEqual(mock_get_regex.call_args_list,
+ [call(basename=b)
+ for b in self.test_obj.__basenames__])
+ self.assertItemsEqual(match.call_args_list,
+ [call(evt.filename)
+ for b in self.test_obj.__basenames__])
+
+ mock_get_regex.reset_mock()
+ match.reset_mock()
+ match.return_value = True
+ self.assertTrue(self.test_obj.handles(evt))
+ match.assert_called_with(evt.filename)
+ else:
+ match.return_value = False
+ self.assertFalse(self.test_obj.handles(evt,
+ basename=os.path.basename(self.path)))
+ mock_get_regex.assert_called_with(
+ basename=os.path.basename(self.path))
+ match.assert_called_with(evt.filename)
+
+ mock_get_regex.reset_mock()
+ match.reset_mock()
+ match.return_value = True
+ self.assertTrue(self.test_obj.handles(evt,
+ basename=os.path.basename(self.path)))
+ mock_get_regex.assert_called_with(
+ basename=os.path.basename(self.path))
+ match.assert_called_with(evt.filename)
+
+ @patch("%s.%s.get_regex" % (test_obj.__module__, test_obj.__name__))
+ def test_ignore(self, mock_get_regex):
+ match = Mock()
+ mock_get_regex.return_value = Mock()
+ mock_get_regex.return_value.match = match
+
+ evt = Mock()
+ evt.filename = "event.txt"
+
+ if not self.test_obj.__ignore__:
+ self.assertFalse(self.test_obj.ignore(evt))
+ self.assertFalse(mock_get_regex.called)
+ self.assertFalse(self.test_obj.ignore(evt, basename=os.path.basename(self.path)))
+ self.assertFalse(mock_get_regex.called)
+ elif self.test_obj.__basenames__:
+ match.return_value = False
+ self.assertFalse(self.test_obj.handles(evt))
+ self.assertItemsEqual(mock_get_regex.call_args_list,
+ [call(basename=b,
+ extensions=self.test_obj.__ignore__)
+ for b in self.test_obj.__basenames__])
+ self.assertItemsEqual(match.call_args_list,
+ [call(evt.filename)
+ for b in self.test_obj.__basenames__])
+
+ mock_get_regex.reset_mock()
+ match.reset_mock()
+ match.return_value = True
+ self.assertTrue(self.test_obj.handles(evt))
+ match.assert_called_with(evt.filename)
+ else:
+ match.return_value = False
+ self.assertFalse(self.test_obj.handles(evt,
+ basename=os.path.basename(self.path)))
+ mock_get_regex.assert_called_with(
+ basename=os.path.basename(self.path),
+ extensions=self.test_obj.__ignore__)
+ match.assert_called_with(evt.filename)
+
+ mock_get_regex.reset_mock()
+ match.reset_mock()
+ match.return_value = True
+ self.assertTrue(self.test_obj.handles(evt,
+ basename=os.path.basename(self.path),
+ extensions=self.test_obj.__ignore__))
+ mock_get_regex.assert_called_with(
+ basename=os.path.basename(self.path))
+ match.assert_called_with(evt.filename)
+
+
+class TestCfgGenerator(TestCfgBaseFileMatcher):
+ test_obj = CfgGenerator
+
+ def test_get_data(self):
+ cg = self.get_obj()
+ cg.data = "foo bar baz"
+ self.assertEqual(cg.data, cg.get_data(Mock(), Mock()))
+
+
+class TestCfgFilter(TestCfgBaseFileMatcher):
+ test_obj = CfgFilter
+
+ def test_modify_data(self):
+ cf = self.get_obj()
+ self.assertRaises(NotImplementedError,
+ cf.modify_data, Mock(), Mock(), Mock())
+
+
+class TestCfgInfo(TestCfgBaseFileMatcher):
+ test_obj = CfgInfo
+
+ def get_obj(self, name=None):
+ if name is None:
+ name = self.path
+ return self.test_obj(name)
+
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgBaseFileMatcher.__init__")
+ def test__init(self, mock__init):
+ ci = self.get_obj("test.txt")
+ mock__init.assert_called_with(ci, "test.txt", None, None)
+
+ def test_bind_info_to_entry(self):
+ ci = self.get_obj()
+ self.assertRaises(NotImplementedError,
+ ci.bind_info_to_entry, Mock(), Mock())
+
+ def test__set_info(self):
+ ci = self.get_obj()
+ entry = Mock()
+ entry.attrib = dict()
+
+ info = {"foo": "foo",
+ "_bar": "bar",
+ "bar:baz=quux": "quux",
+ "baz__": "baz",
+ "__quux": "quux"}
+ ci._set_info(entry, info)
+ self.assertItemsEqual(entry.attrib,
+ dict([(k, v) for k, v in info.items()
+ if not k.startswith("__")]))
+
+
+class TestCfgVerifier(TestCfgBaseFileMatcher):
+ test_obj = CfgVerifier
+
+ def test_verify_entry(self):
+ cf = self.get_obj()
+ self.assertRaises(NotImplementedError,
+ cf.verify_entry, Mock(), Mock(), Mock())
+
+
+class TestCfgDefaultInfo(TestCfgInfo):
+ test_obj = CfgDefaultInfo
+
+ def get_obj(self, defaults=None):
+ if defaults is None:
+ defaults = dict()
+ return self.test_obj(defaults)
+
+ @patch("Bcfg2.Server.Plugins.Cfg.CfgInfo.__init__")
+ def test__init(self, mock__init):
+ defaults = Mock()
+ cdi = self.get_obj(defaults=defaults)
+ mock__init.assert_called_with(cdi, '')
+ self.assertEqual(defaults, cdi.defaults)
+
+ def test_handle_event(self):
+ # this CfgInfo handler doesn't handle any events -- it's not
+ # file-driven, but based on the built-in defaults
+ pass
+
+ def test_bind_info_to_entry(self):
+ cdi = self.get_obj()
+ cdi._set_info = Mock()
+ entry = Mock()
+ cdi.bind_info_to_entry(entry, Mock())
+ cdi._set_info.assert_called_with(entry, cdi.defaults)
+
+
+class TestCfgEntrySet(TestEntrySet):
+ test_obj = CfgEntrySet
+
+ def test__init(self):
+ pass
+
+ def test_handlers(self):
+ # this is really really difficult to mock out, so we just get
+ # a list of handlers and make sure that it roughly matches
+ # what's on the filesystem
+ expected = []
+ for submodule in walk_packages(path=Bcfg2.Server.Plugins.Cfg.__path__,
+ prefix="Bcfg2.Server.Plugins.Cfg."):
+ expected.append(submodule[1].rsplit('.', 1)[-1])
+ eset = self.get_obj()
+ self.assertItemsEqual(expected,
+ [h.__name__ for h in eset.handlers])
+
+ def test_handle_event(self):
+ eset = self.get_obj()
+ eset.entry_init = Mock()
+ eset._handlers = [Mock(), Mock(), Mock()]
+ for hdlr in eset.handlers:
+ hdlr.__name__ = "handler"
+ eset.entries = dict()
+
+ def reset():
+ eset.entry_init.reset_mock()
+ for hdlr in eset.handlers:
+ hdlr.reset_mock()
+
+ # test that a bogus deleted event is discarded
+ evt = Mock()
+ evt.code2str.return_value = "deleted"
+ evt.filename = os.path.join(datastore, "test.txt")
+ eset.handle_event(evt)
+ self.assertFalse(eset.entry_init.called)
+ self.assertItemsEqual(eset.entries, dict())
+ for hdlr in eset.handlers:
+ self.assertFalse(hdlr.handles.called)
+ self.assertFalse(hdlr.ignore.called)
+
+ # test creation of a new file
+ for action in ["exists", "created", "changed"]:
+ evt = Mock()
+ evt.code2str.return_value = action
+ evt.filename = os.path.join(datastore, "test.txt")
+
+ # test with no handler that handles
+ for hdlr in eset.handlers:
+ hdlr.handles.return_value = False
+ hdlr.ignore.return_value = False
+
+ reset()
+ eset.handle_event(evt)
+ self.assertFalse(eset.entry_init.called)
+ self.assertItemsEqual(eset.entries, dict())
+ for hdlr in eset.handlers:
+ hdlr.handles.assert_called_with(evt, basename=eset.path)
+ hdlr.ignore.assert_called_with(evt, basename=eset.path)
+
+ # test with a handler that handles the entry
+ reset()
+ eset.handlers[-1].handles.return_value = True
+ eset.handle_event(evt)
+ eset.entry_init.assert_called_with(evt, eset.handlers[-1])
+ for hdlr in eset.handlers:
+ hdlr.handles.assert_called_with(evt, basename=eset.path)
+ if not hdlr.return_value:
+ hdlr.ignore.assert_called_with(evt, basename=eset.path)
+
+ # test with a handler that ignores the entry before one
+ # that handles it
+ reset()
+ eset.handlers[0].ignore.return_value = True
+ eset.handle_event(evt)
+ self.assertFalse(eset.entry_init.called)
+ eset.handlers[0].handles.assert_called_with(evt,
+ basename=eset.path)
+ eset.handlers[0].ignore.assert_called_with(evt,
+ basename=eset.path)
+ for hdlr in eset.handlers[1:]:
+ self.assertFalse(hdlr.handles.called)
+ self.assertFalse(hdlr.ignore.called)
+
+ # test changed event with an entry that already exists
+ reset()
+ evt = Mock()
+ evt.code2str.return_value = "changed"
+ evt.filename = os.path.join(datastore, "test.txt")
+ eset.entries[evt.filename] = Mock()
+ eset.handle_event(evt)
+ self.assertFalse(eset.entry_init.called)
+ for hdlr in eset.handlers:
+ self.assertFalse(hdlr.handles.called)
+ self.assertFalse(hdlr.ignore.called)
+ eset.entries[evt.filename].handle_event.assert_called_with(evt)
+
+ # test deleted event with an entry that already exists
+ reset()
+ evt.code2str.return_value = "deleted"
+ eset.handle_event(evt)
+ self.assertFalse(eset.entry_init.called)
+ for hdlr in eset.handlers:
+ self.assertFalse(hdlr.handles.called)
+ self.assertFalse(hdlr.ignore.called)
+ self.assertItemsEqual(eset.entries, dict())
+
+ def test_get_matching(self):
+ eset = self.get_obj()
+ eset.get_handlers = Mock()
+ metadata = Mock()
+ self.assertEqual(eset.get_matching(metadata),
+ eset.get_handlers.return_value)
+ eset.get_handlers.assert_called_with(metadata, CfgGenerator)
+
+ @patch("Bcfg2.Server.Plugin.EntrySet.entry_init")
+ def test_entry_init(self, mock_entry_init):
+ eset = self.get_obj()
+ eset.entries = dict()
+ evt = Mock()
+ evt.filename = "test.txt"
+ handler = Mock()
+ handler.__specific__ = True
+
+ # test handling an event with the parent entry_init
+ eset.entry_init(evt, handler)
+ mock_entry_init.assert_called_with(eset, evt, entry_type=handler,
+ specific=handler.get_regex.return_value)
+ self.assertItemsEqual(eset.entries, dict())
+
+ # test handling the event with a Cfg handler
+ handler.__specific__ = False
+ eset.entry_init(evt, handler)
+ handler.assert_called_with(os.path.join(eset.path, evt.filename))
+ self.assertItemsEqual(eset.entries,
+ {evt.filename: handler.return_value})
+ handler.return_value.handle_event.assert_called_with(evt)
+
+ # test handling an event for an entry that already exists with
+ # a Cfg handler
+ handler.reset_mock()
+ eset.entry_init(evt, handler)
+ self.assertFalse(handler.called)
+ self.assertItemsEqual(eset.entries,
+ {evt.filename: handler.return_value})
+ eset.entries[evt.filename].handle_event.assert_called_with(evt)
+
+ @patch("Bcfg2.Server.Plugins.Cfg.u_str")
+ @patch("Bcfg2.Server.Plugins.Cfg.b64encode")
+ def test_bind_entry(self, mock_b64encode, mock_u_str):
+ Bcfg2.Server.Plugins.Cfg.SETUP = dict(validate=False)
+
+ mock_u_str.side_effect = lambda x: x
+
+ eset = self.get_obj()
+ eset.bind_info_to_entry = Mock()
+ eset._generate_data = Mock()
+ eset.get_handlers = Mock()
+ eset._validate_data = Mock()
+
+ def reset():
+ mock_b64encode.reset_mock()
+ mock_u_str.reset_mock()
+ eset.bind_info_to_entry.reset_mock()
+ eset._generate_data.reset_mock()
+ eset.get_handlers.reset_mock()
+ eset._validate_data.reset_mock()
+ return lxml.etree.Element("Path", name="/test.txt")
+
+ entry = reset()
+ metadata = Mock()
+
+ # test basic entry, no validation, no filters, etc.
+ eset._generate_data.return_value = "data"
+ eset.get_handlers.return_value = []
+ bound = eset.bind_entry(entry, metadata)
+ eset.bind_info_to_entry.assert_called_with(entry, metadata)
+ eset._generate_data.assert_called_with(entry, metadata)
+ self.assertFalse(eset._validate_data.called)
+ expected = lxml.etree.Element("Path", name="/text.txt")
+ expected.text = "data"
+ self.assertXMLEqual(bound, expected)
+ self.assertEqual(bound, entry)
+
+ # test empty entry
+ entry = reset()
+ eset._generate_data.return_value = ""
+ bound = eset.bind_entry(entry, metadata)
+ eset.bind_info_to_entry.assert_called_with(entry, metadata)
+ eset._generate_data.assert_called_with(entry, metadata)
+ self.assertFalse(eset._validate_data.called)
+ expected = lxml.etree.Element("Path", name="/text.txt", empty="true")
+ self.assertXMLEqual(bound, expected)
+ self.assertEqual(bound, entry)
+
+ # test filters
+ entry = reset()
+ eset._generate_data.return_value = "initial data"
+ filters = [Mock(), Mock()]
+ filters[0].modify_data.return_value = "modified data"
+ filters[1].modify_data.return_value = "final data"
+ eset.get_handlers.return_value = filters
+ bound = eset.bind_entry(entry, metadata)
+ eset.bind_info_to_entry.assert_called_with(entry, metadata)
+ eset._generate_data.assert_called_with(entry, metadata)
+ filters[0].modify_data.assert_called_with(entry, metadata,
+ "initial data")
+ filters[1].modify_data.assert_called_with(entry, metadata,
+ "modified data")
+ self.assertFalse(eset._validate_data.called)
+ expected = lxml.etree.Element("Path", name="/text.txt")
+ expected.text = "final data"
+ self.assertXMLEqual(bound, expected)
+
+ # test base64 encoding
+ entry = reset()
+ entry.set("encoding", "base64")
+ mock_b64encode.return_value = "base64 data"
+ eset.get_handlers.return_value = []
+ eset._generate_data.return_value = "data"
+ bound = eset.bind_entry(entry, metadata)
+ eset.bind_info_to_entry.assert_called_with(entry, metadata)
+ eset._generate_data.assert_called_with(entry, metadata)
+ self.assertFalse(eset._validate_data.called)
+ mock_b64encode.assert_called_with("data")
+ self.assertFalse(mock_u_str.called)
+ expected = lxml.etree.Element("Path", name="/text.txt",
+ encoding="base64")
+ expected.text = "base64 data"
+ self.assertXMLEqual(bound, expected)
+ self.assertEqual(bound, entry)
+
+ # test successful validation
+ entry = reset()
+ Bcfg2.Server.Plugins.Cfg.SETUP['validate'] = True
+ bound = eset.bind_entry(entry, metadata)
+ eset.bind_info_to_entry.assert_called_with(entry, metadata)
+ eset._generate_data.assert_called_with(entry, metadata)
+ eset._validate_data.assert_called_with(entry, metadata, "data")
+ expected = lxml.etree.Element("Path", name="/text.txt")
+ expected.text = "data"
+ self.assertXMLEqual(bound, expected)
+ self.assertEqual(bound, entry)
+
+ # test failed validation
+ entry = reset()
+ eset._validate_data.side_effect = CfgVerificationError
+ self.assertRaises(Bcfg2.Server.Plugin.PluginExecutionError,
+ eset.bind_entry, entry, metadata)
+ eset.bind_info_to_entry.assert_called_with(entry, metadata)
+ eset._generate_data.assert_called_with(entry, metadata)
+ eset._validate_data.assert_called_with(entry, metadata, "data")
+
+ def test_get_handlers(self):
+ eset = self.get_obj()
+ eset.entries['test1.txt'] = CfgInfo("test1.txt")
+ eset.entries['test2.txt'] = CfgGenerator("test2.txt", Mock(), None)
+ eset.entries['test2.txt'].specific.matches.return_value = True
+ eset.entries['test3.txt'] = CfgInfo("test3.txt")
+ eset.entries['test4.txt'] = CfgGenerator("test4.txt", Mock(), None)
+ eset.entries['test4.txt'].specific.matches.return_value = False
+ eset.entries['test5.txt'] = CfgGenerator("test5.txt", Mock(), None)
+ eset.entries['test5.txt'].specific.matches.return_value = True
+ eset.entries['test6.txt'] = CfgVerifier("test6.txt", Mock(), None)
+ eset.entries['test6.txt'].specific.matches.return_value = True
+ eset.entries['test7.txt'] = CfgFilter("test7.txt", Mock(), None)
+ eset.entries['test7.txt'].specific.matches.return_value = False
+
+ def reset():
+ for e in eset.entries.values():
+ if e.specific is not None:
+ e.specific.reset_mock()
+
+ metadata = Mock()
+ self.assertItemsEqual(eset.get_handlers(metadata, CfgGenerator),
+ [eset.entries['test2.txt'],
+ eset.entries['test5.txt']])
+ for ename in ['test2.txt', 'test4.txt', 'test5.txt']:
+ eset.entries[ename].specific.matches.assert_called_with(metadata)
+ for ename in ['test6.txt', 'test7.txt']:
+ self.assertFalse(eset.entries[ename].specific.matches.called)
+
+ reset()
+ self.assertItemsEqual(eset.get_handlers(metadata, CfgInfo),
+ [eset.entries['test1.txt'],
+ eset.entries['test3.txt']])
+ for entry in eset.entries.values():
+ if entry.specific is not None:
+ self.assertFalse(entry.specific.matches.called)
+
+ reset()
+ self.assertItemsEqual(eset.get_handlers(metadata, CfgVerifier),
+ [eset.entries['test6.txt']])
+ eset.entries['test6.txt'].specific.matches.assert_called_with(metadata)
+ for ename, entry in eset.entries.items():
+ if ename != 'test6.txt' and entry.specific is not None:
+ self.assertFalse(entry.specific.matches.called)
+
+ reset()
+ self.assertItemsEqual(eset.get_handlers(metadata, CfgFilter), [])
+ eset.entries['test7.txt'].specific.matches.assert_called_with(metadata)
+ for ename, entry in eset.entries.items():
+ if ename != 'test7.txt' and entry.specific is not None:
+ self.assertFalse(entry.specific.matches.called)
+
+ reset()
+ self.assertItemsEqual(eset.get_handlers(metadata, Mock), [])
+ for ename, entry in eset.entries.items():
+ if entry.specific is not None:
+ self.assertFalse(entry.specific.matches.called)
+
+ def test_bind_info_to_entry(self):
+ default_info = Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO
+ eset = self.get_obj()
+ eset.get_handlers = Mock()
+ eset.get_handlers.return_value = []
+ Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO = Mock()
+ metadata = Mock()
+
+ def reset():
+ eset.get_handlers.reset_mock()
+ Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.reset_mock()
+ return lxml.etree.Element("Path", name="/test.txt")
+
+ # test with no info handlers
+ entry = reset()
+ eset.bind_info_to_entry(entry, metadata)
+ eset.get_handlers.assert_called_with(metadata, CfgInfo)
+ Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata)
+ self.assertEqual(entry.get("type"), "file")
+
+ # test with one info handler
+ entry = reset()
+ handler = Mock()
+ eset.get_handlers.return_value = [handler]
+ eset.bind_info_to_entry(entry, metadata)
+ eset.get_handlers.assert_called_with(metadata, CfgInfo)
+ Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata)
+ handler.bind_info_to_entry.assert_called_with(entry, metadata)
+ self.assertEqual(entry.get("type"), "file")
+
+ # test with more than one info handler
+ entry = reset()
+ handlers = [Mock(), Mock()]
+ eset.get_handlers.return_value = handlers
+ eset.bind_info_to_entry(entry, metadata)
+ eset.get_handlers.assert_called_with(metadata, CfgInfo)
+ Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO.bind_info_to_entry.assert_called_with(entry, metadata)
+ # we don't care which handler gets called as long as exactly
+ # one of them does
+ called = 0
+ for handler in handlers:
+ if handler.bind_info_to_entry.called:
+ handler.bind_info_to_entry.assert_called_with(entry, metadata)
+ called += 1
+ self.assertEqual(called, 1)
+ self.assertEqual(entry.get("type"), "file")
+
+ Bcfg2.Server.Plugins.Cfg.DEFAULT_INFO = default_info
+
+ def test_generate_data(self):
+ eset = self.get_obj()
+ eset.best_matching = Mock()
+ generator = Mock()
+ generator.get_data.return_value = "data"
+ eset.best_matching.return_value = generator
+ eset.get_handlers = Mock()
+ entry = lxml.etree.Element("Path", name="/test.txt", perms="0640")
+ metadata = Mock()
+
+ def reset():
+ eset.best_matching.reset_mock()
+ eset.get_handlers.reset_mock()
+
+
+ # test success
+ self.assertEqual(eset._generate_data(entry, metadata),
+ "data")
+ eset.get_handlers.assert_called_with(metadata, CfgGenerator)
+ eset.best_matching.assert_called_with(metadata,
+ eset.get_handlers.return_value)
+
+ # test failure to generate data
+ reset()
+ generator.get_data.side_effect = OSError
+ self.assertRaises(Bcfg2.Server.Plugin.PluginExecutionError,
+ eset._generate_data, entry, metadata)
+
+ def test_validate_data(self):
+ class MockChild1(Mock):
+ pass
+
+ class MockChild2(Mock):
+ pass
+
+ eset = self.get_obj()
+ eset.get_handlers = Mock()
+ handlers1 = [MockChild1(), MockChild1()]
+ handlers2 = [MockChild2()]
+ eset.get_handlers.return_value = [handlers1[0], handlers2[0],
+ handlers1[1]]
+ eset.best_matching = Mock()
+ eset.best_matching.side_effect = lambda m, v: v[0]
+ entry = lxml.etree.Element("Path", name="/test.txt")
+ metadata = Mock()
+ data = "data"
+
+ eset._validate_data(entry, metadata, data)
+ eset.get_handlers.assert_called_with(metadata, CfgVerifier)
+ self.assertItemsEqual(eset.best_matching.call_args_list,
+ [call(metadata, handlers1),
+ call(metadata, handlers2)])
+ handlers1[0].verify_entry.assert_called_with(entry, metadata, data)
+ handlers2[0].verify_entry.assert_called_with(entry, metadata, data)
+
+ def test_specificity_from_filename(self):
+ pass
+
+
+class TestCfg(TestGroupSpool, TestPullTarget):
+ test_obj = Cfg
+
+ def get_obj(self, core=None):
+ if core is None:
+ core = Mock()
+ core.setup = MagicMock()
+ return TestGroupSpool.get_obj(self, core=core)
+
+ @patch("Bcfg2.Server.Plugin.GroupSpool.__init__")
+ @patch("Bcfg2.Server.Plugin.PullTarget.__init__")
+ def test__init(self, mock_pulltarget_init, mock_groupspool_init):
+ core = Mock()
+ core.setup = MagicMock()
+ cfg = self.test_obj(core, datastore)
+ mock_pulltarget_init.assert_called_with(cfg)
+ mock_groupspool_init.assert_called_with(cfg, core, datastore)
+ core.setup.add_option.assert_called_with("validate",
+ Bcfg2.Options.CFG_VALIDATION)
+ core.setup.reparse.assert_called_with()
+
+ core.reset_mock()
+ core.setup.reset_mock()
+ mock_pulltarget_init.reset_mock()
+ mock_groupspool_init.reset_mock()
+ core.setup.__contains__.return_value = True
+ cfg = self.test_obj(core, datastore)
+ mock_pulltarget_init.assert_called_with(cfg)
+ mock_groupspool_init.assert_called_with(cfg, core, datastore)
+ self.assertFalse(core.setup.add_option.called)
+ self.assertFalse(core.setup.reparse.called)
+
+ def test_has_generator(self):
+ cfg = self.get_obj()
+ cfg.entries = dict()
+ entry = lxml.etree.Element("Path", name="/test.txt")
+ metadata = Mock()
+
+ self.assertFalse(cfg.has_generator(entry, metadata))
+
+ eset = Mock()
+ eset.get_handlers.return_value = []
+ cfg.entries[entry.get("name")] = eset
+ self.assertFalse(cfg.has_generator(entry, metadata))
+ eset.get_handlers.assert_called_with(metadata, CfgGenerator)
+
+ eset.get_handlers.reset_mock()
+ eset.get_handlers.return_value = [Mock()]
+ self.assertTrue(cfg.has_generator(entry, metadata))
+ eset.get_handlers.assert_called_with(metadata, CfgGenerator)
diff --git a/testsuite/common.py b/testsuite/common.py
index 3aeeaf5f2..5d8157e55 100644
--- a/testsuite/common.py
+++ b/testsuite/common.py
@@ -284,3 +284,10 @@ class patchIf(object):
# 10 args -- mocksignature has been removed
args.pop(5)
return _noop_patch(*args)(func)
+
+
+try:
+ re_type = re._pattern_type
+except AttributeError:
+ re_type = type(re.compile(""))
+