Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def list_all_templates(native=True):
"""
Gets a list of all templates on the canvas
Returns:
(list[TemplateEntity]): A list of TemplateEntity's
"""
with nipyapi.utils.rest_exceptions():
templates = nipyapi.nifi.FlowApi().get_templates()
if not native:
if templates:
return templates.templates
return None
return templates
def get_root_pg_id():
"""
Convenience function to return the UUID of the Root Process Group
Returns (str): The UUID of the root PG
"""
return nipyapi.nifi.FlowApi().get_process_group_status('root') \
.process_group_status.id
"""
Returns information about a Process Group and flow.
This surfaces the native implementation, for the recursed implementation
see 'recurse_flow'
Args:
pg_id (str): id of the Process Group to retrieve, defaults to the root
process group if not set
Returns:
(ProcessGroupFlowEntity): The Process Group object
"""
assert isinstance(pg_id, six.string_types), "pg_id should be a string"
with nipyapi.utils.rest_exceptions():
return nipyapi.nifi.FlowApi().get_flow(pg_id)
def list_all_controller_types():
"""
Lists all Controller Service types available on the environment
Returns:
list(DocumentedTypeDTO)
"""
handle = nipyapi.nifi.FlowApi()
return handle.get_controller_service_types().controller_service_types
"""
assert isinstance(
get_process_group(pg_id, 'id'),
nipyapi.nifi.ProcessGroupEntity
)
assert isinstance(scheduled, bool)
assert components is None or isinstance(components, list)
target_state = 'RUNNING' if scheduled else 'STOPPED'
body = nipyapi.nifi.ScheduleComponentsEntity(
id=pg_id,
state=target_state
)
if components:
body.components = {i.id: i.revision for i in components}
with nipyapi.utils.rest_exceptions():
result = nipyapi.nifi.FlowApi().schedule_components(
id=pg_id,
body=body
)
if result.state == target_state:
return True
return False
Args:
bucket_id (str): UUID of the bucket holding the flow to be enumerated
flow_id (str): UUID of the flow in the bucket to be enumerated
registry_id (str): UUID of the registry client linking the bucket, only
required if requesting flows via NiFi instead of directly Registry
service (str): Accepts 'nifi' or 'registry', indicating which service
to query
Returns:
list(VersionedFlowSnapshotMetadata) or
(VersionedFlowSnapshotMetadataSetEntity)
"""
assert service in ['nifi', 'registry']
if service == 'nifi':
with nipyapi.utils.rest_exceptions():
return nipyapi.nifi.FlowApi().get_versions(
registry_id=registry_id,
bucket_id=bucket_id,
flow_id=flow_id
)
else:
with nipyapi.utils.rest_exceptions():
return nipyapi.registry.BucketFlowsApi().get_flow_versions(
bucket_id=bucket_id,
flow_id=flow_id
)