Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_s3_noprefix():
rval = is_s3("s3://a")
assert rval[0]
assert rval[1] == "a"
assert rval[2] == ""
c.add("default")
c.get("default").tensor_names = ["foo_" + str(i) for i in range(num_tensors)]
c.export(path + trial_name, DEFAULT_COLLECTIONS_FILE_NAME)
c.export(path + trial_name, DEFAULT_COLLECTIONS_FILE_NAME)
for i in range(num_steps):
generate_data(
path=path,
trial=trial_name,
num_tensors=num_tensors,
step=i,
tname_prefix="foo",
worker="algo-1",
shape=(3, 3, 3),
rank=0,
)
_, bucket, prefix = is_s3(os.path.join(path, trial_name))
trial_obj = S3Trial(name=prefix, bucket_name=bucket, prefix_name=prefix)
return trial_obj, trial_name
def helper_test_multi_save_configs_trial(trial_dir):
tr = create_trial(trial_dir)
print(tr.steps(), tr.steps(mode=smd.modes.TRAIN), tr.steps(mode=smd.modes.EVAL))
assert len(tr.steps()) == 4
assert len(tr.steps(mode=smd.modes.TRAIN)) == 3
assert len(tr.steps(mode=smd.modes.EVAL)) == 1
assert len(tr.tensor_names()) == 1
on_s3, bucket, prefix = is_s3(trial_dir)
if not on_s3:
shutil.rmtree(trial_dir)
else:
delete_s3_prefix(bucket, prefix)
def create_trial(path, name=None, **kwargs):
if name is None:
name = os.path.basename(path)
s3, bucket_name, prefix_name = is_s3(path)
if s3:
return S3Trial(name=name, bucket_name=bucket_name, prefix_name=prefix_name, **kwargs)
else:
return LocalTrial(name=name, dirname=path, **kwargs)
def has_training_ended(trial_prefix):
file_path = os.path.join(trial_prefix, END_OF_JOB_FILENAME)
s3, bucket_name, key_name = is_s3(file_path)
if s3:
try:
request = ListRequest(bucket_name, key_name)
file_available = S3Handler.list_prefixes([request])[0]
if len(file_available) > 0:
return True
else:
return False
except ClientError as ex:
status_code = ex.response["ResponseMetadata"]["HTTPStatusCode"]
logger.info(f"Client error occurred : {ex}")
if status_code.startswith("4"):
raise ex
else:
return False
else:
def __init__(self, path, start=None, length=None):
self.is_s3, self.bucket, self.key = is_s3(path)
if not self.is_s3:
self.key = path
self.bucket = None
if start is None:
self.start = 0
else:
self.start = start
self.length = length
self.download_entire_file = start is None
def check_dir_exists(path):
from smdebug.core.access_layer.s3handler import S3Handler, ListRequest
s3, bucket_name, key_name = is_s3(path)
if s3:
try:
request = ListRequest(bucket_name, key_name)
folder = S3Handler.list_prefixes([request])[0]
if len(folder) > 0 and has_training_ended(folder[-1]):
raise RuntimeError(
"The path:{} already exists on s3. "
"Please provide a directory path that does "
"not already exist.".format(path)
)
except ClientError as ex:
if ex.response["Error"]["Code"] == "NoSuchBucket":
# then we do not need to raise any error
pass
else:
# do not know the error
def delete_prefix(path=None, delete_request=None):
if path is not None and delete_request is not None:
raise ValueError("Only one of path or delete_request can be passed")
elif path is not None:
on_s3, bucket, prefix = is_s3(path)
if on_s3 is False:
raise ValueError("Given path is not an S3 location")
delete_requests = [DeleteRequest(bucket, prefix)]
S3Handler.delete_prefixes(delete_requests)
elif delete_request is not None:
S3Handler.delete_prefixes([delete_request])
def __init__(self, path):
super().__init__(path)
self.path = path
_, self.bucket_name, self.prefix_name = is_s3(path)
self.index_file_cache = ReadIndexFilesCache()