Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_encoded_cache(self):
df, config = self.df, self.config
ds = DataSource(df, config)
assert not ds.disable_cache
ds.prepare_encoders()
for column in ['x1', 'x2', 'y']:
assert not column in ds.encoded_cache
encoded_column = ds.get_encoded_column_data(column)
assert (ds.encoded_cache[column] == encoded_column).all()
],
'output_features': [
{
'name': 'z',
'type': 'numeric',
# 'encoder_path': 'lightwood.encoders.categorical.categorical'
}
]
}
data = {'x': [i for i in range(10)], 'y': [random.randint(i, i + 20) for i in range(10)]}
nums = [data['x'][i] * data['y'][i] for i in range(10)]
data['z'] = [i + 0.5 for i in range(10)]
data_frame = pandas.DataFrame(data)
print(data_frame)
ds = DataSource(data_frame, config)
input_ds_for_prediction = DataSource(data_frame[['x', 'y']], config)
mixer = SkLearnMixer(input_column_names=['x', 'y'], output_column_names=['z'])
for i in mixer.iter_fit(ds):
print('training')
predictions = mixer.predict(input_ds_for_prediction, ['z'])
print(predictions)
#########################################
# Multiple Target variables #
# Test Case 3 #
#########################################
config = {
'name': 'test',
{
'name': 'z`',
'type': 'categorical'
}
]
}
config = predictor_config_schema.validate(config)
data = {'x': [i for i in range(10)], 'y': [random.randint(i, i + 20) for i in range(10)]}
nums = [data['x'][i] * data['y'][i] for i in range(10)]
data['z'] = [i + 0.5 for i in range(10)]
data['z`'] = ['low' if i < 50 else 'high' for i in nums]
data_frame = pandas.DataFrame(data)
ds = DataSource(data_frame, config)
ds.prepare_encoders()
mixer = NnMixer({}, config)
mixer.fit(ds,ds, stop_training_after_seconds=50)
predict_input_ds = DataSource(data_frame[['x', 'y']], config)
predict_input_ds.prepare_encoders()
predictions = mixer.predict(predict_input_ds)
print(predictions)
# 'encoder_path': 'lightwood.encoders.categorical.categorical'
}
]
}
##For Classification
data = {'x': [i for i in range(10)], 'y': [random.randint(i, i + 20) for i in range(10)]}
nums = [data['x'][i] * data['y'][i] for i in range(10)]
data['z'] = ['low' if i < 50 else 'high' for i in nums]
data_frame = pandas.DataFrame(data)
# print(data_frame)
ds = DataSource(data_frame, config)
predict_input_ds = DataSource(data_frame[['x', 'y']], config)
####################
mixer = NnMixer(input_column_names=['x', 'y'], output_column_names=['z'])
data_encoded = mixer.fit(ds)
predictions = mixer.predict(predict_input_ds)
print(predictions)
##For Regression
# GENERATE DATA
###############
config = {
'input_features': [
def predict(self, when_data=None, when=None):
"""
Predict given when conditions
:param when_data: a dataframe
:param when: a dictionary
:return: a complete dataframe
"""
if when is not None:
when_dict = {key: [when[key]] for key in when}
when_data = pandas.DataFrame(when_dict)
when_data_ds = DataSource(when_data, self.config)
when_data_ds.encoders = self._mixer.encoders
main_mixer_predictions = self._mixer.predict(when_data_ds)
if CONFIG.HELPER_MIXERS and self.has_boosting_mixer:
for output_column in main_mixer_predictions:
if self._helper_mixers is not None and output_column in self._helper_mixers:
if (self._helper_mixers[output_column]['accuracy'] > 1.00 * self.train_accuracy[output_column]['value']) or CONFIG.FORCE_HELPER_MIXERS:
helper_mixer_predictions = self._helper_mixers[output_column]['model'].predict(when_data_ds, [output_column])
main_mixer_predictions[output_column] = helper_mixer_predictions[output_column]
return main_mixer_predictions
def calculate_accuracy(self, from_data):
"""
calculates the accuracy of the model
:param from_data:a dataframe
:return accuracies: dictionaries of accuracies
"""
if self._mixer is None:
logging.error("Please train the model before calculating accuracy")
return
ds = from_data if isinstance(from_data, DataSource) else DataSource(from_data, self.config)
predictions = self._mixer.predict(ds, include_extra_data=True)
accuracies = {}
for output_column in self._output_columns:
real = list(map(str,ds.get_column_original_data(output_column)))
predicted = list(map(str,predictions[output_column]['predictions']))
weight_map = None
if 'weights' in ds.get_column_config(output_column):
weight_map = ds.get_column_config(output_column)['weights']
accuracy = self.apply_accuracy_function(ds.get_column_config(output_column)['type'], real, predicted,weight_map=weight_map)
if ds.get_column_config(output_column)['type'] in (COLUMN_DATA_TYPES.NUMERIC):
ds.encoders[output_column].decode_log = True
def extractRandomSubset(self, percentage):
np.random.seed(int(round(percentage * 100000)))
msk = np.random.rand(len(self.data_frame)) < (1 - percentage)
test_df = self.data_frame[~msk]
self.data_frame = self.data_frame[msk]
# clear caches
self._clear_cache()
ds = DataSource(test_df, self.configuration)
ds.encoders = self.encoders
ds.transformer = self.transformer
return ds
'output_features': [{'name': col, 'type': type_map(col)} for col in self._output_columns]
}
self.config = predictor_config_schema.validate(self.config)
logging.info('Automatically generated a configuration')
logging.info(self.config)
else:
self._output_columns = [col['name'] for col in self.config['output_features']]
self._input_columns = [col['name'] for col in self.config['input_features']]
if stop_training_after_seconds is None:
stop_training_after_seconds = round(from_data.shape[0] * from_data.shape[1] / 5)
if stop_model_building_after_seconds is None:
stop_model_building_after_seconds = stop_training_after_seconds * 3
from_data_ds = DataSource(from_data, self.config)
if test_data is not None:
test_data_ds = DataSource(test_data, self.config)
else:
test_data_ds = from_data_ds.extractRandomSubset(0.1)
from_data_ds.training = True
mixer_class = NnMixer
mixer_params = {}
if 'mixer' in self.config:
if 'class' in self.config['mixer']:
mixer_class = self.config['mixer']['class']
if 'attrs' in self.config['mixer']:
mixer_params = self.config['mixer']['attrs']