Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
debug(f'Number of tasks = {args.num_tasks}')
if args.dataset_type == 'bert_pretraining':
data.bert_init(args, logger)
# Split data
if args.dataset_type == 'regression_with_binning': # Note: for now, binning based on whole dataset, not just training set
data, bin_predictions, regression_data = data
args.bin_predictions = bin_predictions
debug(f'Splitting data with seed {args.seed}')
train_data, _, _ = split_data(data=data, split_type=args.split_type, sizes=args.split_sizes, seed=args.seed, args=args, logger=logger)
_, val_data, test_data = split_data(regression_data, split_type=args.split_type, sizes=args.split_sizes, seed=args.seed, args=args, logger=logger)
else:
debug(f'Splitting data with seed {args.seed}')
if args.separate_test_set:
test_data = get_data(path=args.separate_test_set, args=args, features_path=args.separate_test_set_features, logger=logger)
if args.separate_val_set:
val_data = get_data(path=args.separate_val_set, args=args, features_path=args.separate_val_set_features, logger=logger)
train_data = data # nothing to split; we already got our test and val sets
else:
train_data, val_data, _ = split_data(data=data, split_type=args.split_type, sizes=(0.8, 0.2, 0.0), seed=args.seed, args=args, logger=logger)
else:
train_data, val_data, test_data = split_data(data=data, split_type=args.split_type, sizes=args.split_sizes, seed=args.seed, args=args, logger=logger)
# Optionally replace test data with train or val data
if args.test_split == 'train':
test_data = train_data
elif args.test_split == 'val':
test_data = val_data
if args.dataset_type == 'classification':
class_sizes = get_class_sizes(data)
if args.dataset_type == 'bert_pretraining':
data.bert_init(args, logger)
# Split data
if args.dataset_type == 'regression_with_binning': # Note: for now, binning based on whole dataset, not just training set
data, bin_predictions, regression_data = data
args.bin_predictions = bin_predictions
debug(f'Splitting data with seed {args.seed}')
train_data, _, _ = split_data(data=data, split_type=args.split_type, sizes=args.split_sizes, seed=args.seed, args=args, logger=logger)
_, val_data, test_data = split_data(regression_data, split_type=args.split_type, sizes=args.split_sizes, seed=args.seed, args=args, logger=logger)
else:
debug(f'Splitting data with seed {args.seed}')
if args.separate_test_set:
test_data = get_data(path=args.separate_test_set, args=args, features_path=args.separate_test_set_features, logger=logger)
if args.separate_val_set:
val_data = get_data(path=args.separate_val_set, args=args, features_path=args.separate_val_set_features, logger=logger)
train_data = data # nothing to split; we already got our test and val sets
else:
train_data, val_data, _ = split_data(data=data, split_type=args.split_type, sizes=(0.8, 0.2, 0.0), seed=args.seed, args=args, logger=logger)
else:
train_data, val_data, test_data = split_data(data=data, split_type=args.split_type, sizes=args.split_sizes, seed=args.seed, args=args, logger=logger)
# Optionally replace test data with train or val data
if args.test_split == 'train':
test_data = train_data
elif args.test_split == 'val':
test_data = val_data
if args.dataset_type == 'classification':
class_sizes = get_class_sizes(data)
debug('Class sizes')
for i, task_class_sizes in enumerate(class_sizes):
debug, info = logger.debug, logger.info
else:
debug = info = print
# Set GPU
if args.gpu is not None:
torch.cuda.set_device(args.gpu)
# Print args
debug(pformat(vars(args)))
# Get data
debug('Loading data')
args.task_names = get_task_names(args.data_path)
desired_labels = get_desired_labels(args, args.task_names)
data = get_data(path=args.data_path, args=args, logger=logger)
args.num_tasks = data.num_tasks()
args.features_size = data.features_size()
args.real_num_tasks = args.num_tasks - args.features_size if args.predict_features else args.num_tasks
debug(f'Number of tasks = {args.num_tasks}')
if args.dataset_type == 'bert_pretraining':
data.bert_init(args, logger)
# Split data
if args.dataset_type == 'regression_with_binning': # Note: for now, binning based on whole dataset, not just training set
data, bin_predictions, regression_data = data
args.bin_predictions = bin_predictions
debug(f'Splitting data with seed {args.seed}')
train_data, _, _ = split_data(data=data, split_type=args.split_type, sizes=args.split_sizes, seed=args.seed, args=args, logger=logger)
_, val_data, test_data = split_data(regression_data, split_type=args.split_type, sizes=args.split_sizes, seed=args.seed, args=args, logger=logger)
else:
# Save model checkpoint if improved validation score, or always save it if unsupervised
if args.minimize_score and avg_val_score < best_score or \
not args.minimize_score and avg_val_score > best_score or \
args.dataset_type == 'unsupervised':
best_score, best_epoch = avg_val_score, epoch
save_checkpoint(os.path.join(save_dir, 'model.pt'), model, scaler, features_scaler, args)
if args.dataset_type == 'unsupervised':
return [0] # rest of this is meaningless when unsupervised
# Evaluate on test set using model with best validation score
info(f'Model {model_idx} best validation {args.metric} = {best_score:.6f} on epoch {best_epoch}')
model = load_checkpoint(os.path.join(save_dir, 'model.pt'), cuda=args.cuda, logger=logger)
if args.split_test_by_overlap_dataset is not None:
overlap_data = get_data(path=args.split_test_by_overlap_dataset, logger=logger)
overlap_smiles = set(overlap_data.smiles())
test_data_intersect, test_data_nonintersect = [], []
for d in test_data.data:
if d.smiles in overlap_smiles:
test_data_intersect.append(d)
else:
test_data_nonintersect.append(d)
test_data_intersect, test_data_nonintersect = MoleculeDataset(test_data_intersect), MoleculeDataset(test_data_nonintersect)
for name, td in [('Intersect', test_data_intersect), ('Nonintersect', test_data_nonintersect)]:
test_preds = predict(
model=model,
data=td,
args=args,
scaler=scaler,
logger=logger
)
def visualize_attention(args: Namespace):
"""Visualizes attention weights."""
print('Loading data')
data = get_data(args.data_path)
smiles = data.smiles()
print('Data size = {:,}'.format(len(smiles)))
print('Loading model from "{}"'.format(args.checkpoint_path))
model, _, _, _ = load_checkpoint(args.checkpoint_path, cuda=args.cuda)
mpn = model[0]
for i in trange(0, len(smiles), args.batch_size):
smiles_batch = smiles[i:i + args.batch_size]
mpn.viz_attention(smiles_batch, viz_dir=args.viz_dir)
def plot_distribution(data_path: str, save_dir: str, bins: int):
"""
Plots the distribution of values of a dataset.
:param data_path: Path to data CSV file.
:param save_dir: Directory where plot PNGs will be saved.
:param bins: Number of bins in histogram.
"""
# Get values
task_names = get_task_names(data_path)
data = get_data(path=data_path)
targets = data.targets()
# Arrange values by task
data_size, num_tasks = len(targets), len(task_names)
values = [[targets[i][j] for i in range(data_size)] for j in range(num_tasks)]
# Plot distributions for each task
data_name = os.path.basename(data_path).replace('.csv', '')
for i in range(num_tasks):
plt.clf()
plt.hist(values[i], bins=bins)
# Save plot
plt.title(f'{data_name} - {task_names[i]}')
plt.xlabel(task_names[i])
def generate_vocab(args: Namespace):
# Get smiles
data = get_data(path=args.data_path)
smiles = data.smiles()
vocab_func = partial(
atom_vocab,
vocab_func=args.vocab_func,
substructure_sizes=args.substructure_sizes
)
pairs = [(vocab_func, smile) for smile in smiles]
if args.sequential:
counter = sum([count_vocab(pair) for pair in tqdm(pairs, total=len(pairs))], Counter())
else:
with Pool() as pool:
counter = sum(pool.map(count_vocab, pairs), Counter())
def visualize_attention(args: Namespace):
"""Visualizes attention weights."""
print('Loading data')
data = get_data(path=args.data_path)
smiles = data.smiles()
print(f'Data size = {len(smiles):,}')
print(f'Loading model from "{args.checkpoint_path}"')
model = load_checkpoint(args.checkpoint_path, cuda=args.cuda)
mpn = model[0]
for i in trange(0, len(smiles), args.batch_size):
smiles_batch = smiles[i:i + args.batch_size]
mpn.viz_attention(smiles_batch, viz_dir=args.viz_dir)
def visualize_encoding_property_space(args: Namespace):
# Load data
data = get_data(path=args.data_path)
# Sort according to similarity measure
if args.similarity_measure == 'property':
data.sort(key=lambda d: d.targets[args.task_index])
elif args.similarity_measure == 'random':
data.shuffle(args.seed)
else:
raise ValueError(f'similarity_measure "{args.similarity_measure}" not supported or not implemented yet.')
# Load model and scalers
model = load_checkpoint(args.checkpoint_path)
scaler, features_scaler = load_scalers(args.checkpoint_path)
data.normalize_features(features_scaler)
# Random seed
if args.seed is not None:
def run_random_forest(args: Namespace, logger: Logger = None) -> List[float]:
if logger is not None:
debug, info = logger.debug, logger.info
else:
debug = info = print
debug(pformat(vars(args)))
metric_func = get_metric_func(args.metric)
debug('Loading data')
data = get_data(path=args.data_path)
debug(f'Splitting data with seed {args.seed}')
# Need to have val set so that train and test sets are the same as when doing MPN
train_data, _, test_data = split_data(data=data, split_type=args.split_type, seed=args.seed)
debug(f'Total size = {len(data):,} | train size = {len(train_data):,} | test size = {len(test_data):,}')
debug('Computing morgan fingerprints')
for dataset in [train_data, test_data]:
for datapoint in tqdm(dataset, total=len(dataset)):
datapoint.set_features(morgan_fingerprint(smiles=datapoint.smiles, radius=args.radius, num_bits=args.num_bits))
debug('Training')
if args.single_task:
scores = single_task_random_forest(train_data, test_data, metric_func, args)
else: