Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
]
) # build a sklearn classifier
for task_id in benchmark_suite.tasks[:1]: # iterate over all tasks
task = openml.tasks.get_task(task_id) # download the OpenML task
X, y = task.get_X_and_y() # get the data (not used in this example)
openml.config.apikey = openml.config.apikey # set the OpenML Api Key
run = openml.runs.run_model_on_task(
clf, task, avoid_duplicate_runs=False
) # run classifier on splits (requires API key)
score = run.get_metric_fn(
sklearn.metrics.accuracy_score
) # print accuracy score
print('Data set: %s; Accuracy: %0.2f' % (task.get_dataset().name, score.mean()))
run.publish() # publish the experiment on OpenML (optional)
TestBase._mark_entity_for_removal('run', run.run_id)
TestBase.logger.info("collected from {}: {}".format(__file__.split('/')[-1],
run.run_id))
print('URL for run: %s/run/%d' % (openml.config.server, run.run_id))
__version__ = 0.1
class Model(sklearn.base.BaseEstimator):
def __init__(self, boolean, integer, floating_point_value):
self.boolean = boolean
self.integer = integer
self.floating_point_value = floating_point_value
def fit(self, X, y):
pass
class TestSklearnExtensionFlowFunctions(TestBase):
# Splitting not helpful, these test's don't rely on the server and take less
# than 1 seconds
def setUp(self):
super().setUp(n_levels=2)
iris = sklearn.datasets.load_iris()
self.X = iris.data
self.y = iris.target
self.extension = SklearnExtension()
def test_serialize_model(self):
with mock.patch.object(self.extension, '_check_dependencies') as check_dependencies_mock:
model = sklearn.tree.DecisionTreeClassifier(criterion='entropy',
max_features='auto',
max_leaf_nodes=2000)
'sklearn.pipeline.Pipeline',
'sklearn.linear_model.base.LinearRegression',
]
def _remove_random_state(flow):
if 'random_state' in flow.parameters:
del flow.parameters['random_state']
for component in flow.components.values():
_remove_random_state(component)
flow = self.extension.model_to_flow(clf)
flow, _ = self._add_sentinel_to_flow_name(flow, sentinel)
if not openml.flows.flow_exists(flow.name, flow.external_version):
flow.publish()
TestBase._mark_entity_for_removal('flow', (flow.flow_id, flow.name))
TestBase.logger.info("collected from test_run_functions: {}".format(flow.flow_id))
task = openml.tasks.get_task(task_id)
X, y = task.get_X_and_y()
self.assertEqual(np.count_nonzero(np.isnan(X)), n_missing_vals)
run = openml.runs.run_flow_on_task(
flow=flow,
task=task,
seed=seed,
avoid_duplicate_runs=openml.config.avoid_duplicate_runs,
)
run_ = run.publish()
TestBase._mark_entity_for_removal('run', run.run_id)
TestBase.logger.info("collected from test_run_functions: {}".format(run.run_id))
self.assertEqual(run_, run)
self.assertIsInstance(run.dataset_id, int)
# License: BSD 3-Clause
import openml
import openml.study
from openml.testing import TestBase
import pandas as pd
class TestStudyFunctions(TestBase):
_multiprocess_can_split_ = True
def test_get_study_old(self):
openml.config.server = self.production_server
study = openml.study.get_study(34)
self.assertEqual(len(study.data), 105)
self.assertEqual(len(study.tasks), 105)
self.assertEqual(len(study.flows), 27)
self.assertEqual(len(study.setups), 30)
self.assertIsNone(study.runs)
def test_get_study_new(self):
openml.config.server = self.production_server
study = openml.study.get_study(123)
def _delete_entity_from_tracker(self, entity_type, entity):
""" Deletes entity records from the static file_tracker
Given an entity type and corresponding ID, deletes all entries, including
duplicate entries of the ID for the entity type.
"""
if entity_type in TestBase.publish_tracker:
# removes duplicate entries
TestBase.publish_tracker[entity_type] = list(set(TestBase.publish_tracker[entity_type]))
if entity_type == 'flow':
delete_index = [i for i, (id_, _) in
enumerate(TestBase.publish_tracker[entity_type])
if id_ == entity][0]
else:
delete_index = [i for i, id_ in
enumerate(TestBase.publish_tracker[entity_type])
if id_ == entity][0]
TestBase.publish_tracker[entity_type].pop(delete_index)
from openml.datasets.functions import (create_dataset,
attributes_arff_from_df,
_get_cached_dataset,
_get_cached_dataset_features,
_get_cached_dataset_qualities,
_get_cached_datasets,
_get_dataset_arff,
_get_dataset_description,
_get_dataset_features,
_get_dataset_qualities,
_get_online_dataset_arff,
_get_online_dataset_format,
DATASETS_CACHE_DIR_NAME)
class TestOpenMLDataset(TestBase):
_multiprocess_can_split_ = True
def setUp(self):
super(TestOpenMLDataset, self).setUp()
def tearDown(self):
self._remove_pickle_files()
super(TestOpenMLDataset, self).tearDown()
def _remove_pickle_files(self):
self.lock_path = os.path.join(openml.config.get_cache_directory(), 'locks')
for did in ['-1', '2']:
with lockutils.external_lock(
name='datasets.functions.get_dataset:%s' % did,
lock_path=self.lock_path,
):
from openml.testing import TestBase
import numpy as np
import openml
import sys
if sys.version_info[0] >= 3:
from unittest import mock
else:
import mock
class OpenMLTaskTest(TestBase):
_multiprocess_can_split_ = True
_batch_size = 25
def mocked_perform_api_call(call, request_method):
# TODO: JvR: Why is this not a staticmethod?
url = openml.config.server + '/' + call
return openml._api_calls._download_text_file(url)
def test_list_all(self):
openml.utils._list_all(listing_call=openml.tasks.functions._list_tasks)
@mock.patch('openml._api_calls._perform_api_call',
side_effect=mocked_perform_api_call)
def test_list_all_few_results_available(self, _perform_api_call):
# we want to make sure that the number of api calls is only 1.
# Although we have multiple versions of the iris dataset, there is only
# License: BSD 3-Clause
import inspect
import os
import numpy as np
from openml import OpenMLSplit
from openml.testing import TestBase
class OpenMLSplitTest(TestBase):
# Splitting not helpful, these test's don't rely on the server and take less
# than 5 seconds + rebuilding the test would potentially be costly
def setUp(self):
__file__ = inspect.getfile(OpenMLSplitTest)
self.directory = os.path.dirname(__file__)
# This is for dataset
self.arff_filename = os.path.join(
self.directory, "..", "files", "org", "openml", "test",
"tasks", "1882", "datasplits.arff"
)
self.pd_filename = self.arff_filename.replace(".arff", ".pkl.py3")
def tearDown(self):
try:
os.remove(self.pd_filename)
def test_numpy_type_allowed_in_flow(self):
""" Simple numpy types should be serializable. """
dt = sklearn.tree.DecisionTreeClassifier(
max_depth=np.float64(3.0),
min_samples_leaf=np.int32(5)
)
self.extension.model_to_flow(dt)
def test_numpy_array_not_allowed_in_flow(self):
""" Simple numpy arrays should not be serializable. """
bin = sklearn.preprocessing.MultiLabelBinarizer(classes=np.asarray([1, 2, 3]))
with self.assertRaises(TypeError):
self.extension.model_to_flow(bin)
class TestSklearnExtensionRunFunctions(TestBase):
_multiprocess_can_split_ = True
def setUp(self):
super().setUp(n_levels=2)
self.extension = SklearnExtension()
################################################################################################
# Test methods for performing runs with this extension module
def test_run_model_on_task(self):
class MyPipe(sklearn.pipeline.Pipeline):
pass
task = openml.tasks.get_task(1)
pipe = MyPipe([('imp', SimpleImputer()),
('dummy', sklearn.dummy.DummyClassifier())])
openml.runs.run_model_on_task(pipe, task)
from collections import OrderedDict
import copy
import unittest
from distutils.version import LooseVersion
import sklearn
from sklearn import ensemble
import pandas as pd
import openml
from openml.testing import TestBase
import openml.extensions.sklearn
class TestFlowFunctions(TestBase):
_multiprocess_can_split_ = True
def setUp(self):
super(TestFlowFunctions, self).setUp()
def tearDown(self):
super(TestFlowFunctions, self).tearDown()
def _check_flow(self, flow):
self.assertEqual(type(flow), dict)
self.assertEqual(len(flow), 6)
self.assertIsInstance(flow['id'], int)
self.assertIsInstance(flow['name'], str)
self.assertIsInstance(flow['full_name'], str)
self.assertIsInstance(flow['version'], str)
# There are some runs on openml.org that can have an empty external version