How to use the awxkit.awxkit.api.pages.page.PageList function in awxkit

To help you get started, we’ve selected a few awxkit examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github ansible / awx / awxkit / awxkit / api / pages / notification_templates.py View on Github external
if disassociate:
            payload['disassociate'] = True

        with suppress(exc.NoContent):
            getattr(resource.related, result_attr).post(payload)


page.register_page([resources.notification_template,
                    (resources.notification_templates, 'post'),
                    (resources.notification_template_copy, 'post'),
                    resources.notification_template_any,
                    resources.notification_template_error,
                    resources.notification_template_success], NotificationTemplate)


class NotificationTemplates(page.PageList, NotificationTemplate):

    pass


page.register_page([resources.notification_templates,
                    resources.notification_templates_any,
                    resources.notification_templates_error,
                    resources.notification_templates_success],
                   NotificationTemplates)


class NotificationTemplateTest(base.Base):

    pass
github ansible / awx / awxkit / awxkit / api / pages / workflow_job_templates.py View on Github external
payload = self.create_payload(name=name, description=description, organization=organization, **kwargs)
        return self.update_identity(WorkflowJobTemplates(self.connection).post(payload))

    def add_label(self, label):
        if isinstance(label, page.Page):
            label = label.json
        with suppress(exc.NoContent):
            self.related.labels.post(label)


page.register_page([resources.workflow_job_template,
                    (resources.workflow_job_templates, 'post'),
                    (resources.workflow_job_template_copy, 'post')], WorkflowJobTemplate)


class WorkflowJobTemplates(page.PageList, WorkflowJobTemplate):

    pass


page.register_page([resources.workflow_job_templates], WorkflowJobTemplates)


class WorkflowJobTemplateLaunch(base.Base):

    pass


page.register_page(resources.workflow_job_template_launch, WorkflowJobTemplateLaunch)


class WorkflowJobTemplateCopy(base.Base):
github ansible / awx / awxkit / awxkit / api / pages / job_templates.py View on Github external
self.related.credentials.post(
                dict(id=credential.id, disassociate=True))

    def remove_all_credentials(self):
        for cred in self.related.credentials.get().results:
            with suppress(exc.NoContent):
                self.related.credentials.post(
                    dict(id=cred.id, disassociate=True))


page.register_page([resources.job_template,
                    (resources.job_templates, 'post'),
                    (resources.job_template_copy, 'post')], JobTemplate)


class JobTemplates(page.PageList, JobTemplate):

    pass


page.register_page([resources.job_templates,
                    resources.related_job_templates], JobTemplates)


class JobTemplateCallback(base.Base):

    pass


page.register_page(resources.job_template_callback, JobTemplateCallback)
github ansible / awx / awxkit / awxkit / api / pages / workflow_job_nodes.py View on Github external
poll_until(self.job_exists, interval=interval, timeout=adjusted_timeout, **kw)

        return self

    def job_exists(self):
        self.get()
        try:
            return self.job
        except AttributeError:
            return False


page.register_page(resources.workflow_job_node, WorkflowJobNode)


class WorkflowJobNodes(page.PageList, WorkflowJobNode):

    pass


page.register_page([resources.workflow_job_nodes,
                    resources.workflow_job_workflow_nodes,
                    resources.workflow_job_node_always_nodes,
                    resources.workflow_job_node_failure_nodes,
                    resources.workflow_job_node_success_nodes], WorkflowJobNodes)
github ansible / awx / awxkit / awxkit / api / pages / credentials.py View on Github external
payload = self.payload(kind=kind, **kwargs)
        payload.ds = DSAdapter(self.__class__.__name__, self._dependency_store)
        return payload

    def create(self, kind='cloud', **kwargs):
        payload = self.create_payload(kind=kind, **kwargs)
        return self.update_identity(
            CredentialTypes(
                self.connection).post(payload))


