summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorChris St. Pierre <chris.a.st.pierre@gmail.com>2013-01-22 11:16:19 -0500
committerChris St. Pierre <chris.a.st.pierre@gmail.com>2013-01-22 11:16:19 -0500
commit22029e107420ff21cf9f1811bf4bb6dc2aba1dde (patch)
treebeec8e930b7204e64e01198e5d57b59f72ffef20
parent1b001ee4a4d5cacab04c1e99ad4cc4ce4ca93894 (diff)
downloadbcfg2-22029e107420ff21cf9f1811bf4bb6dc2aba1dde.tar.gz
bcfg2-22029e107420ff21cf9f1811bf4bb6dc2aba1dde.tar.bz2
bcfg2-22029e107420ff21cf9f1811bf4bb6dc2aba1dde.zip
made genshi a requirement
-rwxr-xr-xsetup.py2
-rw-r--r--src/lib/Bcfg2/Server/Lint/GroupNames.py9
-rw-r--r--src/lib/Bcfg2/Server/Lint/RequiredAttrs.py9
-rw-r--r--src/lib/Bcfg2/Server/Plugin/helpers.py16
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Bundler.py149
-rw-r--r--src/lib/Bcfg2/Server/Plugins/Cfg/CfgGenshiGenerator.py88
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py10
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py12
-rw-r--r--testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py202
9 files changed, 221 insertions, 276 deletions
diff --git a/setup.py b/setup.py
index b003d6440..df6adba74 100755
--- a/setup.py
+++ b/setup.py
@@ -18,7 +18,7 @@ version = sys.version_info[:2]
if version < (2, 6):
need_m2crypto = True
-inst_reqs = ['lxml']
+inst_reqs = ['lxml', 'genshi']
if need_m2crypto:
inst_reqs.append('M2Crypto')
diff --git a/src/lib/Bcfg2/Server/Lint/GroupNames.py b/src/lib/Bcfg2/Server/Lint/GroupNames.py
index 52e42aa7b..4ce12eae7 100644
--- a/src/lib/Bcfg2/Server/Lint/GroupNames.py
+++ b/src/lib/Bcfg2/Server/Lint/GroupNames.py
@@ -3,11 +3,7 @@
import os
import re
import Bcfg2.Server.Lint
-try:
- from Bcfg2.Server.Plugins.Bundler import BundleTemplateFile
- HAS_GENSHI = True
-except ImportError:
- HAS_GENSHI = False
+from Bcfg2.Server.Plugins.Bundler import BundleTemplateFile
class GroupNames(Bcfg2.Server.Lint.ServerPlugin):
@@ -43,8 +39,7 @@ class GroupNames(Bcfg2.Server.Lint.ServerPlugin):
""" Check groups used in the Bundler plugin for validity """
for bundle in self.core.plugins['Bundler'].entries.values():
if (self.HandlesFile(bundle.name) and
- (not HAS_GENSHI or
- not isinstance(bundle, BundleTemplateFile))):
+ not isinstance(bundle, BundleTemplateFile)):
self.check_entries(bundle.xdata.xpath("//Group"),
bundle.name)
diff --git a/src/lib/Bcfg2/Server/Lint/RequiredAttrs.py b/src/lib/Bcfg2/Server/Lint/RequiredAttrs.py
index 61b737a82..bf72d26d0 100644
--- a/src/lib/Bcfg2/Server/Lint/RequiredAttrs.py
+++ b/src/lib/Bcfg2/Server/Lint/RequiredAttrs.py
@@ -8,11 +8,7 @@ import Bcfg2.Server.Lint
import Bcfg2.Client.Tools.POSIX
import Bcfg2.Client.Tools.VCS
from Bcfg2.Server.Plugins.Packages import Apt, Yum
-try:
- from Bcfg2.Server.Plugins.Bundler import BundleTemplateFile
- HAS_GENSHI = True
-except ImportError:
- HAS_GENSHI = False
+from Bcfg2.Server.Plugins.Bundler import BundleTemplateFile
# format verifying functions
@@ -183,8 +179,7 @@ class RequiredAttrs(Bcfg2.Server.Lint.ServerPlugin):
for bundle in self.core.plugins['Bundler'].entries.values():
if (self.HandlesFile(bundle.name) and
- (not HAS_GENSHI or
- not isinstance(bundle, BundleTemplateFile))):
+ not isinstance(bundle, BundleTemplateFile)):
try:
xdata = lxml.etree.XML(bundle.data)
except (lxml.etree.XMLSyntaxError, AttributeError):
diff --git a/src/lib/Bcfg2/Server/Plugin/helpers.py b/src/lib/Bcfg2/Server/Plugin/helpers.py
index 59796a556..c85253be6 100644
--- a/src/lib/Bcfg2/Server/Plugin/helpers.py
+++ b/src/lib/Bcfg2/Server/Plugin/helpers.py
@@ -17,6 +17,7 @@ from Bcfg2.Server.Plugin.base import Debuggable, Plugin
from Bcfg2.Server.Plugin.interfaces import Generator
from Bcfg2.Server.Plugin.exceptions import SpecificityError, \
PluginExecutionError
+import genshi.core
try:
import Bcfg2.Encryption
@@ -33,6 +34,21 @@ except ImportError:
LOGGER = logging.getLogger(__name__)
+def removecomment(stream):
+ """ A Genshi filter that removes comments from the stream. This
+ function is a generator.
+
+ :param stream: The Genshi stream to remove comments from
+ :type stream: genshi.core.Stream
+ :returns: tuple of ``(kind, data, pos)``, as when iterating
+ through a Genshi stream
+ """
+ for kind, data, pos in stream:
+ if kind is genshi.core.COMMENT:
+ continue
+ yield kind, data, pos
+
+
def default_path_metadata():
""" Get the default Path entry metadata from the config.
diff --git a/src/lib/Bcfg2/Server/Plugins/Bundler.py b/src/lib/Bcfg2/Server/Plugins/Bundler.py
index fa993cd85..3907794ae 100644
--- a/src/lib/Bcfg2/Server/Plugins/Bundler.py
+++ b/src/lib/Bcfg2/Server/Plugins/Bundler.py
@@ -11,28 +11,8 @@ import Bcfg2.Server.Plugin
import Bcfg2.Server.Lint
from Bcfg2.Options import get_option_parser
-try:
- import genshi.core
- import genshi.input
- from genshi.template import TemplateLoader, MarkupTemplate, TemplateError
- HAS_GENSHI = True
-except ImportError:
- HAS_GENSHI = False
-
-
-def removecomment(stream):
- """ A Genshi filter that removes comments from the stream. This
- function is a generator.
-
- :param stream: The Genshi stream to remove comments from
- :type stream: genshi.core.Stream
- :returns: tuple of ``(kind, data, pos)``, as when iterating
- through a Genshi stream
- """
- for kind, data, pos in stream:
- if kind is genshi.core.COMMENT:
- continue
- yield kind, data, pos
+import genshi.input
+from genshi.template import TemplateLoader, MarkupTemplate, TemplateError
class BundleFile(Bcfg2.Server.Plugin.StructFile):
@@ -46,64 +26,64 @@ class BundleFile(Bcfg2.Server.Plugin.StructFile):
return bundle
-if HAS_GENSHI:
- class BundleTemplateFile(Bcfg2.Server.Plugin.StructFile):
- """ Representation of a Genshi-templated bundle XML file """
+class BundleTemplateFile(Bcfg2.Server.Plugin.StructFile):
+ """ Representation of a Genshi-templated bundle XML file """
- def __init__(self, name, encoding):
- Bcfg2.Server.Plugin.StructFile.__init__(self, name)
- self.encoding = encoding
- self.logger = logging.getLogger(name)
- self.template = None
+ def __init__(self, name, encoding):
+ Bcfg2.Server.Plugin.StructFile.__init__(self, name)
+ self.encoding = encoding
+ self.logger = logging.getLogger(name)
+ self.template = None
- def HandleEvent(self, event=None):
- """Handle all fs events for this template."""
- if event and event.code2str() == 'deleted':
- return
- try:
- loader = TemplateLoader()
- self.template = loader.load(self.name, cls=MarkupTemplate,
- encoding=self.encoding)
- except LookupError:
- err = sys.exc_info()[1]
- self.logger.error('Genshi lookup error in %s: %s' %
- (self.name, err))
- except TemplateError:
- err = sys.exc_info()[1]
- self.logger.error('Genshi template error in %s: %s' %
- (self.name, err))
- except genshi.input.ParseError:
- err = sys.exc_info()[1]
- self.logger.error('Genshi parse error in %s: %s' %
- (self.name, err))
-
- def get_xml_value(self, metadata):
- """ get the rendered XML data that applies to the given
- client """
- if not hasattr(self, 'template'):
- msg = "No parsed template information for %s" % self.name
- self.logger.error(msg)
- raise Bcfg2.Server.Plugin.PluginExecutionError(msg)
- stream = self.template.generate(
- metadata=metadata,
- repo=get_option_parser()['repo']).filter(removecomment)
- data = lxml.etree.XML(stream.render('xml',
- strip_whitespace=False),
- parser=Bcfg2.Server.XMLParser)
- bundlename = os.path.splitext(os.path.basename(self.name))[0]
- bundle = lxml.etree.Element('Bundle', name=bundlename)
- for item in self.Match(metadata, data):
- bundle.append(copy.deepcopy(item))
- return bundle
-
- def Match(self, metadata, xdata): # pylint: disable=W0221
- """Return matching fragments of parsed template."""
- rv = []
- for child in xdata.getchildren():
- rv.extend(self._match(child, metadata))
- self.logger.debug("File %s got %d match(es)" % (self.name,
- len(rv)))
- return rv
+ def HandleEvent(self, event=None):
+ """Handle all fs events for this template."""
+ if event and event.code2str() == 'deleted':
+ return
+ try:
+ loader = TemplateLoader()
+ self.template = loader.load(self.name, cls=MarkupTemplate,
+ encoding=self.encoding)
+ except LookupError:
+ err = sys.exc_info()[1]
+ self.logger.error('Genshi lookup error in %s: %s' %
+ (self.name, err))
+ except TemplateError:
+ err = sys.exc_info()[1]
+ self.logger.error('Genshi template error in %s: %s' %
+ (self.name, err))
+ except genshi.input.ParseError:
+ err = sys.exc_info()[1]
+ self.logger.error('Genshi parse error in %s: %s' %
+ (self.name, err))
+
+ def get_xml_value(self, metadata):
+ """ get the rendered XML data that applies to the given
+ client """
+ if not hasattr(self, 'template'):
+ msg = "No parsed template information for %s" % self.name
+ self.logger.error(msg)
+ raise Bcfg2.Server.Plugin.PluginExecutionError(msg)
+ stream = self.template.generate(
+ metadata=metadata,
+ repo=get_option_parser()['repo']
+ ).filter(Bcfg2.Server.Plugin.removecomment)
+ data = lxml.etree.XML(stream.render('xml',
+ strip_whitespace=False),
+ parser=Bcfg2.Server.XMLParser)
+ bundlename = os.path.splitext(os.path.basename(self.name))[0]
+ bundle = lxml.etree.Element('Bundle', name=bundlename)
+ for item in self.Match(metadata, data):
+ bundle.append(copy.deepcopy(item))
+ return bundle
+
+ def Match(self, metadata, xdata): # pylint: disable=W0221
+ """Return matching fragments of parsed template."""
+ rv = []
+ for child in xdata.getchildren():
+ rv.extend(self._match(child, metadata))
+ self.logger.debug("File %s got %d match(es)" % (self.name,
+ len(rv)))
+ return rv
class Bundler(Bcfg2.Server.Plugin.Plugin,
@@ -135,14 +115,8 @@ class Bundler(Bcfg2.Server.Plugin.Plugin,
parser=Bcfg2.Server.XMLParser)
nsmap = bundle.getroot().nsmap
if (name.endswith('.genshi') or
- ('py' in nsmap and
- nsmap['py'] == 'http://genshi.edgewall.org/')):
- if HAS_GENSHI:
- return BundleTemplateFile(name, self.encoding)
- else:
- raise Bcfg2.Server.Plugin.PluginExecutionError("Genshi not "
- "available: %s"
- % name)
+ ('py' in nsmap and nsmap['py'] == 'http://genshi.edgewall.org/')):
+ return BundleTemplateFile(name, self.encoding)
else:
return BundleFile(name)
@@ -183,8 +157,7 @@ class BundlerLint(Bcfg2.Server.Lint.ServerPlugin):
self.missing_bundles()
for bundle in self.core.plugins['Bundler'].entries.values():
if (self.HandlesFile(bundle.name) and
- (not HAS_GENSHI or
- not isinstance(bundle, BundleTemplateFile))):
+ not isinstance(bundle, BundleTemplateFile)):
self.bundle_names(bundle)
@classmethod
diff --git a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgGenshiGenerator.py b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgGenshiGenerator.py
index b58349fe0..c11939bd5 100644
--- a/src/lib/Bcfg2/Server/Plugins/Cfg/CfgGenshiGenerator.py
+++ b/src/lib/Bcfg2/Server/Plugins/Cfg/CfgGenshiGenerator.py
@@ -5,63 +5,41 @@
import re
import sys
import traceback
-from Bcfg2.Server.Plugin import PluginExecutionError
+from Bcfg2.Server.Plugin import PluginExecutionError, removecomment
from Bcfg2.Server.Plugins.Cfg import CfgGenerator
-try:
- import genshi.core
- from genshi.template import TemplateLoader, NewTextTemplate
- from genshi.template.eval import UndefinedError, Suite
- #: True if Genshi libraries are available
- HAS_GENSHI = True
-
- def _genshi_removes_blank_lines():
- """ Genshi 0.5 uses the Python :mod:`compiler` package to
- compile genshi snippets to AST. Genshi 0.6 uses some bespoke
- magic, because compiler has been deprecated.
- :func:`compiler.parse` produces an AST that removes all excess
- whitespace (e.g., blank lines), while
- :func:`genshi.template.astutil.parse` does not. In order to
- determine which actual line of code an error occurs on, we
- need to know which is in use and how it treats blank lines.
- I've beat my head against this for hours and the best/only way
- I can find is to compile some genshi code with an error and
- see which line it's on."""
- code = """d = dict()
-
+from genshi.template import TemplateLoader, NewTextTemplate
+from genshi.template.eval import UndefinedError, Suite
+
+
+def _genshi_removes_blank_lines():
+ """ Genshi 0.5 uses the Python :mod:`compiler` package to
+ compile genshi snippets to AST. Genshi 0.6 uses some bespoke
+ magic, because compiler has been deprecated.
+ :func:`compiler.parse` produces an AST that removes all excess
+ whitespace (e.g., blank lines), while
+ :func:`genshi.template.astutil.parse` does not. In order to
+ determine which actual line of code an error occurs on, we
+ need to know which is in use and how it treats blank lines.
+ I've beat my head against this for hours and the best/only way
+ I can find is to compile some genshi code with an error and
+ see which line it's on."""
+ code = """d = dict()
d['a']"""
- try:
- Suite(code).execute(dict())
- except KeyError:
- line = traceback.extract_tb(sys.exc_info()[2])[-1][1]
- if line == 2:
- return True
- else:
- return False
-
- #: True if Genshi removes all blank lines from a code block before
- #: executing it; False indicates that Genshi only removes leading
- #: and trailing blank lines. See
- #: :func:`_genshi_removes_blank_lines` for an explanation of this.
- GENSHI_REMOVES_BLANK_LINES = _genshi_removes_blank_lines()
-except ImportError:
- TemplateLoader = None # pylint: disable=C0103
- HAS_GENSHI = False
-
-
-def removecomment(stream):
- """ A Genshi filter that removes comments from the stream. This
- function is a generator.
-
- :param stream: The Genshi stream to remove comments from
- :type stream: genshi.core.Stream
- :returns: tuple of ``(kind, data, pos)``, as when iterating
- through a Genshi stream
- """
- for kind, data, pos in stream:
- if kind is genshi.core.COMMENT:
- continue
- yield kind, data, pos
+ try:
+ Suite(code).execute(dict())
+ except KeyError:
+ line = traceback.extract_tb(sys.exc_info()[2])[-1][1]
+ if line == 2:
+ return True
+ else:
+ return False
+
+#: True if Genshi removes all blank lines from a code block before
+#: executing it; False indicates that Genshi only removes leading
+#: and trailing blank lines. See
+#: :func:`_genshi_removes_blank_lines` for an explanation of this.
+GENSHI_REMOVES_BLANK_LINES = _genshi_removes_blank_lines()
class CfgGenshiGenerator(CfgGenerator):
@@ -94,8 +72,6 @@ class CfgGenshiGenerator(CfgGenerator):
def __init__(self, fname, spec, encoding):
CfgGenerator.__init__(self, fname, spec, encoding)
- if not HAS_GENSHI:
- raise PluginExecutionError("Genshi is not available")
self.template = None
self.loader = self.__loader_cls__(max_cache_size=0)
__init__.__doc__ = CfgGenerator.__init__.__doc__
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py
index fc9b5610c..0b6f3fd87 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugin/Testhelpers.py
@@ -3,6 +3,7 @@ import sys
import copy
import lxml.etree
import Bcfg2.Server
+import genshi.core
from Bcfg2.Compat import reduce
from mock import Mock, MagicMock, patch
from Bcfg2.Server.Plugin.helpers import *
@@ -36,6 +37,15 @@ class FakeElementTree(lxml.etree._ElementTree):
class TestFunctions(Bcfg2TestCase):
+ def test_removecomment(self):
+ data = [(None, "test", 1),
+ (None, "test2", 2)]
+ stream = [(genshi.core.COMMENT, "test", 0),
+ data[0],
+ (genshi.core.COMMENT, "test3", 0),
+ data[1]]
+ self.assertItemsEqual(list(removecomment(stream)), data)
+
def test_bind_info(self):
entry = lxml.etree.Element("Path", name="/test")
metadata = Mock()
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py
index b447a9bb8..330779c99 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgEncryptedGenshiGenerator.py
@@ -14,20 +14,14 @@ while path != "/":
path = os.path.dirname(path)
from common import *
-try:
- from TestServer.TestPlugins.TestCfg.TestCfgGenshiGenerator import \
- TestCfgGenshiGenerator
- HAS_GENSHI = True
-except ImportError:
- TestCfgGenshiGenerator = object
- HAS_GENSHI = False
+from TestServer.TestPlugins.TestCfg.TestCfgGenshiGenerator import \
+ TestCfgGenshiGenerator
-if can_skip or (HAS_CRYPTO and HAS_GENSHI):
+if can_skip or HAS_CRYPTO:
class TestCfgEncryptedGenshiGenerator(TestCfgGenshiGenerator):
test_obj = CfgEncryptedGenshiGenerator
@skipUnless(HAS_CRYPTO, "Encryption libraries not found, skipping")
- @skipUnless(HAS_GENSHI, "Genshi libraries not found, skipping")
def setUp(self):
pass
diff --git a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py
index 88f3cf3f7..154d6a8db 100644
--- a/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py
+++ b/testsuite/Testsrc/Testlib/TestServer/TestPlugins/TestCfg/TestCfgGenshiGenerator.py
@@ -18,111 +18,97 @@ from common import *
from TestServer.TestPlugins.TestCfg.Test_init import TestCfgGenerator
-if can_skip or HAS_GENSHI:
- class TestCfgGenshiGenerator(TestCfgGenerator):
- test_obj = CfgGenshiGenerator
-
- @skipUnless(HAS_GENSHI, "Genshi libraries not found, skipping")
- def setUp(self):
- pass
-
- def test_removecomment(self):
- data = [(None, "test", 1),
- (None, "test2", 2)]
- stream = [(genshi.core.COMMENT, "test", 0),
- data[0],
- (genshi.core.COMMENT, "test3", 0),
- data[1]]
- self.assertItemsEqual(list(removecomment(stream)), data)
-
- def test__init(self):
- TestCfgGenerator.test__init(self)
- cgg = self.get_obj()
- self.assertIsInstance(cgg.loader, cgg.__loader_cls__)
-
- def test_get_data(self):
- cgg = self.get_obj()
- cgg._handle_genshi_exception = Mock()
- cgg.setup = MagicMock()
- cgg.template = Mock()
- fltr = Mock()
- cgg.template.generate.return_value = fltr
- stream = Mock()
- fltr.filter.return_value = stream
- entry = lxml.etree.Element("Path", name="/test.txt")
- metadata = Mock()
-
-
- def reset():
- cgg.template.reset_mock()
- cgg._handle_genshi_exception.reset_mock()
- cgg.setup.reset_mock()
-
- template_vars = dict(
- name=entry.get("name"),
- metadata=metadata,
- path=cgg.name,
- source_path=cgg.name,
- repo=cgg.setup.__getitem__.return_value)
-
- self.assertEqual(cgg.get_data(entry, metadata),
- stream.render.return_value)
- cgg.template.generate.assert_called_with(**template_vars)
- cgg.setup.__getitem__.assert_called_with("repo")
- fltr.filter.assert_called_with(removecomment)
- stream.render.assert_called_with("text", encoding=cgg.encoding,
- strip_whitespace=False)
-
- reset()
- def render(fmt, **kwargs):
- stream.render.side_effect = None
- raise TypeError
- stream.render.side_effect = render
- self.assertEqual(cgg.get_data(entry, metadata),
- stream.render.return_value)
- cgg.template.generate.assert_called_with(**template_vars)
- cgg.setup.__getitem__.assert_called_with("repo")
- fltr.filter.assert_called_with(removecomment)
- self.assertEqual(stream.render.call_args_list,
- [call("text", encoding=cgg.encoding,
- strip_whitespace=False),
- call("text", encoding=cgg.encoding)])
-
- reset()
- stream.render.side_effect = UndefinedError("test")
- self.assertRaises(UndefinedError,
- cgg.get_data, entry, metadata)
- cgg.template.generate.assert_called_with(**template_vars)
- cgg.setup.__getitem__.assert_called_with("repo")
- fltr.filter.assert_called_with(removecomment)
- stream.render.assert_called_with("text", encoding=cgg.encoding,
- strip_whitespace=False)
-
- reset()
- stream.render.side_effect = ValueError
- cgg._handle_genshi_exception.side_effect = ValueError
- self.assertRaises(ValueError,
- cgg.get_data, entry, metadata)
- cgg.template.generate.assert_called_with(**template_vars)
- cgg.setup.__getitem__.assert_called_with("repo")
- fltr.filter.assert_called_with(removecomment)
- stream.render.assert_called_with("text", encoding=cgg.encoding,
- strip_whitespace=False)
- self.assertTrue(cgg._handle_genshi_exception.called)
-
- def test_handle_event(self):
- cgg = self.get_obj()
- cgg.loader = Mock()
- event = Mock()
- cgg.handle_event(event)
- cgg.loader.load.assert_called_with(cgg.name,
- cls=NewTextTemplate,
- encoding=cgg.encoding)
-
- cgg.loader.reset_mock()
- cgg.loader.load.side_effect = OSError
- self.assertRaises(PluginExecutionError,
- cgg.handle_event, event)
- cgg.loader.load.assert_called_with(cgg.name,
- cls=NewTextTemplate,
- encoding=cgg.encoding)
+class TestCfgGenshiGenerator(TestCfgGenerator):
+ test_obj = CfgGenshiGenerator
+
+ def test__init(self):
+ TestCfgGenerator.test__init(self)
+ cgg = self.get_obj()
+ self.assertIsInstance(cgg.loader, cgg.__loader_cls__)
+
+ def test_get_data(self):
+ cgg = self.get_obj()
+ cgg._handle_genshi_exception = Mock()
+ cgg.setup = MagicMock()
+ cgg.template = Mock()
+ fltr = Mock()
+ cgg.template.generate.return_value = fltr
+ stream = Mock()
+ fltr.filter.return_value = stream
+ entry = lxml.etree.Element("Path", name="/test.txt")
+ metadata = Mock()
+
+
+ def reset():
+ cgg.template.reset_mock()
+ cgg._handle_genshi_exception.reset_mock()
+ cgg.setup.reset_mock()
+
+ template_vars = dict(
+ name=entry.get("name"),
+ metadata=metadata,
+ path=cgg.name,
+ source_path=cgg.name,
+ repo=cgg.setup.__getitem__.return_value)
+
+ self.assertEqual(cgg.get_data(entry, metadata),
+ stream.render.return_value)
+ cgg.template.generate.assert_called_with(**template_vars)
+ cgg.setup.__getitem__.assert_called_with("repo")
+ fltr.filter.assert_called_with(removecomment)
+ stream.render.assert_called_with("text", encoding=cgg.encoding,
+ strip_whitespace=False)
+
+ reset()
+ def render(fmt, **kwargs):
+ stream.render.side_effect = None
+ raise TypeError
+ stream.render.side_effect = render
+ self.assertEqual(cgg.get_data(entry, metadata),
+ stream.render.return_value)
+ cgg.template.generate.assert_called_with(**template_vars)
+ cgg.setup.__getitem__.assert_called_with("repo")
+ fltr.filter.assert_called_with(removecomment)
+ self.assertEqual(stream.render.call_args_list,
+ [call("text", encoding=cgg.encoding,
+ strip_whitespace=False),
+ call("text", encoding=cgg.encoding)])
+
+ reset()
+ stream.render.side_effect = UndefinedError("test")
+ self.assertRaises(UndefinedError,
+ cgg.get_data, entry, metadata)
+ cgg.template.generate.assert_called_with(**template_vars)
+ cgg.setup.__getitem__.assert_called_with("repo")
+ fltr.filter.assert_called_with(removecomment)
+ stream.render.assert_called_with("text", encoding=cgg.encoding,
+ strip_whitespace=False)
+
+ reset()
+ stream.render.side_effect = ValueError
+ cgg._handle_genshi_exception.side_effect = ValueError
+ self.assertRaises(ValueError,
+ cgg.get_data, entry, metadata)
+ cgg.template.generate.assert_called_with(**template_vars)
+ cgg.setup.__getitem__.assert_called_with("repo")
+ fltr.filter.assert_called_with(removecomment)
+ stream.render.assert_called_with("text", encoding=cgg.encoding,
+ strip_whitespace=False)
+ self.assertTrue(cgg._handle_genshi_exception.called)
+
+ def test_handle_event(self):
+ cgg = self.get_obj()
+ cgg.loader = Mock()
+ event = Mock()
+ cgg.handle_event(event)
+ cgg.loader.load.assert_called_with(cgg.name,
+ cls=NewTextTemplate,
+ encoding=cgg.encoding)
+
+ cgg.loader.reset_mock()
+ cgg.loader.load.side_effect = OSError
+ self.assertRaises(PluginExecutionError,
+ cgg.handle_event, event)
+ cgg.loader.load.assert_called_with(cgg.name,
+ cls=NewTextTemplate,
+ encoding=cgg.encoding)