Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def obtain_df_splits(data_csv):
"""Split input data csv file in to train, validation and test dataframes.
:param data_csv: Input data CSV file.
:return test_df, train_df, val_df: Train, validation and test dataframe
splits
"""
data_df = read_csv(data_csv)
# Obtain data split array mapping data rows to split type
# 0-train, 1-validation, 2-test
data_split = get_split(data_df)
train_split, test_split, val_split = split_dataset_tvt(data_df, data_split)
# Splits are python dictionaries not dataframes- they need to be converted.
test_df = pd.DataFrame(test_split)
train_df = pd.DataFrame(train_split)
val_df = pd.DataFrame(val_split)
return test_df, train_df, val_df
]
output_features = [category_feature(vocab_size=2, reduce_input='sum')]
# Generate test data
rel_path = generate_data(input_features, output_features, csv_filename)
input_features[0]['encoder'] = 'parallel_cnn'
exp_dir_name = run_experiment(
input_features,
output_features,
data_csv=rel_path
)
output_feature_name = get_output_feature_name(exp_dir_name)
experiment_source_data_name = csv_filename.split('.')[0]
ground_truth = experiment_source_data_name + '.hdf5'
ground_truth_train_split = load_from_file(ground_truth, output_feature_name,
ground_truth_split=0)
ground_truth_val_split = load_from_file(ground_truth, output_feature_name,
ground_truth_split=1)
ground_truth_test_split = load_from_file(ground_truth, output_feature_name)
test_df, train_df, val_df = obtain_df_splits(csv_filename)
target_predictions_from_train = train_df[output_feature_name]
target_predictions_from_val = val_df[output_feature_name]
target_predictions_from_test = test_df[output_feature_name]
gtm_name = experiment_source_data_name + '.json'
ground_truth_metadata = load_json(gtm_name)
ground_truth_loaded_train_split = np.asarray([
ground_truth_metadata[output_feature_name]['str2idx'][train_row]
for train_row in target_predictions_from_train
])
ground_truth_loaded_val_split = np.asarray([
def obtain_df_splits(data_csv):
"""Split input data csv file in to train, validation and test dataframes.
:param data_csv: Input data CSV file.
:return test_df, train_df, val_df: Train, validation and test dataframe
splits
"""
data_df = read_csv(data_csv)
# Obtain data split array mapping data rows to split type
# 0-train, 1-validation, 2-test
data_split = get_split(data_df)
train_split, test_split, val_split = split_dataset_tvt(data_df, data_split)
# Splits are python dictionaries not dataframes- they need to be converted.
test_df = pd.DataFrame(test_split)
train_df = pd.DataFrame(train_split)
val_df = pd.DataFrame(val_split)
return test_df, train_df, val_df
def delete_temporary_data(csv_path):
"""
Helper method to delete temporary data created for running tests. Deletes
the csv and hdf5/json data (if any)
:param csv_path: path to the csv data file
:return: None
"""
if os.path.isfile(csv_path):
os.remove(csv_path)
json_path = replace_file_extension(csv_path, 'json')
if os.path.isfile(json_path):
os.remove(json_path)
hdf5_path = replace_file_extension(csv_path, 'hdf5')
if os.path.isfile(hdf5_path):
os.remove(hdf5_path)
'width': 8,
'num_channels': 3,
'num_processes': 5
},
fc_size=8,
num_filters=8
),
text_feature(encoder='embed', min_len=1),
numerical_feature(normalization='minmax')
]
output_features = [binary_feature(), numerical_feature()]
rel_path = generate_data(
input_features, output_features, csv_filename, num_examples=50
)
df1 = read_csv(rel_path)
input_features[0]['preprocessing']['num_channels'] = 1
rel_path = generate_data(
input_features, output_features, csv_filename, num_examples=50
)
df2 = read_csv(rel_path)
df = concatenate_df(df1, df2, None)
df.to_csv(rel_path, index=False)
# Here the user sepcifiies number of channels. Exception shouldn't be thrown
run_experiment(input_features, output_features, data_csv=rel_path)
del input_features[0]['preprocessing']['num_channels']
# User now doesn't specify num channels. Should throw exception
# Training with csv
model.train(
data_csv=data_csv,
skip_save_processed_input=True,
skip_save_progress=True,
skip_save_unprocessed_output=True
)
model.predict(data_csv=data_csv)
# Remove results/intermediate data saved to disk
shutil.rmtree(model.exp_dir_name, ignore_errors=True)
# Training with dataframe
data_df = read_csv(data_csv)
model.train(
data_df=data_df,
skip_save_processed_input=True,
skip_save_progress=True,
skip_save_unprocessed_output=True
)
model.predict(data_df=data_df)
return model
def train_model(input_features, output_features, data_csv):
"""
Helper method to avoid code repetition in running an experiment
:param input_features: input schema
:param output_features: output schema
:param data_csv: path to data
:return: None
"""
model_definition = {
'input_features': input_features,
'output_features': output_features,
'combiner': {'type': 'concat', 'fc_size': 14},
'training': {'epochs': 2}
}
model = LudwigModel(model_definition)
# Training with csv
model.train(
data_csv=data_csv,
skip_save_processed_input=True,
skip_save_progress=True,
skip_save_unprocessed_output=True
)
model.predict(data_csv=data_csv)
# Remove results/intermediate data saved to disk
shutil.rmtree(model.exp_dir_name, ignore_errors=True)
# Training with dataframe
data_df = read_csv(data_csv)
def run_api_experiment(input_features, output_features):
"""
Helper method to avoid code repetition in running an experiment
:param input_features: input schema
:param output_features: output schema
:return: None
"""
model_definition = {
'input_features': input_features,
'output_features': output_features,
'combiner': {'type': 'concat', 'fc_size': 14},
'training': {'epochs': 2}
}
model = LudwigModel(model_definition)
return model
def setup_model(self):
"""Configure and setup test model"""
model_definition = {
'input_features': self.input_features,
'output_features': self.output_features,
'combiner': {'type': 'concat', 'fc_size': 14},
'training': {'epochs': 2}
}
self.model = LudwigModel(model_definition)
'-l',
'--logging_level',
default='info',
help='the level of logging to use',
choices=['critical', 'error', 'warning', 'info', 'debug', 'notset']
)
args = parser.parse_args(sys_argv)
args.evaluate_performance = True
logging.getLogger('ludwig').setLevel(
logging_level_registry[args.logging_level]
)
set_on_master(args.use_horovod)
if is_on_master():
print_ludwig('Test', LUDWIG_VERSION)
full_predict(**vars(args))