Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _fail_unresponsive_bundles(self):
"""
Fail bundles in uploading, staged and running state if we haven't heard from them for more than
BUNDLE_TIMEOUT_DAYS days.
"""
bundles_to_fail = (
self._model.batch_get_bundles(state=State.UPLOADING)
+ self._model.batch_get_bundles(state=State.STAGED)
+ self._model.batch_get_bundles(state=State.RUNNING)
)
now = time.time()
for bundle in bundles_to_fail:
# For simplicity, we use field metadata.created to calculate timeout for now.
# Ideally, we should use field metadata.last_updated.
if now - bundle.metadata.created > BUNDLE_TIMEOUT_DAYS * SECONDS_PER_DAY:
failure_message = 'Bundle has been stuck in {} state for more than {} days.'.format(
bundle.state, BUNDLE_TIMEOUT_DAYS
)
logger.info('Failing bundle %s: %s', bundle.uuid, failure_message)
self._model.update_bundle(
bundle,
{'state': State.FAILED, 'metadata': {'failure_message': failure_message}},
relevant_uuids = local.model.get_self_and_descendants(uuids, depth=sys.maxsize)
if not recursive:
# If any descendants exist, then we only delete uuids if force = True.
if (not force) and set(uuids) != set(relevant_uuids):
relevant = local.model.batch_get_bundles(uuid=(set(relevant_uuids) - set(uuids)))
raise UsageError(
'Can\'t delete bundles %s because the following bundles depend on them:\n %s'
% (' '.join(uuids), '\n '.join(bundle.simple_str() for bundle in relevant))
)
relevant_uuids = uuids
check_bundles_have_all_permission(local.model, request.user, relevant_uuids)
# Make sure we don't delete bundles which are active.
states = local.model.get_bundle_states(uuids)
logger.debug('delete states: %s', states)
active_uuids = [uuid for (uuid, state) in states.items() if state in State.ACTIVE_STATES]
logger.debug('delete actives: %s', active_uuids)
if len(active_uuids) > 0:
raise UsageError(
'Can\'t delete bundles: %s. ' % (' '.join(active_uuids))
+ 'For run bundles, kill them first. '
+ 'Bundles stuck not running will eventually '
+ 'automatically be moved to a state where they '
+ 'can be deleted.'
)
# Make sure that bundles are not referenced in multiple places (otherwise, it's very dangerous)
result = local.model.get_all_host_worksheet_uuids(relevant_uuids)
for uuid, host_worksheet_uuids in result.items():
worksheets = local.model.batch_get_worksheets(fetch_items=False, uuid=host_worksheet_uuids)
frozen_worksheets = [worksheet for worksheet in worksheets if worksheet.frozen]
if len(frozen_worksheets) > 0:
def server_state(self):
if self.is_killed:
return State.KILLED
elif self.exitcode != 0:
return State.FAILED
else:
return State.READY
"""
Takes in a list of bundle paths and a mapping of UUID to BundleModel, and returns a list of paths and
subpaths that need to be removed.
"""
to_delete = []
# Batch get information for all bundles stored on-disk
for bundle_path in bundle_paths:
uuid = _get_uuid(bundle_path)
# Screen for bundles stored on disk that are no longer in the database
bundle = db_bundle_by_uuid.get(uuid, None)
if bundle == None:
to_delete += [bundle_path]
continue
# Delete dependencies stored inside of READY or FAILED bundles
if bundle.state in [State.READY, State.FAILED]:
dep_paths = [
os.path.join(bundle_path, dep.child_path) for dep in bundle.dependencies
]
to_delete += list(filter(os.path.exists, dep_paths))
return to_delete
class RunStage(object):
"""
Defines the finite set of possible stages and transition functions
Note that it is important that each state be able to be re-executed
without unintended adverse effects (which happens upon worker resume)
"""
WORKER_STATE_TO_SERVER_STATE = {}
"""
This stage involves setting up the directory structure for the run
and preparing to start the container
"""
PREPARING = 'RUN_STAGE.PREPARING'
WORKER_STATE_TO_SERVER_STATE[PREPARING] = State.PREPARING
"""
Running encompasses the state where the user's job is running
"""
RUNNING = 'RUN_STAGE.RUNNING'
WORKER_STATE_TO_SERVER_STATE[RUNNING] = State.RUNNING
"""
This stage encompasses cleaning up intermediary components like
the dependency symlinks and also the releasing of dependencies
"""
CLEANING_UP = 'RUN_STAGE.CLEANING_UP'
WORKER_STATE_TO_SERVER_STATE[CLEANING_UP] = State.RUNNING
"""
Uploading results means the job's results are getting uploaded to the server
def _is_available_locally(self, uuid):
if self._bundle_model.get_bundle_state(uuid) in [State.RUNNING, State.PREPARING]:
return self._bundle_model.get_bundle_worker(uuid)['shared_file_system']
return True
"""
Transitions bundle to STARTING state:
Updates the last_updated metadata.
Adds a worker_run row that tracks which worker will run the bundle.
"""
with self.engine.begin() as connection:
# Check that it still exists.
row = connection.execute(
cl_bundle.select().where(cl_bundle.c.id == bundle.id)
).fetchone()
if not row:
# The user deleted the bundle.
return False
bundle_update = {
'state': State.STARTING,
'metadata': {'last_updated': int(time.time())},
}
self.update_bundle(bundle, bundle_update, connection)
worker_run_row = {'user_id': user_id, 'worker_id': worker_id, 'run_uuid': bundle.uuid}
connection.execute(cl_worker_run.insert().values(worker_run_row))
return True
def _stage_bundles(self):
"""
Stages bundles by:
1) Failing any bundles that have any missing or failed dependencies.
2) Staging any bundles that have all ready dependencies.
"""
bundles = self._model.batch_get_bundles(state=State.CREATED)
parent_uuids = set(dep.parent_uuid for bundle in bundles for dep in bundle.dependencies)
parents = self._model.batch_get_bundles(uuid=parent_uuids)
all_parent_states = {parent.uuid: parent.state for parent in parents}
all_parent_uuids = set(all_parent_states)
bundles_to_fail = []
bundles_to_stage = []
for bundle in bundles:
parent_uuids = set(dep.parent_uuid for dep in bundle.dependencies)
try:
check_bundles_have_read_permission(
self._model, self._model.get_user(bundle.owner_id), parent_uuids
)
except PermissionError as e:
class AWSBatchRunStage(object):
"""
Defines the finite set of possible stages and transition functions
Note that it is important that each state be able to be re-executed
without unintended adverse effects (which happens upon worker resume)
"""
WORKER_STATE_TO_SERVER_STATE = {}
"""
This stage is while we're checking on the job for the first time on Batch
"""
INITIALIZING = 'AWS_BATCH_RUN.INITIALIZING'
WORKER_STATE_TO_SERVER_STATE[INITIALIZING] = State.PREPARING
"""
This stage is for creating and submitting a job definition to Batch
"""
SETTING_UP = 'AWS_BATCH_RUN.SETTING_UP'
WORKER_STATE_TO_SERVER_STATE[SETTING_UP] = State.PREPARING
"""
This stage is for submitting the job to Batch
"""
SUBMITTING = 'AWS_BATCH_RUN.SUBMITTING'
WORKER_STATE_TO_SERVER_STATE[SUBMITTING] = State.PREPARING
"""
Running encompasses the state where the user's job is running
"""
def _restage_stuck_starting_bundles(self, workers):
"""
Moves bundles that got stuck in the STARTING state back to the STAGED
state so that they can be scheduled to run again.
"""
for bundle in self._model.batch_get_bundles(state=State.STARTING, bundle_type='run'):
if (
not workers.is_running(bundle.uuid)
or time.time() - bundle.metadata.last_updated > 5 * 60
): # Run message went missing.
logger.info('Re-staging run bundle %s', bundle.uuid)
if self._model.transition_bundle_staged(bundle):
workers.restage(bundle.uuid)