Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create_controller(regress_nifi, fix_cont):
root_pg = canvas.get_process_group(canvas.get_root_pg_id(), 'id')
cont_type = canvas.list_all_controller_types()[0]
r1 = canvas.create_controller(
parent_pg=root_pg,
controller=cont_type
)
assert isinstance(r1, nifi.ControllerServiceEntity)
with pytest.raises(AssertionError):
_ = canvas.create_controller('pie', cont_type)
with pytest.raises(AssertionError):
_ = canvas.create_controller(root_pg, 'pie')
def test_create_process_group(regress_nifi):
r = canvas.create_process_group(
parent_pg=canvas.get_process_group(canvas.get_root_pg_id(), 'id'),
new_pg_name=conftest.test_pg_name,
location=(400.0, 400.0),
comment='some comment'
)
assert r.component.name == conftest.test_pg_name
assert r.position.x == r.position.y == 400
assert r.component.parent_group_id == canvas.get_root_pg_id()
assert isinstance(r, nifi.ProcessGroupEntity)
# Test process group creation on other than root process group.
s = canvas.create_process_group(parent_pg=canvas.get_process_group(conftest.test_pg_name), location=(200.0, 200.0),
new_pg_name=conftest.test_another_pg_name)
assert s.component.name == conftest.test_another_pg_name
assert s.position.x == s.position.y == 200
assert s.component.parent_group_id == canvas.get_process_group(conftest.test_pg_name, "name").id
assert isinstance(s, nifi.ProcessGroupEntity)
def test_deploy_flow_version(regress_flow_reg, fix_ver_flow):
r1 = versioning.deploy_flow_version(
parent_id=canvas.get_root_pg_id(),
location=(0,0),
bucket_id=fix_ver_flow.bucket.identifier,
flow_id=fix_ver_flow.flow.identifier,
reg_client_id=fix_ver_flow.client.id,
version=1
)
assert isinstance(r1, nifi.ProcessGroupEntity)
r2 = versioning.deploy_flow_version(
parent_id=canvas.get_root_pg_id(),
location=(0, 0),
bucket_id=fix_ver_flow.bucket.identifier,
flow_id=fix_ver_flow.flow.identifier,
reg_client_id=fix_ver_flow.client.id,
version=None
)
assert isinstance(r2, nifi.ProcessGroupEntity)
def test_get_root_pg_id():
r = canvas.get_root_pg_id()
assert isinstance(r, str)
def test_list_nested_processors(regress_nifi, fix_pg, fix_proc):
pg_1 = fix_pg.generate(
parent_pg=canvas.get_process_group(canvas.get_root_pg_id(), 'id')
)
pg_2 = fix_pg.generate(parent_pg=pg_1)
root_proc_1 = fix_proc.generate()
pg_1_proc_1 = fix_proc.generate(parent_pg=pg_1)
pg_1_proc_2 = fix_proc.generate(parent_pg=pg_1)
pg_2_proc_1 = fix_proc.generate(parent_pg=pg_2)
pg_2_proc_2 = fix_proc.generate(parent_pg=pg_2)
pg_2_proc_3 = fix_proc.generate(parent_pg=pg_2)
pg_2_proc_4 = fix_proc.generate(parent_pg=pg_2)
r1 = [x for x in canvas.list_all_processors('root')
if conftest.test_basename in x.status.name]
assert len(r1) == 7
r2 = [x for x in canvas.list_all_processors(pg_2.id)
if conftest.test_basename in x.status.name]
assert len(r2) == 4
r3 = [x for x in canvas.list_all_processors(pg_1.id)
def test_input_output_ports(regress_nifi, fix_pg):
root_input_port = canvas.create_port(
pg_id=canvas.get_root_pg_id(),
port_type='INPUT_PORT',
name=conftest.test_basename + 'input_port',
state='STOPPED'
)
assert isinstance(root_input_port, nifi.PortEntity)
root_output_port = canvas.create_port(
pg_id=canvas.get_root_pg_id(),
port_type='OUTPUT_PORT',
name=conftest.test_basename + 'output_port',
state='STOPPED'
)
assert isinstance(root_output_port, nifi.PortEntity)
input_ports = [x for x in canvas.list_all_by_kind('input_ports')
if conftest.test_basename in x.status.name]
assert len(input_ports) == 1
output_ports = [x for x in canvas.list_all_by_kind('output_ports')
if conftest.test_basename in x.status.name]
assert len(output_ports) == 1
f_pg = fix_pg.generate()
f_pg_input_port = canvas.create_port(
pg_id=f_pg.id,
port_type='INPUT_PORT',
name=conftest.test_basename + 'input_port',
group_identity=None):
"""
Creates a default security context within NiFi or Nifi-Registry
Args:
service (str): 'nifi' or 'registry' to indicate which service
user_identity: a service user to establish in the security context
group_identity: a service group to establish in the security context
Returns:
None
"""
assert service in _valid_services
if 'nifi' in service:
rpg_id = nipyapi.canvas.get_root_pg_id()
if user_identity is None and group_identity is None:
nifi_user_identity = nipyapi.security.get_service_user(
nipyapi.config.default_nifi_username,
service='nifi'
)
else:
nifi_user_identity = user_identity
access_policies = [
('write', 'process-groups', rpg_id),
('read', 'process-groups', rpg_id),
('write', 'data/process-groups', rpg_id),
('read', 'data/process-groups', rpg_id),
('read', 'system', None),
]
for pol in access_policies:
ap = nipyapi.security.get_access_policy_for_resource(
def step_9_deploy_prod_flow_to_nifi():
"""Deploying the flow to the Prod environment"""
log.info("Deploying promoted flow from Prod Registry to Prod Nifi")
bucket = nipyapi.versioning.get_registry_bucket(prod_bucket_name)
flow = nipyapi.versioning.get_flow_in_bucket(
bucket_id=bucket.identifier,
identifier=prod_ver_flow_name
)
reg_client = nipyapi.versioning.get_registry_client(prod_reg_client_name)
nipyapi.versioning.deploy_flow_version(
parent_id=nipyapi.canvas.get_root_pg_id(),
location=(0, 0),
bucket_id=bucket.identifier,
flow_id=flow.identifier,
reg_client_id=reg_client.id,
version=None
)
print("The Promoted Flow has now been deployed to the Prod NiFi, please "
"refresh the Prod NiFi tab and note that the Process Group has the "
def bootstrap_nifi_access_policies(user='nobel'):
"""
Grant the current nifi user access to the root process group
Note: Not sure not work with the current LDAP-configured Docker image.
It may need to be tweaked to configure the ldap-user-group-provider.
"""
rpg_id = nipyapi.canvas.get_root_pg_id()
nifi_user_identity = nipyapi.security.get_service_user(user)
access_policies = [
('write', 'process-groups', rpg_id),
('read', 'process-groups', rpg_id)
]
for pol in access_policies:
ap = nipyapi.security.create_access_policy(
action=pol[0],
resource=pol[1],
r_id=pol[2],
service='nifi'
)
nipyapi.security.add_user_to_access_policy(
nifi_user_identity,
policy=ap,