Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
step1 = dsl.ContainerOp(
name="step1_concat",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["cat /data/file*| gzip -c >/data/full.gz"],
pvolumes={"/data": vop1.volume}
)
step1_snap = dsl.VolumeSnapshotOp(
name="create_snapshot_1",
resource_name="snap1",
volume=step1.pvolume
)
vop2 = dsl.VolumeOp(
name="create_volume_2",
resource_name="vol2",
data_source=step1_snap.snapshot,
size=step1_snap.outputs["size"]
)
step2 = dsl.ContainerOp(
name="step2_gunzip",
image="library/bash:4.4.23",
command=["gunzip", "-k", "/data/full.gz"],
pvolumes={"/data": vop2.volume}
)
step2_snap = dsl.VolumeSnapshotOp(
name="create_snapshot_2",
resource_name="snap2",
def volume_snapshotop_rokurl(rok_url):
vop1 = dsl.VolumeOp(
name="create_volume_1",
resource_name="vol1",
size="1Gi",
annotations={"rok/origin": rok_url},
modes=dsl.VOLUME_MODE_RWM
)
step1 = dsl.ContainerOp(
name="step1_concat",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["cat /data/file*| gzip -c >/data/full.gz"],
pvolumes={"/data": vop1.volume}
)
step1_snap = dsl.VolumeSnapshotOp(
evaluation='gs://ml-pipeline-playground/tfx/taxi-cab-classification/eval.csv',
mode='local',
preprocess_module='gs://ml-pipeline-playground/tfx/taxi-cab-classification/preprocessing.py',
learning_rate=0.1,
hidden_layer_size='1500',
steps=3000,
analyze_slice_column='trip_start_hour'
):
output_template = str(output) + '/{{workflow.uid}}/{{pod.name}}/data'
target_lambda = """lambda x: (x['target'] > x['fare'] * 0.2)"""
target_class_lambda = """lambda x: 1 if (x['target'] > x['fare'] * 0.2) else 0"""
tf_server_name = 'taxi-cab-classification-model-{{workflow.uid}}'
if platform != 'GCP':
vop = dsl.VolumeOp(
name="create_pvc",
resource_name="pipeline-pvc",
modes=dsl.VOLUME_MODE_RWM,
size="1Gi"
)
checkout = dsl.ContainerOp(
name="checkout",
image="alpine/git:latest",
command=["git", "clone", "https://github.com/kubeflow/pipelines.git", str(output) + "/pipelines"],
).apply(onprem.mount_pvc(vop.outputs["name"], 'local-storage', output))
checkout.after(vop)
validation = dataflow_tf_data_validation_op(
inference_data=train,
validation_data=evaluation,
def volume_op_dag():
vop = dsl.VolumeOp(
name="create_pvc",
resource_name="my-pvc",
size="10Gi",
modes=dsl.VOLUME_MODE_RWM
)
step1 = dsl.ContainerOp(
name="step1",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo 1 | tee /mnt/file1"],
pvolumes={"/mnt": vop.volume}
)
step2 = dsl.ContainerOp(
name="step2",
def volumeop_parallel():
vop = dsl.VolumeOp(
name="create_pvc",
resource_name="my-pvc",
size="10Gi",
modes=dsl.VOLUME_MODE_RWM
)
step1 = dsl.ContainerOp(
name="step1",
image="library/bash:4.4.23",
command=["sh", "-c"],
arguments=["echo 1 | tee /mnt/file1"],
pvolumes={"/mnt": vop.volume}
)
step2 = dsl.ContainerOp(
name="step2",
#will be pushing image so need docker secret
#create from local with `kubectl create secret generic docker-config --from-file=config.json=${DOCKERHOME}/config.json --type=kubernetes.io/config`
secret = k8s_client.V1Volume(
name="docker-config-secret",
secret=k8s_client.V1SecretVolumeSource(secret_name=docker_secret)
)
#use volume for storing model
modelvolop = dsl.VolumeOp(
name="modelpvc",
resource_name="modelpvc",
size="50Mi",
modes=dsl.VOLUME_MODE_RWO
)
#and another as working directory between steps
wkdirop = dsl.VolumeOp(
name="wkdirpvc",
resource_name="wkdirpvc",
size="50Mi",
modes=dsl.VOLUME_MODE_RWO
)
#clone the training code and move to workspace dir as kaniko (next step) expects that
clone = dsl.ContainerOp(
name="clone",
image="alpine/git:latest",
command=["sh", "-c"],
arguments=["git clone --depth 1 --branch "+str(training_branch)+" "+str(training_repo)+"; cp "+str(training_files)+" /workspace; ls /workspace/;"],
pvolumes={"/workspace": wkdirop.volume}
)
#build and push image for training
def volume_snapshotop_sequential(url):
vop = dsl.VolumeOp(
name="create_volume",
resource_name="vol1",
size="1Gi",
modes=dsl.VOLUME_MODE_RWM
)
step1 = dsl.ContainerOp(
name="step1_ingest",
image="google/cloud-sdk:272.0.0",
command=["sh", "-c"],
arguments=["mkdir /data/step1 && "
"gsutil cat %s | gzip -c >/data/step1/file1.gz" % url],
pvolumes={"/data": vop.volume}
)
step1_snap = dsl.VolumeSnapshotOp(