Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_filter_obj(fix_pg):
f_pg = fix_pg.generate()
t_1 = ['pie']
with pytest.raises(ValueError):
_ = utils.filter_obj(t_1, '', '')
with pytest.raises(ValueError):
_ = utils.filter_obj([f_pg], '', 'pie')
r1 = utils.filter_obj([f_pg], conftest.test_pg_name, 'name')
assert isinstance(r1, nifi.ProcessGroupEntity)
r2 = utils.filter_obj([f_pg], 'FakeNews', 'name')
assert r2 is None
f_pg2 = fix_pg.generate(suffix='2')
# Test greedy
r3 = utils.filter_obj([f_pg, f_pg2], conftest.test_pg_name, 'name')
assert isinstance(r3, list)
# Test not greedy
r4 = utils.filter_obj([f_pg, f_pg2], conftest.test_pg_name, 'name',
greedy=False)
assert isinstance(r4, nifi.ProcessGroupEntity)
r5 = utils.filter_obj([], '', '')
assert r5 is None
_ = utils.filter_obj(t_1, '', '')
with pytest.raises(ValueError):
_ = utils.filter_obj([f_pg], '', 'pie')
r1 = utils.filter_obj([f_pg], conftest.test_pg_name, 'name')
assert isinstance(r1, nifi.ProcessGroupEntity)
r2 = utils.filter_obj([f_pg], 'FakeNews', 'name')
assert r2 is None
f_pg2 = fix_pg.generate(suffix='2')
# Test greedy
r3 = utils.filter_obj([f_pg, f_pg2], conftest.test_pg_name, 'name')
assert isinstance(r3, list)
# Test not greedy
r4 = utils.filter_obj([f_pg, f_pg2], conftest.test_pg_name, 'name',
greedy=False)
assert isinstance(r4, nifi.ProcessGroupEntity)
r5 = utils.filter_obj([], '', '')
assert r5 is None
def get_flow_in_bucket(bucket_id, identifier, identifier_type='name'):
"""
Filters the Flows in a Bucket against a particular identifier
Args:
bucket_id (str): UUID of the Bucket to filter against
identifier (str): The string to filter on
identifier_type (str): The param to check
Returns:
None for no matches, Single Object for unique match,
list(Objects) for multiple matches
"""
with nipyapi.utils.rest_exceptions():
obj = list_flows_in_bucket(bucket_id)
return nipyapi.utils.filter_obj(obj, identifier, identifier_type)
def get_registry_bucket(identifier, identifier_type='name'):
"""
Filters the Bucket list to a particular identifier
Args:
identifier (str): the filter string
identifier_type (str): the param to filter on
Returns:
None for no matches, Single Object for unique match,
list(Objects) for multiple matches
"""
with nipyapi.utils.rest_exceptions():
obj = list_registry_buckets()
return nipyapi.utils.filter_obj(obj, identifier, identifier_type)
Filters the all users list for a given identifier and type
Args:
identifier (str): the string to search for
identifier_type (str): the field to search in
service (str): the name of the service
Returns:
None if no match, list of multiple matches, else single object
"""
assert service in _valid_services
assert isinstance(identifier, six.string_types)
assert isinstance(identifier_type, six.string_types)
obj = list_service_users(service)
out = nipyapi.utils.filter_obj(obj, identifier, identifier_type)
return out
identifier (str): The String to filter against
identifier_type (str): The field to apply the filter to. Set in
config.py
Returns:
None for no matches, Single Object for unique match,
list(Objects) for multiple matches
"""
assert isinstance(identifier, six.string_types)
assert identifier_type in ['name', 'id']
with nipyapi.utils.rest_exceptions():
if identifier_type == 'id':
out = nipyapi.nifi.ProcessorsApi().get_processor(identifier)
else:
obj = list_all_processors()
out = nipyapi.utils.filter_obj(obj, identifier, identifier_type)
return out