Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
train_file = 'train_data.csv'
test_file = 'test_data.csv'
seed_val = 0 # random seed
EPS = 1e-10
# Information about each dataset in benchmark is stored in dict.
# performance_val = expected performance on this dataset (lower = better),should update based on previously run benchmarks
binary_dataset = {'url': 'https://autogluon.s3-us-west-2.amazonaws.com/datasets/AdultIncomeBinaryClassification.zip',
'name': 'AdultIncomeBinaryClassification',
'problem_type': BINARY,
'label_column': 'class',
'performance_val': 0.129} # Mixed types of features.
multi_dataset = {'url': 'https://autogluon.s3-us-west-2.amazonaws.com/datasets/CoverTypeMulticlassClassification.zip',
'name': 'CoverTypeMulticlassClassification',
'problem_type': MULTICLASS,
'label_column': 'Cover_Type',
'performance_val': 0.032} # big dataset with 7 classes, all features are numeric. Runs SLOW.
regression_dataset = {'url': 'https://autogluon.s3-us-west-2.amazonaws.com/datasets/AmesHousingPriceRegression.zip',
'name': 'AmesHousingPriceRegression',
'problem_type': REGRESSION,
'label_column': 'SalePrice',
'performance_val': 0.076} # Regression with mixed feature-types, skewed Y-values.
toyregres_dataset = {'url': 'https://autogluon.s3-us-west-2.amazonaws.com/datasets/toyRegression.zip',
'name': 'toyRegression',
'problem_type': REGRESSION,
'label_column': 'y',
'performance_val': 0.183}
# 1-D toy deterministic regression task with: heavy label+feature missingness, extra distraction column in test data
reason = "dtype of label-column == float and many unique label-values observed"
elif unique_vals.dtype == 'object':
problem_type = MULTICLASS
reason = "dtype of label-column == object"
elif unique_vals.dtype == 'int':
unique_ratio = len(unique_vals)/float(len(y))
if (unique_ratio <= REGRESS_THRESHOLD) and (unique_count <= MULTICLASS_LIMIT):
problem_type = MULTICLASS # TODO: Check if integers are from 0 to n-1 for n unique values, if they have a wide spread, it could still be regression
reason = "dtype of label-column == int, but few unique label-values observed"
else:
problem_type = REGRESSION
reason = "dtype of label-column == int and many unique label-values observed"
else:
raise NotImplementedError('label dtype', unique_vals.dtype, 'not supported!')
logger.log(25, "AutoGluon infers your prediction problem is: %s (because %s)" % (problem_type, reason))
logger.log(25, "If this is wrong, please specify `problem_type` argument in fit() instead (You may specify problem_type as one of: ['%s', '%s', '%s'])\n" % (BINARY, MULTICLASS, REGRESSION))
return problem_type
def fit(self, X, y, k_fold=5, random_state=0, compute_base_preds=False, time_limit=None, **kwargs):
start_time = time.time()
self.model_base.feature_types_metadata = self.feature_types_metadata # TODO: Don't pass this here
if self.problem_type == REGRESSION:
stratified = False
else:
stratified = True
# TODO: Preprocess data here instead of repeatedly
kfolds = generate_kfold(X=X, y=y, n_splits=k_fold, stratified=stratified, random_state=random_state, n_repeats=self.n_repeats)
if self.problem_type == MULTICLASS:
oof_pred_proba = np.zeros(shape=(len(X), len(y.unique())))
else:
oof_pred_proba = np.zeros(shape=len(X))
models = []
num_folds = len(kfolds)
time_limit_fold = None
for i, fold in enumerate(kfolds):
if time_limit:
time_elapsed = time.time() - start_time
time_left = time_limit - time_elapsed
required_time_per_fold = time_left / (num_folds - i)
time_limit_fold = required_time_per_fold * 0.8
if i > 0:
expected_time_required = time_elapsed * (num_folds / i)
expected_remaining_time_required = expected_time_required / (num_folds / (num_folds - i))
def __init__(self, path: str, problem_type: str, scheduler_options=None, objective_func=None,
num_classes=None, low_memory=False, feature_types_metadata={}, kfolds=0,
stack_ensemble_levels=0, time_limit=None, verbosity=2):
self.path = path
self.problem_type = problem_type
self.feature_types_metadata = feature_types_metadata
self.verbosity = verbosity
if objective_func is not None:
self.objective_func = objective_func
elif self.problem_type == BINARY:
self.objective_func = accuracy
elif self.problem_type == MULTICLASS:
self.objective_func = accuracy
else:
self.objective_func = root_mean_squared_error
self.objective_func_expects_y_pred = scorer_expects_y_pred(scorer=self.objective_func)
logger.log(25, "AutoGluon will gauge predictive performance using evaluation metric: %s" % self.objective_func.name)
if not self.objective_func_expects_y_pred:
logger.log(25, "This metric expects predicted probabilities rather than predicted class labels, so you'll need to use predict_proba() instead of predict()")
logger.log(20, "To change this, specify the eval_metric argument of fit()")
self.num_classes = num_classes
self.feature_prune = False # will be set to True if feature-pruning is turned on.
self.low_memory = low_memory
self.bagged_mode = True if kfolds >= 2 else False
if self.bagged_mode:
self.kfolds = kfolds # int number of folds to do model bagging, < 2 means disabled
def set_net_defaults(self, train_dataset):
""" Sets dataset-adaptive default values to use for our neural network """
if self.problem_type == MULTICLASS:
self.num_classes = train_dataset.num_classes
self.num_net_outputs = self.num_classes
elif self.problem_type == REGRESSION:
self.num_net_outputs = 1
if self.params['y_range'] is None: # Infer default y-range
y_vals = train_dataset.dataset._data[train_dataset.label_index].asnumpy()
min_y = float(min(y_vals))
max_y = float(max(y_vals))
std_y = np.std(y_vals)
y_ext = self.params['y_range_extend']*std_y
if min_y >= 0: # infer y must be nonnegative
min_y = max(0, min_y-y_ext)
else:
min_y = min_y-y_ext
if max_y <= 0: # infer y must be non-positive
max_y = min(0, max_y+y_ext)
for feature in feature_type_map:
if feature_type_map[feature] == 'language':
feature_colinds = feature_arraycol_map[feature]
data_list.append(mx.nd.array(processed_array[:,feature_colinds], dtype='int32')) # array of ints with data for this language feature
self.data_desc.append("language")
self.feature_dataindex_map[feature] = len(data_list)-1
if labels is not None:
labels = np.array(labels)
if self.problem_type == REGRESSION and labels.dtype != np.float32:
labels = labels.astype('float32') # Convert to proper float-type if not already
data_list.append(mx.nd.array(labels.reshape(len(labels),1)))
self.data_desc.append("label")
self.label_index = len(data_list) - 1 # To access data labels, use: self.dataset._data[self.label_index]
self.num_classes = None
if self.problem_type in [BINARY, MULTICLASS]:
self.num_classes = len(set(labels))
self.embed_indices = [i for i in range(len(self.data_desc)) if 'embed' in self.data_desc[i]] # list of indices of embedding features in self.dataset, order matters!
self.language_indices = [i for i in range(len(self.data_desc)) if 'language' in self.data_desc[i]] # list of indices of language features in self.dataset, order matters!
self.num_categories_per_embed_feature = None
self.dataset = mx.gluon.data.dataset.ArrayDataset(*data_list) # Access ith embedding-feature via: self.dataset._data[self.data_desc.index('embed_'+str(i))].asnumpy()
self.dataloader = mx.gluon.data.DataLoader(self.dataset, self.batch_size, shuffle= not is_test,
last_batch = 'keep' if is_test else 'rollover',
num_workers=self.params['num_dataloading_workers']) # no need to shuffle test data
if not is_test:
self.num_categories_per_embedfeature = self.getNumCategoriesEmbeddings()
def get_default_param(problem_type, num_classes=None):
if problem_type == BINARY:
return get_param_binary()
elif problem_type == MULTICLASS:
return get_param_multiclass(num_classes=num_classes)
elif problem_type == REGRESSION:
return get_param_regression()
else:
return get_param_binary()
def construct(problem_type: str, label: str, threshold: int):
if problem_type == BINARY:
return CleanerDummy()
elif problem_type == MULTICLASS:
return CleanerMulticlass(label=label, threshold=threshold)
elif problem_type == REGRESSION:
return CleanerDummy()
else:
raise NotImplementedError
def construct_custom_catboost_metric(metric, is_higher_better, needs_pred_proba, problem_type):
if (metric.name == 'log_loss') and (problem_type == MULTICLASS) and needs_pred_proba:
return 'MultiClass'
if metric.name == 'accuracy':
return 'Accuracy'
metric_class = metric_classes_dict[problem_type]
return metric_class(metric=metric, is_higher_better=is_higher_better, needs_pred_proba=needs_pred_proba)
def class_labels(self):
if self.problem_type == MULTICLASS:
return self.label_cleaner.ordered_class_labels
else:
return None