Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def workflow1(
input_handle_eval: dsl.PipelineParam=dsl.PipelineParam(name='input-handle-eval', value='gs://aju-dev-demos-codelabs/KF/taxidata/eval/data.csv'),
input_handle_train: dsl.PipelineParam=dsl.PipelineParam(name='input-handle-train', value='gs://aju-dev-demos-codelabs/KF/taxidata/train/data.csv'),
outfile_prefix_eval: dsl.PipelineParam=dsl.PipelineParam(name='outfile-prefix-eval', value='eval_transformed'),
outfile_prefix_train: dsl.PipelineParam=dsl.PipelineParam(name='outfile-prefix-train', value='train_transformed'),
train_steps: dsl.PipelineParam=dsl.PipelineParam(name='train-steps', value=10000),
project: dsl.PipelineParam=dsl.PipelineParam(name='project', value='YOUR_PROJECT_HERE'),
working_dir: dsl.PipelineParam=dsl.PipelineParam(name='working-dir', value='YOUR_GCS_DIR_HERE'),
tft_setup_file: dsl.PipelineParam=dsl.PipelineParam(name='tft-setup-file', value='/ml/transform/setup.py'),
tfma_setup_file: dsl.PipelineParam=dsl.PipelineParam(name='tfma-setup-file', value='/ml/analysis/setup.py'),
workers: dsl.PipelineParam=dsl.PipelineParam(name='workers', value=1),
pss: dsl.PipelineParam=dsl.PipelineParam(name='pss', value=1),
max_rows: dsl.PipelineParam=dsl.PipelineParam(name='max-rows', value=10000),
ts1: dsl.PipelineParam=dsl.PipelineParam(name='ts1', value=''),
ts2: dsl.PipelineParam=dsl.PipelineParam(name='ts2', value=''),
preprocessing_module1: dsl.PipelineParam=dsl.PipelineParam(name='preprocessing-module1', value='gs://aju-dev-demos-codelabs/KF/taxi-preproc/preprocessing.py'),
preprocessing_module2: dsl.PipelineParam=dsl.PipelineParam(name='preprocessing-module2', value='gs://aju-dev-demos-codelabs/KF/taxi-preproc/preprocessing2.py'),
preprocess_mode: dsl.PipelineParam=dsl.PipelineParam(name='preprocess-mode', value='local'),
tfma_mode: dsl.PipelineParam=dsl.PipelineParam(name='tfma-mode', value='local')):
tfteval = dsl.ContainerOp(
name = 'tft-eval',
image = 'gcr.io/google-samples/ml-pipeline-dataflow-tftbq-taxi',
arguments = [ "--input_handle", input_handle_eval, "--outfile_prefix", outfile_prefix_eval,
"--working_dir", '%s/%s/tft-eval' % (working_dir, '{{workflow.name}}'),
"--project", project,
def gh_summ( #pylint: disable=unused-argument
train_steps: dsl.PipelineParam = dsl.PipelineParam(name='train-steps', value=2019300),
project: dsl.PipelineParam = dsl.PipelineParam(name='project', value='YOUR_PROJECT_HERE'),
github_token: dsl.PipelineParam = dsl.PipelineParam(
name='github-token', value='YOUR_GITHUB_TOKEN_HERE'),
working_dir: dsl.PipelineParam = dsl.PipelineParam(name='working-dir', value='YOUR_GCS_DIR_HERE'),
checkpoint_dir: dsl.PipelineParam = dsl.PipelineParam(
name='checkpoint-dir',
value='gs://aju-dev-demos-codelabs/kubecon/model_output_tbase.bak2019000'),
deploy_webapp: dsl.PipelineParam = dsl.PipelineParam(name='deploy-webapp', value='true'),
data_dir: dsl.PipelineParam = dsl.PipelineParam(
name='data-dir', value='gs://aju-dev-demos-codelabs/kubecon/t2t_data_gh_all/')):
train = dsl.ContainerOp(
name='train',
image='gcr.io/google-samples/ml-pipeline-t2ttrain',
arguments=["--data-dir", data_dir,
"--checkpoint-dir", checkpoint_dir,
def workflow1(
input_handle_eval: dsl.PipelineParam=dsl.PipelineParam(name='input-handle-eval', value='gs://aju-dev-demos-codelabs/KF/taxidata/eval/data.csv'),
input_handle_train: dsl.PipelineParam=dsl.PipelineParam(name='input-handle-train', value='gs://aju-dev-demos-codelabs/KF/taxidata/train/data.csv'),
outfile_prefix_eval: dsl.PipelineParam=dsl.PipelineParam(name='outfile-prefix-eval', value='eval_transformed'),
outfile_prefix_train: dsl.PipelineParam=dsl.PipelineParam(name='outfile-prefix-train', value='train_transformed'),
train_steps: dsl.PipelineParam=dsl.PipelineParam(name='train-steps', value=10000),
project: dsl.PipelineParam=dsl.PipelineParam(name='project', value='YOUR_PROJECT_HERE'),
working_dir: dsl.PipelineParam=dsl.PipelineParam(name='working-dir', value='YOUR_GCS_DIR_HERE'),
tft_setup_file: dsl.PipelineParam=dsl.PipelineParam(name='tft-setup-file', value='/ml/transform/setup.py'),
tfma_setup_file: dsl.PipelineParam=dsl.PipelineParam(name='tfma-setup-file', value='/ml/analysis/setup.py'),
workers: dsl.PipelineParam=dsl.PipelineParam(name='workers', value=1),
pss: dsl.PipelineParam=dsl.PipelineParam(name='pss', value=1),
max_rows: dsl.PipelineParam=dsl.PipelineParam(name='max-rows', value=10000),
ts1: dsl.PipelineParam=dsl.PipelineParam(name='ts1', value=''),
ts2: dsl.PipelineParam=dsl.PipelineParam(name='ts2', value=''),
preprocessing_module1: dsl.PipelineParam=dsl.PipelineParam(name='preprocessing-module1', value='gs://aju-dev-demos-codelabs/KF/taxi-preproc/preprocessing.py'),
preprocessing_module2: dsl.PipelineParam=dsl.PipelineParam(name='preprocessing-module2', value='gs://aju-dev-demos-codelabs/KF/taxi-preproc/preprocessing2.py'),
preprocess_mode: dsl.PipelineParam=dsl.PipelineParam(name='preprocess-mode', value='local'),
tfma_mode: dsl.PipelineParam=dsl.PipelineParam(name='tfma-mode', value='local')):
tfteval = dsl.ContainerOp(
name = 'tft-eval',
image = 'gcr.io/google-samples/ml-pipeline-dataflow-tftbq-taxi',
arguments = [ "--input_handle", input_handle_eval, "--outfile_prefix", outfile_prefix_eval,
input_handle_eval: dsl.PipelineParam=dsl.PipelineParam(name='input-handle-eval', value='bigquery-public-data.chicago_taxi_trips.taxi_trips'),
input_handle_train: dsl.PipelineParam=dsl.PipelineParam(name='input-handle-train', value='bigquery-public-data.chicago_taxi_trips.taxi_trips'),
outfile_prefix_eval: dsl.PipelineParam=dsl.PipelineParam(name='outfile-prefix-eval', value='eval_transformed'),
outfile_prefix_train: dsl.PipelineParam=dsl.PipelineParam(name='outfile-prefix-train', value='train_transformed'),
train_steps: dsl.PipelineParam=dsl.PipelineParam(name='train-steps', value=10000),
project: dsl.PipelineParam=dsl.PipelineParam(name='project', value='YOUR_PROJECT_HERE'),
working_dir: dsl.PipelineParam=dsl.PipelineParam(name='working-dir', value='YOUR_GCS_DIR_HERE'),
tft_setup_file: dsl.PipelineParam=dsl.PipelineParam(name='tft-setup-file', value='/ml/transform/setup.py'),
tfma_setup_file: dsl.PipelineParam=dsl.PipelineParam(name='tfma-setup-file', value='/ml/analysis/setup.py'),
workers: dsl.PipelineParam=dsl.PipelineParam(name='workers', value=2),
pss: dsl.PipelineParam=dsl.PipelineParam(name='pss', value=1),
max_rows: dsl.PipelineParam=dsl.PipelineParam(name='max-rows', value=10000),
ts1_1: dsl.PipelineParam=dsl.PipelineParam(name='ts1-1', value='2016-02-01 00:00:00'),
ts2_1: dsl.PipelineParam=dsl.PipelineParam(name='ts2-1', value='2016-03-01 00:00:00'),
ts1_2: dsl.PipelineParam=dsl.PipelineParam(name='ts1-2', value='2013-01-01 00:00:00'),
ts2_2: dsl.PipelineParam=dsl.PipelineParam(name='ts2-2', value='2016-03-01 00:00:00'),
preprocessing_module: dsl.PipelineParam=dsl.PipelineParam(name='preprocessing-module1', value='gs://aju-dev-demos-codelabs/KF/taxi-preproc/preprocessing.py'),
preprocess_mode: dsl.PipelineParam=dsl.PipelineParam(name='preprocess-mode', value='local'),
tfma_mode: dsl.PipelineParam=dsl.PipelineParam(name='tfma-mode', value='local')):
tfteval = dsl.ContainerOp(
name = 'tft-eval',
image = 'gcr.io/google-samples/ml-pipeline-dataflow-tftbq-taxi',
arguments = [ "--input_handle", input_handle_eval, "--outfile_prefix", outfile_prefix_eval,
"--working_dir", '%s/%s/tft-eval' % (working_dir, '{{workflow.name}}'),
"--project", project,
"--mode", preprocess_mode,
"--setup_file", tft_setup_file,
"--max_rows", max_rows,
"--ts1", ts1_1,
"--ts2", ts2_1,
def workflow1(
input_handle_eval: dsl.PipelineParam=dsl.PipelineParam(name='input-handle-eval', value='gs://aju-dev-demos-codelabs/KF/taxidata/eval/data.csv'),
input_handle_train: dsl.PipelineParam=dsl.PipelineParam(name='input-handle-train', value='gs://aju-dev-demos-codelabs/KF/taxidata/train/data.csv'),
outfile_prefix_eval: dsl.PipelineParam=dsl.PipelineParam(name='outfile-prefix-eval', value='eval_transformed'),
outfile_prefix_train: dsl.PipelineParam=dsl.PipelineParam(name='outfile-prefix-train', value='train_transformed'),
train_steps: dsl.PipelineParam=dsl.PipelineParam(name='train-steps', value=10000),
project: dsl.PipelineParam=dsl.PipelineParam(name='project', value='YOUR_PROJECT_HERE'),
working_dir: dsl.PipelineParam=dsl.PipelineParam(name='working-dir', value='YOUR_GCS_DIR_HERE'),
tft_setup_file: dsl.PipelineParam=dsl.PipelineParam(name='tft-setup-file', value='/ml/transform/setup.py'),
tfma_setup_file: dsl.PipelineParam=dsl.PipelineParam(name='tfma-setup-file', value='/ml/analysis/setup.py'),
workers: dsl.PipelineParam=dsl.PipelineParam(name='workers', value=1),
pss: dsl.PipelineParam=dsl.PipelineParam(name='pss', value=1),
max_rows: dsl.PipelineParam=dsl.PipelineParam(name='max-rows', value=10000),
ts1: dsl.PipelineParam=dsl.PipelineParam(name='ts1', value=''),
ts2: dsl.PipelineParam=dsl.PipelineParam(name='ts2', value=''),
preprocessing_module1: dsl.PipelineParam=dsl.PipelineParam(name='preprocessing-module1', value='gs://aju-dev-demos-codelabs/KF/taxi-preproc/preprocessing.py'),
preprocessing_module2: dsl.PipelineParam=dsl.PipelineParam(name='preprocessing-module2', value='gs://aju-dev-demos-codelabs/KF/taxi-preproc/preprocessing2.py'),
preprocess_mode: dsl.PipelineParam=dsl.PipelineParam(name='preprocess-mode', value='local'),
tfma_mode: dsl.PipelineParam=dsl.PipelineParam(name='tfma-mode', value='local')):
tfteval = dsl.ContainerOp(
name = 'tft-eval',
image = 'gcr.io/google-samples/ml-pipeline-dataflow-tftbq-taxi',
arguments = [ "--input_handle", input_handle_eval, "--outfile_prefix", outfile_prefix_eval,
"--working_dir", '%s/%s/tft-eval' % (working_dir, '{{workflow.name}}'),
"--project", project,
"--mode", preprocess_mode,
"--setup_file", tft_setup_file,
"--max-rows", '5000',
def workflow2(
input_handle_eval: dsl.PipelineParam=dsl.PipelineParam(name='input-handle-eval', value='bigquery-public-data.chicago_taxi_trips.taxi_trips'),
input_handle_train: dsl.PipelineParam=dsl.PipelineParam(name='input-handle-train', value='bigquery-public-data.chicago_taxi_trips.taxi_trips'),
outfile_prefix_eval: dsl.PipelineParam=dsl.PipelineParam(name='outfile-prefix-eval', value='eval_transformed'),
outfile_prefix_train: dsl.PipelineParam=dsl.PipelineParam(name='outfile-prefix-train', value='train_transformed'),
train_steps: dsl.PipelineParam=dsl.PipelineParam(name='train-steps', value=10000),
project: dsl.PipelineParam=dsl.PipelineParam(name='project', value='YOUR_PROJECT_HERE'),
working_dir: dsl.PipelineParam=dsl.PipelineParam(name='working-dir', value='YOUR_GCS_DIR_HERE'),
tft_setup_file: dsl.PipelineParam=dsl.PipelineParam(name='tft-setup-file', value='/ml/transform/setup.py'),
tfma_setup_file: dsl.PipelineParam=dsl.PipelineParam(name='tfma-setup-file', value='/ml/analysis/setup.py'),
workers: dsl.PipelineParam=dsl.PipelineParam(name='workers', value=2),
pss: dsl.PipelineParam=dsl.PipelineParam(name='pss', value=1),
max_rows: dsl.PipelineParam=dsl.PipelineParam(name='max-rows', value=10000),
ts1_1: dsl.PipelineParam=dsl.PipelineParam(name='ts1-1', value='2016-02-01 00:00:00'),
ts2_1: dsl.PipelineParam=dsl.PipelineParam(name='ts2-1', value='2016-03-01 00:00:00'),
ts1_2: dsl.PipelineParam=dsl.PipelineParam(name='ts1-2', value='2013-01-01 00:00:00'),
ts2_2: dsl.PipelineParam=dsl.PipelineParam(name='ts2-2', value='2016-03-01 00:00:00'),
preprocessing_module: dsl.PipelineParam=dsl.PipelineParam(name='preprocessing-module1', value='gs://aju-dev-demos-codelabs/KF/taxi-preproc/preprocessing.py'),
preprocess_mode: dsl.PipelineParam=dsl.PipelineParam(name='preprocess-mode', value='local'),
tfma_mode: dsl.PipelineParam=dsl.PipelineParam(name='tfma-mode', value='local')):
tfteval = dsl.ContainerOp(
name = 'tft-eval',
image = 'gcr.io/google-samples/ml-pipeline-dataflow-tftbq-taxi',
arguments = [ "--input_handle", input_handle_eval, "--outfile_prefix", outfile_prefix_eval,
def workflow2(
input_handle_eval: dsl.PipelineParam=dsl.PipelineParam(name='input-handle-eval', value='bigquery-public-data.chicago_taxi_trips.taxi_trips'),
input_handle_train: dsl.PipelineParam=dsl.PipelineParam(name='input-handle-train', value='bigquery-public-data.chicago_taxi_trips.taxi_trips'),
outfile_prefix_eval: dsl.PipelineParam=dsl.PipelineParam(name='outfile-prefix-eval', value='eval_transformed'),
outfile_prefix_train: dsl.PipelineParam=dsl.PipelineParam(name='outfile-prefix-train', value='train_transformed'),
train_steps: dsl.PipelineParam=dsl.PipelineParam(name='train-steps', value=10000),
project: dsl.PipelineParam=dsl.PipelineParam(name='project', value='YOUR_PROJECT_HERE'),
working_dir: dsl.PipelineParam=dsl.PipelineParam(name='working-dir', value='YOUR_GCS_DIR_HERE'),
tft_setup_file: dsl.PipelineParam=dsl.PipelineParam(name='tft-setup-file', value='/ml/transform/setup.py'),
tfma_setup_file: dsl.PipelineParam=dsl.PipelineParam(name='tfma-setup-file', value='/ml/analysis/setup.py'),
workers: dsl.PipelineParam=dsl.PipelineParam(name='workers', value=2),
pss: dsl.PipelineParam=dsl.PipelineParam(name='pss', value=1),
max_rows: dsl.PipelineParam=dsl.PipelineParam(name='max-rows', value=10000),
ts1_1: dsl.PipelineParam=dsl.PipelineParam(name='ts1-1', value='2016-02-01 00:00:00'),
ts2_1: dsl.PipelineParam=dsl.PipelineParam(name='ts2-1', value='2016-03-01 00:00:00'),
ts1_2: dsl.PipelineParam=dsl.PipelineParam(name='ts1-2', value='2013-01-01 00:00:00'),
ts2_2: dsl.PipelineParam=dsl.PipelineParam(name='ts2-2', value='2016-03-01 00:00:00'),
preprocessing_module: dsl.PipelineParam=dsl.PipelineParam(name='preprocessing-module1', value='gs://aju-dev-demos-codelabs/KF/taxi-preproc/preprocessing.py'),
specs=[
evaluator_pb2.SingleSlicingSpec(
column_for_slicing=['trip_start_hour']
)
]
),
)
model_validator = ModelValidator(
examples=example_gen.outputs['examples'], model=trainer.outputs['model']
)
# Hack: ensuring push_destination can be correctly parameterized and interpreted.
# pipeline root will be specified as a dsl.PipelineParam with the name
# pipeline-root, see:
# https://github.com/tensorflow/tfx/blob/1c670e92143c7856f67a866f721b8a9368ede385/tfx/orchestration/kubeflow/kubeflow_dag_runner.py#L226
_pipeline_root_param = dsl.PipelineParam(name='pipeline-root')
pusher = Pusher(
model_export=trainer.outputs['model'],
model_blessing=model_validator.outputs['blessing'],
push_destination=pusher_pb2.PushDestination(
filesystem=pusher_pb2.PushDestination.Filesystem(
base_directory=os.path.
join(str(_pipeline_root_param), 'model_serving')
)
),
)
return pipeline.Pipeline(
pipeline_name='parameterized_tfx_oss',
pipeline_root=pipeline_root,
components=[
example_gen, statistics_gen, infer_schema, validate_stats, transform,
def workflow2(
input_handle_eval: dsl.PipelineParam=dsl.PipelineParam(name='input-handle-eval', value='bigquery-public-data.chicago_taxi_trips.taxi_trips'),
input_handle_train: dsl.PipelineParam=dsl.PipelineParam(name='input-handle-train', value='bigquery-public-data.chicago_taxi_trips.taxi_trips'),
outfile_prefix_eval: dsl.PipelineParam=dsl.PipelineParam(name='outfile-prefix-eval', value='eval_transformed'),
outfile_prefix_train: dsl.PipelineParam=dsl.PipelineParam(name='outfile-prefix-train', value='train_transformed'),
train_steps: dsl.PipelineParam=dsl.PipelineParam(name='train-steps', value=10000),
project: dsl.PipelineParam=dsl.PipelineParam(name='project', value='YOUR_PROJECT_HERE'),
working_dir: dsl.PipelineParam=dsl.PipelineParam(name='working-dir', value='YOUR_GCS_DIR_HERE'),
tft_setup_file: dsl.PipelineParam=dsl.PipelineParam(name='tft-setup-file', value='/ml/transform/setup.py'),
tfma_setup_file: dsl.PipelineParam=dsl.PipelineParam(name='tfma-setup-file', value='/ml/analysis/setup.py'),
workers: dsl.PipelineParam=dsl.PipelineParam(name='workers', value=2),
pss: dsl.PipelineParam=dsl.PipelineParam(name='pss', value=1),
max_rows: dsl.PipelineParam=dsl.PipelineParam(name='max-rows', value=10000),
ts1_1: dsl.PipelineParam=dsl.PipelineParam(name='ts1-1', value='2016-02-01 00:00:00'),
ts2_1: dsl.PipelineParam=dsl.PipelineParam(name='ts2-1', value='2016-03-01 00:00:00'),
ts1_2: dsl.PipelineParam=dsl.PipelineParam(name='ts1-2', value='2013-01-01 00:00:00'),
ts2_2: dsl.PipelineParam=dsl.PipelineParam(name='ts2-2', value='2016-03-01 00:00:00'),
preprocessing_module: dsl.PipelineParam=dsl.PipelineParam(name='preprocessing-module1', value='gs://aju-dev-demos-codelabs/KF/taxi-preproc/preprocessing.py'),
preprocess_mode: dsl.PipelineParam=dsl.PipelineParam(name='preprocess-mode', value='local'),
tfma_mode: dsl.PipelineParam=dsl.PipelineParam(name='tfma-mode', value='local')):
tfteval = dsl.ContainerOp(
name = 'tft-eval',
image = 'gcr.io/google-samples/ml-pipeline-dataflow-tftbq-taxi',
arguments = [ "--input_handle", input_handle_eval, "--outfile_prefix", outfile_prefix_eval,
"--working_dir", '%s/%s/tft-eval' % (working_dir, '{{workflow.name}}'),
"--project", project,
"--mode", preprocess_mode,
from tfx.components.model_validator.component import ModelValidator
from tfx.components.pusher.component import Pusher
from tfx.components.schema_gen.component import SchemaGen
from tfx.components.statistics_gen.component import StatisticsGen
from tfx.components.trainer.component import Trainer
from tfx.components.transform.component import Transform
from tfx.orchestration import pipeline
from tfx.orchestration.kubeflow import kubeflow_dag_runner
from tfx.proto import evaluator_pb2
from tfx.utils.dsl_utils import external_input
from tfx.proto import pusher_pb2
from tfx.proto import trainer_pb2
# Define pipeline params used for pipeline execution.
# Path to the module file, should be a GCS path.
_taxi_module_file_param = dsl.PipelineParam(
name='module-file',
value='gs://ml-pipeline-playground/tfx_taxi_simple/modules/taxi_utils.py'
)
# Path to the CSV data file, under which their should be a data.csv file.
_data_root_param = dsl.PipelineParam(
name='data-root', value='gs://ml-pipeline-playground/tfx_taxi_simple/data'
)
# Path of pipeline root, should be a GCS path.
pipeline_root = os.path.join(
'gs://your-bucket', 'tfx_taxi_simple', kfp.dsl.RUN_ID_PLACEHOLDER
)
def _create_test_pipeline(