Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUpClass(cls):
cls.path_for_test = (
mpi.Path(__file__).parent.parent
/ "_transonic_testing/for_test_init.py"
)
assert cls.path_for_test.exists()
cls.path_backend = path_backend = (
cls.path_for_test.parent
/ f"__{backend_default}__"
/ cls.path_for_test.name
)
cls.path_ext = path_backend.with_name(
backend.name_ext_from_path_backend(path_backend)
)
jitted_dicts,
codes_dependance,
codes_dependance_classes,
code_ext,
special,
) = analysis_jit(self.get_source(), self.filename, backend_name)
self.info_analysis = {
"jitted_dicts": jitted_dicts,
"codes_dependance": codes_dependance,
"codes_dependance_classes": codes_dependance_classes,
"special": special,
}
self.backend = backend = backends[backend_name]
path_jit = mpi.Path(backend.jit.path_base)
path_jit_class = mpi.Path(backend.jit.path_class)
# TODO: check if these files have to be written here...
# Write exterior code for functions
for file_name, code in code_ext["function"].items():
path_ext = path_jit / self.module_name.replace(".", os.path.sep)
path_ext_file = path_ext / (file_name + ".py")
write_if_has_to_write(path_ext_file, format_str(code), logger.info)
# Write exterior code for classes
for file_name, code in code_ext["class"].items():
path_ext = path_jit_class / self.module_name.replace(".", os.path.sep)
path_ext_file = path_ext / (file_name + ".py")
write_if_has_to_write(path_ext_file, format_str(code), logger.info)
1. create a Python file with @jit functions and methods
2. import the file
3. replace the methods
"""
if not has_to_replace:
return cls
cls_name = cls.__name__
mod_name = cls.__module__
module = sys.modules[mod_name]
if mod_name == "__main__":
mod_name = find_module_name_from_path(module.__file__)
path_jit_class = mpi.Path(backend.jit.path_class)
# 1. create a Python file with @jit functions and methods
python_path_dir = path_jit_class / mod_name.replace(".", os.path.sep)
python_path = python_path_dir / (cls_name + ".py")
if mpi.has_to_build(python_path, module.__file__):
from transonic.justintime import _get_module_jit
mod = _get_module_jit(backend_name=backend.name, index_frame=5)
if mpi.rank == 0:
python_path = mpi.PathSeq(python_path)
python_code = (
mod.info_analysis["codes_dependance_classes"][cls_name] + "\n"
)
python_code += backend.jit.produce_code_class(cls)
write_if_has_to_write(python_path, python_code)
# 1. create a Python file with @jit functions and methods
python_path_dir = path_jit_class / mod_name.replace(".", os.path.sep)
python_path = python_path_dir / (cls_name + ".py")
if mpi.has_to_build(python_path, module.__file__):
from transonic.justintime import _get_module_jit
mod = _get_module_jit(backend_name=backend.name, index_frame=5)
if mpi.rank == 0:
python_path = mpi.PathSeq(python_path)
python_code = (
mod.info_analysis["codes_dependance_classes"][cls_name] + "\n"
)
python_code += backend.jit.produce_code_class(cls)
write_if_has_to_write(python_path, python_code)
python_path = mpi.Path(python_path)
mpi.barrier()
# 2. import the file
python_mod_name = path_jit_class.name + "." + mod_name + "." + cls_name
module = import_from_path(python_path, python_mod_name)
# 3. replace the methods
for name_method, method in jit_methods.items():
func = method.func
name_new_method = f"__new_method__{cls.__name__}__{name_method}"
new_method = getattr(module, name_new_method)
setattr(cls, name_method, functools.wraps(func)(new_method))
return cls
if not can_import_accelerator(self.backend.name):
logger.warning(
"Cannot accelerate a jitted function because "
f"{self.backend.name_capitalized} is not importable."
)
return func
func_name = func.__name__
backend = self.backend
mod = self.mod
mod.jit_functions[func_name] = self
module_name = mod.module_name
path_jit = mpi.Path(backend.jit.path_base)
path_backend = path_jit / module_name.replace(".", os.path.sep)
if mpi.rank == 0:
path_backend.mkdir(parents=True, exist_ok=True)
mpi.barrier()
path_backend = (path_backend / func_name).with_suffix(".py")
if backend.suffix_header:
path_backend_header = path_backend.with_suffix(backend.suffix_header)
else:
path_backend_header = False
if path_backend.exists():
if not mod.is_dummy_file and has_to_build(path_backend, mod.filename):
has_to_write = True
else:
def clear_cached_extensions(module_name: str, force: bool, backend: str):
"""Delete the cached extensions related to a module
"""
from transonic.backends import backends
from transonic import mpi
backend = backends[backend]
path_jit = mpi.Path(backend.jit.path_base)
if module_name.endswith(".py"):
module_name = module_name[:-3]
if os.path.sep not in module_name:
relative_path = module_name.replace(".", os.path.sep)
else:
relative_path = module_name
path_pythran_dir_jit = path_jit / relative_path
relative_path = Path(relative_path)
path_pythran = relative_path.parent / (
"__{backend.name}__/" + relative_path.name + ".py"
)
def compile_extension(
path: Union[Path, str],
name_ext_file: Optional[str] = None,
native=False,
xsimd=False,
openmp=False,
):
if not isinstance(path, Path):
path = Path(path)
# return the process
return scheduler.compile_extension(
path, name_ext_file, native=native, xsimd=xsimd, openmp=openmp
)
def compile_extension(
path: Union[Path, str],
backend: str,
name_ext_file: str,
native=False,
xsimd=False,
openmp=False,
str_accelerator_flags: Optional[str] = None,
parallel=False,
force=False,
):
print("compile extension")
if not isinstance(path, Path):
path = Path(path)
# return the process
return scheduler.compile_extension(
path,
backend,
name_ext_file,
native=native,
xsimd=xsimd,
openmp=openmp,
str_accelerator_flags=str_accelerator_flags,
parallel=parallel,
force=force,
)