Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_creds_are_pulled_from_secret_at_runtime(self, monkeypatch):
task = LoadTweetReplies()
tweepy = MagicMock()
monkeypatch.setattr("prefect.tasks.twitter.twitter.tweepy", tweepy)
with set_temporary_config({"cloud.use_local_secrets": True}):
with prefect.context(
secrets=dict(
TWITTER_API_CREDENTIALS={
"api_key": "a",
"api_secret": "b",
"access_token": "c",
"access_token_secret": "d",
}
)
):
task.run(user="")
assert tweepy.OAuthHandler.call_args[0] == ("a", "b")
def grab_key():
return prefect.context["THE_ANSWER"]
boto3_client.register_task_definition.return_value = {}
boto3_client.run_task.return_value = {}
monkeypatch.setattr("boto3.client", MagicMock(return_value=boto3_client))
flow_runner = MagicMock()
monkeypatch.setattr(
"prefect.engine.get_default_flow_runner_class",
MagicMock(return_value=flow_runner),
)
monkeypatch.setenv("AWS_ACCESS_KEY_ID", "id")
monkeypatch.setenv("AWS_SECRET_ACCESS_KEY", "secret")
monkeypatch.setenv("AWS_SESSION_TOKEN", "session")
monkeypatch.setenv("REGION_NAME", "region")
with prefect.context({"flow_run_id": "id"}):
storage = Docker(registry_url="test", image_name="image", image_tag="tag")
environment = FargateTaskEnvironment(
containerDefinitions=[
{
"name": "flow-container",
"image": "image",
"command": [],
"environment": [],
"essential": True,
}
],
cluster="test",
family="test",
taskDefinition="test",
Pending,
Finished,
Failed,
TriggerFailed,
Cached,
Scheduled,
Retrying,
Success,
Skipped,
]
ok = MagicMock(ok=True)
monkeypatch.setattr(prefect.utilities.notifications.requests, "post", ok)
for state in all_states:
s = state()
with set_temporary_config({"cloud.use_local_secrets": True}):
with prefect.context(secrets=dict(SLACK_WEBHOOK_URL="")):
returned = slack_notifier(Task(), "", s, ignore_states=[State])
assert returned is s
assert ok.called is False
def test_setting_context_with_dict():
"""
Test accessing context varaibles
"""
with context(dict(x=1)):
assert context.x == 1
def test_task_runner_puts_tags_in_context(self):
with prefect.context() as ctx:
assert "task_tags" not in ctx
result = TaskRunner(Task()).initialize_run(state=None, context=ctx)
assert result.context.task_tags == set()
with prefect.context() as ctx:
assert "task_tags" not in ctx
result = TaskRunner(Task(tags=["foo", "bar"])).initialize_run(
state=None, context=ctx
)
assert result.context.task_tags == {"foo", "bar"}
def deserialize_result(self, result, context=None):
"""
task: a Prefect task
result: the task's serialized result
context: a Prefect context dictionary
"""
if result is None:
return None
else:
with prefect.context(context):
return self.serializer.decode(result)
self.logger.info('Flow {}: {}'.format(type(s).__name__, s))
state.succeed()
except prefect.signals.SKIP as s:
self.logger.info('Flow {}: {}'.format(type(s).__name__, s))
state.skip()
except prefect.signals.SHUTDOWN as s:
self.logger.info('Flow {}: {}'.format(type(s).__name__, s))
state.shutdown()
except prefect.signals.DONTRUN as s:
self.logger.info('Flow {}: {}'.format(type(s).__name__, s))
except prefect.signals.FAIL as s:
self.logger.info(
'Flow {}: {}'.format(type(s).__name__, s), exc_info=True)
state.fail()
except Exception:
if prefect.context.get('debug'):
raise
self.logger.error(
'Flow: An unexpected error occurred', exc_info=True)
state.fail()
def task_context(self, context, state):
with prefect.context(context):
try:
yield
except signals.SUCCESS as s:
self.logger.info('Task {}: {}'.format(type(s).__name__, s))
state.succeed()
except signals.SKIP as s:
self.logger.info('Task {}: {}'.format(type(s).__name__, s))
state.skip()
except signals.RETRY as s:
self.logger.info('Task {}: {}'.format(type(s).__name__, s))
state.fail()
except signals.SHUTDOWN as s:
self.logger.info('Task {}: {}'.format(type(s).__name__, s))
state.shutdown()
except signals.DONTRUN as s:
def run(self, flow: prefect.Flow, parameters: dict = None) -> State: # type: ignore
runner_cls = (
self.flow_runner_class or prefect.engine.get_default_flow_runner_class()
)
flow_state = runner_cls(flow=flow).run(
executor=self.executor or prefect.context.get("executor"),
return_tasks=flow.reference_tasks(),
parameters=parameters or dict(),
)
return flow_state