Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'KNeighborsClassifier__n_neighbors=10, '
'KNeighborsClassifier__p=1, '
'KNeighborsClassifier__weights=uniform'
')',
tpot_obj._pset
)
ind2 = creator.Individual.from_string(
'KNeighborsClassifier('
'BernoulliNB(input_matrix, BernoulliNB__alpha=10.0, BernoulliNB__fit_prior=True),'
'KNeighborsClassifier__n_neighbors=10, '
'KNeighborsClassifier__p=2, '
'KNeighborsClassifier__weights=uniform'
')',
tpot_obj._pset
)
ind1[0].ret = Output_Array
ind2[0].ret = Output_Array
ind1_copy, ind2_copy = tpot_obj._toolbox.clone(ind1),tpot_obj._toolbox.clone(ind2)
offspring1, offspring2 = cxOnePoint(ind1_copy, ind2_copy)
assert offspring1[0].ret == Output_Array
assert offspring2[0].ret == Output_Array
mut_ind = mutNodeReplacement(tpot_obj._toolbox.clone(pipeline), pset=tpot_obj._pset)
new_ret_type_list = [node.ret for node in mut_ind[0]]
new_prims_list = [node for node in mut_ind[0] if node.arity != 0]
if new_prims_list == old_prims_list: # Terminal mutated
assert new_ret_type_list == old_ret_type_list
else: # Primitive mutated
Primitive_Count = 0
for node in mut_ind[0]:
if isinstance(node, gp.Primitive):
Primitive_Count += 1
assert Primitive_Count == 4
diff_prims = [x for x in new_prims_list if x not in old_prims_list]
diff_prims += [x for x in old_prims_list if x not in new_prims_list]
if len(diff_prims) > 1: # Sometimes mutation randomly replaces an operator that already in the pipelines
assert diff_prims[0].ret == diff_prims[1].ret
assert mut_ind[0][0].ret == Output_Array
'KNeighborsClassifier__p=1, '
'KNeighborsClassifier__weights=uniform'
')',
tpot_obj._pset
)
ind2 = creator.Individual.from_string(
'KNeighborsClassifier('
'BernoulliNB(input_matrix, BernoulliNB__alpha=10.0, BernoulliNB__fit_prior=True),'
'KNeighborsClassifier__n_neighbors=10, '
'KNeighborsClassifier__p=2, '
'KNeighborsClassifier__weights=uniform'
')',
tpot_obj._pset
)
ind1[0].ret = Output_Array
ind2[0].ret = Output_Array
ind1_copy, ind2_copy = tpot_obj._toolbox.clone(ind1),tpot_obj._toolbox.clone(ind2)
offspring1, offspring2 = cxOnePoint(ind1_copy, ind2_copy)
assert offspring1[0].ret == Output_Array
assert offspring2[0].ret == Output_Array
def _setup_pset(self):
if self.random_state is not None:
random.seed(self.random_state)
np.random.seed(self.random_state)
self._pset = gp.PrimitiveSetTyped('MAIN', [np.ndarray], Output_Array)
self._pset.renameArguments(ARG0='input_matrix')
self._add_operators()
if self.verbosity > 2:
print('{} operators have been imported by TPOT.'.format(len(self.operators)))
def _add_operators(self):
main_type = ["Classifier", "Regressor", "Selector", "Transformer"]
ret_types = []
self.op_list = []
if self.template == None: # default pipeline structure
step_in_type = np.ndarray
step_ret_type = Output_Array
for operator in self.operators:
arg_types = operator.parameter_types()[0][1:]
p_types = ([step_in_type] + arg_types, step_ret_type)
if operator.root:
# We need to add rooted primitives twice so that they can
# return both an Output_Array (and thus be the root of the tree),
# and return a np.ndarray so they can exist elsewhere in the tree.
self._pset.addPrimitive(operator, *p_types)
tree_p_types = ([step_in_type] + arg_types, step_in_type)
self._pset.addPrimitive(operator, *tree_p_types)
self._import_hash_and_add_terminals(operator, arg_types)
self._pset.addPrimitive(CombineDFs(), [step_in_type, step_in_type], step_in_type)
else:
gp_types = {}
for idx, step in enumerate(self._template_comp):
gp_types = {}
for idx, step in enumerate(self._template_comp):
# input class in each step
if idx:
step_in_type = ret_types[-1]
else:
step_in_type = np.ndarray
if step != 'CombineDFs':
if idx < len(self._template_comp) - 1:
# create an empty for returning class for strongly-type GP
step_ret_type_name = 'Ret_{}'.format(idx)
step_ret_type = type(step_ret_type_name, (object,), {})
ret_types.append(step_ret_type)
else:
step_ret_type = Output_Array
check_template = True
if step == 'CombineDFs':
self._pset.addPrimitive(CombineDFs(), [step_in_type, step_in_type], step_in_type)
elif main_type.count(step): # if the step is a main type
ops = [op for op in self.operators if op.type() == step]
for operator in ops:
arg_types = operator.parameter_types()[0][1:]
p_types = ([step_in_type] + arg_types, step_ret_type)
self._pset.addPrimitive(operator, *p_types)
self._import_hash_and_add_terminals(operator, arg_types)
else: # is the step is a specific operator or a wrong input
try:
operator = next(op for op in self.operators if op.__name__ == step)
except:
raise ValueError(
'An error occured while attempting to read the specified '
p_types = ([step_in_type] + arg_types, step_ret_type)
self._pset.addPrimitive(operator, *p_types)
self._import_hash_and_add_terminals(operator, arg_types)
else: # is the step is a specific operator or a wrong input
try:
operator = next(op for op in self.operators if op.__name__ == step)
except:
raise ValueError(
'An error occured while attempting to read the specified '
'template. Please check a step named {}'.format(step)
)
arg_types = operator.parameter_types()[0][1:]
p_types = ([step_in_type] + arg_types, step_ret_type)
self._pset.addPrimitive(operator, *p_types)
self._import_hash_and_add_terminals(operator, arg_types)
self.ret_types = [np.ndarray, Output_Array] + ret_types