Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
outputs={"add": is_add, "mult": is_mult, "numbers": numbers},
)
async def parse_line(line: str):
return {
"add": "add" in line,
"mult": "mult" in line,
"numbers": [int(item) for item in line.split() if item.isdigit()],
}
OPIMPS = opimp_in(sys.modules[__name__])
OPERATIONS = operation_in(sys.modules[__name__])
DATAFLOW = DataFlow.auto(*OPIMPS)
class TestMemoryKeyValueStore(AsyncTestCase):
def setUp(self):
self.kvStore = MemoryKeyValueStore(BaseConfig())
async def test_get_set(self):
async with self.kvStore as kvstore:
async with kvstore() as ctx:
await ctx.set("feed", b"face")
self.assertEqual(await ctx.get("feed"), b"face")
async def test_get_none(self):
async with self.kvStore as kvstore:
async with kvstore() as ctx:
self.assertEqual(await ctx.get("feed"), None)
class TestMemoryOperationImplementationNetwork(AsyncTestCase):
"operations",
[
("{import_name}", "definitions.py"),
("{import_name}", "operations.py"),
("tests", "test_operations.py"),
],
)
async def test_service(self):
await self.generic_test(
"service",
[("{import_name}", "misc.py"), ("tests", "test_service.py")],
)
class TestDevelopSkelLink(AsyncTestCase):
skel = Skel()
async def test_run(self):
# Skip if not in development mode
if not is_develop("dffml"):
self.skipTest("dffml not installed in development mode")
await Develop.cli("skel", "link")
common_files = [
path.relative_to(self.skel.common)
for path in self.skel.common_files()
]
# At time of writing there are 4 plugins in skel/ change this as needed
plugins = self.skel.plugins()
self.assertGreater(len(plugins), 3)
for plugin in plugins:
return args
@classmethod
def config(cls, config, *above):
return KeyValueStoreWithArgumentsConfig(
filename=cls.config_get(config, above, "filename")
)
def load_kvstore_with_args(loading=None):
if loading == "withargs":
return KeyValueStoreWithArguments
return [KeyValueStoreWithArguments]
class TestMemoryRedundancyChecker(AsyncTestCase):
@patch.object(BaseKeyValueStore, "load", load_kvstore_with_args)
def test_args(self):
self.assertEqual(
MemoryRedundancyChecker.args({}),
{
"rchecker": {
"arg": None,
"config": {
"memory": {
"arg": None,
"config": {
"kvstore": {
"arg": Arg(
type=BaseKeyValueStore.load,
default=MemoryKeyValueStore,
),
class FakeCMD(CMD):
sub_cmd = FakeSubCMD
parser = Parser()
with patch.object(parser, "add_subparsers") as mock_method:
parser.add_subs(FakeCMD)
mock_method.assert_called_once()
parser = Parser()
with patch.object(parser, "add_subparsers") as mock_method:
parser.add_subs(FakeSubCMD)
with self.assertRaisesRegex(AssertionError, "Called 0 times"):
mock_method.assert_called_once()
class TestListEntrypoint(AsyncTestCase):
def test_display_no_docstring(self):
class FakeClass(CMD):
pass
with patch.object(sys.stdout, "write") as mock_method:
ListEntrypoint().display(FakeClass)
with self.assertRaisesRegex(AssertionError, "call not found"):
mock_method.assert_any_call("docstring!")
def test_display_docstring(self):
class FakeClass(CMD):
"docstring!"
with patch.object(sys.stdout, "write") as mock_method:
ListEntrypoint().display(FakeClass)
mock_method.assert_any_call("docstring!")
self.assertEqual(Feature.convert_dtype("float"), float)
def test_convert_dtype_invalid(self):
with self.assertRaisesRegex(TypeError, "Failed to convert"):
Feature.convert_dtype("not a python data type")
class TestDefFeature(AsyncTestCase):
def test_deffeature(self):
feature = DefFeature("test", float, 10)
self.assertEqual(feature.NAME, "test")
self.assertEqual(feature.dtype(), float)
self.assertEqual(feature.length(), 10)
class TestFeatures(AsyncTestCase):
def setUp(self):
self.one = OneFeatureTester()
self.two = TwoFeatureTester()
self.three = ThreeFeatureTester()
self.features = Features(self.one, self.two, self.three)
async def test_names(self):
async with self.features:
names = self.features.names()
for check in ["one", "two", "three"]:
self.assertIn(check, names)
async def test_applicable(self):
async with self.features:
applicable = await self.features.applicable("test")
self.assertIn(self.one, applicable)
# SPDX-License-Identifier: MIT
# Copyright (c) 2019 Intel Corporation
from dffml.source.json import JSONSource, JSONSourceConfig
from dffml.util.testing.source import FileSourceTest
from dffml.util.asynctestcase import AsyncTestCase
class TestJSONSource(FileSourceTest, AsyncTestCase):
async def setUpSource(self):
return JSONSource(JSONSourceConfig(filename=self.testfile))
def setUp(self):
self.kvStore = MemoryKeyValueStore(BaseConfig())
async def test_get_set(self):
async with self.kvStore as kvstore:
async with kvstore() as ctx:
await ctx.set("feed", b"face")
self.assertEqual(await ctx.get("feed"), b"face")
async def test_get_none(self):
async with self.kvStore as kvstore:
async with kvstore() as ctx:
self.assertEqual(await ctx.get("feed"), None)
class TestMemoryOperationImplementationNetwork(AsyncTestCase):
async def setUp(self):
self.operationsNetwork = MemoryOperationImplementationNetwork.withconfig(
{}
)
self.operationsNetworkCtx = await self.operationsNetwork.__aenter__()
async def tearDown(self):
await self.operationsNetwork.__aexit__(None, None, None)
async def test_contains_true(self):
async with self.operationsNetworkCtx() as ctx:
await ctx.instantiate(add.op, BaseConfig(), opimp=add.imp)
self.assertTrue(await ctx.contains(add.op))
async def test_contains_false(self):
async with self.operationsNetworkCtx() as ctx:
async def test_mklock_exists(self):
self.data.locks["feed"] = asyncio.Lock()
self.assertIn("feed", self.data.locks)
await self.data.mklock("feed")
self.assertIn("feed", self.data.locks)
async def test_results(self):
async def complete(*args):
return "face"
with patch.object(self.data, "complete", complete):
await self.data.result()
self.assertEqual(self.data.results, "face")
class TestFeature(AsyncTestCase):
def setUp(self):
self.feature = Feature()
def test_default_dtype(self):
self.assertEqual(self.feature.dtype(), int)
def test_default_length(self):
self.assertEqual(self.feature.length(), 1)
async def test_default_applicable(self):
self.assertEqual(await self.feature.applicable(Data("test")), True)
def test_load_def(self):
feature = Feature.load_def("test", "float", 10)
self.assertEqual(feature.NAME, "test")
self.assertEqual(feature.dtype(), float)
name.startswith("test_") or name in ["setUp", "tearDown"]
):
setattr(self, name, self.async_wrapper(method))
return super().run(result=result)
@contextlib.contextmanager
def non_existant_tempfile():
"""
Yield the filename of a non-existant file within a temporary directory
"""
with tempfile.TemporaryDirectory() as testdir:
yield os.path.join(testdir, str(random.random()))
class AsyncExitStackTestCase(AsyncTestCase):
async def setUp(self):
super().setUp()
self._stack = contextlib.ExitStack().__enter__()
self._astack = await contextlib.AsyncExitStack().__aenter__()
async def tearDown(self):
super().tearDown()
self._stack.__exit__(None, None, None)
await self._astack.__aexit__(None, None, None)
def mktempfile(
self, suffix: Optional[str] = None, text: Optional[str] = None
):
filename = self._stack.enter_context(non_existant_tempfile())
if suffix:
filename = filename + suffix
itertools.starmap(
Path,
[
(".gitignore",),
(".coveragerc",),
("setup_common.py",),
("MANIFEST.in",),
("REPLACE_IMPORT_PACKAGE_NAME", "version.py"),
("REPLACE_IMPORT_PACKAGE_NAME", "__init__.py"),
("tests/__init__.py",),
],
)
)
class TestSkelUtil(AsyncTestCase):
skel = Skel()
def test_all_common_files_accounted_for(self):
common_files = [
path.relative_to(self.skel.common)
for path in self.skel.common_files()
]
for check in COMMON_FILES:
self.assertIn(check, common_files)