Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if disassociate:
payload['disassociate'] = True
with suppress(exc.NoContent):
getattr(resource.related, result_attr).post(payload)
page.register_page([resources.notification_template,
(resources.notification_templates, 'post'),
(resources.notification_template_copy, 'post'),
resources.notification_template_any,
resources.notification_template_error,
resources.notification_template_success], NotificationTemplate)
class NotificationTemplates(page.PageList, NotificationTemplate):
pass
page.register_page([resources.notification_templates,
resources.notification_templates_any,
resources.notification_templates_error,
resources.notification_templates_success],
NotificationTemplates)
class NotificationTemplateTest(base.Base):
pass
payload = self.create_payload(name=name, description=description, organization=organization, **kwargs)
return self.update_identity(WorkflowJobTemplates(self.connection).post(payload))
def add_label(self, label):
if isinstance(label, page.Page):
label = label.json
with suppress(exc.NoContent):
self.related.labels.post(label)
page.register_page([resources.workflow_job_template,
(resources.workflow_job_templates, 'post'),
(resources.workflow_job_template_copy, 'post')], WorkflowJobTemplate)
class WorkflowJobTemplates(page.PageList, WorkflowJobTemplate):
pass
page.register_page([resources.workflow_job_templates], WorkflowJobTemplates)
class WorkflowJobTemplateLaunch(base.Base):
pass
page.register_page(resources.workflow_job_template_launch, WorkflowJobTemplateLaunch)
class WorkflowJobTemplateCopy(base.Base):
self.related.credentials.post(
dict(id=credential.id, disassociate=True))
def remove_all_credentials(self):
for cred in self.related.credentials.get().results:
with suppress(exc.NoContent):
self.related.credentials.post(
dict(id=cred.id, disassociate=True))
page.register_page([resources.job_template,
(resources.job_templates, 'post'),
(resources.job_template_copy, 'post')], JobTemplate)
class JobTemplates(page.PageList, JobTemplate):
pass
page.register_page([resources.job_templates,
resources.related_job_templates], JobTemplates)
class JobTemplateCallback(base.Base):
pass
page.register_page(resources.job_template_callback, JobTemplateCallback)
poll_until(self.job_exists, interval=interval, timeout=adjusted_timeout, **kw)
return self
def job_exists(self):
self.get()
try:
return self.job
except AttributeError:
return False
page.register_page(resources.workflow_job_node, WorkflowJobNode)
class WorkflowJobNodes(page.PageList, WorkflowJobNode):
pass
page.register_page([resources.workflow_job_nodes,
resources.workflow_job_workflow_nodes,
resources.workflow_job_node_always_nodes,
resources.workflow_job_node_failure_nodes,
resources.workflow_job_node_success_nodes], WorkflowJobNodes)
payload = self.payload(kind=kind, **kwargs)
payload.ds = DSAdapter(self.__class__.__name__, self._dependency_store)
return payload
def create(self, kind='cloud', **kwargs):
payload = self.create_payload(kind=kind, **kwargs)
return self.update_identity(
CredentialTypes(
self.connection).post(payload))
page.register_page([resources.credential_type,
(resources.credential_types, 'post')], CredentialType)
class CredentialTypes(page.PageList, CredentialType):
pass
page.register_page(resources.credential_types, CredentialTypes)
class Credential(HasCopy, HasCreate, base.Base):
dependencies = [CredentialType]
optional_dependencies = [Organization, User, Team]
def payload(
self,
credential_type,
user=None,
'ssh_key_unlock',
'vault_password'):
if getattr(self.inputs, field, None) == 'ASK':
if field == 'password':
passwords.append('ssh_password')
else:
passwords.append(field)
return passwords
page.register_page([resources.credential,
(resources.credentials, 'post'),
(resources.credential_copy, 'post')], Credential)
class Credentials(page.PageList, Credential):
pass
page.register_page([resources.credentials,
resources.related_credentials,
resources.job_extra_credentials,
resources.job_template_extra_credentials],
Credentials)
self,
**kwargs
):
if 'name' not in kwargs:
kwargs['name'] = 'approval node {}'.format(random_title())
self.related.create_approval_template.post(kwargs)
return self.get()
page.register_page([resources.workflow_job_template_node,
(resources.workflow_job_template_nodes,
'post')],
WorkflowJobTemplateNode)
class WorkflowJobTemplateNodes(page.PageList, WorkflowJobTemplateNode):
pass
page.register_page([resources.workflow_job_template_nodes,
resources.workflow_job_template_workflow_nodes,
resources.workflow_job_template_node_always_nodes,
resources.workflow_job_template_node_failure_nodes,
resources.workflow_job_template_node_success_nodes],
WorkflowJobTemplateNodes)
if not self.json.job_args:
return ""
for arg in yaml.safe_load(self.json.job_args):
try:
args.append(yaml.safe_load(arg))
except (yaml.parser.ParserError, yaml.scanner.ScannerError):
if arg[0] == '@': # extra var file reference
args.append(attempt_yaml_load(arg))
elif args[-1] == '-c': # this arg is likely sh arg string
args.extend([attempt_yaml_load(item) for item in args_string_to_list(arg)])
else:
raise
return args
class UnifiedJobs(page.PageList, UnifiedJob):
pass
page.register_page([resources.unified_jobs,
resources.instance_related_jobs,
resources.instance_group_related_jobs,
resources.schedules_jobs], UnifiedJobs)
username=username, password=password, **kwargs)
self.password = payload.password
self.update_identity(Users(self.connection).post(payload))
if organization:
organization.add_user(self)
return self
page.register_page([resources.user,
(resources.users, 'post')], User)
class Users(page.PageList, User):
pass
page.register_page([resources.users,
resources.organization_admins,
resources.related_users,
resources.user_admin_organizations], Users)
class Me(Users):
pass
page.register_page(resources.me, Me)
def __item_class__(self):
"""Returns the class representing a single 'Page' item
With an inheritence of OrgListSubClass -> OrgList -> PageList -> Org -> Base -> Page, the following
will return the parent class of the current object (e.g. 'Org').
Obtaining a page type by registered endpoint is highly recommended over using this method.
"""
mro = inspect.getmro(self.__class__)
bl_index = mro.index(PageList)
return mro[bl_index + 1]