Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
try:
with open(DEFAULT_CONFIG, 'r') as f:
raw_args = yaml.safe_load(f)
except yaml.YAMLError as yamlerr:
raise RuntimeError('Illegal default config:{}'.format(yamlerr))
except OSError as ose:
raise FileExistsError('Default config not found:{}'.format(ose))
else:
test_timeout = raw_args['test_timeout']
if self._run_pipeline:
experiment = self._experiment_name
###### Initialization ######
host = 'ml-pipeline.%s.svc.cluster.local:8888' % self._namespace
client = Client(host=host)
###### Get experiments ######
experiment_id = client.get_experiment(experiment_name=experiment).id
###### Get runs ######
list_runs_response = client.list_runs(page_size=RUN_LIST_PAGE_SIZE,
experiment_id=experiment_id)
###### Check all runs ######
for run in list_runs_response.runs:
run_id = run.id
response = client.wait_for_run_completion(run_id, test_timeout)
succ = (response.run.status.lower()=='succeeded')
utils.add_junit_test(test_cases, 'job completion',
succ, 'waiting for job completion failure')
FLAGS, unparsed = parser.parse_known_args()
model_version = FLAGS.model_version
dropout = FLAGS.dropout
learning_rate = FLAGS.learning_rate
commit = FLAGS.commit
arguments = {
'learning_rate': learning_rate,
'dropout': dropout,
'model_version': model_version,
'commit': commit,
}
KFP_SERVICE="ml-pipeline.kubeflow.svc.cluster.local:8888"
client = kfp.Client(host=KFP_SERVICE)
client.create_run_from_pipeline_func(sample_pipeline, arguments=arguments)
def __init__(self, flags_dict: Dict[Text, Any]):
"""Initialize Kubeflow handler.
Args:
flags_dict: A dictionary with flags provided in a command.
"""
super(KubeflowHandler, self).__init__(flags_dict)
# TODO(b/132286477): Change to setup config instead of flags if needed.
if labels.NAMESPACE in self.flags_dict:
self._client = kfp.Client(
host=self.flags_dict[labels.ENDPOINT],
client_id=self.flags_dict[labels.IAP_CLIENT_ID],
namespace=self.flags_dict[labels.NAMESPACE])
else:
self._client = None
def get_pipeline(run_id, namespace: str = Query(config.namespace)):
client = kfclient(namespace=namespace)
try:
run = client.get_run(run_id)
if run:
run = run.to_dict()
except Exception as e:
log_and_raise(
HTTPStatus.INTERNAL_SERVER_ERROR, reason="get kfp error: {}".format(e)
)
return run
def deploy_pipeline_to_kfp(self):
import kfp.compiler as compiler
import kfp
# import the generated pipeline code
# add temp folder to PYTHONPATH
sys.path.append(self.temp_dirdirpath)
from pipeline_code import auto_generated_pipeline
pipeline_filename = self.pipeline_name + '.pipeline.tar.gz'
compiler.Compiler().compile(auto_generated_pipeline, pipeline_filename)
# Get or create an experiment and submit a pipeline run
client = kfp.Client(host=self.kfp_url)
list_experiments_response = client.list_experiments()
experiments = list_experiments_response.experiments
print(experiments)
if not experiments:
# The user does not have any experiments available. Creating a new one
experiment = client.create_experiment(self.pipeline_name + ' experiment')
else:
experiment = experiments[-1] # Using the last experiment
# Submit a pipeline run
run_name = self.pipeline_name + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, {})
print(run_result)
# Parse arguments
parser = argparse.ArgumentParser()
parser.add_argument(
'-f', '--file', help='Compiled pipeline file [.tar.gz, .yaml, .zip]', required=True)
parser.add_argument(
'-e', '--experiment', help='Experiment name to run pipeline on', default='Default')
parser.add_argument(
'-r', '--run-name', help="Run name", default=None)
parser.add_argument(
'-k', '--kubeflow', help="Host, where Kubeflow instance is running", required=True)
args = parser.parse_args()
# Create client
client = kfp.Client(args.kubeflow)
run_name = 'mnist_' + namesgenerator.get_random_name() if not args.run_name else args.run_name
try:
experiment_id = client.get_experiment(experiment_name=args.experiment).id
except:
experiment_id = client.create_experiment(args.experiment).id
# Submit a pipeline run
result = client.run_pipeline(experiment_id, run_name, args.file, params={"drift_detector_steps": "500"})
print(result)
ctype = ".zip"
else:
log_and_raise(
HTTPStatus.BAD_REQUEST, reason="unsupported pipeline type {}".format(ctype)
)
logger.info("writing file {}".format(ctype))
print(str(data))
pipe_tmp = tempfile.mktemp(suffix=ctype)
with open(pipe_tmp, "wb") as fp:
fp.write(data)
run = None
try:
client = kfclient(namespace=namespace)
experiment = client.create_experiment(name=experiment_name)
run = client.run_pipeline(experiment.id, run_name, pipe_tmp, params=arguments)
except Exception as e:
remove(pipe_tmp)
log_and_raise(HTTPStatus.BAD_REQUEST, reason="kfp err: {}".format(e))
remove(pipe_tmp)
return run
raise ValueError(
'run pipeline require access to remote api-service'
', please set the dbpath url'
)
id = mldb.submit_pipeline(
pipeline,
arguments,
experiment=experiment,
run=run,
namespace=namespace,
ops=ops,
artifact_path=artifact_path,
)
else:
client = Client(namespace=namespace)
if isinstance(pipeline, str):
experiment = client.create_experiment(name=experiment)
run_result = client.run_pipeline(
experiment.id, run, pipeline, params=arguments
)
else:
conf = new_pipe_meta(artifact_path, ttl, ops)
run_result = client.create_run_from_pipeline_func(
pipeline,
arguments,
run_name=run,
experiment_name=experiment,
pipeline_conf=conf,
)
id = run_result.run_id
def list_piplines(
full=False,
page_token='',
page_size=10,
sort_by='',
experiment_id=None,
namespace=None,
):
"""List pipelines"""
namespace = namespace or mlconf.namespace
client = Client(namespace=namespace)
resp = client._run_api.list_runs(
page_token=page_token, page_size=page_size, sort_by=sort_by
)
runs = resp.runs
if not full and runs:
runs = []
for run in resp.runs:
runs.append(
{
k: str(v)
for k, v in run.to_dict().items()
if k
in [
'id',
'name',
'status',