Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_load_singleton_csv(self):
fin = tempfile.NamedTemporaryFile(mode='w', delete=False)
fin.write("smiles,endpoint\nc1ccccc1,1")
fin.close()
print(fin.name)
featurizer = dc.feat.CircularFingerprint(size=1024)
tasks = ["endpoint"]
loader = dc.data.CSVLoader(
tasks=tasks, smiles_field="smiles", featurizer=featurizer)
X = loader.featurize(fin.name)
self.assertEqual(1, len(X))
os.remove(fin.name)
X_cache.append(X)
y_cache.append(np.array(y).reshape((1,)))
w_cache.append(np.array(1).reshape((1,)))
ids_cache.append(row_idx)
row_idx += 1
groups.append(group_idx)
group_idx += 1
# flush once more at the end
if len(X_cache) > 0:
yield np.array(X_cache), np.array(y_cache), np.array(w_cache), np.array(
ids_cache)
tasks = ["ani"]
dataset = dc.data.DiskDataset.create_dataset(
shard_generator(), tasks=tasks, data_dir=all_dir)
print("Number of groups", np.amax(groups))
splitter = dc.splits.RandomGroupSplitter(groups)
train_dataset, test_dataset = splitter.train_test_split(
dataset, train_dir=fold_dir, test_dir=test_dir, frac_train=.8)
return train_dataset, test_dataset, groups
p for p in os.listdir(images_path)
if p.startswith('cut_') and p.endswith('.png')
]
all_labels = dict(zip(*np.transpose(np.array(pd.read_csv(label_path)))))
print("Number of images: %d" % len(image_names))
labels = np.array(
[all_labels[os.path.splitext(n)[0][4:]] for n in image_names]).reshape(
(-1, 1))
image_full_paths = [os.path.join(images_path, n) for n in image_names]
classes, cts = np.unique(list(all_labels.values()), return_counts=True)
weight_ratio = dict(zip(classes, np.max(cts) / cts.astype(float)))
weights = np.array([weight_ratio[l[0]] for l in labels]).reshape((-1, 1))
loader = deepchem.data.ImageLoader()
dat = loader.featurize(
image_full_paths, labels=labels, weights=weights)
if split == None:
return dat
splitters = {
'index': deepchem.splits.IndexSplitter(),
'random': deepchem.splits.RandomSplitter()
}
if not seed is None:
np.random.seed(seed)
splitter = splitters[split]
train, valid, test = splitter.train_valid_test_split(dat)
all_dataset = (train, valid, test)
return all_dataset
def load_hiv(featurizer='ECFP', split='index'):
"""Load hiv datasets. Does not do train/test split"""
# Featurize hiv dataset
print("About to featurize hiv dataset.")
current_dir = os.path.dirname(os.path.realpath(__file__))
dataset_file = os.path.join(current_dir, "./HIV.csv")
hiv_tasks = ["HIV_active"]
if featurizer == 'ECFP':
featurizer_func = dc.feat.CircularFingerprint(size=1024)
elif featurizer == 'GraphConv':
featurizer_func = dc.feat.ConvMolFeaturizer()
loader = dc.data.CSVLoader(
tasks=hiv_tasks, smiles_field="smiles", featurizer=featurizer_func)
dataset = loader.featurize(dataset_file, shard_size=8192)
# Initialize transformers
transformers = [
dc.trans.BalancingTransformer(transform_w=True, dataset=dataset)
]
print("About to transform data")
for transformer in transformers:
dataset = transformer.transform(dataset)
splitters = {
'index': dc.splits.IndexSplitter(),
'random': dc.splits.RandomSplitter(),
'scaffold': dc.splits.ScaffoldSplitter(),
'butina': dc.splits.ButinaSplitter()
Whether the load succeeded
all_dataset: (dc.data.Dataset, dc.data.Dataset, dc.data.Dataset)
The train, valid, test datasets
transformers: list of dc.trans.Transformer
The transformers used for this dataset
"""
train_dir = os.path.join(save_dir, "train_dir")
valid_dir = os.path.join(save_dir, "valid_dir")
test_dir = os.path.join(save_dir, "test_dir")
if not os.path.exists(train_dir) or not os.path.exists(
valid_dir) or not os.path.exists(test_dir):
return False, None, list()
loaded = True
train = deepchem.data.DiskDataset(train_dir)
valid = deepchem.data.DiskDataset(valid_dir)
test = deepchem.data.DiskDataset(test_dir)
all_dataset = (train, valid, test)
with open(os.path.join(save_dir, "transformers.pkl"), 'rb') as f:
transformers = pickle.load(f)
return loaded, all_dataset, transformers
982,
}
yield d2
metric = [
dc.metrics.Metric(dc.metrics.mean_absolute_error, mode="regression"),
dc.metrics.Metric(dc.metrics.pearson_r2_score, mode="regression")
]
for split in splits:
data_dir = os.path.join(base_dir, "datasets")
train_dir = os.path.join(data_dir, "%s_train" % split)
valid_dir = os.path.join(data_dir, "%s_valid" % split)
test_dir = os.path.join(data_dir, "%s_test" % split)
train_dataset = dc.data.DiskDataset(train_dir)
valid_dataset = dc.data.DiskDataset(train_dir)
test_dataset = dc.data.DiskDataset(test_dir)
pdbbind_tasks = ["-logKd/Ki"]
transformers = []
y_train = train_dataset.y
y_train *= -1 * 2.479 / 4.184
train_dataset = dc.data.DiskDataset.from_numpy(
train_dataset.X,
y_train,
train_dataset.w,
train_dataset.ids,
tasks=pdbbind_tasks)
y_test = test_dataset.y
y_test *= -1 * 2.479 / 4.184
dset_cols.append(params.date_col)
# Include SMILES column from dataset in columns to be merged, unless the descriptor table
# already has a column of the same name
if params.smiles_col not in self.precomp_descr_table.columns.values:
dset_cols.append(params.smiles_col)
if model_dataset.contains_responses:
dset_cols += params.response_cols
merged_dset_df = dset_df[dset_cols].merge(
self.precomp_descr_table, how='inner', left_on=params.id_col, right_on=self.desc_id_col)
model_dataset.save_featurized_data(merged_dset_df)
user_specified_features = self.get_feature_columns()
featurizer_obj = dc.feat.UserDefinedFeaturizer(user_specified_features)
features = dc.data.data_loader.get_user_specified_features(merged_dset_df, featurizer=featurizer_obj,
verbose=False)
if features is None:
raise Exception("Featurization failed for dataset")
ids = merged_dset_df[params.id_col]
nrows = len(ids)
ncols = len(params.response_cols)
if model_dataset.contains_responses:
vals = merged_dset_df[params.response_cols].values
else:
vals = np.zeros((nrows,ncols))
attr = attr.loc[ids]
return features, ids, vals, attr, None