Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_keep_numbers(self):
from lale.datasets.data_schemas import to_schema
from lale.lib.lale import Project
train_X, train_y = self._creditG['X'], self._creditG['y']
trainable = Project(columns={'type': 'number'})
trained = trainable.fit(train_X)
transformed = trained.transform(train_X)
transformed_schema = to_schema(transformed)
transformed_expected = {
'type': 'array', 'minItems': 670, 'maxItems': 670,
'items': {
'type': 'array', 'minItems': 7, 'maxItems': 7,
'items': [
{'description': 'duration', 'type': 'number'},
{'description': 'credit_amount', 'type': 'number'},
{'description': 'installment_commitment', 'type': 'number'},
{'description': 'residence_since', 'type': 'number'},
{'description': 'age', 'type': 'number'},
{'description': 'existing_credits', 'type': 'number'},
{'description': 'num_dependents', 'type': 'number'}]}}
self.maxDiff = None
self.assertEqual(transformed_schema, transformed_expected)
def test_arff_to_schema(self):
from lale.datasets.data_schemas import to_schema
from lale.type_checking import validate_schema
train_X, train_y = self._creditG['X'], self._creditG['y']
assert hasattr(train_X, 'json_schema')
train_X_schema = to_schema(train_X)
validate_schema(train_X, train_X_schema, subsample_array=False)
assert hasattr(train_y, 'json_schema')
train_y_schema = to_schema(train_y)
validate_schema(train_y, train_y_schema, subsample_array=False)
train_X_expected = {
'type': 'array', 'minItems': 670, 'maxItems': 670,
'items': {
'type': 'array', 'minItems': 20, 'maxItems': 20,
'items': [
{'description': 'checking_status', 'enum': [
'<0', '0<=X<200', '>=200', 'no checking']},
{'description': 'duration', 'type': 'number'},
{'description': 'credit_history', 'enum': [
'no credits/all paid', 'all paid',
'existing paid', 'delayed previously',
'critical/other existing credit']},
def test_transform_schema_Concat_irisDf(self):
from lale.datasets.data_schemas import to_schema
data_X, data_y = self._irisDf['X'], self._irisDf['y']
s_in_X, s_in_y = to_schema(data_X), to_schema(data_y)
def check(s_actual, n_expected, s_expected):
assert s_actual['items']['minItems'] == n_expected, str(s_actual)
assert s_actual['items']['maxItems'] == n_expected, str(s_actual)
assert s_actual['items']['items'] == s_expected, str(s_actual)
s_out_X = ConcatFeatures.transform_schema({'items': [s_in_X]})
check(s_out_X, 4, {'type': 'number'})
s_out_y = ConcatFeatures.transform_schema({'items': [s_in_y]})
check(s_out_y, 1, {'description': 'target', 'type': 'integer'})
s_out_XX = ConcatFeatures.transform_schema({'items': [s_in_X, s_in_X]})
check(s_out_XX, 8, {'type': 'number'})
s_out_yy = ConcatFeatures.transform_schema({'items': [s_in_y, s_in_y]})
check(s_out_yy, 2, {'type': 'integer'})
s_out_Xy = ConcatFeatures.transform_schema({'items': [s_in_X, s_in_y]})
check(s_out_Xy, 5, {'type': 'number'})
s_out_XXX = ConcatFeatures.transform_schema({
'items': [s_in_X, s_in_X, s_in_X]})
def test_transform_schema_higher_order(self):
from lale.datasets.data_schemas import to_schema
inner = LogisticRegression
outer = IdentityWrapper(op=LogisticRegression)
input_schema = to_schema(self._digits['X'])
transformed_inner = inner.transform_schema(input_schema)
transformed_outer = outer.transform_schema(input_schema)
self.maxDiff = None
self.assertEqual(transformed_inner, transformed_outer)
def fit(self, X, y=None):
columns = self._hyperparams['columns']
if lale.type_checking.is_schema(columns):
s_all = lale.datasets.data_schemas.to_schema(X)
s_row = s_all['items']
n_columns = s_row['minItems']
assert n_columns == s_row['maxItems']
s_cols = s_row['items']
if isinstance(s_cols, dict):
if lale.type_checking.is_subschema(s_cols, columns):
columns = [*range(n_columns)]
else:
columns = []
else:
assert isinstance(s_cols, list)
columns = [
i for i in range(n_columns)
if lale.type_checking.is_subschema(s_cols[i], columns)]
self._col_tfm = sklearn.compose.ColumnTransformer(
transformers=[('keep', 'passthrough', columns)])
def _transform_schema_col_tfm(self, s_X, col_tfm):
s_X = lale.datasets.data_schemas.to_schema(s_X)
s_row = s_X['items']
s_cols = s_row['items']
keep_cols = [col
for name, tfm, cols in col_tfm.transformers_
if tfm == 'passthrough'
for col in cols]
n_columns = len(keep_cols)
if isinstance(s_cols, dict):
s_cols_result = s_cols
else:
name2i = {s_cols[i]['description']: i for i in range(len(s_cols))}
keep_cols_i = [name2i[col] if isinstance(col, str) else col
for col in keep_cols]
s_cols_result = [s_cols[i] for i in keep_cols_i]
s_result = {
**s_X,
def combine_schemas(schemas):
n_datasets = len(schemas)
if n_datasets == 1:
result = schemas[0]
else:
result = {
'type': 'array',
'minItems': n_datasets, 'maxItems': n_datasets,
'items': [lale.datasets.data_schemas.to_schema(i)
for i in schemas]}
return result
outputs = { }