Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'workflow_team',
'2',
'{}')
self.assertIsInstance(credential_with_type_id, GenericCredential)
self.assertIsNone(self.tower.create_credential_in_organization_with_type_id('workflow',
'CredName2',
'CredDescription',
'workflow_admin',
'workflow_team',
'2',
'{}'))
with self.assertRaises(InvalidOrganization):
self.tower.delete_organization_credential_by_name('workflowBroken',
'CredName',
'Source Control')
with self.assertRaises(InvalidCredentialType):
self.tower.delete_organization_credential_by_name('workflow',
'CredName',
'Source ControlBroken')
with self.assertRaises(InvalidCredential):
self.tower.delete_organization_credential_by_name('workflow',
'CredNameBroken',
'Source Control')
self.assertTrue(self.tower.delete_organization_credential_by_name('workflow',
'CredName',
'Source Control'))
with self.assertRaises(InvalidOrganization):
self.tower.delete_organization_credential_by_name_with_type_id('workflowBroken',
'CredName2',
'2')
with self.assertRaises(InvalidCredential):
self.tower.delete_organization_credential_by_name_with_type_id('workflow',
def test_credential_type_lifecycle(self):
with self.recorder:
credential = self.tower.create_credential_type('Test Credential Type',
'This is the description',
'net',
'{}',
'{}')
self.assertIsInstance(credential, CredentialType)
duplicate_credential = self.tower.create_credential_type('Test Credential Type',
'This is the description',
'net',
'{}',
'{}')
self.assertIsNone(duplicate_credential)
self.assertTrue(self.tower.delete_credential_type('Test Credential Type'))
with self.assertRaises(InvalidCredentialType):
self.tower.create_credential_type('Test Credential Type',
'This is the description',
'garbage',
'{}',
'{}')
with self.assertRaises(InvalidVariables):
self.tower.create_credential_type('Test Credential Type',
'This is the description',
'net',
'agadgffdsagsdffg',
'{}')
with self.assertRaises(InvalidVariables):
self.tower.create_credential_type('Test Credential Type',
'This is the description',
'net',
'{}',
InvalidCredentialType: The credential type provided as argument does not exist.
InvalidVariables: The inputs provided as argument is not valid json.
"""
organization_ = self.get_organization_by_name(organization)
if not organization_:
raise InvalidOrganization(organization)
user_ = self.get_user_by_username(user)
if not user_:
raise InvalidUser(user)
team_ = organization_.get_team_by_name(team)
if not team_:
raise InvalidTeam(team)
credential_type_ = self.get_credential_type_by_name(credential_type)
if not credential_type_:
raise InvalidCredentialType(credential_type)
if not validate_json(inputs_):
raise InvalidVariables(inputs_)
return self.create_credential_with_credential_type_id(name,
credential_type_.id,
description=description,
user_id=user_.id,
team_id=team_.id,
organization_id=organization_.id,
inputs=inputs_
)
def delete_credential_type(self, name):
"""Deletes a credential_type from tower.
Args:
name: The name of the credential_type to delete.
Returns:
bool: True on success, False otherwise.
Raises:
InvalidCredential: The credential provided as argument does not exist.
"""
credential_type = self.get_credential_type_by_name(name)
if not credential_type:
raise InvalidCredentialType(name)
return credential_type.delete()
organization: The organization that owns the credential.
name: The name of the credential(s) to retrieve.
credential_type: The type of the credential.
Returns:
Credential: A credential if found else None.
Raises:
InvalidCredentialType: The CredentialType given was not found.
InvalidOrganization: The Organization given was not found.
"""
# return self.credentials.filter({'name__iexact': name})
credential_type_ = self.get_credential_type_by_name(credential_type)
if not credential_type_:
raise InvalidCredentialType(credential_type)
organization_ = self.get_organization_by_name(organization)
if not organization_:
raise InvalidOrganization(organization)
return next(self.credentials.filter({'organization': organization_.id,
'name__iexact': name,
'credential_type': credential_type_.id}), None)
organization: The organization that owns the credential.
name: The name of the credential(s) to delete.
credential_type: The type of the credential.
Returns:
bool: True on success, False otherwise.
Raises:
InvalidCredentialType: The CredentialType given was not found.
InvalidOrganization: The Organization given was not found.
InvalidCredential: The credential was not found.
"""
credential_type_ = self.get_credential_type_by_name(credential_type)
if not credential_type_:
raise InvalidCredentialType(credential_type)
organization_ = self.get_organization_by_name(organization)
if not organization_:
raise InvalidOrganization(organization)
credential = next(self.credentials.filter({'organization': organization_.id,
'name__iexact': name,
'credential_type': credential_type_.id}), None)
if not credential:
raise InvalidCredential(name)
return credential.delete()
def credential_type(self, value):
"""Set the credential_type of the credential.
Returns:
None.
"""
credential_type = self._tower.get_credential_type_by_name(value)
if not credential_type:
raise InvalidCredentialType(value)
self._update_values('credential_type', credential_type.id)
# This is to 'use' the module(s), so lint doesn't complain
assert __version__
# assert exceptions
assert AuthFailed
assert InvalidUserLevel
assert InvalidOrganization
assert InvalidVariables
assert InvalidInventory
assert InvalidUser
assert InvalidTeam
assert InvalidCredential
assert InvalidGroup
assert InvalidHost
assert InvalidProject
assert InvalidCredentialType
assert InvalidPlaybook
assert InvalidInstanceGroup
assert InvalidJobType
assert InvalidVerbosity
assert InvalidJobTemplate
assert PermissionNotFound
assert InvalidValue
assert InvalidRole
# assert objects
assert Tower
assert Organization
assert User
assert Role
assert Team
assert Project
"""Retrieves credential matching a certain name.
Args:
name: The name of the credential to retrieve.
credential_type: The type of credential.
Returns:
Credential: A credential if found else none.
Raises:
InvalidCredentialType: The credential type given as a parameter was not found.
"""
credential_type_ = self._tower.get_credential_type_by_name(credential_type)
if not credential_type_:
raise InvalidCredentialType(name)
return next(self.credentials.filter({'organization': self.id,
'name__iexact': name,
'credential_type': credential_type_.id}), None)
name: The name of the credential type.
description: The description of the credential type.
type_: The kind of credential type.Valid values (u'scm', u'ssh', u'vault', u'net', u'cloud', u'insights').
inputs_ (str): A json of the inputs to set to the credential type.
injectors (str): A json of the injectors to set to the credential type.
Returns:
CredentialType on success, None otherwise.
Raises:
InvalidCredentialTypeKind: The credential type kind provided as argument does not exist.
InvalidVariables: The inputs or injectors provided as argument is not valid json.
"""
if type_.lower() not in VALID_CREDENTIAL_TYPES:
raise InvalidCredentialType(type_)
payload = {'name': name,
'description': description,
'kind': type_.lower()}
if not validate_json(inputs_):
raise InvalidVariables(inputs_)
if not validate_json(injectors):
raise InvalidVariables(injectors)
payload['inputs'] = json.loads(inputs_)
payload['injectors'] = json.loads(injectors)
url = '{api}/credential_types/'.format(api=self.api)
response = self.session.post(url, json=payload)
if not response.ok:
self._logger.error('Error creating credential type "%s", response was: "%s"', type_, response.text)
return CredentialType(self, response.json()) if response.ok else None