Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
container.image = self._container.image
# clear existing commands
del container.command[:]
container.command.extend(self._container.command)
# also clear existing args
del container.args[:]
container.args.extend(self._container.args)
resource_requirements = _lazy_k8s.io.api.core.v1.generated_pb2.ResourceRequirements()
for resource in self._container.resources.limits:
resource_requirements.limits[
_core_task.Resources.ResourceName.Name(resource.name).lower()].CopyFrom(
_lazy_k8s.io.apimachinery.pkg.api.resource.generated_pb2.Quantity(string=resource.value))
for resource in self._container.resources.requests:
resource_requirements.requests[
_core_task.Resources.ResourceName.Name(resource.name).lower()].CopyFrom(
_lazy_k8s.io.apimachinery.pkg.api.resource.generated_pb2.Quantity(string=resource.value))
if resource_requirements.ByteSize():
# Important! Only copy over resource requirements if they are non-empty.
container.resources.CopyFrom(resource_requirements)
del container.env[:]
container.env.extend(
[_lazy_k8s.io.api.core.v1.generated_pb2.EnvVar(name=key, value=val) for key, val in
_six.iteritems(self._container.env)])
final_containers.append(container)
del pod_spec.containers[:]
pod_spec.containers.extend(final_containers)
sidecar_job_plugin = _task_models.SidecarJob(
def to_flyte_idl(self):
"""
:rtype: flyteidl.core.tasks_pb2.ResourceEntry
"""
return _core_task.Resources.ResourceEntry(name=self.name, value=self.value)
from flytekit.plugins import flyteidl as _lazy_flyteidl
from google.protobuf import json_format as _json_format, struct_pb2 as _struct
from flytekit.sdk.spark_types import SparkType as _spark_type
from flytekit.models import common as _common, literals as _literals, interface as _interface
from flytekit.models.core import identifier as _identifier
from flytekit.common.exceptions import user as _user_exceptions
class Resources(_common.FlyteIdlEntity):
class ResourceName(object):
UNKNOWN = _core_task.Resources.UNKNOWN
CPU = _core_task.Resources.CPU
GPU = _core_task.Resources.GPU
MEMORY = _core_task.Resources.MEMORY
STORAGE = _core_task.Resources.STORAGE
class ResourceEntry(_common.FlyteIdlEntity):
def __init__(self, name, value):
"""
:param int name: enum value from ResourceName
:param Text value: a textual value describing the resource need. Must be a valid k8s quantity.
"""
self._name = name
self._value = value
@property
def name(self):
"""
enum value from ResourceName
:rtype: int
for container in containers:
# In the case of the primary container, we overwrite specific container attributes with the default values
# used in an SDK runnable task.
if container.name == primary_container_name:
container.image = self._container.image
# clear existing commands
del container.command[:]
container.command.extend(self._container.command)
# also clear existing args
del container.args[:]
container.args.extend(self._container.args)
resource_requirements = _lazy_k8s.io.api.core.v1.generated_pb2.ResourceRequirements()
for resource in self._container.resources.limits:
resource_requirements.limits[
_core_task.Resources.ResourceName.Name(resource.name).lower()].CopyFrom(
_lazy_k8s.io.apimachinery.pkg.api.resource.generated_pb2.Quantity(string=resource.value))
for resource in self._container.resources.requests:
resource_requirements.requests[
_core_task.Resources.ResourceName.Name(resource.name).lower()].CopyFrom(
_lazy_k8s.io.apimachinery.pkg.api.resource.generated_pb2.Quantity(string=resource.value))
if resource_requirements.ByteSize():
# Important! Only copy over resource requirements if they are non-empty.
container.resources.CopyFrom(resource_requirements)
del container.env[:]
container.env.extend(
[_lazy_k8s.io.api.core.v1.generated_pb2.EnvVar(name=key, value=val) for key, val in
_six.iteritems(self._container.env)])
final_containers.append(container)
def to_flyte_idl(self):
"""
:rtype: flyteidl.core.tasks_pb2.Resources
"""
return _core_task.Resources(
requests=[r.to_flyte_idl() for r in self.requests],
limits=[r.to_flyte_idl() for r in self.limits]
)
from flyteidl.admin import task_pb2 as _admin_task
from flyteidl.core import tasks_pb2 as _core_task, literals_pb2 as _literals_pb2, compiler_pb2 as _compiler
from flyteidl.plugins import spark_pb2 as _spark_task
from flyteidl.plugins import pytorch_pb2 as _pytorch_task
from flytekit.plugins import flyteidl as _lazy_flyteidl
from google.protobuf import json_format as _json_format, struct_pb2 as _struct
from flytekit.sdk.spark_types import SparkType as _spark_type
from flytekit.models import common as _common, literals as _literals, interface as _interface
from flytekit.models.core import identifier as _identifier
from flytekit.common.exceptions import user as _user_exceptions
class Resources(_common.FlyteIdlEntity):
class ResourceName(object):
UNKNOWN = _core_task.Resources.UNKNOWN
CPU = _core_task.Resources.CPU
GPU = _core_task.Resources.GPU
MEMORY = _core_task.Resources.MEMORY
STORAGE = _core_task.Resources.STORAGE
class ResourceEntry(_common.FlyteIdlEntity):
def __init__(self, name, value):
"""
:param int name: enum value from ResourceName
:param Text value: a textual value describing the resource need. Must be a valid k8s quantity.
"""
self._name = name
self._value = value
@property