Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
assert not has_to_compile_at_import()
print(mpi.rank, "before if self.path_backend.exists()", flush=1)
if self.path_backend.exists():
print(mpi.rank, "before self.path_backend.unlink()", flush=1)
self.path_backend.unlink()
print(mpi.rank, "before make_backend_file(self.path_for_test)", flush=1)
if mpi.rank == 0:
backend.make_backend_file(self.path_for_test)
print(mpi.rank, "after make_backend_file(self.path_for_test)", flush=1)
mpi.barrier()
from _transonic_testing import for_test_init
importlib.reload(for_test_init)
assert self.path_backend.exists()
assert for_test_init.ts.is_transpiled
for_test_init.func(1, 3.14)
for_test_init.func1(1.1, 2.2)
for_test_init.check_class()
def test_not_transonified():
path_for_test = (
Path(__file__).parent.parent / "_transonic_testing/for_test_init.py"
)
path_output = path_for_test.parent / f"__{backend_default}__"
if path_output.exists() and mpi.rank == 0:
rmtree(path_output)
mpi.barrier()
from _transonic_testing import for_test_init
importlib.reload(for_test_init)
from _transonic_testing.for_test_init import func, func1, check_class
func(1, 3.14)
func1(1.1, 2.2)
check_class()
python_path_dir = path_jit_class / mod_name.replace(".", os.path.sep)
python_path = python_path_dir / (cls_name + ".py")
if mpi.has_to_build(python_path, module.__file__):
from transonic.justintime import _get_module_jit
mod = _get_module_jit(backend_name=backend.name, index_frame=5)
if mpi.rank == 0:
python_path = mpi.PathSeq(python_path)
python_code = (
mod.info_analysis["codes_dependance_classes"][cls_name] + "\n"
)
python_code += backend.jit.produce_code_class(cls)
write_if_has_to_write(python_path, python_code)
python_path = mpi.Path(python_path)
mpi.barrier()
# 2. import the file
python_mod_name = path_jit_class.name + "." + mod_name + "." + cls_name
module = import_from_path(python_path, python_mod_name)
# 3. replace the methods
for name_method, method in jit_methods.items():
func = method.func
name_new_method = f"__new_method__{cls.__name__}__{name_method}"
new_method = getattr(module, name_new_method)
setattr(cls, name_method, functools.wraps(func)(new_method))
return cls
if mpi.rank == 0:
if parallel:
limit = self.limit_nb_processes
else:
limit = 1
while len(self.processes) >= limit:
time.sleep(self.deltat)
self.processes = [
process
for process in self.processes
if process.is_alive_root()
]
mpi.barrier(timeout=None)
)
return func
func_name = func.__name__
backend = self.backend
mod = self.mod
mod.jit_functions[func_name] = self
module_name = mod.module_name
path_jit = mpi.Path(backend.jit.path_base)
path_backend = path_jit / module_name.replace(".", os.path.sep)
if mpi.rank == 0:
path_backend.mkdir(parents=True, exist_ok=True)
mpi.barrier()
path_backend = (path_backend / func_name).with_suffix(".py")
if backend.suffix_header:
path_backend_header = path_backend.with_suffix(backend.suffix_header)
else:
path_backend_header = False
if path_backend.exists():
if not mod.is_dummy_file and has_to_build(path_backend, mod.filename):
has_to_write = True
else:
has_to_write = False
else:
has_to_write = True
src = None
def wait_for_all_extensions(self):
"""Wait until all compilation processes are done"""
if mpi.rank == 0:
while self.processes:
time.sleep(self.deltat)
self.processes = [
process
for process in self.processes
if process.is_alive_root()
]
mpi.barrier(timeout=None)
def wait_for_all_extensions(self):
"""Wait until all compilation processes are done"""
if mpi.rank == 0:
while self.processes:
time.sleep(self.deltat)
self.processes = [
process
for process in self.processes
if process.is_alive_root()
]
mpi.barrier(timeout=None)
def write_new_header(self, path_backend_header, header, arg_types):
mpi.barrier()
if mpi.rank == 0:
logger.debug(
f"write {self.name_capitalized} signature in file "
f"{path_backend_header} with types\n{arg_types}"
)
with open(path_backend_header, "w") as file:
file.write(header)
file.flush()