Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# -*- coding: utf-8 -*-
"""Tests for nipyapi security module."""
from __future__ import absolute_import
import pytest
from tests import conftest
import nipyapi
# Tells pytest to skip this module of security testing is not enabled.
pytestmark = pytest.mark.skipif(not conftest.test_security, reason='test_security disabled in Conftest')
# Useful for manual testing
if conftest.test_security:
test_host = nipyapi.config.default_host
nipyapi.utils.set_endpoint('https://' + test_host + ':18443/nifi-registry-api', True, True)
nipyapi.utils.set_endpoint('https://' + test_host + ':8443/nifi-api', True, True)
def test_list_service_users():
# This test suite makes extensive use of this call in fixtures
pass
def test_get_service_user():
# This test suite makes extensive use of this call in fixtures
pass
def test_create_service_user():
with pytest.raises(AssertionError):
nipyapi.security.create_service_user(service='bob', identity='pie')
nipyapi.utils.start_docker_containers(
docker_containers=d_containers,
network_name=d_network_name
)
for reg_instance in [dev_reg_api_url, prod_reg_api_url]:
log.info("Waiting for NiFi Registries to be ready")
nipyapi.utils.set_endpoint(reg_instance)
nipyapi.utils.wait_to_complete(
test_function=nipyapi.utils.is_endpoint_up,
endpoint_url='-'.join(reg_instance.split('-')[:-1]),
nipyapi_delay=nipyapi.config.long_retry_delay,
nipyapi_max_wait=nipyapi.config.long_max_wait
)
for nifi_instance in [dev_nifi_api_url, prod_nifi_api_url]:
log.info("Waiting for NiFi instances to be ready")
nipyapi.utils.set_endpoint(nifi_instance)
nipyapi.utils.wait_to_complete(
test_function=nipyapi.utils.is_endpoint_up,
endpoint_url='-'.join(nifi_instance.split('-')[:-1]),
nipyapi_delay=nipyapi.config.long_retry_delay,
nipyapi_max_wait=nipyapi.config.long_max_wait
)
# Sleeping to wait for all startups to return before printing guide
sleep(1)
print("Your Docker containers should now be ready, please find them at the"
"following URLs:"
"\nnifi-dev ", dev_nifi_url,
"\nreg-dev ", dev_reg_url,
"\nreg-prod ", prod_reg_url,
"\nnifi-prod ", prod_nifi_url,
"\nPlease open each of these in a browser tab."
"\nPlease then call the function 'step_2_create_reg_clients()'\n")
def step_1_boot_demo_env():
"""step_1_boot_demo_env"""
log.info("Starting Dev and Prod NiFi and NiFi-Registry Docker Containers"
"\nPlease wait, this may take a few minutes to download the "
"Docker images and then start them.")
nipyapi.utils.start_docker_containers(
docker_containers=d_containers,
network_name=d_network_name
)
for reg_instance in [dev_reg_api_url, prod_reg_api_url]:
log.info("Waiting for NiFi Registries to be ready")
nipyapi.utils.set_endpoint(reg_instance)
nipyapi.utils.wait_to_complete(
test_function=nipyapi.utils.is_endpoint_up,
endpoint_url='-'.join(reg_instance.split('-')[:-1]),
nipyapi_delay=nipyapi.config.long_retry_delay,
nipyapi_max_wait=nipyapi.config.long_max_wait
)
for nifi_instance in [dev_nifi_api_url, prod_nifi_api_url]:
log.info("Waiting for NiFi instances to be ready")
nipyapi.utils.set_endpoint(nifi_instance)
nipyapi.utils.wait_to_complete(
test_function=nipyapi.utils.is_endpoint_up,
endpoint_url='-'.join(nifi_instance.split('-')[:-1]),
nipyapi_delay=nipyapi.config.long_retry_delay,
nipyapi_max_wait=nipyapi.config.long_max_wait
)
# Sleeping to wait for all startups to return before printing guide
flow_id=dev_flow.identifier,
comment='A Flow update with a Complex Processor'
)
dev_ver_flow = nipyapi.versioning.get_flow_in_bucket(
dev_bucket.identifier,
identifier=dev_ver_flow_name
)
log.info("Exporting the Dev flow to Yaml")
dev_export = nipyapi.versioning.export_flow_version(
bucket_id=dev_bucket.identifier,
flow_id=dev_ver_flow.identifier,
mode='yaml'
)
log.info("Connecting to Prod Environment")
nipyapi.utils.set_endpoint(prod_nifi_api_url)
nipyapi.utils.set_endpoint(prod_reg_api_url)
log.info("Importing the Updated Dev Yaml to the Prod Bucket Flow")
prod_bucket = nipyapi.versioning.get_registry_bucket(prod_bucket_name)
prod_flow = nipyapi.versioning.get_flow_in_bucket(
bucket_id=prod_bucket.identifier,
identifier=prod_ver_flow_name
)
nipyapi.versioning.import_flow_version(
bucket_id=prod_bucket.identifier,
encoded_flow=dev_export,
flow_id=prod_flow.identifier
)
log.info("Pushing the new Version into the Prod Flow")
prod_pg = nipyapi.canvas.get_process_group(dev_pg_name)
nipyapi.versioning.update_flow_ver(
process_group=prod_pg,
target_version=None
def step_c_promote_change_to_prod_reg():
"""Promoting the committed change across to the prod environment"""
log.info("Exporting updated Dev Flow Version")
dev_bucket = nipyapi.versioning.get_registry_bucket(dev_bucket_name)
dev_ver_flow = nipyapi.versioning.get_flow_in_bucket(
dev_bucket.identifier,
identifier=dev_ver_flow_name
)
dev_export = nipyapi.versioning.export_flow_version(
bucket_id=dev_bucket.identifier,
flow_id=dev_ver_flow.identifier,
mode='yaml'
)
log.info("Connecting to Prod Environment")
nipyapi.utils.set_endpoint(prod_nifi_api_url)
nipyapi.utils.set_endpoint(prod_reg_api_url)
log.info("Pushing updated version into Prod Registry Flow")
prod_bucket = nipyapi.versioning.get_registry_bucket(prod_bucket_name)
prod_flow = nipyapi.versioning.get_flow_in_bucket(
bucket_id=prod_bucket.identifier,
identifier=prod_ver_flow_name
)
nipyapi.versioning.import_flow_version(
bucket_id=prod_bucket.identifier,
encoded_flow=dev_export,
flow_id=prod_flow.identifier
)
print("We have promoted the change from our Dev Registry to Prod, please "
"refresh your Prod Registry Tab to see the new version is present, "
def step_7_create_prod_ver_bucket():
"""Connecting to the Prod environment and creating a new bucket"""
log.info("Connecting to Prod Environment")
nipyapi.utils.set_endpoint(prod_nifi_api_url)
nipyapi.utils.set_endpoint(prod_reg_api_url)
log.info("Creating %s as a new Registry Bucket", prod_bucket_name)
nipyapi.versioning.create_registry_bucket(prod_bucket_name)
print("We have now created a bucket in the Prod Registry to promote our "
"Dev flow into. Go to the Prod Registry tab and click the arrow next"
nipyapi.security.add_user_to_access_policy(
nifi_user_identity,
policy=ap,
service='nifi'
)
# connection test disabled as it not configured with the correct SSLContext
log.info("Starting Secured NiFi and NiFi-Registry Docker Containers")
nipyapi.utils.start_docker_containers(
docker_containers=d_containers,
network_name=d_network_name
)
log.info("Creating Registry security context")
nipyapi.utils.set_endpoint(secured_registry_url)
log.info("Using demo certs from %s", host_certs_path)
nipyapi.security.set_service_ssl_context(
service='registry',
ca_file=path.join(host_certs_path, 'localhost-ts.pem'),
client_cert_file=path.join(host_certs_path, 'client-cert.pem'),
client_key_file=path.join(host_certs_path, 'client-key.pem'),
client_key_password='clientPassword'
)
log.info("Waiting for Registry to be ready for login")
registry_user = nipyapi.utils.wait_to_complete(
test_function=nipyapi.security.get_service_access_status,
service='registry',
bool_response=True,
nipyapi_delay=nipyapi.config.long_retry_delay,
nipyapi_max_wait=nipyapi.config.long_max_wait
)
client_cert_file=path.join(host_certs_path, 'client-cert.pem'),
client_key_file=path.join(host_certs_path, 'client-key.pem'),
client_key_password='clientPassword'
)
log.info("Waiting for Registry to be ready for login")
registry_user = nipyapi.utils.wait_to_complete(
test_function=nipyapi.security.get_service_access_status,
service='registry',
bool_response=True,
nipyapi_delay=nipyapi.config.long_retry_delay,
nipyapi_max_wait=nipyapi.config.long_max_wait
)
pprint('nipyapi_secured_registry CurrentUser: ' + registry_user.identity)
log.info("Creating NiFi security context")
nipyapi.utils.set_endpoint(secured_nifi_url)
nipyapi.security.set_service_ssl_context(
service='nifi',
ca_file=host_certs_path + '/localhost-ts.pem'
)
log.info("Waiting for NiFi to be ready for login")
nipyapi.utils.wait_to_complete(
test_function=nipyapi.security.service_login,
service='nifi',
username='nobel',
password='password',
bool_response=True,
nipyapi_delay=nipyapi.config.long_retry_delay,
nipyapi_max_wait=nipyapi.config.long_max_wait
)
nifi_user = nipyapi.security.get_service_access_status(service='nifi')
pprint(
def step_e_check_sensitive_processors():
"""Create and test for Sensitive Properties to be set in the Canvas"""
log.info("Connecting to Dev Environment")
nipyapi.utils.set_endpoint(dev_nifi_api_url)
nipyapi.utils.set_endpoint(dev_reg_api_url)
log.info("Creating additional complex Processor")
nipyapi.canvas.create_processor(
parent_pg=nipyapi.canvas.get_process_group(dev_pg_name),
processor=nipyapi.canvas.get_processor_type('GetTwitter'),
location=(400.0, 600.0),
name=dev_proc2_name,
)
s_proc = nipyapi.canvas.list_sensitive_processors(summary=True)
print("We have created a new Processor {0} which has security protected"
"properties, these will need to be completed in each environment "
"that this flow is used in. These properties are discoverable using "
"the API calls list 'canvas.list_sensitive_processors()'"
"\nFunction 'nipyapi.canvas.update_processor' as used in step_a is"
" intended for this purpose"
"\nPlease no call 'step_f_set_sensitive_values()'\n"