Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _(job):
return job_context.JobContext(app=app, worker_name="worker", job=job)
def context():
return job_context.JobContext()
def test_context_for_worker_kwargs(app):
test_worker = worker.Worker(app=app, name="foo")
expected = job_context.JobContext(app=app, worker_id=3, worker_name="bar")
context = test_worker.context_for_worker(worker_id=3, worker_name="bar")
assert context == expected
def test_log_extra_job(job_factory):
job = job_factory()
context = job_context.JobContext(
worker_name="a", job=job, additional_context={"ha": "ho"},
)
assert context.log_extra(action="foo", bar="baz") == {
"action": "foo",
"bar": "baz",
"ha": "ho",
"job": job.log_context(),
"worker": {"name": "a", "queues": None},
}
def test_context_for_worker_reset(app):
test_worker = worker.Worker(app=app, name="foo")
expected = job_context.JobContext(app=app, worker_id=3, worker_name="foo")
test_worker.context_for_worker(worker_id=3, worker_name="bar")
context = test_worker.context_for_worker(worker_id=3, reset=True)
assert context == expected
def test_evolve():
context = job_context.JobContext(worker_name="a")
assert context.evolve(worker_name="b").worker_name == "b"
def test_context_for_worker_value_kept(app):
test_worker = worker.Worker(app=app, name="foo")
expected = job_context.JobContext(app=app, worker_id=3, worker_name="bar")
test_worker.context_for_worker(worker_id=3, worker_name="bar")
context = test_worker.context_for_worker(worker_id=3)
assert context == expected
def test_context_for_worker(app):
test_worker = worker.Worker(app=app, name="foo")
expected = job_context.JobContext(app=app, worker_id=3, worker_name="foo")
context = test_worker.context_for_worker(worker_id=3)
assert context == expected
def test_log_extra():
context = job_context.JobContext(worker_name="a", additional_context={"ha": "ho"})
assert context.log_extra(action="foo", bar="baz") == {
"action": "foo",
"bar": "baz",
"ha": "ho",
"worker": {"name": "a", "queues": None},
}