Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from lale.operators import make_pipeline
pca = PCA(copy=False)
tam = TAM(tans_class=pca, name='pca', col_names=['a', 'b', 'c'], col_dtypes=[np.dtype('float32'), np.dtype('float32'), np.dtype('float32')])
lgbm_classifier = LGBMClassifier(class_weight='balanced', learning_rate=0.18)
pipeline = make_pipeline(tam, lgbm_classifier)
expected = \
"""from lale.lib.autoai_libs import TAM
import sklearn.decomposition.pca
import numpy as np
from lightgbm import LGBMClassifier
from lale.operators import make_pipeline
tam = TAM(tans_class=sklearn.decomposition.pca.PCA(copy=False, iterated_power='auto', n_components=None, random_state=None, svd_solver='auto', tol=0.0, whiten=False), name='pca', col_names=['a', 'b', 'c'], col_dtypes=[np.dtype('float32'), np.dtype('float32'), np.dtype('float32')])
lgbm_classifier = LGBMClassifier(class_weight='balanced', learning_rate=0.18)
pipeline = make_pipeline(tam, lgbm_classifier)"""
self._roundtrip(expected, lale.pretty_print.to_string(pipeline, combinators=False))
pipeline = (Scaler | NoOp) >> (pca & Nystroem) >> Concat >> (KNN | lr)
expected = \
"""from lale.lib.sklearn import MinMaxScaler as Scaler
from lale.lib.lale import NoOp
from lale.lib.sklearn import PCA
from lale.lib.sklearn import Nystroem
from lale.lib.lale import ConcatFeatures as Concat
from lale.lib.sklearn import KNeighborsClassifier as KNN
from lale.lib.sklearn import LogisticRegression as LR
import lale
lale.wrap_imported_operators()
pca = PCA(copy=False)
lr = LR(solver='saga', C=0.9)
pipeline = (Scaler | NoOp) >> (pca & Nystroem) >> Concat >> (KNN | lr)"""
self._roundtrip(expected, lale.pretty_print.to_string(pipeline))
def test_higher_order(self):
from lale.lib.lale import Both
from lale.lib.sklearn import PCA
from lale.lib.sklearn import Nystroem
pipeline = Both(op1=PCA(n_components=2), op2=Nystroem)
expected = """from lale.lib.lale import Both
from lale.lib.sklearn import PCA
from lale.lib.sklearn import Nystroem
import lale
lale.wrap_imported_operators()
pca = PCA(n_components=2)
pipeline = Both(op1=pca, op2=Nystroem)"""
self._roundtrip(expected, lale.pretty_print.to_string(pipeline))
pipeline = get_pipeline_of_applicable_type(
steps=[PCA, MMS, KNN, pipeline_0],
edges=[(PCA, KNN), (PCA, pipeline_0), (MMS, pipeline_0)])
expected = \
"""from lale.lib.sklearn import PCA
from lale.lib.sklearn import MinMaxScaler as MMS
from lale.lib.sklearn import KNeighborsClassifier as KNN
from lale.lib.lale import ConcatFeatures as HStack
from lale.lib.sklearn import LogisticRegression as LR
from lale.operators import get_pipeline_of_applicable_type
import lale
lale.wrap_imported_operators()
pipeline_0 = HStack >> LR
pipeline = get_pipeline_of_applicable_type(steps=[PCA, MMS, KNN, pipeline_0], edges=[(PCA,KNN), (PCA,pipeline_0), (MMS,pipeline_0)])"""
self._roundtrip(expected, lale.pretty_print.to_string(pipeline))
def test_higher_order_2(self):
from lale.lib.sklearn import VotingClassifier as Vote
from lale.lib.sklearn import KNeighborsClassifier as KNN
from lale.lib.sklearn import PCA
from lale.lib.sklearn import LogisticRegression as LR
pipeline = Vote(estimators=[('knn',KNN), ('pipeline',PCA()>>LR)],
voting='soft')
expected = """from lale.lib.sklearn import VotingClassifier as Vote
from lale.lib.sklearn import KNeighborsClassifier as KNN
from lale.lib.sklearn import PCA
from lale.lib.sklearn import LogisticRegression as LR
import lale
lale.wrap_imported_operators()
pipeline = Vote(estimators=[('knn', KNN), ('pipeline', PCA() >> LR)], voting='soft')"""
self._roundtrip(expected, lale.pretty_print.to_string(pipeline))
Return the pretty-printed code as a plain old Python string.
- True:
Pretty-print in notebook cell output with syntax highlighting.
- 'input'
Create a new notebook cell with pretty-printed code as input.
Returns
-------
str or None
If called with ipython_display=False, return pretty-printed Python source code as a Python string.
"""
result = lale.pretty_print.to_string(self, show_imports, combinators, call_depth=2)
if ipython_display == False:
return result
elif ipython_display == 'input':
import IPython.core
ipython = IPython.core.getipython.get_ipython()
comment = "# generated by pretty_print(ipython_display='input') from previous cell\n"
ipython.set_next_input(comment+result, replace=False)
else:
assert ipython_display in [True, 'output']
import IPython.display
markdown = IPython.display.Markdown(f'```python\n{result}\n```')
return IPython.display.display(markdown)
if k != k2:
raise ValueError(
'Invalid keyword {} for argument {}.'.format(k2, v2))
else:
v2 = v
hyperparams[k] = v2
#using params_all instead of hyperparams to ensure the construction is consistent with schema
trainable_to_get_params = TrainableIndividualOp(_name=self.name(), _impl=None, _schemas=self._schemas)
trainable_to_get_params._hyperparams = hyperparams
params_all = trainable_to_get_params._get_params_all()
try:
lale.type_checking.validate_schema(params_all, self.hyperparam_schema())
except jsonschema.ValidationError as e_orig:
e = e_orig if e_orig.parent is None else e_orig.parent
lale.type_checking.validate_is_schema(e.schema)
schema = lale.pretty_print.to_string(e.schema)
if [*e.schema_path][:3] == ['allOf', 0, 'properties']:
arg = e.schema_path[3]
reason = f'invalid value {arg}={e.instance}'
schema_path = f'argument {arg}'
elif [*e.schema_path][:3] == ['allOf', 0, 'additionalProperties']:
pref, suff = 'Additional properties are not allowed (', ')'
assert e.message.startswith(pref) and e.message.endswith(suff)
reason = 'argument ' + e.message[len(pref):-len(suff)]
schema_path = 'arguments and their defaults'
schema = self.hyperparam_defaults()
elif e.schema_path[0] == 'allOf' and int(e.schema_path[1]) != 0:
assert e.schema_path[2] == 'anyOf'
descr = e.schema['description']
if descr.endswith('.'):
descr = descr[:-1]
reason = f'constraint {descr[0].lower()}{descr[1:]}'
def add_ranges(min_a, max_a, min_b, max_b):
min_ab = min_a + min_b
if max_a == 'unbounded' or max_b == 'unbounded':
max_ab = 'unbounded'
else:
max_ab = max_a + max_b
return min_ab, max_ab
for s_dataset in s_X['items']:
if s_dataset.get('laleType', None) == 'Any':
return {'laleType': 'Any'}
arr_1d_num = {'type': 'array', 'items': {'type': 'number'}}
arr_2d_num = {'type': 'array', 'items': arr_1d_num}
s_decision_func = {'anyOf': [arr_1d_num, arr_2d_num]}
if lale.type_checking.is_subschema(s_decision_func, s_dataset):
s_dataset = arr_2d_num
assert 'items' in s_dataset, lale.pretty_print.to_string(s_dataset)
s_rows = s_dataset['items']
if 'type' in s_rows and 'array' == s_rows['type']:
s_cols = s_rows['items']
if isinstance(s_cols, dict):
min_c = s_rows['minItems'] if 'minItems' in s_rows else 1
max_c = s_rows['maxItems'] if 'maxItems' in s_rows else 'unbounded'
elem_schema = lale.type_checking.join_schemas(elem_schema, s_cols)
else:
min_c, max_c = len(s_cols), len(s_cols)
for s_col in s_cols:
elem_schema = lale.type_checking.join_schemas(elem_schema, s_col)
min_cols, max_cols = add_ranges(min_cols,max_cols,min_c,max_c)
else:
elem_schema = lale.type_checking.join_schemas(elem_schema, s_rows)
min_cols, max_cols = add_ranges(min_cols, max_cols, 1, 1)
s_result = {