How to use the bzt.engine.Configuration function in bzt

To help you get started, we’ve selected a few bzt examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github Blazemeter / taurus / tests / json-to-yaml.py View on Github external
import os
import sys
import tempfile

from bzt.engine import Configuration


fp, filename = tempfile.mkstemp()
os.write(fp, sys.stdin.read())

conf = Configuration()
conf.load([filename])
conf.dump(filename, Configuration.YAML)

sys.stdout.write(open(filename).read())
github Blazemeter / taurus / tests / bza_client_example.py View on Github external
def get_token():
    a = Configuration()
    a.load([os.path.expanduser("~/.bzt-rc")])
    return a['modules']['blazemeter']['token']
github Blazemeter / taurus / tests / test_configuration.py View on Github external
def test_filtering(self):
        obj = Configuration()
        obj.merge({
            "drop": "me",
            "also-drop": {"this": "drop"},
            "and-also-drop": ["thelist"],
            "but-keep": "value",
            "and-also-keep": {
                "nested": "value",
                "while-dropping": "some"
            },
            "filter-subitems": {
                "keep": "value",
                "drop": "some"
            }

        })
        rules = {
github Blazemeter / taurus / tests / test_configuration.py View on Github external
def test_merge(self):
        obj = Configuration()
        configs = [
            RESOURCES_DIR + "yaml/test.yml",
            RESOURCES_DIR + "json/merge1.json",
            RESOURCES_DIR + "json/merge2.json",
        ]
        obj.load(configs)
        fname = temp_file()
        obj.dump(fname, Configuration.JSON)
        with open(fname) as fh:
            ROOT_LOGGER.debug("JSON:\n%s", fh.read())
        jmeter = obj['modules']['jmeter']
        classval = jmeter['class']
        self.assertEquals("bzt.modules.jmeter.JMeterExecutor", classval)
        self.assertEquals("value", obj['key'])
        self.assertEquals(6, len(obj["list-append"]))
        self.assertEquals(2, len(obj["list-replace"]))
github Blazemeter / taurus / tests / test_configuration.py View on Github external
def test_elementwise_merge(self):
        obj = Configuration()
        obj.merge({
            "execution": [{
                "executor": "jmeter",
                "iterations": 10,
            }],
        })
        obj.merge({
            "$execution": [{"iterations": 20}],
        })
        self.assertEqual(obj["execution"][0]["iterations"], 20)
github Blazemeter / taurus / tests / test_configuration.py View on Github external
def test_save(self):
        obj = Configuration()
        obj.merge({
            "str": "text",
            "uc": six.u("ucstring")
        })
        fname = temp_file()
        obj.dump(fname, Configuration.YAML)
        with open(fname) as fh:
            written = fh.read()
            ROOT_LOGGER.debug("YAML:\n%s", written)
            self.assertNotIn("unicode", written)
github Blazemeter / taurus / tests / test_configuration.py View on Github external
def test_elementwise_merge_right_is_bigger(self):
        obj = Configuration()
        obj.merge({
            "execution": [{
                "executor": "jmeter",
                "iterations": 10,
            }],
        })
        obj.merge({
            "$execution": [{"iterations": 20}, {"iterations": 30}],
        })
        self.assertEqual(obj["execution"][0]["iterations"], 20)
        self.assertEqual(obj["execution"][1]["iterations"], 30)
github Blazemeter / taurus / bzt / cli.py View on Github external
:type configs: list
        :return: list
        """
        jmxes = []
        for filename in configs[:]:
            if filename.lower().endswith(".jmx"):
                jmxes.append(filename)
                configs.remove(filename)

        if jmxes:
            self.log.debug("Adding JMX shorthand config for: %s", jmxes)
            fds = NamedTemporaryFile(prefix="jmx_", suffix=".json")
            fname = fds.name
            fds.close()

            config = Configuration()

            for jmx_file in jmxes:
                piece = BetterDict.from_dict({"executor": "jmeter", "scenario": {"script": jmx_file}})
                config.get(EXEC, [], force_set=True).append(piece)  # Does it brake single execution?

            config.dump(fname, Configuration.JSON)

            return [fname]
        else:
            return []
github Blazemeter / taurus / bzt / cli.py View on Github external
:type configs: list
        :return: list
        """
        jtls = []
        for filename in configs[:]:
            if filename.lower().endswith(".jtl"):
                jtls.append(filename)
                configs.remove(filename)

        if jtls:
            self.log.debug("Adding JTL shorthand config for: %s", jtls)
            fds = NamedTemporaryFile(prefix="jtl_", suffix=".json")
            fname = fds.name
            fds.close()

            config = Configuration()

            for jtl in jtls:
                piece = BetterDict.from_dict({"executor": "external-results-loader", "data-file": jtl})
                config.get(EXEC, [], force_set=True).append(piece)

            config.dump(fname, Configuration.JSON)

            return [fname]
        else:
            return []