Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def mock_iter_resp_lines(resp):
for name in ['test1', 'test2', 'test3']:
yield json.dumps({
'type': 'ADDED',
'object': models.V1DeploymentConfig(
metadata=k8s_models.V1ObjectMeta(
name=name,
resource_version=1
),
spec=models.V1DeploymentConfigSpec()
).to_dict()
})
oapi = client.OapiApi()
monkeypatch.setattr(oapi, 'list_deployment_config_for_all_namespaces', MockHTTPResponse)
monkeypatch.setattr(k8s_watch.watch, 'iter_resp_lines', mock_iter_resp_lines)
count = 2
names = ['test1', 'test2', 'test3']
w = watch.Watch(return_type="V1DeploymentConfig")
for event in w.stream(oapi.list_deployment_config_for_all_namespaces, _request_timeout=2):
assert event['object'].metadata.name in names
names.remove(event['object'].metadata.name)
count -= 1
if not count:
w.stop()
def delete_old_images(image_name):
# Let's ignore the registry prefix for now because sometimes our tag doesn't match the registry
registry, image_name = image_name.split('/', 1)
try:
oapi = openshift_client.OapiApi()
image_list = get_registry_images()
for image in image_list['items']:
image_fqn, image_sha = image['dockerImageReference'].split("@")
if image_name in image_fqn:
print("Found image: %s" % image_fqn)
if registry not in image_fqn:
# This warning will only get displayed if a user has used --registry-route
# This is because the route name gets collapsed into the service hostname
# when pushed to the registry.
print("Warning: Tagged image registry prefix doesn't match. Deleting anyway. Given: %s; Found: %s"
% (registry, image_fqn.split('/')[0]))
oapi.delete_image(name=image_sha, body={})
print("Successfully deleted %s" % image_sha)
except Exception as e:
print("Exception deleting old images: %s" % e)
def create_project(project):
print("Creating project {}".format(project))
try:
openshift_config.load_kube_config()
api = openshift_client.OapiApi()
api.create_project_request({
'apiVersion': 'v1',
'kind': 'ProjectRequest',
'metadata': {
'name': project
}
})
print("Created project")
# TODO: Evaluate the project request to get the actual project name
return project
except ApiException as e:
if e.status == 409:
print("Project {} already exists".format(project))
return project
else:
def create_cluster_role_binding(name, user_name, role="cluster-admin"):
print("Creating role binding of {} for {}".format(role, user_name))
try:
kubernetes_config.load_kube_config()
api = openshift_client.OapiApi()
# TODO: Use generateName when it doesn't throw an exception
api.create_cluster_role_binding(
{
'apiVersion': 'v1',
'kind': 'ClusterRoleBinding',
'metadata': {
'name': name,
},
'roleRef': {
'name': role,
},
'userNames': [user_name]
}
)
except ApiException as e:
raise e
def get_asb_route():
asb_route = None
route_list = None
suffix = None
possible_namespaces = ["ansible-service-broker", "openshift-ansible-service-broker",
"openshift-automation-service-broker"]
for namespace in possible_namespaces:
try:
openshift_config.load_kube_config()
oapi = openshift_client.OapiApi()
route_list = oapi.list_namespaced_route(namespace)
if route_list.items != []:
suffix = namespace
break
except ApiException as e:
print("Didn't find OpenShift Automation Broker route in namespace: %s.\
Reason: [%s]. Trying alternative namespaces." % namespace, e.reason)
if route_list.items == []:
print("No routes found in broker namespaces.")
return None
for route in route_list.items:
if 'asb' in route.metadata.name and 'etcd' not in route.metadata.name:
asb_route = route.spec.host