Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_client_recursion_limit(fix_pg, fix_funnel, target=450):
# https://github.com/Chaffelson/nipyapi/issues/147
parent_pg = canvas.get_process_group('root')
for i in range(0, target):
parent_pg = fix_pg.generate(parent_pg, str(i))
fix_funnel.generate(parent_pg)
start = time.time()
r1 = canvas.list_all_process_groups(canvas.get_root_pg_id())
end = time.time()
assert len(r1) == target + 1 # +1 to allow for root PG
print("Len {0} Set {1}".format(len(r1), len(set([x.id for x in r1]))))
print("Elapsed r1: {0}".format((end - start)))
parent_pg=canvas.get_process_group(canvas.get_root_pg_id(), 'id'),
new_pg_name=conftest.test_pg_name,
location=(400.0, 400.0),
comment='some comment'
)
assert r.component.name == conftest.test_pg_name
assert r.position.x == r.position.y == 400
assert r.component.parent_group_id == canvas.get_root_pg_id()
assert isinstance(r, nifi.ProcessGroupEntity)
# Test process group creation on other than root process group.
s = canvas.create_process_group(parent_pg=canvas.get_process_group(conftest.test_pg_name), location=(200.0, 200.0),
new_pg_name=conftest.test_another_pg_name)
assert s.component.name == conftest.test_another_pg_name
assert s.position.x == s.position.y == 200
assert s.component.parent_group_id == canvas.get_process_group(conftest.test_pg_name, "name").id
assert isinstance(s, nifi.ProcessGroupEntity)
with pytest.raises(ValueError):
parent_pg = canvas.get_process_group('NiFi Flow')
parent_pg.id = 'invalid'
_ = canvas.create_process_group(
parent_pg,
'irrelevant',
(0, 0)
)
def test_get_component_connections(regress_nifi, fix_proc):
f_p1 = fix_proc.generate()
f_p2 = fix_proc.generate()
f_p3 = canvas.create_processor(
parent_pg=canvas.get_process_group(canvas.get_root_pg_id(), 'id'),
processor=canvas.get_processor_type('AttributesToJSON'),
location=(400.0, 425.0),
name=conftest.test_processor_name + '_inbound'
)
canvas.create_connection(f_p1, f_p3, name=conftest.test_basename)
canvas.create_connection(f_p2, f_p3, name=conftest.test_basename)
r1 = canvas.get_component_connections(f_p1)
assert len(r1) == 1
assert r1[0].source_id == f_p1.id
r2 = canvas.get_component_connections(f_p3)
assert len(r2) == 2
assert r2[0].destination_id == f_p3.id
assert r2[1].source_id in [f_p1.id, f_p2.id]
def test_create_process_group(regress_nifi):
r = canvas.create_process_group(
parent_pg=canvas.get_process_group(canvas.get_root_pg_id(), 'id'),
new_pg_name=conftest.test_pg_name,
location=(400.0, 400.0),
comment='some comment'
)
assert r.component.name == conftest.test_pg_name
assert r.position.x == r.position.y == 400
assert r.component.parent_group_id == canvas.get_root_pg_id()
assert isinstance(r, nifi.ProcessGroupEntity)
# Test process group creation on other than root process group.
s = canvas.create_process_group(parent_pg=canvas.get_process_group(conftest.test_pg_name), location=(200.0, 200.0),
new_pg_name=conftest.test_another_pg_name)
assert s.component.name == conftest.test_another_pg_name
assert s.position.x == s.position.y == 200
assert s.component.parent_group_id == canvas.get_process_group(conftest.test_pg_name, "name").id
assert isinstance(s, nifi.ProcessGroupEntity)
with pytest.raises(ValueError):
parent_pg = canvas.get_process_group('NiFi Flow')
parent_pg.id = 'invalid'
_ = canvas.create_process_group(
parent_pg,
'irrelevant',
(0, 0)
)
def create_pg_snippet(pg_id):
"""
Creates a snippet of the targeted process group, and returns the object
ready to be turned into a Template
Args:
pg_id: UUID of the process Group to snippet
Returns:
(SnippetEntity): The Snippet Object
"""
target_pg = nipyapi.canvas.get_process_group(pg_id, 'id')
new_snippet_req = nipyapi.nifi.SnippetEntity(
snippet={
'processGroups': {
target_pg.id: target_pg.revision
},
'parentGroupId':
target_pg.component.parent_group_id
}
)
with nipyapi.utils.rest_exceptions():
snippet_resp = nipyapi.nifi.SnippetsApi().create_snippet(
new_snippet_req
)
return snippet_resp
log = logging.getLogger(__name__)
log.setLevel(logging.INFO)
log.info("Setting up NiPyApi Demo Console")
log.info("Cleaning up old NiPyApi Console Process Groups")
process_group_0 = nipyapi.canvas.get_process_group(_pg0)
if process_group_0 is not None:
nipyapi.canvas.delete_process_group(
process_group_0,
force=True,
refresh=True
)
log.info("Creating process_group_0 as an empty process group name %s", _pg0)
process_group_0 = nipyapi.canvas.create_process_group(
nipyapi.canvas.get_process_group(nipyapi.canvas.get_root_pg_id(), 'id'),
_pg0,
location=(400.0, 400.0)
)
log.info("Cleaning up old NiPyApi Console Processors")
processor_0 = nipyapi.canvas.get_processor(_proc0)
if processor_0 is not None:
nipyapi.canvas.delete_processor(process_group_0, True)
log.info("Creating processor_0 as a new GenerateFlowFile named %s in the "
"previous ProcessGroup", _proc0)
processor_0 = nipyapi.canvas.create_processor(
parent_pg=process_group_0,
processor=nipyapi.canvas.get_processor_type('GenerateFlowFile'),
location=(400.0, 400.0),
name=_proc0,
config=nipyapi.nifi.ProcessorConfigDTO(
def upload_template(pg_id, template_file):
"""
Uploads a given template xml from from the file system to the given
Process Group
Args:
pg_id (str): The UUID of the Process Group to upload to
template_file (str): The path including filename to the template file
Returns:
(TemplateEntity): The new Template object
"""
with nipyapi.utils.rest_exceptions():
this_pg = nipyapi.canvas.get_process_group(pg_id, 'id')
assert isinstance(this_pg, nipyapi.nifi.ProcessGroupEntity)
log.info("Called upload_template against endpoint %s with args %s",
nipyapi.config.nifi_config.api_client.host, locals())
# Ensure we are receiving a valid file
assert isfile(template_file) and access(template_file, R_OK), \
SystemError("File {0} invalid or unreadable".format(template_file))
# Test for expected Template XML elements
tree = etree.parse(template_file)
root_tag = tree.getroot().tag
if root_tag != 'template':
raise TypeError(
"Expected 'template' as xml root element, got {0} instead."
"Are you sure this is a Template?"
.format(root_tag)
)
t_name = tree.find('name').text
bucket (Bucket): the Bucket on the NiFi Registry to save to
flow_name (str): A name for the VersionedFlow in the Bucket
Note you need either a name for a new VersionedFlow, or the ID of
an existing one to save a new version
flow_id (Optional [str]): Identifier of an existing VersionedFlow in
the bucket, if saving a new version to an existing flow
comment (str): A comment for the version commit
desc (str): A description of the VersionedFlow
refresh (bool): Whether to refresh the object revisions before action
force (bool): Whether to Force Commit, or just regular Commit
Returns:
(VersionControlInformationEntity)
"""
if refresh:
target_pg = nipyapi.canvas.get_process_group(process_group.id, 'id')
else:
target_pg = process_group
if nipyapi.utils.check_version('1.10.0') <= 0:
body = nipyapi.nifi.StartVersionControlRequestEntity(
process_group_revision=target_pg.revision,
versioned_flow=nipyapi.nifi.VersionedFlowDTO(
bucket_id=bucket.identifier,
comments=comment,
description=desc,
flow_name=flow_name,
flow_id=flow_id,
registry_id=registry_client.id,
action='FORCE_COMMIT' if force else 'COMMIT'
)
)
else:
def step_5_save_flow_to_bucket():
"""Saving the flow to the bucket as a new versioned flow"""
log.info(
"Saving %s to %s", dev_pg_name, dev_bucket_name)
process_group = nipyapi.canvas.get_process_group(dev_pg_name)
bucket = nipyapi.versioning.get_registry_bucket(dev_bucket_name)
registry_client = nipyapi.versioning.get_registry_client(
dev_reg_client_name)
nipyapi.versioning.save_flow_ver(
process_group=process_group,
registry_client=registry_client,
bucket=bucket,
flow_name=dev_ver_flow_name,
desc='A Versioned Flow',
comment='A Versioned Flow'
)
print("We have now saved the Dev Process Group to the Dev Registry bucket "
"as a new Versioned Flow. Return to the Dev Registry tab in your "