Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
clearing it first.
Args:
- serialized (bytes): a serialized registry
- encryption_key (str): A key to use for decrypting the registry. If None
(the default), then the key in `prefect.config.registry.encryption_key`
is used. If that key is unavailable, deserialization will procede without
decryption.
- dest_registry (dict): a registry to update with the serialized registry
(defaults to the global registry)
"""
if dest_registry is None:
dest_registry = REGISTRY
encryption_key = encryption_key or prefect.config.registry.encryption_key
if not encryption_key:
_warn(
"No encryption key provided and none found in "
"`prefect.config.registry.encryption_key`. The registry will attempt "
"unencrypted deserialization."
)
else:
serialized = Fernet(encryption_key).decrypt(serialized)
dest_registry.update(cloudpickle.loads(serialized))
output_filename : str, optional
Filename for output PDF file.
If not provided, this will be the name of the input notebook with the extension changed to '.pdf'
asciidoc_template : str, optional
Filename of a non-default template to use when exporting to asciidoc
(assumed to be in the inputs directory)
Returns
-------
str
Path to output PDF file
"""
prefect.context.logger.info(f"Converting notebook '{notebook_path}' to PDF.")
if output_filename is None:
output_filename = f"{Path(notebook_path).stem}.pdf"
output_path = str(Path(prefect.config.outputs.reports_dir) / output_filename)
if asciidoc_template is None:
if "asciidoc_template_path" in prefect.config:
asciidoc_template_path = prefect.config.asciidoc_template_path
else:
# If no template is provided, and no default template is set in the config,
# run nbconvert without specifying a template (i.e. use the default nbconvert asciidoc template).
asciidoc_template_path = None
else:
asciidoc_template_path = str(
Path(prefect.config.inputs.inputs_dir) / asciidoc_template
)
prefect.context.logger.debug(
f"Using template '{asciidoc_template_path}' to convert notebook to asciidoc."
)
aws_secret_access_key=self.aws_secret_access_key,
aws_session_token=self.aws_session_token,
region_name=self.region_name,
)
definition_exists = True
try:
boto3_c.describe_task_definition(
taskDefinition=self.task_definition_kwargs.get("family")
)
except ClientError:
definition_exists = False
if not definition_exists:
env_values = [
{"name": "PREFECT__CLOUD__GRAPHQL", "value": config.cloud.graphql},
{"name": "PREFECT__CLOUD__USE_LOCAL_SECRETS", "value": "false"},
{
"name": "PREFECT__ENGINE__FLOW_RUNNER__DEFAULT_CLASS",
"value": "prefect.engine.cloud.CloudFlowRunner",
},
{
"name": "PREFECT__ENGINE__TASK_RUNNER__DEFAULT_CLASS",
"value": "prefect.engine.cloud.CloudTaskRunner",
},
{"name": "PREFECT__LOGGING__LOG_TO_CLOUD", "value": "true"},
]
# create containerDefinitions if they do not exist
if not self.task_definition_kwargs.get("containerDefinitions"):
self.task_definition_kwargs["containerDefinitions"] = []
self.task_definition_kwargs["containerDefinitions"].append({})
Args:
- yaml_obj (dict): A dictionary representing the parsed yaml
Returns:
- dict: a dictionary with the yaml values replaced
"""
# set identifier labels
yaml_obj["metadata"]["labels"]["identifier"] = self.identifier_label
# set environment variables
env = yaml_obj["spec"]["containers"][0]["env"]
env[0]["value"] = prefect.config.cloud.graphql
env[1]["value"] = prefect.config.cloud.log
env[2]["value"] = prefect.config.cloud.result_handler
env[3]["value"] = prefect.config.cloud.auth_token
env[4]["value"] = prefect.context.get("flow_run_id", "")
# set image
yaml_obj["spec"]["containers"][0]["image"] = prefect.context.get(
"image", "daskdev/dask:latest"
)
return yaml_obj
def reset_default_flow():
global DEFAULT_FLOW
flow_name = prefect.config.get("flows", "global_default_flow")
if flow_name and flow_name != "None":
DEFAULT_FLOW = prefect.Flow("Default Flow")
prefect.context.Context.update(flow=DEFAULT_FLOW)
def __init__(self) -> None:
if config.get("prefect_cloud", None):
self.load_prefect_client()
Filename of a non-default template to use when exporting to asciidoc
(assumed to be in the inputs directory)
Returns
-------
str
Path to output PDF file
"""
prefect.context.logger.info(f"Converting notebook '{notebook_path}' to PDF.")
if output_filename is None:
output_filename = f"{Path(notebook_path).stem}.pdf"
output_path = str(Path(prefect.config.outputs.reports_dir) / output_filename)
if asciidoc_template is None:
if "asciidoc_template_path" in prefect.config:
asciidoc_template_path = prefect.config.asciidoc_template_path
else:
# If no template is provided, and no default template is set in the config,
# run nbconvert without specifying a template (i.e. use the default nbconvert asciidoc template).
asciidoc_template_path = None
else:
asciidoc_template_path = str(
Path(prefect.config.inputs.inputs_dir) / asciidoc_template
)
prefect.context.logger.debug(
f"Using template '{asciidoc_template_path}' to convert notebook to asciidoc."
)
body, resources = notebook_to_asciidoc(notebook_path, asciidoc_template_path)
prefect.context.logger.debug("Converted notebook to asciidoc.")
prefect.context.logger.debug("Converting asciidoc to PDF...")
output_filename : str, optional
Filename for output PDF file.
If not provided, this will be the name of the input notebook with the extension changed to '.pdf'
asciidoc_template : str, optional
Filename of a non-default template to use when exporting to asciidoc
(assumed to be in the inputs directory)
Returns
-------
str
Path to output PDF file
"""
prefect.context.logger.info(f"Converting notebook '{notebook_path}' to PDF.")
if output_filename is None:
output_filename = f"{Path(notebook_path).stem}.pdf"
output_path = str(Path(prefect.config.outputs.reports_dir) / output_filename)
if asciidoc_template is None:
if "asciidoc_template_path" in prefect.config:
asciidoc_template_path = prefect.config.asciidoc_template_path
else:
# If no template is provided, and no default template is set in the config,
# run nbconvert without specifying a template (i.e. use the default nbconvert asciidoc template).
asciidoc_template_path = None
else:
asciidoc_template_path = str(
Path(prefect.config.inputs.inputs_dir) / asciidoc_template
)
prefect.context.logger.debug(
f"Using template '{asciidoc_template_path}' to convert notebook to asciidoc."
)
# Use flow storage image for job
job["spec"]["template"]["spec"]["containers"][0]["image"] = (
StorageSchema().load(flow_run.flow.storage).name # type: ignore
)
self.logger.debug(
"Using image {} for job".format(
StorageSchema().load(flow_run.flow.storage).name # type: ignore
)
)
# Populate environment variables for flow run execution
env = job["spec"]["template"]["spec"]["containers"][0]["env"]
env[0]["value"] = config.cloud.api or "https://api.prefect.io"
env[1]["value"] = config.cloud.agent.auth_token
env[2]["value"] = flow_run.id # type: ignore
env[3]["value"] = os.getenv("NAMESPACE", "default")
env[4]["value"] = str(self.labels)
env[5]["value"] = str(self.log_to_cloud).lower()
# Use image pull secrets if provided
job["spec"]["template"]["spec"]["imagePullSecrets"][0]["name"] = os.getenv(
"IMAGE_PULL_SECRETS", ""
)
# Set resource requirements if provided
resources = job["spec"]["template"]["spec"]["containers"][0]["resources"]
if os.getenv("JOB_MEM_REQUEST"):
resources["requests"]["memory"] = os.getenv("JOB_MEM_REQUEST")
if os.getenv("JOB_MEM_LIMIT"):
resources["limits"]["memory"] = os.getenv("JOB_MEM_LIMIT")
job = json.load(job_file)
job["Job"]["ID"] = flow_run.id # type: ignore
job["Job"]["Name"] = "prefect-job-{}".format(str(uuid.uuid4())[:8])
job["Job"]["TaskGroups"][0]["Name"] = "prefect-job-{}".format(
flow_run.id # type: ignore
)
job["Job"]["TaskGroups"][0]["Tasks"][0]["Name"] = flow_run.id # type: ignore
job["Job"]["TaskGroups"][0]["Tasks"][0]["Config"]["image"] = (
StorageSchema().load(flow_run.flow.storage).name # type: ignore
)
env = job["Job"]["TaskGroups"][0]["Tasks"][0]["Env"]
env["PREFECT__CLOUD__API"] = config.cloud.api or "https://api.prefect.io"
env["PREFECT__CLOUD__AGENT__AUTH_TOKEN"] = config.cloud.agent.auth_token
env["PREFECT__CONTEXT__FLOW_RUN_ID"] = flow_run.id # type: ignore
env["PREFECT__CONTEXT__NAMESPACE"] = os.getenv("NAMESPACE", "default")
env["PREFECT__LOGGING__LOG_TO_CLOUD"] = str(self.log_to_cloud).lower()
return job