Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_create_jks_success(self):
set_module_args(dict(
certificate='cert-foo',
private_key='private-foo',
dest='/path/to/keystore.jks',
name='foo',
password='changeit'
))
module = AnsibleModule(
argument_spec=self.spec.argument_spec,
supports_check_mode=self.spec.supports_check_mode
)
module.exit_json = Mock()
with patch('os.remove', return_value=True):
self.run_commands.side_effect = lambda args, kwargs: (0, '', '')
create_jks(module, "test", "openssl", "keytool", "/path/to/keystore.jks", "changeit")
module.exit_json.assert_called_once_with(
changed=True,
cmd="keytool -importkeystore "
"-destkeystore '/path/to/keystore.jks' "
"-srckeystore '/tmp/keystore.p12' -srcstoretype pkcs12 -alias 'test' "
"-deststorepass 'changeit' -srcstorepass 'changeit' -noprompt",
msg='',
def main():
argument_spec = openstack_full_argument_spec(
name=dict(required=False, default=None),
ram=dict(required=False, default=None),
vcpus=dict(required=False, default=None),
limit=dict(required=False, default=None, type='int'),
ephemeral=dict(required=False, default=None),
)
module_kwargs = openstack_module_kwargs(
mutually_exclusive=[
['name', 'ram'],
['name', 'vcpus'],
['name', 'ephemeral']
]
)
module = AnsibleModule(argument_spec, **module_kwargs)
is_old_facts = module._name == 'os_flavor_facts'
if is_old_facts:
module.deprecate("The 'os_flavor_facts' module has been renamed to 'os_flavor_info', "
"and the renamed one no longer returns ansible_facts", version='2.13')
name = module.params['name']
vcpus = module.params['vcpus']
ram = module.params['ram']
ephemeral = module.params['ephemeral']
limit = module.params['limit']
filters = {}
if vcpus:
filters['vcpus'] = vcpus
if ram:
filters['ram'] = ram
"name": {"required": True, "type": "str"},
"negotiate_ntlm": {"required": False, "type": "str",
"choices": ["enable", "disable"]},
"require_tfa": {"required": False, "type": "str",
"choices": ["enable", "disable"]},
"ssh_ca": {"required": False, "type": "str"},
"user_database": {"required": False, "type": "list",
"options": {
"name": {"required": True, "type": "str"}
}}
}
}
}
module = AnsibleModule(argument_spec=fields,
supports_check_mode=False)
# legacy_mode refers to using fortiosapi instead of HTTPAPI
legacy_mode = 'host' in module.params and module.params['host'] is not None and \
'username' in module.params and module.params['username'] is not None and \
'password' in module.params and module.params['password'] is not None
if not legacy_mode:
if module._socket_path:
connection = Connection(module._socket_path)
fos = FortiOSHandler(connection)
is_error, has_changed, result = fortios_authentication(module.params, fos)
else:
module.fail_json(**FAIL_SOCKET_MSG)
else:
def main():
module = AnsibleModule(argument_spec=dict(
source_volume_id=dict(type='str'),
destination_volume_id=dict(type='str'),
copy_priority=dict(required=False, default=0, type='int'),
ssid=dict(required=True, type='str'),
api_url=dict(required=True),
api_username=dict(required=False),
api_password=dict(required=False, no_log=True),
validate_certs=dict(required=False, default=True),
targetWriteProtected=dict(required=False, default=True, type='bool'),
onlineCopy=dict(required=False, default=False, type='bool'),
volume_copy_pair_id=dict(type='str'),
status=dict(required=True, choices=['present', 'absent'], type='str'),
create_copy_pair_if_does_not_exist=dict(required=False, default=True, type='bool'),
start_stop_copy=dict(required=False, choices=['start', 'stop'], type='str'),
search_volume_id=dict(type='str'),
),
def main():
fields = {
"app": {"required": True, "type": "str"},
"state": {
"required": False,
"default": "present",
"choices": ["present", "absent"],
"type": "str",
},
}
choice_map = {
"present": dokku_app_present,
"absent": dokku_app_absent,
}
module = AnsibleModule(argument_spec=fields, supports_check_mode=False)
is_error, has_changed, result = choice_map.get(module.params["state"])(
module.params
)
if is_error:
module.fail_json(msg=result["error"], meta=result)
module.exit_json(changed=has_changed, meta=result)
def main():
argument_spec = ecs_argument_spec()
argument_spec.update(dict(
name_prefix=dict(type='str'))
)
module = AnsibleModule(argument_spec=argument_spec)
if HAS_FOOTMARK is False:
module.fail_json(msg="Package 'footmark' required for this module.")
name_prefix = module.params['name_prefix']
try:
groups = []
for group in ram_connect(module).list_groups():
if name_prefix and not group.name.startswith(name_prefix):
continue
groups.append(group.read())
module.exit_json(changed=False, groups=groups)
except Exception as e:
module.fail_json(msg=str("Unable to list groups, error:{0}".format(e)))
def main():
module = AnsibleModule(
argument_spec=dict(
path=dict(type='path', required=True),
select_crypto_backend=dict(type='str', default='auto', choices=['auto', 'cryptography', 'pyopenssl']),
),
supports_check_mode=True,
)
try:
base_dir = os.path.dirname(module.params['path']) or '.'
if not os.path.isdir(base_dir):
module.fail_json(
name=base_dir,
msg='The directory %s does not exist or the file is not a directory' % base_dir
)
backend = module.params['select_crypto_backend']
def run_module():
module = AnsibleModule(
argument_spec=dict(
state=dict(default="present", choices=['present', 'absent']),
resource=dict(required=True),
node_name=dict(required=True),
score=dict(required=False, default="INFINITY"),
cib_file=dict(required=False),
),
supports_check_mode=True
)
state = module.params['state']
resource = module.params['resource']
node_name = module.params['node_name']
score = module.params['score']
cib_file = module.params['cib_file']
def main():
"""
Module usage
"""
module = AnsibleModule(
argument_spec=dict(
# resource managed, more to come (acl,broker)
resource=dict(choices=['topic', 'acl'], default='topic'),
# resource name
name=dict(type='str', required=True),
partitions=dict(type='int', required=False, default=0),
replica_factor=dict(type='int', required=False, default=0),
acl_resource_type=dict(choices=['topic', 'broker',
'delegation_token', 'group',
'transactional_id'],
default='topic'),
def main():
argument_spec = vultr_argument_spec()
argument_spec.update(dict(
group=dict(required=True),
start_port=dict(type='int', aliases=['port']),
end_port=dict(type='int'),
protocol=dict(choices=['tcp', 'udp', 'gre', 'icmp'], default='tcp'),
cidr=dict(),
ip_version=dict(choices=['v4', 'v6'], default='v4', aliases=['ip_type']),
state=dict(choices=['present', 'absent'], default='present'),
))
module = AnsibleModule(
argument_spec=argument_spec,
supports_check_mode=True,
)
vultr_firewall_rule = AnsibleVultrFirewallRule(module)
if module.params.get('state') == "absent":
firewall_rule = vultr_firewall_rule.absent_firewall_rule()
else:
firewall_rule = vultr_firewall_rule.present_firewall_rule()
result = vultr_firewall_rule.get_result(firewall_rule)
module.exit_json(**result)