Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
config (Optional [ProcessorConfigDTO]): A configuration object for the
new processor
Returns:
(ProcessorEntity): The new Processor
"""
if name is None:
processor_name = processor.type.split('.')[-1]
else:
processor_name = name
if config is None:
target_config = nipyapi.nifi.ProcessorConfigDTO()
else:
target_config = config
with nipyapi.utils.rest_exceptions():
return nipyapi.nifi.ProcessGroupsApi().create_processor(
id=parent_pg.id,
body=nipyapi.nifi.ProcessorEntity(
revision={'version': 0},
component=nipyapi.nifi.ProcessorDTO(
position=nipyapi.nifi.PositionDTO(
x=float(location[0]),
y=float(location[1])
),
type=processor.type,
name=processor_name,
config=target_config
)
"Source Relationships [{1}]" \
.format(str(relationships), str(source_rels))
else:
# if no relationships supplied, we connect them all
relationships = source_rels
if source_type == 'OUTPUT_PORT':
# the hosting process group for an Output port connection to another
# process group is the common parent process group
parent_pg = get_process_group(source.component.parent_group_id, 'id')
if parent_pg.id == get_root_pg_id():
parent_id = parent_pg.id
else:
parent_id = parent_pg.component.parent_group_id
else:
parent_id = source.component.parent_group_id
with nipyapi.utils.rest_exceptions():
return nipyapi.nifi.ProcessGroupsApi().create_connection(
id=parent_id,
body=nipyapi.nifi.ConnectionEntity(
revision=nipyapi.nifi.RevisionDTO(
version=0
),
source_type=source_type,
destination_type=target_type,
component=nipyapi.nifi.ConnectionDTO(
source=nipyapi.nifi.ConnectableDTO(
id=source.id,
group_id=source.component.parent_group_id,
type=source_type
),
name=name,
destination=nipyapi.nifi.ConnectableDTO(
def revert_flow_ver(process_group):
"""
Attempts to roll back uncommitted changes to a Process Group to the last
committed version
Args:
process_group (ProcessGroupEntity): the ProcessGroup to work with
Returns:
(VersionedFlowUpdateRequestEntity)
"""
# ToDo: Add handling for flows with live data
assert isinstance(process_group, nipyapi.nifi.ProcessGroupEntity)
with nipyapi.utils.rest_exceptions():
return nipyapi.nifi.VersionsApi().initiate_revert_flow_version(
id=process_group.id,
body=nipyapi.nifi.VersionsApi().get_version_information(
process_group.id
)
def list_registry_buckets():
"""
Lists all available Buckets in the NiFi Registry
Returns:
(list[Bucket]) objects
"""
with nipyapi.utils.rest_exceptions():
return nipyapi.registry.BucketsApi().get_buckets()
def get_cluster():
"""
EXPERIMENTAL
Returns the contents of the NiFi cluster
Returns (json):
"""
with nipyapi.utils.rest_exceptions():
return nipyapi.nifi.ControllerApi().get_cluster()
r_id (optional[str]): if NiFi, the resource ID of the resource
service (str): the service to target
Returns:
An access policy object for that service
"""
assert isinstance(resource, six.string_types)
assert action in _valid_actions
assert r_id is None or isinstance(r_id, six.string_types)
assert service in _valid_services
if resource[0] != '/':
r = '/' + resource
else:
r = resource
with nipyapi.utils.rest_exceptions():
if service == 'nifi':
return nipyapi.nifi.PoliciesApi().create_access_policy(
body=nipyapi.nifi.AccessPolicyEntity(
revision=nipyapi.nifi.RevisionDTO(version=0),
component=nipyapi.nifi.AccessPolicyDTO(
action=action,
resource='/'.join([r, r_id]) if r_id else r
)
)
)
# elif service == 'registry':
return nipyapi.registry.PoliciesApi().create_access_policy(
body=nipyapi.registry.AccessPolicy(
action=action,
resource=r
)
Args:
parent_pg (ProcessGroupEntity): The parent Process Group to create the
new process group in
new_pg_name (str): The name of the new Process Group
location (tuple[x, y]): the x,y coordinates to place the new Process
Group under the parent
comment (str): Entry for the Comments field
Returns:
(ProcessGroupEntity): The new Process Group
"""
assert isinstance(parent_pg, nipyapi.nifi.ProcessGroupEntity)
assert isinstance(new_pg_name, six.string_types)
assert isinstance(location, tuple)
with nipyapi.utils.rest_exceptions():
return nipyapi.nifi.ProcessGroupsApi().create_process_group(
id=parent_pg.id,
body=nipyapi.nifi.ProcessGroupEntity(
revision={'version': 0},
component=nipyapi.nifi.ProcessGroupDTO(
name=new_pg_name,
position=nipyapi.nifi.PositionDTO(
x=float(location[0]),
y=float(location[1])
),
comments=comment
)
name (str): optional, Name to assign to the port
state (str): One of RUNNING, STOPPED, DISABLED
position (tuple): optional, tuple of ints like (400, 400)
Returns:
(PortEntity) of the created port
"""
assert state in ["RUNNING", "STOPPED", "DISABLED"]
assert port_type in ["INPUT_PORT", "OUTPUT_PORT"]
assert isinstance(pg_id, six.string_types)
position = position if position else (400, 400)
assert isinstance(position, tuple)
handle = nipyapi.nifi.ProcessGroupsApi()
port_generator = getattr(handle, 'create_' + port_type.lower())
with nipyapi.utils.rest_exceptions():
return port_generator(
id=pg_id,
body=nipyapi.nifi.PortEntity(
revision=nipyapi.nifi.RevisionDTO(version=0),
component=nipyapi.nifi.PortDTO(
parent_group_id=pg_id,
position=nipyapi.nifi.PositionDTO(
x=float(position[0]),
y=float(position[1])
),
name=name
)
# TODO: Reimplement to batched instead of single threaded
def _autumn_leaves(con_id_, drop_request_):
test_obj = nipyapi.nifi.FlowfileQueuesApi().get_drop_request(
con_id_,
drop_request_.drop_request.id
).drop_request
if not test_obj.finished:
return False
if test_obj.failure_reason:
raise ValueError(
"Unable to complete drop request{0}, error was {1}"
.format(test_obj, test_obj.drop_request.failure_reason)
)
return True
with nipyapi.utils.rest_exceptions():
drop_req = nipyapi.nifi.FlowfileQueuesApi().create_drop_request(con_id)
assert isinstance(drop_req, nipyapi.nifi.DropRequestEntity)
return nipyapi.utils.wait_to_complete(_autumn_leaves, con_id, drop_req)