Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# NOTE: Make sure that the outcome column is labeled 'target' in the data file
tpot_data = pd.read_csv('PATH/TO/DATA/FILE', sep='COLUMN_SEPARATOR', dtype=np.float64)
features = tpot_data.drop('target', axis=1)
training_features, testing_features, training_target, testing_target = \\
train_test_split(features, tpot_data['target'], random_state=None)
exported_pipeline = make_pipeline(
SelectFromModel(estimator=ExtraTreesRegressor(max_features=0.05, n_estimators=100), threshold=0.05),
DecisionTreeRegressor(max_depth=8, min_samples_leaf=5, min_samples_split=5)
)
exported_pipeline.fit(training_features, training_target)
results = exported_pipeline.predict(testing_features)
"""
assert expected_code == export_pipeline(pipeline, tpot_obj_reg.operators, tpot_obj_reg._pset)
features_with_nan = np.copy(training_features)
features_with_nan[0][0] = float('nan')
tpot_obj.fit(features_with_nan, training_target)
# use fixed pipeline since the random.seed() performs differently in python 2.* and 3.*
pipeline_string = (
'KNeighborsClassifier('
'input_matrix, '
'KNeighborsClassifier__n_neighbors=10, '
'KNeighborsClassifier__p=1, '
'KNeighborsClassifier__weights=uniform'
')'
)
tpot_obj._optimized_pipeline = creator.Individual.from_string(pipeline_string, tpot_obj._pset)
export_code = export_pipeline(tpot_obj._optimized_pipeline, tpot_obj.operators, tpot_obj._pset, tpot_obj._imputed)
expected_code = """import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
from sklearn.impute import SimpleImputer
# NOTE: Make sure that the outcome column is labeled 'target' in the data file
tpot_data = pd.read_csv('PATH/TO/DATA/FILE', sep='COLUMN_SEPARATOR', dtype=np.float64)
features = tpot_data.drop('target', axis=1)
training_features, testing_features, training_target, testing_target = \\
train_test_split(features, tpot_data['target'], random_state=None)
imputer = SimpleImputer(strategy="median")
imputer.fit(training_features)
training_features = imputer.transform(training_features)
features = tpot_data.drop('target', axis=1)
training_features, testing_features, training_target, testing_target = \\
train_test_split(features, tpot_data['target'], random_state=None)
exported_pipeline = make_pipeline(
make_union(
StackingEstimator(estimator=DecisionTreeClassifier(criterion="gini", max_depth=8, min_samples_leaf=5, min_samples_split=5)),
FunctionTransformer(copy)
),
KNeighborsClassifier(n_neighbors=10, p=1, weights="uniform")
)
exported_pipeline.fit(training_features, training_target)
results = exported_pipeline.predict(testing_features)
"""
assert expected_code == export_pipeline(pipeline, tpot_obj.operators, tpot_obj._pset)
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.neighbors import KNeighborsClassifier
# NOTE: Make sure that the outcome column is labeled 'target' in the data file
tpot_data = pd.read_csv('PATH/TO/DATA/FILE', sep='COLUMN_SEPARATOR', dtype=np.float64)
features = tpot_data.drop('target', axis=1)
training_features, testing_features, training_target, testing_target = \\
train_test_split(features, tpot_data['target'], random_state=None)
exported_pipeline = KNeighborsClassifier(n_neighbors=10, p=1, weights="uniform")
exported_pipeline.fit(training_features, training_target)
results = exported_pipeline.predict(testing_features)
"""
assert expected_code == export_pipeline(pipeline, tpot_obj.operators, tpot_obj._pset)
# NOTE: Make sure that the outcome column is labeled 'target' in the data file
tpot_data = pd.read_csv('PATH/TO/DATA/FILE', sep='COLUMN_SEPARATOR', dtype=np.float64)
features = tpot_data.drop('target', axis=1)
training_features, testing_features, training_target, testing_target = \\
train_test_split(features, tpot_data['target'], random_state=None)
exported_pipeline = make_pipeline(
SelectPercentile(score_func=f_classif, percentile=20),
DecisionTreeClassifier(criterion="gini", max_depth=8, min_samples_leaf=5, min_samples_split=5)
)
exported_pipeline.fit(training_features, training_target)
results = exported_pipeline.predict(testing_features)
"""
assert expected_code == export_pipeline(pipeline, tpot_obj.operators, tpot_obj._pset)
# NOTE: Make sure that the outcome column is labeled 'target' in the data file
tpot_data = pd.read_csv('PATH/TO/DATA/FILE', sep='COLUMN_SEPARATOR', dtype=np.float64)
features = tpot_data.drop('target', axis=1)
training_features, testing_features, training_target, testing_target = \\
train_test_split(features, tpot_data['target'], random_state=None)
# Average CV score on the training set was: 0.929813743
exported_pipeline = make_pipeline(
SelectPercentile(score_func=f_classif, percentile=20),
DecisionTreeClassifier(criterion="gini", max_depth=8, min_samples_leaf=5, min_samples_split=5)
)
exported_pipeline.fit(training_features, training_target)
results = exported_pipeline.predict(testing_features)
"""
assert_equal(expected_code, export_pipeline(pipeline, tpot_obj.operators, tpot_obj._pset, pipeline_score=0.929813743))
from tpot.export_utils import set_param_recursive
# NOTE: Make sure that the outcome column is labeled 'target' in the data file
tpot_data = pd.read_csv('PATH/TO/DATA/FILE', sep='COLUMN_SEPARATOR', dtype=np.float64)
features = tpot_data.drop('target', axis=1)
training_features, testing_features, training_target, testing_target = \\
train_test_split(features, tpot_data['target'], random_state=39)
exported_pipeline = BernoulliNB(alpha=1.0, fit_prior=False)
# Fix random state for all the steps in exported pipeline
set_param_recursive(exported_pipeline.steps, 'random_state', 39)
exported_pipeline.fit(training_features, training_target)
results = exported_pipeline.predict(testing_features)
"""
exported_code = export_pipeline(pipeline, tpot_obj.operators, tpot_obj._pset, random_state=tpot_obj.random_state)
assert expected_code == exported_code
----------
output_file_name: string (default: '')
String containing the path and file name of the desired output file. If left empty, writing to file will be skipped.
data_file_path: string (default: '')
By default, the path of input dataset is 'PATH/TO/DATA/FILE' by default.
If data_file_path is another string, the path will be replaced.
Returns
-------
to_write: str
The whole pipeline text as a string.
"""
if self._optimized_pipeline is None:
raise RuntimeError('A pipeline has not yet been optimized. Please call fit() first.')
to_write = export_pipeline(self._optimized_pipeline,
self.operators, self._pset,
self._imputed, self._optimized_pipeline_score,
self.random_state,
data_file_path=data_file_path)
if output_file_name is not '':
with open(output_file_name, 'w') as output_file:
output_file.write(to_write)
else:
return to_write
def _save_periodic_pipeline(self, gen):
try:
self._create_periodic_checkpoint_folder()
for pipeline, pipeline_scores in zip(self._pareto_front.items, reversed(self._pareto_front.keys)):
idx = self._pareto_front.items.index(pipeline)
pareto_front_pipeline_score = pipeline_scores.wvalues[1]
sklearn_pipeline_str = generate_pipeline_code(expr_to_tree(pipeline, self._pset), self.operators)
to_write = export_pipeline(pipeline,
self.operators, self._pset,
self._imputed, pareto_front_pipeline_score,
self.random_state)
# dont export a pipeline you had
if self._exported_pipeline_text.count(sklearn_pipeline_str):
self._update_pbar(pbar_num=0, pbar_msg='Periodic pipeline was not saved, probably saved before...')
else:
filename = os.path.join(self.periodic_checkpoint_folder,
'pipeline_gen_{}_idx_{}_{}.py'.format(gen,
idx ,
datetime.now().strftime('%Y.%m.%d_%H-%M-%S')
)
)
self._update_pbar(pbar_num=0, pbar_msg='Saving periodic pipeline from pareto front to {}'.format(filename))
with open(filename, 'w') as output_file:
output_file.write(to_write)