Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from tfx.orchestration.config import pipeline_config
from tfx.orchestration.kubeflow import base_component
from tfx.orchestration.kubeflow import utils
from tfx.orchestration.kubeflow.proto import kubeflow_pb2
from tfx.orchestration.launcher import base_component_launcher
from tfx.orchestration.launcher import in_process_component_launcher
from tfx.orchestration.launcher import kubernetes_component_launcher
from tfx.utils import json_utils
# OpFunc represents the type of a function that takes as input a
# dsl.ContainerOp and returns the same object. Common operations such as adding
# k8s secrets, mounting volumes, specifying the use of TPUs and so on can be
# specified as an OpFunc.
# See example usage here:
# https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/gcp.py
OpFunc = Callable[[dsl.ContainerOp], dsl.ContainerOp]
# Default secret name for GCP credentials. This secret is installed as part of
# a typical Kubeflow installation when the component is GKE.
_KUBEFLOW_GCP_SECRET_NAME = 'user-gcp-sa'
# Default TFX container image to use in KubeflowDagRunner.
_KUBEFLOW_TFX_IMAGE = 'tensorflow/tfx:%s' % (version.__version__)
def _mount_config_map_op(config_map_name: Text) -> OpFunc:
"""Mounts all key-value pairs found in the named Kubernetes ConfigMap.
All key-value pairs in the ConfigMap are mounted as environment variables.
Args:
config_map_name: The name of the ConfigMap resource.
file_outputs={"model_version": "/model_version.txt"},
arguments=[
"--data-path", sample.outputs["data_path"],
"--model-name", model_name,
"--models-path", train.outputs["model_path"],
"--accuracy", train.outputs["accuracy"],
"--hydrosphere-address", hydrosphere_address,
"--learning-rate", learning_rate,
"--epochs", epochs,
"--batch-size", batch_size,
]
).apply(use_aws_secret())
release.after(train)
# 4. Deploy model to stage application
deploy_to_stage = dsl.ContainerOp(
name="deploy_to_stage",
image="hydrosphere/mnist-pipeline-deploy-to-stage:v1", # <-- Replace with correct docker image
arguments=[
"--model-version", release.outputs["model_version"],
"--hydrosphere-address", hydrosphere_address,
"--model-name", model_name,
],
).apply(use_aws_secret())
deploy_to_stage.after(release)
# 5. Test the model via stage application
test = dsl.ContainerOp(
name="test",
image="hydrosphere/mnist-pipeline-test:v1", # <-- Replace with correct docker image
arguments=[
"--data-path", sample.outputs["data_path"],
'--beam_pipeline_args',
json.dumps(pipeline.beam_pipeline_args),
'--additional_pipeline_args',
json.dumps(pipeline.additional_pipeline_args),
'--component_launcher_class_path',
component_launcher_class_path,
'--serialized_component',
serialized_component,
'--component_config',
json_utils.dumps(component_config),
]
if pipeline.enable_cache:
arguments.append('--enable_cache')
self.container_op = dsl.ContainerOp(
name=component.id.replace('.', '_'),
command=_COMMAND,
image=tfx_image,
arguments=arguments,
output_artifact_paths={
'mlpipeline-ui-metadata': '/mlpipeline-ui-metadata.json',
},
)
absl.logging.info('Adding upstream dependencies for component {}'.format(
self.container_op.name))
for op in depends_on:
absl.logging.info(' -> Component: {}'.format(op.name))
self.container_op.after(op)
# TODO(b/140172100): Document the use of additional_pipeline_args.
def immediate_value_pipeline():
# "url" is a pipeline parameter with value being hard coded.
# It is useful in case for some component you want to hard code a parameter instead
# of exposing it as a pipeline parameter.
url=dsl.PipelineParam(name='url', value='gs://ml-pipeline-playground/shakespeare1.txt')
op1 = dsl.ContainerOp(
name='download',
image='google/cloud-sdk:216.0.0',
command=['sh', '-c'],
arguments=['gsutil cat %s | tee /tmp/results.txt' % url],
file_outputs={'downloaded': '/tmp/results.txt'})
op2 = dsl.ContainerOp(
name='echo',
image='library/bash:4.4.23',
command=['sh', '-c'],
arguments=['echo %s' % op1.output])
def tf_slim_optimize(
model_name='resnet_v1_50',
num_classes=1000,
checkpoint_url='http://download.tensorflow.org/models/resnet_v1_50_2016_08_28.tar.gz',
batch_size=1,
export_dir='/tmp/export',
generated_model_dir='gs://your-bucket/folder',
mo_options='--saved_model_dir .',
input_numpy_file='gs://intelai_public_models/images/imgs.npy',
label_numpy_file='gs://intelai_public_models/images/lbs.npy'
):
slim = dsl.ContainerOp(
name='Create_model',
image='gcr.io/constant-cubist-173123/inference_server/ml_slim:6',
command=['python', 'slim_model.py'],
arguments=[
'--model_name', model_name,
'--batch_size', batch_size,
'--checkpoint_url', checkpoint_url,
'--num_classes', num_classes,
'--saved_model_dir', generated_model_dir,
'--export_dir', export_dir],
file_outputs={'generated-model-dir': '/tmp/saved_model_dir.txt'})
mo = dsl.ContainerOp(
name='Optimize_model',
image='gcr.io/constant-cubist-173123/inference_server/ml_mo:12',
command=['convert_model.py'],
def pipeline():
op0 = dsl.ContainerOp(
name="my-out-cop0",
image='python:alpine3.6',
command=["sh", "-c"],
arguments=[
'python -c "import json; import sys; json.dump([i for i in range(20, 31)], open(\'/tmp/out.json\', \'w\'))"'],
file_outputs={'out': '/tmp/out.json'},
)
with dsl.ParallelFor(op0.output) as item:
op1 = dsl.ContainerOp(
name="my-in-cop1",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo do output op1 item: %s" % item],
)
def echo1_op(text1):
return dsl.ContainerOp(
name='echo1',
image='library/bash:4.4.23',
command=['sh', '-c'],
arguments=['echo "$0"', text1])
@dsl.pipeline(
name='Retry random failures',
description='The pipeline includes two steps which fail randomly. It shows how to use ContainerOp(...).set_retry(...).'
)
def retry_sample_pipeline():
op1 = random_failure_op('0,1,2,3').set_retry(10)
op2 = random_failure_op('0,1').set_retry(5)
def xcp_op(src, dst, f='', recursive=False, mtime='', log_level='info', minsize=0, maxsize=0):
"""Parallel cloud copy."""
from kfp import dsl
args = [
# '-f', f,
# '-t', mtime,
# '-m', maxsize,
# '-n', minsize,
# '-v', log_level,
src, dst,
]
if recursive:
args = ['-r'] + args
return dsl.ContainerOp(
name='xcp',
image='yhaviv/invoke',
command=['xcp'],
arguments=args,
)
@dsl.pipeline(
name='pipeline to run jobs',
description='shows how to run pipeline jobs.'
)
def sample_pipeline(learning_rate='0.01',
dropout='0.9',
model_version='1',
commit='f097575656f927d86d99dd64931042e1a9003cb2'):
"""A pipeline for end to end machine learning workflow."""
data=["user-susan:/training"]
gpus=1
# 1. prepare data
prepare_data = arena.standalone_job_op(
name="prepare-data",
image="byrnedo/alpine-curl",
data=data,