Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# This pipeline automatically injects the Kubeflow TFX image if the
# environment variable 'KUBEFLOW_TFX_IMAGE' is defined. Currently, the tfx
# cli tool exports the environment variable to pass to the pipelines.
tfx_image = os.environ.get('KUBEFLOW_TFX_IMAGE', None)
runner_config = kubeflow_dag_runner.KubeflowDagRunnerConfig(
kubeflow_metadata_config=metadata_config,
# Specify custom docker image to use.
tfx_image=tfx_image,
pipeline_operator_funcs=(
# If running on K8s Engine (GKE) on Google Cloud Platform (GCP),
# kubeflow_dag_runner.get_default_pipeline_operator_funcs() provides
# default configurations specifically for GKE on GCP, such as secrets.
[
onprem.mount_pvc(_persistent_volume_claim, _persistent_volume,
_persistent_volume_mount)
]))
kubeflow_dag_runner.KubeflowDagRunner(config=runner_config).run(
_create_pipeline(
pipeline_name=_pipeline_name,
pipeline_root=_pipeline_root,
data_root=_data_root,
module_file=_module_file,
serving_model_dir=_serving_model_dir,
# 0 means auto-detect based on on the number of CPUs available during
# execution time.
direct_num_workers=0))
tf_server_name = 'taxi-cab-classification-model-{{workflow.uid}}'
if platform != 'GCP':
vop = dsl.VolumeOp(
name="create_pvc",
resource_name="pipeline-pvc",
modes=dsl.VOLUME_MODE_RWM,
size="1Gi"
)
checkout = dsl.ContainerOp(
name="checkout",
image="alpine/git:latest",
command=["git", "clone", "https://github.com/kubeflow/pipelines.git", str(output) + "/pipelines"],
).apply(onprem.mount_pvc(vop.outputs["name"], 'local-storage', output))
checkout.after(vop)
validation = dataflow_tf_data_validation_op(
inference_data=train,
validation_data=evaluation,
column_names=column_names,
key_columns=key_columns,
gcp_project=project,
run_mode=mode,
validation_output=output_template,
)
if platform != 'GCP':
validation.after(checkout)
preprocess = dataflow_tf_transform_op(
training_data_file_pattern=train,
else:
deploy = kubeflow_deploy_op(
cluster_name=project,
model_dir=str(training.output) + '/export/export',
pvc_name='users-pvc',
# pvc_name=vop.outputs["name"],
server_name=tf_server_name,
service_type='NodePort',
)
steps = [validation, preprocess, training, analysis, prediction, cm, roc, deploy]
for step in steps:
if platform == 'GCP':
step.apply(gcp.use_gcp_secret('user-gcp-sa'))
else:
step.apply(onprem.mount_pvc('users-pvc', 'local-storage', output))
# step.apply(onprem.mount_pvc(vop.outputs["name"], 'local-storage', output))
# KF Pipelines using Kubeflow. If installing KF Pipelines using the
# lightweight deployment option, you may need to override the defaults.
metadata_config = kubeflow_dag_runner.get_default_kubeflow_metadata_config()
# This pipeline automatically injects the Kubeflow TFX image if the
# environment variable 'KUBEFLOW_TFX_IMAGE' is defined. Currently, the tfx
# cli tool exports the environment variable to pass to the pipelines.
tfx_image = os.environ.get('KUBEFLOW_TFX_IMAGE', None)
runner_config = kubeflow_dag_runner.KubeflowDagRunnerConfig(
kubeflow_metadata_config=metadata_config,
# Specify custom docker image to use.
tfx_image=tfx_image,
pipeline_operator_funcs=(
kubeflow_dag_runner.get_default_pipeline_operator_funcs() + [
onprem.mount_pvc(_persistent_volume_claim, _persistent_volume,
_persistent_volume_mount)
]))
kubeflow_dag_runner.KubeflowDagRunner(config=runner_config).run(
_create_pipeline(
pipeline_name=_pipeline_name,
pipeline_root=_pipeline_root,
data_root=_data_root,
module_file=_module_file,
serving_model_dir=_serving_model_dir,
direct_num_workers=_beam_num_workers))
else:
deploy = kubeflow_deploy_op(
cluster_name=project,
model_dir=str(training.output) + '/export/export',
pvc_name='users-pvc',
# pvc_name=vop.outputs["name"],
server_name=tf_server_name,
service_type='NodePort',
)
steps = [validation, preprocess, training, analysis, prediction, cm, roc, deploy]
for step in steps:
if platform == 'GCP':
step.apply(gcp.use_gcp_secret('user-gcp-sa'))
else:
step.apply(onprem.mount_pvc('users-pvc', 'local-storage', output))
# step.apply(onprem.mount_pvc(vop.outputs["name"], 'local-storage', output))