Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Args:
fq_context_name: The unique string for a context
remote: whether to also remove the remote on S3
force: whether to force delete a dirty context
Returns:
"""
repo, local_context = DisdatFS._parse_fq_context_name(fq_context_name)
ctxt_dir = os.path.join(DisdatConfig.instance().get_context_dir(), local_context)
if self.curr_context is not None and (fq_context_name == self.curr_context_name):
print("Disdat deleting the current context {}, remember to 'dsdt switch ' afterwords!".format(fq_context_name))
os.remove(os.path.join(DisdatConfig.instance().get_meta_dir(), META_FS_FILE))
if local_context in self._all_contexts:
dc = self._all_contexts[local_context]
remote_context_url = dc.get_remote_object_dir()
dc.delete_context(force=force)
del self._all_contexts[local_context]
if os.path.exists(ctxt_dir):
shutil.rmtree(ctxt_dir)
_logger.info("Disdat deleted local data context {}.".format(local_context))
if remote:
aws_s3.delete_s3_dir(remote_context_url)
_logger.info("Disdat deleted remote data context {}.".format(remote_context_url))
else:
_logger.info("Disdat local data context {} appears to already have been deleted.".format(local_context))
def get_fq_docker_repo_name(is_sagemaker, pipeline_setup_file):
"""
Produce the fully qualified docker repo name.
Args:
is_sagemaker (bool): for sagemaker image
pipeline_setup_file (str): the path to the setup.py file used to dockerize this pipeline
Returns:
(str): The fully qualified docker image repository name
"""
disdat_config = DisdatConfig.instance()
repository_prefix = None
if disdat_config.parser.has_option('docker', 'repository_prefix'):
repository_prefix = disdat_config.parser.get('docker', 'repository_prefix')
if is_sagemaker:
repository_name = common.make_sagemaker_project_repository_name(repository_prefix, pipeline_setup_file)
else:
repository_name = common.make_project_repository_name(repository_prefix, pipeline_setup_file)
# Figure out the fully-qualified repository name, i.e., the name
# including the registry.
registry_name = disdat_config.parser.get('docker', 'registry').strip('/')
if registry_name == '*ECR*':
fq_repository_name = aws.ecr_get_fq_repository_name(repository_name)
else:
fq_repository_name = '{}/{}'.format(registry_name, repository_name)
"""
def check_role_arn(job_dict, jra):
""" Check to see if the job desc dictionary contains the same job_role_arn (jra)
"""
if jra is None:
if 'jobRoleArn' not in job_dict['containerProperties']:
return True
else:
if 'jobRoleArn' in job_dict['containerProperties']:
if job_dict['containerProperties']['jobRoleArn'] == jra:
return True
return False
disdat_config = DisdatConfig.instance()
# Get the parameter values required to kick off an AWS Batch job.
# Every batch job must:
# 1. Have a name
# 2. Have a job definition that declares which ECR-hosted Docker
# image to use.
# 3. Have a queue that feeds jobs into a compute cluster.
# 4. The command to execute inside the Docker image; the command
# args are more-or-less the same as the ones used to execute
# locally using 'dsdt run'
# Create a Job Definition and upload it.
# We create per-user job definitions so multiple users do not clobber each other.
# In addition, we never re-use a job definition, since the user may update
# the vcpu or memory requirements and those are stuck in the job definition
"""
Given the current version, see if it is different than found_version
Note, if either version is dirty, we are forced to say they are different
Typically we get the code_version from the pipe and the lineage object from the
bundle. We then see if the current code == the information in lineage object.
Args:
current_version (CodeVersion) :
lineage_obj (LineageObject):
Returns:
"""
conf = common.DisdatConfig.instance()
if conf.ignore_code_version:
return False
# If there were uncommitted changes, then we have to re-run, mark as different
if code_version.dirty:
return True
if code_version.semver != lineage_obj.pb.code_semver:
return True
if code_version.hash != lineage_obj.pb.code_hash:
return True
## Currently ignoring tstamp, branch, url
## CodeVersion = collections.namedtuple('CodeVersion', 'semver hash tstamp branch url dirty')
def load(target_contexts=[]):
"""
Load the data contexts described at meta_dir. Each of these is a "remote."
Args:
target_contexts (list(str)): If not None, try to load just this context.
Returns:
(dict) of 'name':context pairs.
"""
ctxt_dir = DisdatConfig.instance().get_context_dir()
if ctxt_dir is None:
raise Exception("Unable to load context without a metadata directory argument")
contexts = {}
files = glob.glob(os.path.join(ctxt_dir, '*'))
for ctxt in files:
if len(target_contexts) > 0 and ctxt not in target_contexts:
continue
#_logger.debug("Loading context {}...".format(ctxt))
meta_file = os.path.join(ctxt_dir, ctxt, META_CTXT_FILE)
if not os.path.isfile(meta_file):
def save(self):
"""
Write out the json describing the fs.
Only current context now.
Returns:
"""
meta_file = os.path.join(DisdatConfig.instance().get_meta_dir(), META_FS_FILE)
with open(meta_file, 'w') as json_file:
state_dict = {self.JsonConfig.ACTIVE_CONTEXT: self.curr_context_name}
json_file.write(json.dumps(state_dict))
def load(self):
"""
Load the fs object found in the meta_dir.
:return: (string) name of active context
"""
meta_file = os.path.join(DisdatConfig.instance().get_meta_dir(), META_FS_FILE)
if not os.path.isfile(meta_file):
_logger.debug("No disdat {} meta fs data file found.".format(meta_file))
raise RuntimeError("No current local context, please change context with 'dsdt switch'")
else:
with open(meta_file, 'r') as json_file:
state_dict = json.loads(json_file.readline())
try:
return state_dict[self.JsonConfig.ACTIVE_CONTEXT]
except KeyError:
raise RuntimeError("No current local context, please change context with 'dsdt switch'")
def delete_context(self, fq_context_name, remote, force):
"""
Delete a branch at / or
Args:
fq_context_name: The unique string for a context
remote: whether to also remove the remote on S3
force: whether to force delete a dirty context
Returns:
"""
repo, local_context = DisdatFS._parse_fq_context_name(fq_context_name)
ctxt_dir = os.path.join(DisdatConfig.instance().get_context_dir(), local_context)
if self.curr_context is not None and (fq_context_name == self.curr_context_name):
print("Disdat deleting the current context {}, remember to 'dsdt switch ' afterwords!".format(fq_context_name))
os.remove(os.path.join(DisdatConfig.instance().get_meta_dir(), META_FS_FILE))
if local_context in self._all_contexts:
dc = self._all_contexts[local_context]
remote_context_url = dc.get_remote_object_dir()
dc.delete_context(force=force)
del self._all_contexts[local_context]
if os.path.exists(ctxt_dir):
shutil.rmtree(ctxt_dir)
_logger.info("Disdat deleted local data context {}.".format(local_context))
if remote:
aws_s3.delete_s3_dir(remote_context_url)