Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def assert_conversion_same_as_mirror(nb_file, fmt, mirror_name, compare_notebook=False):
dirname, basename = os.path.split(nb_file)
file_name, org_ext = os.path.splitext(basename)
fmt = long_form_one_format(fmt)
notebook = jupytext.read(nb_file, fmt=fmt)
check_auto_ext(fmt, notebook.metadata, '')
ext = fmt['extension']
mirror_file = os.path.join(dirname, '..', 'mirror', mirror_name, full_path(file_name, fmt))
# it's better not to have Jupytext metadata in test notebooks:
if fmt == 'ipynb' and 'jupytext' in notebook.metadata: # pragma: no cover
notebook.metadata.pop('jupytext')
jupytext.write(nb_file, fmt=fmt)
create_mirror_file_if_missing(mirror_file, notebook, fmt)
# Compare the text representation of the two notebooks
if compare_notebook:
nb_mirror = jupytext.read(mirror_file)
compare(nb_mirror, notebook)
def test_base_path():
fmt = long_form_one_format('dir/prefix_/ipynb')
assert base_path('dir/prefix_NAME.ipynb', fmt) == 'NAME'
with pytest.raises(InconsistentPath):
base_path('dir/incorrect_prefix_NAME.ipynb', fmt)
def pipe_notebook(notebook, command, fmt='py:percent', update=True, prefix=None):
"""Pipe the notebook, in the desired representation, to the given command. Update the notebook
with the returned content if desired."""
if command in ['black', 'flake8', 'autopep8']:
command = command + ' -'
elif command in ['pytest', 'unittest']:
command = command + ' {}'
fmt = long_form_one_format(fmt, notebook.metadata, auto_ext_requires_language_info=False)
check_auto_ext(fmt, notebook.metadata, '--pipe-fmt')
text = writes(notebook, fmt)
command = command.split(' ')
if '{}' in command:
if prefix is not None:
prefix = prefix + (' ' if ' ' in prefix else '_')
tmp_file_args = dict(mode='w+',
encoding='utf8',
prefix=prefix,
suffix=fmt['extension'],
delete=False)
try:
tmp = NamedTemporaryFile(**tmp_file_args)
except TypeError:
# NamedTemporaryFile does not have an 'encoding' argument on pypy
text = sys.stdin.read()
return reads(text, fmt)
if not hasattr(fp, 'read'):
# Treat fp as a file name
fp = str(fp)
_, ext = os.path.splitext(fp)
fmt = copy(fmt or {})
if not isinstance(fmt, dict):
fmt = long_form_one_format(fmt)
fmt.update({'extension': ext})
with io.open(fp, encoding='utf-8') as stream:
return read(stream, as_version=as_version, fmt=fmt, **kwargs)
if fmt is not None:
fmt = long_form_one_format(fmt)
if fmt['extension'] == '.ipynb':
notebook = nbformat.read(fp, as_version, **kwargs)
rearrange_jupytext_metadata(notebook.metadata)
return notebook
return reads(fp.read(), fmt, **kwargs)
def compare_notebooks(notebook_actual, notebook_expected, fmt=None, allow_expected_differences=True,
raise_on_first_difference=True, compare_outputs=False):
"""Compare the two notebooks, and raise with a meaningful message
that explains the differences, if any"""
fmt = long_form_one_format(fmt)
format_name = fmt.get('format_name')
# Expected differences
allow_filtered_cell_metadata = allow_expected_differences
allow_missing_code_cell_metadata = allow_expected_differences and format_name == 'sphinx'
allow_missing_markdown_cell_metadata = allow_expected_differences and format_name in ['sphinx', 'spin']
allow_removed_final_blank_line = allow_expected_differences
cell_metadata_filter = notebook_actual.get('jupytext', {}).get('cell_metadata_filter')
if format_name == 'sphinx' and notebook_actual.cells and notebook_actual.cells[0].source == '%matplotlib inline':
notebook_actual.cells = notebook_actual.cells[1:]
# Compare cells type and content
test_cell_iter = iter(notebook_actual.cells)
modified_cells = set()
if fp == '-':
# Use sys.stdout.buffer when possible, and explicit utf-8 encoding, cf. #331
content = writes(nb, version=version, fmt=fmt, **kwargs)
try:
# Python 3
sys.stdout.buffer.write(content.encode('utf-8'))
except AttributeError:
sys.stdout.write(content.encode('utf-8'))
return
if not hasattr(fp, 'write'):
# Treat fp as a file name
fp = str(fp)
_, ext = os.path.splitext(fp)
fmt = copy(fmt or {})
fmt = long_form_one_format(fmt, update={'extension': ext})
create_prefix_dir(fp, fmt)
with io.open(fp, 'w', encoding='utf-8') as stream:
write(nb, stream, version=version, fmt=fmt, **kwargs)
return
else:
assert fmt is not None, "'fmt' argument in jupytext.write is mandatory unless fp is a file name"
content = writes(nb, version=version, fmt=fmt, **kwargs)
if isinstance(content, bytes):
content = content.decode('utf8')
fp.write(content)
if not content.endswith(u'\n'):
fp.write(u'\n')
def combine_inputs_with_outputs(nb_source, nb_outputs, fmt=None):
"""Copy outputs of the second notebook into
the first one, for cells that have matching inputs"""
output_code_cells = [cell for cell in nb_outputs.cells if cell.cell_type == 'code']
output_other_cells = [cell for cell in nb_outputs.cells if cell.cell_type != 'code']
fmt = long_form_one_format(fmt)
text_repr = nb_source.metadata.get('jupytext', {}).get('text_representation', {})
ext = fmt.get('extension') or text_repr.get('extension')
format_name = fmt.get('format_name') or text_repr.get('format_name')
nb_outputs_filtered_metadata = copy(nb_outputs.metadata)
filter_metadata(nb_outputs_filtered_metadata,
nb_source.metadata.get('jupytext', {}).get('notebook_metadata_filter'),
_DEFAULT_NOTEBOOK_METADATA)
for key in nb_outputs.metadata:
if key not in nb_outputs_filtered_metadata:
nb_source.metadata[key] = nb_outputs.metadata[key]
source_is_md_version_one = ext in ['.md', '.markdown', '.Rmd'] and text_repr.get('format_version') == '1.0'
if nb_source.metadata.get('jupytext', {}).get('formats') or ext in ['.md', '.markdown', '.Rmd']:
nb_source.metadata.get('jupytext', {}).pop('text_representation', None)
def reads(text, fmt, as_version=nbformat.NO_CONVERT, **kwargs):
"""
Read a notebook from a string
:param text: the text representation of the notebook
:param fmt: (optional) the jupytext format like `md`, `py:percent`, ...
:param as_version: see nbformat.reads
:param kwargs: (not used) additional parameters for nbformat.reads
:return: the notebook
"""
fmt = copy(fmt) if fmt else divine_format(text)
fmt = long_form_one_format(fmt)
ext = fmt['extension']
if ext == '.ipynb':
return nbformat.reads(text, as_version, **kwargs)
format_name = read_format_from_metadata(text, ext) or fmt.get('format_name')
if format_name:
format_options = {}
else:
format_name, format_options = guess_format(text, ext)
if format_name:
fmt['format_name'] = format_name
fmt.update(format_options)