Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
traceback=output.traceback,
)
break
if error:
# Write notebook back out with the Error Message at the top of the Notebook.
error_msg = ERROR_MESSAGE_TEMPLATE % str(error.exec_count)
error_msg_cell = nbformat.v4.new_code_cell(
source="%%html\n" + error_msg,
outputs=[
nbformat.v4.new_output(output_type="display_data", data={"text/html": error_msg})
],
metadata={"inputHidden": True, "hide_input": True},
)
nb.cells = [error_msg_cell] + nb.cells
write_ipynb(nb, output_path)
raise error
temp_path = os.path.join(
output_notebook_dir, '{prefix}-out.ipynb'.format(prefix=str(uuid.uuid4()))
)
with safe_tempfile_path() as output_log_path:
# Scaffold the registration here
nb = load_notebook_node(notebook_path)
nb_no_parameters = replace_parameters(
system_compute_context,
nb,
get_papermill_parameters(system_compute_context, inputs, output_log_path),
)
intermediate_path = os.path.join(
output_notebook_dir, '{prefix}-inter.ipynb'.format(prefix=str(uuid.uuid4()))
)
write_ipynb(nb_no_parameters, intermediate_path)
with user_code_error_boundary(
DagstermillExecutionError,
lambda: (
'Error occurred during the execution of Dagstermill solid '
'{solid_name}: {notebook_path}'.format(
solid_name=name, notebook_path=notebook_path
)
),
):
try:
papermill_engines.register('dagstermill', DagstermillNBConvertEngine)
papermill.execute_notebook(
intermediate_path, temp_path, engine_name='dagstermill', log_output=True
)
except Exception as exc:
def save(self, **kwargs):
"""
Saves the wrapped notebook state.
If an output path is known, this triggers a save of the wrapped
notebook state to the provided path.
Can be used outside of cell state changes if execution is taking
a long time to conclude but the notebook object should be synced.
For example, you may want to save the notebook every 10 minutes when running
a 5 hour cell execution to capture output messages in the notebook.
"""
if self.output_path:
write_ipynb(self.nb, self.output_path)
input_path=input_path,
output_path=output_path if request_save_on_cell_execute else None,
kernel_name=kernel_name,
progress_bar=progress_bar,
log_output=log_output,
start_timeout=start_timeout,
stdout_file=stdout_file,
stderr_file=stderr_file,
**engine_kwargs
)
# Check for errors first (it saves on error before raising)
raise_for_execution_errors(nb, output_path)
# Write final output in case the engine didn't write it on cell completion.
write_ipynb(nb, output_path)
return nb
def execute_retroactive_scaffold(notebook_path):
nb = load_notebook_node(notebook_path)
new_nb = copy.deepcopy(nb)
import_cell_source = 'import dagstermill'
import_cell = nbformat.v4.new_code_cell(source=import_cell_source)
parameters_cell_source = 'context = dagstermill.get_context()'
parameters_cell = nbformat.v4.new_code_cell(source=parameters_cell_source)
parameters_cell.metadata['tags'] = ['parameters']
new_nb.cells = [import_cell, parameters_cell] + nb.cells
write_ipynb(new_nb, notebook_path)