page.register_page([resources.credential_type,
                    (resources.credential_types, 'post')], CredentialType)


class CredentialTypes(page.PageList, CredentialType):

    pass


page.register_page(resources.credential_types, CredentialTypes)


class Credential(HasCopy, HasCreate, base.Base):

    dependencies = [CredentialType]
    optional_dependencies = [Organization, User, Team]

    def payload(
            self,
            credential_type,
            user=None,
github ansible / awx / awxkit / awxkit / api / pages / credentials.py View on Github external
'ssh_key_unlock',
                'vault_password'):
            if getattr(self.inputs, field, None) == 'ASK':
                if field == 'password':
                    passwords.append('ssh_password')
                else:
                    passwords.append(field)
        return passwords


page.register_page([resources.credential,
                    (resources.credentials, 'post'),
                    (resources.credential_copy, 'post')], Credential)


class Credentials(page.PageList, Credential):

    pass


page.register_page([resources.credentials,
                    resources.related_credentials,
                    resources.job_extra_credentials,
                    resources.job_template_extra_credentials],
                   Credentials)
github ansible / awx / awxkit / awxkit / api / pages / workflow_job_template_nodes.py View on Github external
self,
        **kwargs
    ):
        if 'name' not in kwargs:
            kwargs['name'] = 'approval node {}'.format(random_title())
        self.related.create_approval_template.post(kwargs)
        return self.get()


page.register_page([resources.workflow_job_template_node,
                    (resources.workflow_job_template_nodes,
                     'post')],
                   WorkflowJobTemplateNode)


class WorkflowJobTemplateNodes(page.PageList, WorkflowJobTemplateNode):

    pass


page.register_page([resources.workflow_job_template_nodes,
                    resources.workflow_job_template_workflow_nodes,
                    resources.workflow_job_template_node_always_nodes,
                    resources.workflow_job_template_node_failure_nodes,
                    resources.workflow_job_template_node_success_nodes],
                   WorkflowJobTemplateNodes)
github ansible / awx / awxkit / awxkit / api / pages / unified_jobs.py View on Github external
if not self.json.job_args:
            return ""
        for arg in yaml.safe_load(self.json.job_args):
            try:
                args.append(yaml.safe_load(arg))
            except (yaml.parser.ParserError, yaml.scanner.ScannerError):
                if arg[0] == '@':  # extra var file reference
                    args.append(attempt_yaml_load(arg))
                elif args[-1] == '-c':  # this arg is likely sh arg string
                    args.extend([attempt_yaml_load(item) for item in args_string_to_list(arg)])
                else:
                    raise
        return args


class UnifiedJobs(page.PageList, UnifiedJob):

    pass


page.register_page([resources.unified_jobs,
                    resources.instance_related_jobs,
                    resources.instance_group_related_jobs,
                    resources.schedules_jobs], UnifiedJobs)
github ansible / awx / awxkit / awxkit / api / pages / users.py View on Github external
username=username, password=password, **kwargs)
        self.password = payload.password

        self.update_identity(Users(self.connection).post(payload))

        if organization:
            organization.add_user(self)

        return self


page.register_page([resources.user,
                    (resources.users, 'post')], User)


class Users(page.PageList, User):

    pass


page.register_page([resources.users,
                    resources.organization_admins,
                    resources.related_users,
                    resources.user_admin_organizations], Users)


class Me(Users):

    pass


page.register_page(resources.me, Me)
github ansible / awx / awxkit / awxkit / api / pages / page.py View on Github external
def __item_class__(self):
        """Returns the class representing a single 'Page' item
        With an inheritence of OrgListSubClass -> OrgList -> PageList -> Org -> Base -> Page, the following
        will return the parent class of the current object (e.g. 'Org').

        Obtaining a page type by registered endpoint is highly recommended over using this method.
        """
        mro = inspect.getmro(self.__class__)
        bl_index = mro.index(PageList)
        return mro[bl_index + 1]