Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@then(u'there exists one {name} in {field_name}')
def step_impl(context, name, field_name):
if not StepDecider(context).should_run_test():
return
path = field_name.split('.')
filter_type = path.pop(0)
resources = get_resources(context.response.json(), filter_type)
for res in resources:
found = utils.traverse(res, path)
assert found is not None, utils.bad_response_assert_with_resource(response=context.response,
message=ERROR_REQUIRED,
name=name,
resource=res)
@then('I delete from Cassandra table "{tablename}" where "{criterion}", using "{conn_name}"')
@util.obtain_values
def then_i_delete_from_cassandra_table(ctx, tablename, conn_name, criterion=None):
if not criterion:
criterion = ''
insert = "DELETE FROM %s %s" % (tablename, criterion)
ctx.zato.user_ctx[conn_name].execute(insert)
@then("the asset info should match the expected asset info")
def asset_info_match(context):
for k in context.expected_asset_info:
assert (context.expected_asset_info[k] == context.asset_info.get(k)) or ((not context.expected_asset_info[k]) and (not context.asset_info.get(k)))
@when(r'I restore the backup named "{backup_name}" for "{fqtn}" table')
def _i_restore_the_backup_named_for_table(context, backup_name, fqtn):
medusa.restore_node.restore_node(
context.medusa_config,
Path("/tmp"),
backup_name,
in_place=True,
keep_auth=False,
seeds=None,
verify=None,
keyspaces={},
tables={fqtn},
use_sstableloader=False,
)
@when("I get the suggested params")
def suggested_params(context):
context.params = context.acl.suggested_params_as_object()
@when('I invoke send_diagnostics message')
def step_impl(context):
context.langServer.send_diagnostics('/sample', [])
context._diagCount = 0
@when("we make any LookupAccountByID call")
def lookup_account_any(context):
context.response = context.icl.account_info("PNWOET7LLOWMBMLE4KOCELCX6X3D3Q4H2Q4QJASYIEOF7YIPPQBG3YQ5YI", 12)
@when('we wait for the chronos job stored as "{job_name}" to appear in the job list')
def chronos_job_is_ready(context, job_name):
"""Wait for a job with a matching job id to be ready."""
chronos_tools.wait_for_job(context.chronos_client, context.jobs[job_name]["name"])
@given('JSON Pointer "{path}" in request is an integer "{value}"')
@util.obtain_values
def given_json_pointer_in_request_is_an_integer(ctx, path, value):
set_pointer(ctx, path, int(value))
@given("a fake HTTP server")
def make_fake_http_server(context):
context.fake_http_server = FakeHTTPServer()
context.fake_http_server.start()