Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
task_env_param = parameter(scope=ParameterScope.children)
class EnvConfig(Config):
"""Databand's environment configuration"""
_conf__task_family = "env"
cloud_type = parameter(description="cloud type: gcp/aws/")[str]
env_label = parameter(
default=EnvLabel.dev, description="environment type: dev/int/prod"
)[
str
] # label
production = parameter(
description="indicates that environment is production"
).value(False)
conn_id = parameter(default=None, description="cloud connection settings")[str]
# MAIN OUTPUT FOLDER
root = parameter(description="Data outputs location").folder[DirTarget]
# DATABAND SYSTEM FOLDERS
dbnd_root = parameter(description="DBND system outputs location").output.folder(
default=None
)[DirTarget]
dbnd_local_root = parameter(
description="DBND home for the local engine environment"
).output.folder()[DirTarget]
dbnd_data_sync_root = parameter(
" see docs for more options",
)[str]
submit_driver = parameter(
description="override env.submit_driver for specific environment"
).none[bool]
submit_tasks = parameter(
description="override env.submit_tasks for specific environment"
).none[bool]
enable_airflow_kubernetes = parameter(
default=True,
description="Enable use of kubernetes executor for kubebernetes engine submission",
)[bool]
execution_date = parameter(default=None, description="Override execution date")[
datetime
]
# Execution specific
id = parameter(default=None, description="The list of task ids to run")[List[str]]
task = parameter(
default=None, description="Run only specified tasks (regular expresion)"
)[str]
ignore_dependencies = parameter(
description="The regex to filter specific task_ids"
).value(False)
ignore_first_depends_on_past = parameter(
description="The regex to filter specific task_ids"
).value(False)
class _DbndDriverTask(Task):
_conf__no_child_params = True
task_is_system = True
task_in_memory_outputs = True
is_driver = parameter[bool]
is_submitter = parameter[bool]
execution_date = parameter[datetime]
sends_heartbeat = parameter[bool]
host_engine = parameter[EngineConfig]
target_engine = parameter[EngineConfig]
task_executor_type = parameter[str]
# all paths, we make them system, we don't want to check if they are exists
local_driver_root = output(system=True)[Target]
local_driver_log = output(system=True)[Target]
remote_driver_root = output(system=True)[Target]
driver_dump = output(system=True)[Target]
def _build_submit_task(self, run):
if run.root_task:
raise DatabandRuntimeError(
"Can't send to remote execution task created via code, only command line is supported"
)
# dont' describe in local run, do it in remote run
settings = self.settings
)
from dbnd._core.parameter.parameter_builder import parameter
from dbnd._core.task import config
from targets import DirTarget
if typing.TYPE_CHECKING:
from dbnd._core.run.databand_run import DatabandRun
logger = logging.getLogger(__name__)
class EngineConfig(config.Config):
"""Databand's engine configuration (where tasks are executed)"""
require_submit = parameter(
description="Should the task engine be forced to submit tasks"
).value(False)
dbnd_local_root = parameter(
default=None, description="Local dbnd home directory at the engine environment"
)[DirTarget]
dbnd_executable = parameter(
default=[sys.executable, "-m", "dbnd"],
description="'dbnd' executable path at engine environment",
)[typing.List[str]]
def cleanup_after_run(self):
pass
def submit_to_engine_task(self, env, task_name, args, interactive=True):
_conf__task_family = "log"
disabled = parameter(description="Should logging be disabled").value(False)
capture_stdout_stderr = parameter(
description="Should logger retransmit all output wrtten to stdout\stderr"
).value(True)
capture_task_run_log = parameter.help("Capture task output into log").value(True)
override_airflow_logging_on_task_run = parameter(
description="Replace airflow logger with databand logger"
).value(True)
support_jupiter = parameter(
description="Support logging output to Jupiter UI"
).value(True)
level = parameter(description="Logging level. DEBUG\INFO\WARN\ERROR").value("INFO")
formatter = parameter(
description="Log formatting string (logging library convention)"
)[str]
formatter_colorlog = parameter(
description="Log formatting string (logging library convention)"
)[str]
formatter_simple = parameter(
description="Log formatting string (logging library convention)"
)[str]
console_formatter_name = parameter(
description="The name of the formatter logging to console output"
)[str]
file_formatter_name = parameter(
description="The name of the formatter logging to file output"
)[str]
dag_id=dag_id,
execution_date=execution_date,
task_id=task_id,
try_number=try_number,
)
except Exception as ex:
logger.info("Failed to get airlfow context info from spark job: %s", ex)
return None
class AirflowOperatorRuntimeTask(Task):
task_is_system = False
_conf__track_source_code = False
dag_id = parameter[str]
execution_date = parameter[datetime.datetime]
def _initialize(self):
super(AirflowOperatorRuntimeTask, self)._initialize()
self.task_meta.task_functional_call = ""
self.task_meta.task_command_line = generate_airflow_cmd(
dag_id=self.dag_id,
task_id=self.task_id,
execution_date=self.execution_date,
is_root_task=False,
)
@classmethod
def build_from_airflow_context(self, af_context):
# we can't actually run it, we even don't know when it's going to finish
_conf_auto_read_params = True # enables autoread of params.
_conf_confirm_on_kill_msg = None # get user confirmation on task kill if not empty
_conf__require_run_dump_file = False
# this is the state of autoread
_task_auto_read_original = None
_task_auto_read = None
####
# execution
# will be used by Registry
task_definition = None # type: TaskDefinition
# user can override this with his configuration
defaults = None # type: Dict[ParameterDefinition, any()]
validate_no_extra_params = parameter.enum(ParamValidation).system(
description="validate that all configured keys for a task have a matching parameter definition"
)
@classmethod
def get_task_family(cls):
return cls.task_definition.task_family
def __init__(self, **kwargs):
super(_BaseTask, self).__init__()
# most of the time we will use it as TaskMetaCtrl - we want this to be type hint!
self.task_meta = kwargs["task_meta"] # type: TaskMeta
self._params = TaskParameters(self)
for p_value in self.task_meta.task_params:
setattr(self, p_value.name, p_value.value)
)[bool]
send_body_to_server_max_size = parameter(
default=16 * 1024 * 1024, # 16MB
description="Max log file size in bytes to be sent to server.\n"
"\t* use 0 for unlimited;"
"\t* use -1 to disable;"
"\t* use negative (e.g. -1000) to get log's 'head' instead of 'tail'."
"Default: 16MB.",
)[int]
remote_logging_disabled = parameter.help(
"for tasks using a cloud environment, don't copy the task log to cloud storage"
).value(False)
sqlalchemy_print = parameter(description="enable sqlalchemy logger").value(False)
sqlalchemy_trace = parameter(description="trace sqlalchemy queries").value(False)
api_profile = parameter(description="profile api calls").value(False)
def _initialize(self):
super(LoggingConfig, self)._initialize()
self.task_log_file_formatter = None
def format_exception_as_str(self, exc_info, isolate=True):
if self.exception_simple:
return format_exception_as_str(exc_info)
try:
tbvaccine = TBVaccine(
no_colors=self.exception_no_color,
show_vars=False,
skip_non_user_on_isolate=True,
"""Databand's environment configuration"""
_conf__task_family = "env"
cloud_type = parameter(description="cloud type: gcp/aws/")[str]
env_label = parameter(
default=EnvLabel.dev, description="environment type: dev/int/prod"
)[
str
] # label
production = parameter(
description="indicates that environment is production"
).value(False)
conn_id = parameter(default=None, description="cloud connection settings")[str]
# MAIN OUTPUT FOLDER
root = parameter(description="Data outputs location").folder[DirTarget]
# DATABAND SYSTEM FOLDERS
dbnd_root = parameter(description="DBND system outputs location").output.folder(
default=None
)[DirTarget]
dbnd_local_root = parameter(
description="DBND home for the local engine environment"
).output.folder()[DirTarget]
dbnd_data_sync_root = parameter(
description="Rooted directory for target syncing against remote engine"
).output.folder()[DirTarget]
# execution
Note that setting this value with ``@property`` will not work, because this
is a class level value.
"""
_task_band_result = output(default=None, system=True)
_meta_output = output(
system=True,
output_name="meta",
output_ext="",
target_config=folder,
significant=False,
description="Location of all internal outputs (e.g. metrics)",
)
task_band = output.json(output_name="band")
task_enabled = parameter.system(scope=ParameterScope.children)[bool]
task_enabled_in_prod = parameter.system(scope=ParameterScope.children)[bool]
# for permanent bump of task version use Task.task_class_version
task_version = parameter(
description="task version, directly affects task signature ",
scope=ParameterScope.children,
)[VersionStr]
task_class_version = parameter.value(
default=DEFAULT_CLASS_VERSION,
system=True,
description="task code version, "
"use while you want persistent change in your task version",
)
task_env = parameter.value(