Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
stats.append('Size: %s' % (self.size_str(metadata['data_size']),))
fields['stats'] = 'Stats:\n %s\n' % ('\n '.join(stats),) if stats else ''
# Compute a nicely-formatted list of hard dependencies. Since this type of
# dependency is realized within this bundle as a symlink to another bundle,
# label these dependencies as 'references' in the UI.
fields['hard_dependencies'] = ''
if info['hard_dependencies']:
deps = info['hard_dependencies']
if len(deps) == 1 and not deps[0]['child_path']:
fields['hard_dependencies'] = 'Reference:\n %s\n' % (
path_util.safe_join(deps[0]['parent_uuid'], deps[0]['parent_path']),)
else:
fields['hard_dependencies'] = 'References:\n%s\n' % ('\n'.join(
' %s:%s' % (
dep['child_path'],
path_util.safe_join(dep['parent_uuid'], dep['parent_path']),
) for dep in sorted(deps, key=lambda dep: dep['child_path'])
))
# Compute a nicely-formatted failure message, if this bundle failed.
# It is possible for bundles that are not failed to have failure messages:
# for example, if a bundle is killed in the database after running for too
# long then succeeds afterwards, it will be in this state.
fields['failure_message'] = ''
if info['state'] == State.FAILED and metadata['failure_message']:
fields['failure_message'] = 'Failure message:\n %s\n' % ('\n '.join(
metadata['failure_message'].split('\n')
))
# Return the formatted summary of the bundle info.
return '''
{bundle_type}: {name}
{description}
UUID: {uuid}
def codalab_home(self):
from codalab.lib import path_util
# Default to this directory in the user's home directory.
# In the future, allow customization based on.
home = os.getenv('CODALAB_HOME', '~/.codalab')
home = path_util.normalize(home)
path_util.make_directory(home)
return home
def home(self):
from codalab.lib import path_util
result = path_util.normalize(self.config['home'])
path_util.make_directory(result)
return result
def start_bundle(self, bundle, bundle_store, parent_dict, username):
'''
Start a bundle in the background.
'''
if self.bundle != None: return None
temp_dir = canonicalize.get_current_location(bundle_store, bundle.uuid)
path_util.make_directory(temp_dir)
# We don't follow symlinks (for consistency with remote
# machine, where it is more secure, so people can't make us
# copy random files on the system). Of course in local mode,
# if some of those symlinks are absolute, the run can
# read/write those locations. But we're not sandboxed, so
# anything could happen. The dependencies are copied, so in
# practice, this is not a bit worry.
pairs = bundle.get_dependency_paths(bundle_store, parent_dict, temp_dir)
print >>sys.stderr, 'LocalMachine.start_bundle: copying dependencies of %s to %s' % (bundle.uuid, temp_dir)
for (source, target) in pairs:
path_util.copy(source, target, follow_symlinks=False)
script_file = temp_dir + '.sh'
with open(script_file, 'w') as f:
f.write("cd %s &&\n" % temp_dir)
def install_dependencies(self, bundle_store, parent_dict, path, rel):
'''
Symlink this bundle's dependencies into the directory at path.
The caller is responsible for cleaning up this directory.
'''
precondition(os.path.isabs(path), '%s is a relative path!' % (path,))
for dep in self.dependencies:
parent = parent_dict[dep.parent_uuid]
# Compute an absolute target and check that the dependency exists.
target = path_util.safe_join(
bundle_store.get_location(parent.data_hash),
dep.parent_path,
)
if not os.path.exists(target):
parent_spec = getattr(parent.metadata, 'name', parent.uuid)
target_text = path_util.safe_join(parent_spec, dep.parent_path)
raise UsageError('Target not found: %s' % (target_text,))
if rel:
# Create a symlink that points to the dependency's relative target.
target = path_util.safe_join(
(os.pardir if dep.child_path else ''),
bundle_store.get_location(parent.data_hash, relative=True),
dep.parent_path,
)
link_path = path_util.safe_join(path, dep.child_path)
os.symlink(target, link_path)
results = self.auth_handler.get_users('ids', [bundle.owner_id])
if results.get(bundle.owner_id):
username = results[bundle.owner_id].name
else:
username = str(bundle.owner_id)
status = self.machine.start_bundle(bundle, self.bundle_store, self.get_parent_dict(bundle), username)
if status != None:
status['started'] = int(time.time())
started = True
except Exception as e:
# If there's an exception, we just make the bundle fail
# (even if it's not the bundle's fault).
real_path = canonicalize.get_current_location(self.bundle_store, bundle.uuid)
path_util.make_directory(real_path)
status = {'bundle': bundle, 'success': False, 'failure_message': 'Internal error: ' + str(e), 'temp_dir': real_path}
print '=== INTERNAL ERROR: %s' % e
started = True # Force failing
traceback.print_exc()
else: # MakeBundle
started = True
if started:
print '-- START BUNDLE: %s' % (bundle,)
self._update_events_log('start_bundle', bundle, (bundle.uuid,))
# If we have a MakeBundle, then just process it immediately.
if isinstance(bundle, MakeBundle):
real_path = canonicalize.get_current_location(self.bundle_store, bundle.uuid)
path_util.make_directory(real_path)
status = {'bundle': bundle, 'success': True, 'temp_dir': real_path}
def __init__(self, codalab_home):
self.codalab_home = path_util.normalize(codalab_home)
self.partitions = os.path.join(self.codalab_home, 'partitions')
path_util.make_directory(self.partitions)
self.refresh_partitions()
if self.__get_num_partitions() == 0: # Ensure at least one partition exists.
self.add_partition(None, 'default')
self.lru_cache = OrderedDict()
def head_target(self, target, num_lines):
path = self.get_target_path(target)
return path_util.read_lines(path, num_lines)
def __get_num_partitions(self):
"""
Returns the current number of disks being used by this MultiDiskBundleStore.
This is calculated as the number of directories in self.partitions
"""
return reduce(lambda dirs, _: len(dirs), path_util.ls(self.partitions))