Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
test_loader,
device,
metrics=metrics,
)
logging.info("... evaluation done!")
else:
raise ScriptError("Unknown mode: {}".format(args.mode))
if __name__ == "__main__":
parser = build_parser()
args = parser.parse_args()
if args.mode == "from_json":
args = spk.utils.read_from_json(args.json_path)
import argparse
main(args)
def get_model(representation, output_modules, parallelize=False):
model = AtomisticModel(representation, output_modules)
if parallelize:
model = nn.DataParallel(model)
logging.info(
"The model you built has: %d parameters"
% schnetpack.utils.spk_utils.compute_params(model)
)
return model
def get_output_module(args, representation, mean, stddev, atomref):
derivative = spk.utils.get_derivative(args)
negative_dr = spk.utils.get_negative_dr(args)
contributions = spk.utils.get_contributions(args)
stress = spk.utils.get_stress(args)
if args.dataset == "md17" and not args.ignore_forces:
derivative = spk.datasets.MD17.forces
output_module_str = spk.utils.get_module_str(args)
if output_module_str == "dipole_moment":
return spk.atomistic.output_modules.DipoleMoment(
args.features,
predict_magnitude=True,
mean=mean[args.property],
stddev=stddev[args.property],
property=args.property,
contributions=contributions,
)
elif output_module_str == "electronic_spatial_extent":
stddev=stddev[args.property],
property=args.property,
contributions=contributions,
)
elif output_module_str == "electronic_spatial_extent":
return spk.atomistic.output_modules.ElectronicSpatialExtent(
args.features,
mean=mean[args.property],
stddev=stddev[args.property],
property=args.property,
contributions=contributions,
)
elif output_module_str == "atomwise":
return spk.atomistic.output_modules.Atomwise(
args.features,
aggregation_mode=spk.utils.get_pooling_mode(args),
mean=mean[args.property],
stddev=stddev[args.property],
atomref=atomref[args.property],
property=args.property,
derivative=derivative,
negative_dr=negative_dr,
contributions=contributions,
stress=stress,
)
elif output_module_str == "polarizability":
return spk.atomistic.output_modules.Polarizability(
args.features,
aggregation_mode=spk.utils.get_pooling_mode(args),
property=args.property,
)
elif output_module_str == "isotropic_polarizability":
def get_output_module(args, representation, mean, stddev, atomref):
derivative = spk.utils.get_derivative(args)
negative_dr = spk.utils.get_negative_dr(args)
contributions = spk.utils.get_contributions(args)
stress = spk.utils.get_stress(args)
if args.dataset == "md17" and not args.ignore_forces:
derivative = spk.datasets.MD17.forces
output_module_str = spk.utils.get_module_str(args)
if output_module_str == "dipole_moment":
return spk.atomistic.output_modules.DipoleMoment(
args.features,
predict_magnitude=True,
mean=mean[args.property],
stddev=stddev[args.property],
property=args.property,
contributions=contributions,
)
elif output_module_str == "electronic_spatial_extent":
return spk.atomistic.output_modules.ElectronicSpatialExtent(
args.features,
mean=mean[args.property],
stddev=stddev[args.property],
property=args.property,
contributions=contributions,
def get_output_module(args, representation, mean, stddev, atomref):
derivative = spk.utils.get_derivative(args)
negative_dr = spk.utils.get_negative_dr(args)
contributions = spk.utils.get_contributions(args)
stress = spk.utils.get_stress(args)
if args.dataset == "md17" and not args.ignore_forces:
derivative = spk.datasets.MD17.forces
output_module_str = spk.utils.get_module_str(args)
if output_module_str == "dipole_moment":
return spk.atomistic.output_modules.DipoleMoment(
args.features,
predict_magnitude=True,
mean=mean[args.property],
stddev=stddev[args.property],
property=args.property,
contributions=contributions,
)
elif output_module_str == "electronic_spatial_extent":
return spk.atomistic.output_modules.ElectronicSpatialExtent(
os.remove(evaluation_fp)
else:
raise ScriptError(
"The evaluation file does already exist at {}! Add overwrite flag"
" to remove.".format(evaluation_fp)
)
# load model
logging.info("loading trained model...")
model = spk.utils.load_model(
os.path.join(args.modelpath, "best_model"), map_location=device
)
# run evaluation
logging.info("evaluating...")
if spk.utils.get_derivative(train_args) is None:
with torch.no_grad():
evaluate(
args,
model,
train_loader,
val_loader,
test_loader,
device,
metrics=metrics,
)
else:
evaluate(
args,
model,
train_loader,
val_loader,
elif args.mode == "eval":
# remove old evaluation files
evaluation_fp = os.path.join(args.modelpath, "evaluation.txt")
if os.path.exists(evaluation_fp):
if args.overwrite:
os.remove(evaluation_fp)
else:
raise ScriptError(
"The evaluation file does already exist at {}! Add overwrite flag"
" to remove.".format(evaluation_fp)
)
# load model
logging.info("loading trained model...")
model = spk.utils.load_model(
os.path.join(args.modelpath, "best_model"), map_location=device
)
# run evaluation
logging.info("evaluating...")
if spk.utils.get_derivative(train_args) is None:
with torch.no_grad():
evaluate(
args,
model,
train_loader,
val_loader,
test_loader,
device,
metrics=metrics,
)
if __name__ == "__main__":
parser = get_parser()
args = parser.parse_args()
argparse_dict = vars(args)
jsonpath = os.path.join(args.simulation_dir, "args.json")
device = torch.device(args.device)
# Set up directory
if not os.path.exists(args.simulation_dir):
os.makedirs(args.simulation_dir)
# Store command line args
spk.utils.to_json(jsonpath, argparse_dict)
# Load the model
ml_model = spk.utils.load_model(args.model_path)
logging.info("Loaded model.")
logging.info(
"The model you built has: {:d} parameters".format(
spk.utils.count_params(ml_model)
)
)
# Initialize the ML ase interface
ml_calculator = spk.interfaces.AseInterface(
args.molecule_path,
ml_model,
args.simulation_dir,
logging.info(
"The model you built has: {:d} parameters".format(
spk.utils.count_params(ml_model)
)
)
# Initialize the ML ase interface
ml_calculator = spk.interfaces.AseInterface(
args.molecule_path,
ml_model,
args.simulation_dir,
args.device,
args.energy,
args.forces,
environment_provider=spk.utils.get_environment_provider(args, device),
)
logging.info("Initialized ase driver")
# Perform the requested simulations
if args.single_point:
logging.info("Single point prediction...")
ml_calculator.calculate_single_point()
if args.optimize:
logging.info("Optimizing geometry...")
ml_calculator.optimize(steps=args.optimize)
if args.normal_modes:
if not args.optimize:
logging.warning(