Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def add_notification_template(self, notification_template, endpoint="notification_templates_success"):
from awxkit.api.pages.workflow_job_templates import WorkflowJobTemplate
supported_endpoints = wfjt_notification_endpoints if isinstance(self, WorkflowJobTemplate) \
else notification_endpoints
if endpoint not in supported_endpoints:
raise ValueError('Unsupported notification endpoint "{0}". Please use one of {1}.'
.format(endpoint, notification_endpoints))
with suppress(exc.NoContent):
self.related[endpoint].post(dict(id=notification_template.id))
if job_result not in job_results:
raise ValueError(
'Unsupported job_result type "{0}". Please use one of {1}.' .format(
job_result, job_results))
result_attr = 'notification_templates_{0}'.format(job_result)
if result_attr not in resource.related:
raise ValueError(
'Unsupported resource "{0}". Does not have a related {1} field.' .format(
resource, result_attr))
payload = dict(id=self.id)
if disassociate:
payload['disassociate'] = True
with suppress(exc.NoContent):
getattr(resource.related, result_attr).post(payload)
def add_label(self, label):
if isinstance(label, page.Page):
label = label.json
with suppress(exc.NoContent):
self.related.labels.post(label)
def remove_notification_template(self, notification_template, endpoint="notification_templates_success"):
from awxkit.api.pages.workflow_job_templates import WorkflowJobTemplate
supported_endpoints = wfjt_notification_endpoints if isinstance(self, WorkflowJobTemplate) \
else notification_endpoints
if endpoint not in supported_endpoints:
raise ValueError('Unsupported notification endpoint "{0}". Please use one of {1}.'
.format(endpoint, notification_endpoints))
with suppress(exc.NoContent):
self.related[endpoint].post(dict(id=notification_template.id, disassociate=notification_template.id))
def add_credential(self, credential):
with suppress(exc.NoContent):
self.related.credentials.post(
dict(id=credential.id, associate=True))
def add_label(self, label):
if isinstance(label, page.Page):
label = label.json
with suppress(exc.NoContent):
self.related.labels.post(label)
target = '/api/v2/{}/{}'.format(resource, value)
detail = self.page.__class__(
target,
self.page.connection
).get()
object_roles = detail['summary_fields']['object_roles']
actual_role = object_roles[role + '_role']
params = {'id': actual_role['id']}
if self.action == 'grant':
params['associate'] = True
if self.action == 'revoke':
params['disassociate'] = True
try:
self.page.get().related.roles.post(params)
except NoContent:
# we expect to enter this block because these endpoints return
# HTTP 204 on success
pass
def delete(self):
r = self.connection.delete(self.endpoint)
with suppress(exc.NoContent):
return self.page_identity(r)
def add_extra_credential(self, credential):
with suppress(exc.NoContent):
self.related.extra_credentials.post(
dict(id=credential.id, associate=True))
"""If a payload is supplied, PUT the payload. If not, submit our existing page JSON as our payload."""
json = self.json if json is None else json
r = self.connection.put(self.endpoint, json=json)
return self.page_identity(r, request_json=json)
def get_related(self, related_name, **kwargs):
assert related_name in self.json.get('related', [])
endpoint = self.json['related'][related_name]
return self.walk(endpoint, **kwargs)
def walk(self, endpoint, **kw):
page_cls = get_registered_page(endpoint)
return page_cls(self.connection, endpoint=endpoint).get(**kw)
_exception_map = {http.NO_CONTENT: exc.NoContent,
http.NOT_FOUND: exc.NotFound,
http.INTERNAL_SERVER_ERROR: exc.InternalServerError,
http.BAD_GATEWAY: exc.BadGateway,
http.METHOD_NOT_ALLOWED: exc.MethodNotAllowed,
http.UNAUTHORIZED: exc.Unauthorized,
http.PAYMENT_REQUIRED: exc.PaymentRequired,
http.CONFLICT: exc.Conflict}
def exception_from_status_code(status_code):
return _exception_map.get(status_code, None)
class PageList(object):
@property