Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_missing_app_async(method_name, kwargs):
with pytest.raises(exceptions.SyncConnectorConfigurationError):
# Some of this methods are not async but they'll raise
# before the await is reached.
await getattr(connector_module.BaseConnector(), method_name)(**kwargs)
def test_load_from_path_wrong_type():
with pytest.raises(exceptions.LoadFromPathError):
utils.load_from_path("json.loads", int)
"""
if "." not in path:
raise exceptions.LoadFromPathError(f"{path} is not a valid path")
module_path, name = path.rsplit(".", 1)
try:
module = importlib.import_module(module_path)
except ImportError as exc:
raise exceptions.LoadFromPathError(str(exc)) from exc
try:
imported = getattr(module, name)
except AttributeError as exc:
raise exceptions.LoadFromPathError(str(exc)) from exc
if not isinstance(imported, allowed_type):
raise exceptions.LoadFromPathError(
f"Object at {path} is not of type {allowed_type.__name__} "
f"but {type(imported).__name__}"
)
return imported
async def process_job(self, job: jobs.Job, worker_id: int = 0) -> None:
context = self.context_for_worker(worker_id=worker_id, job=job)
self.logger.debug(
f"Loaded job info, about to start job {job.call_string}",
extra=context.log_extra(action="loaded_job_info"),
)
status = jobs.Status.FAILED
next_attempt_scheduled_at = None
try:
await self.run_job(job=job, worker_id=worker_id)
status = jobs.Status.SUCCEEDED
except exceptions.JobRetry as e:
status = jobs.Status.TODO
next_attempt_scheduled_at = e.scheduled_at
except exceptions.JobError:
pass
except exceptions.TaskNotFound as exc:
self.logger.exception(
f"Task was not found: {exc}",
extra=context.log_extra(action="task_not_found", exception=str(exc)),
)
finally:
await self.job_store.finish_job(
job=job, status=status, scheduled_at=next_attempt_scheduled_at
)
self.logger.debug(
f"Acknowledged job completion {job.call_string}",
context = self.context_for_worker(worker_id=worker_id, job=job)
self.logger.debug(
f"Loaded job info, about to start job {job.call_string}",
extra=context.log_extra(action="loaded_job_info"),
)
status = jobs.Status.FAILED
next_attempt_scheduled_at = None
try:
await self.run_job(job=job, worker_id=worker_id)
status = jobs.Status.SUCCEEDED
except exceptions.JobRetry as e:
status = jobs.Status.TODO
next_attempt_scheduled_at = e.scheduled_at
except exceptions.JobError:
pass
except exceptions.TaskNotFound as exc:
self.logger.exception(
f"Task was not found: {exc}",
extra=context.log_extra(action="task_not_found", exception=str(exc)),
)
finally:
await self.job_store.finish_job(
job=job, status=status, scheduled_at=next_attempt_scheduled_at
)
self.logger.debug(
f"Acknowledged job completion {job.call_string}",
extra=context.log_extra(action="finish_task", status=status),
)
# Remove job information from the current context