How to use the nipyapi.utils.filter_obj function in nipyapi

To help you get started, we’ve selected a few nipyapi examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github Chaffelson / nipyapi / tests / test_utils.py View on Github external
def test_filter_obj(fix_pg):
    f_pg = fix_pg.generate()
    t_1 = ['pie']
    with pytest.raises(ValueError):
        _ = utils.filter_obj(t_1, '', '')
    with pytest.raises(ValueError):
        _ = utils.filter_obj([f_pg], '', 'pie')
    r1 = utils.filter_obj([f_pg], conftest.test_pg_name, 'name')
    assert isinstance(r1, nifi.ProcessGroupEntity)
    r2 = utils.filter_obj([f_pg], 'FakeNews', 'name')
    assert r2 is None
    f_pg2 = fix_pg.generate(suffix='2')
    # Test greedy
    r3 = utils.filter_obj([f_pg, f_pg2], conftest.test_pg_name, 'name')
    assert isinstance(r3, list)
    # Test not greedy
    r4 = utils.filter_obj([f_pg, f_pg2], conftest.test_pg_name, 'name',
                          greedy=False)
    assert isinstance(r4, nifi.ProcessGroupEntity)
    r5 = utils.filter_obj([], '', '')
    assert r5 is None
github Chaffelson / nipyapi / tests / test_utils.py View on Github external
_ = utils.filter_obj(t_1, '', '')
    with pytest.raises(ValueError):
        _ = utils.filter_obj([f_pg], '', 'pie')
    r1 = utils.filter_obj([f_pg], conftest.test_pg_name, 'name')
    assert isinstance(r1, nifi.ProcessGroupEntity)
    r2 = utils.filter_obj([f_pg], 'FakeNews', 'name')
    assert r2 is None
    f_pg2 = fix_pg.generate(suffix='2')
    # Test greedy
    r3 = utils.filter_obj([f_pg, f_pg2], conftest.test_pg_name, 'name')
    assert isinstance(r3, list)
    # Test not greedy
    r4 = utils.filter_obj([f_pg, f_pg2], conftest.test_pg_name, 'name',
                          greedy=False)
    assert isinstance(r4, nifi.ProcessGroupEntity)
    r5 = utils.filter_obj([], '', '')
    assert r5 is None
github Chaffelson / nipyapi / nipyapi / versioning.py View on Github external
def get_flow_in_bucket(bucket_id, identifier, identifier_type='name'):
    """
    Filters the Flows in a Bucket against a particular identifier

    Args:
        bucket_id (str): UUID of the Bucket to filter against
        identifier (str): The string to filter on
        identifier_type (str): The param to check

    Returns:
        None for no matches, Single Object for unique match,
        list(Objects) for multiple matches
    """
    with nipyapi.utils.rest_exceptions():
        obj = list_flows_in_bucket(bucket_id)
    return nipyapi.utils.filter_obj(obj, identifier, identifier_type)
github Chaffelson / nipyapi / nipyapi / versioning.py View on Github external
def get_registry_bucket(identifier, identifier_type='name'):
    """
    Filters the Bucket list to a particular identifier

    Args:
        identifier (str): the filter string
        identifier_type (str): the param to filter on

    Returns:
        None for no matches, Single Object for unique match,
        list(Objects) for multiple matches
    """
    with nipyapi.utils.rest_exceptions():
        obj = list_registry_buckets()
    return nipyapi.utils.filter_obj(obj, identifier, identifier_type)
github Chaffelson / nipyapi / nipyapi / security.py View on Github external
Filters the all users list for a given identifier and type

    Args:
        identifier (str): the string to search for
        identifier_type (str): the field to search in
        service (str): the name of the service

    Returns:
        None if no match, list of multiple matches, else single object

    """
    assert service in _valid_services
    assert isinstance(identifier, six.string_types)
    assert isinstance(identifier_type, six.string_types)
    obj = list_service_users(service)
    out = nipyapi.utils.filter_obj(obj, identifier, identifier_type)
    return out
github Chaffelson / nipyapi / nipyapi / canvas.py View on Github external
identifier (str): The String to filter against
        identifier_type (str): The field to apply the filter to. Set in
            config.py

    Returns:
        None for no matches, Single Object for unique match,
        list(Objects) for multiple matches
    """
    assert isinstance(identifier, six.string_types)
    assert identifier_type in ['name', 'id']
    with nipyapi.utils.rest_exceptions():
        if identifier_type == 'id':
            out = nipyapi.nifi.ProcessorsApi().get_processor(identifier)
        else:
            obj = list_all_processors()
            out = nipyapi.utils.filter_obj(obj, identifier, identifier_type)
    return out