Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
injectors (str): A json of the injectors to set to the credential type.
Returns:
CredentialType on success, None otherwise.
Raises:
InvalidCredentialTypeKind: The credential type kind provided as argument does not exist.
InvalidVariables: The inputs or injectors provided as argument is not valid json.
"""
if type_.lower() not in VALID_CREDENTIAL_TYPES:
raise InvalidCredentialType(type_)
payload = {'name': name,
'description': description,
'kind': type_.lower()}
if not validate_json(inputs_):
raise InvalidVariables(inputs_)
if not validate_json(injectors):
raise InvalidVariables(injectors)
payload['inputs'] = json.loads(inputs_)
payload['injectors'] = json.loads(injectors)
url = '{api}/credential_types/'.format(api=self.api)
response = self.session.post(url, json=payload)
if not response.ok:
self._logger.error('Error creating credential type "%s", response was: "%s"', type_, response.text)
return CredentialType(self, response.json()) if response.ok else None
Returns:
CredentialType on success, None otherwise.
Raises:
InvalidCredentialTypeKind: The credential type kind provided as argument does not exist.
InvalidVariables: The inputs or injectors provided as argument is not valid json.
"""
if type_.lower() not in VALID_CREDENTIAL_TYPES:
raise InvalidCredentialType(type_)
payload = {'name': name,
'description': description,
'kind': type_.lower()}
if not validate_json(inputs_):
raise InvalidVariables(inputs_)
if not validate_json(injectors):
raise InvalidVariables(injectors)
payload['inputs'] = json.loads(inputs_)
payload['injectors'] = json.loads(injectors)
url = '{api}/credential_types/'.format(api=self.api)
response = self.session.post(url, json=payload)
if not response.ok:
self._logger.error('Error creating credential type "%s", response was: "%s"', type_, response.text)
return CredentialType(self, response.json()) if response.ok else None
def variables(self, value):
"""Update the variables of the team.
Returns:
None:
"""
if validate_json(value):
self._update_values('variables', value)
else:
raise InvalidValue('Value is not valid json received: {value}'.format(value=value))
organization_ = self.get_organization_by_name(organization)
if not organization_:
raise InvalidOrganization(organization)
user_ = self.get_user_by_username(user)
if not user_:
raise InvalidUser(user)
team_ = organization_.get_team_by_name(team)
if not team_:
raise InvalidTeam(team)
payload = {'name': name,
'description': description,
'organization': organization_.id,
'user': user_.id,
'team': team_.id,
'credential_type': credential_type_id}
if not validate_json(inputs_):
raise InvalidVariables(inputs_)
payload['inputs'] = json.loads(inputs_)
url = '{api}/credentials/'.format(api=self.api)
response = self.session.post(url, json=payload)
if not response.ok:
self._logger.error('Error creating credential "%s", response was: "%s"', name, response.text)
return Credential(self, response.json()) if response.ok else None
def variables(self, value):
"""Update the variables of the group.
Returns:
None:
"""
conditions = [validate_json(value)]
if all(conditions):
self._update_values('variables', value)
else:
raise InvalidValue('{value} is not valid json.'.format(value=value))
def variables(self, value):
"""Update the variables on the host.
Returns:
None:
"""
conditions = [validate_json(value)]
if all(conditions):
self._update_values('variables', value)
else:
raise InvalidValue(
'{value} is not valid json.'.format(value=value))
def create_group(self, name, description, variables='{}'):
"""Creates a group.
Args:
name: The name of the group to create.
description: The description of the group.
variables: A json with the variables that will be set on the created group.
Returns:
Group: The created group is successful, None otherwise.
Raises:
InvalidVariables: The variables provided as argument is not valid json.
"""
if not validate_json(variables):
raise InvalidVariables(variables)
url = '{api}/groups/'.format(api=self._tower.api)
payload = {'name': name,
'description': description,
'inventory': self.id,
'variables': variables}
response = self._tower.session.post(url, json=payload)
if not response.ok:
self._logger.error('Error creating group "%s", response was "%s"', name, response.text)
return Group(self._tower, response.json()) if response.ok else None
InvalidVariables: The inputs provided as argument is not valid json.
"""
organization_ = self.get_organization_by_name(organization)
if not organization_:
raise InvalidOrganization(organization)
user_ = self.get_user_by_username(user)
if not user_:
raise InvalidUser(user)
team_ = organization_.get_team_by_name(team)
if not team_:
raise InvalidTeam(team)
credential_type_ = self.get_credential_type_by_name(credential_type)
if not credential_type_:
raise InvalidCredentialType(credential_type)
if not validate_json(inputs_):
raise InvalidVariables(inputs_)
return self.create_credential_with_credential_type_id(name,
credential_type_.id,
description=description,
user_id=user_.id,
team_id=team_.id,
organization_id=organization_.id,
inputs=inputs_
)
def create_host(self, name, description, variables='{}'):
"""Creates a host.
Args:
name: The name of the host to create.
description: The description of the host.
variables: A json with the variables that will be set on the created host.
Returns:
Host: The created host is successful, None otherwise.
Raises:
InvalidVariables: The variables provided as argument is not valid json.
"""
if not validate_json(variables):
raise InvalidVariables(variables)
url = '{api}/hosts/'.format(api=self._tower.api)
payload = {'name': name,
'description': description,
'inventory': self.id,
'enabled': True,
'instance_id': '',
'variables': variables}
response = self._tower.session.post(url, json=payload)
if not response.ok:
self._logger.error('Error creating host "%s", response was "%s"', name, response.text)
return Host(self._tower, response.json()) if response.ok else None
def create_inventory(self, name, description, variables='{}'):
"""Creates an inventory.
Args:
name: The name of the inventory to create.
description: The description of the inventory.
variables: A json with the initial variables set on the inventory.
Returns:
Inventory: The created Inventory object on success, None otherwise.
Raises:
InvalidVariables: The variables provided as argument is not valid json.
"""
if not validate_json(variables):
raise InvalidVariables(variables)
payload = {'name': name,
'description': description,
'organization': self.id,
'variables': variables}
url = '{api}/inventories/'.format(api=self._tower.api)
response = self._tower.session.post(url, json=payload)
if not response.ok:
self._logger.error('Error creating inventory "%s", response was "%s"', name, response.text)
return Inventory(self._tower, response.json()) if response.ok else None