Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setup(self, run_id, columns=20):
sql_conn = get_db()
df_status = pd.read_sql_query('SELECT run_id, task_id, task_status_name, timestamp FROM task_status WHERE run_id=(?)',
sql_conn, params=(run_id, ))
close_db()
min_time = timestamp_to_int(min(df_status['timestamp']))
max_time = timestamp_to_int(max(df_status['timestamp']))
time_step = int((max_time - min_time) / columns)
minutes = time_step // 60
seconds = time_step % 60
return [html.H4('Bin width'),
html.Div(children=[html.Label(htmlFor='bin_width_minutes', children='Minutes'),
dcc.Input(id='bin_width_minutes', type='number', min=0, value=minutes)]),
html.Div(children=[html.Label(htmlFor='bin_width_seconds', children='Seconds'),
dcc.Input(id='bin_width_seconds', type='number', min=0, value=seconds)])]
def setup(self, run_id, columns=20):
sql_conn = get_db()
df_status = pd.read_sql_query('SELECT run_id, task_id, task_status_name, timestamp FROM task_status WHERE run_id=(?)',
sql_conn, params=(run_id, ))
close_db()
min_time = timestamp_to_int(min(df_status['timestamp']))
max_time = timestamp_to_int(max(df_status['timestamp']))
time_step = int((max_time - min_time) / columns)
minutes = time_step // 60
seconds = time_step % 60
return [html.P('Bin width'),
html.Label(htmlFor='bin_width_minutes', children='Minutes'),
dcc.Input(id='bin_width_minutes', type='number', min=0, value=minutes),
html.Label(htmlFor='bin_width_seconds', children='Seconds'),
dcc.Input(id='bin_width_seconds', type='number', min=0, value=seconds)]
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("-s", "--sitespec", default=None)
parser.add_argument("-c", "--count", default="10",
help="Count of apps to launch")
parser.add_argument("-d", "--debug", action='store_true',
help="Count of apps to launch")
args = parser.parse_args()
if args.sitespec:
config = None
try:
exec("import parsl; from {} import config".format(args.sitespec))
parsl.load(config)
except Exception as e:
print("Failed to load the requested config : ", args.sitespec)
exit(0)
# if args.debug:
# parsl.set_stream_logger()
# x = test_simple(int(args.count))
# x = test_imports()
x = test_parallel_for(int(args.count))
# x = test_parallel_for(int(args.count))
@python_app
def sleep10ms():
import time
time.sleep(0.01)
#sleep(0.01)
@python_app
def noop():
pass
#needed_time = args.tasks_per_trial * args.trials * 2 / target_workers
#if needed_time <= 1800: needed_time = 1800
walltime = time.strftime('%H:%M:%S', time.gmtime(needed_time))
print("The walltime for {} workers is {}".format(target_workers, walltime))
if target_workers % args.cores_per_node != 0:
nodes_per_block = 1
tasks_per_node = target_workers % args.cores_per_node
else:
nodes_per_block = int(target_workers / args.cores_per_node)
tasks_per_node = args.cores_per_node
config = Config(
executors=[
HighThroughputExecutor(
label="funcx_local",
# worker_debug=True,
worker_mode="singularity_reuse",
container_image=os.path.expanduser("~/sing-run.simg"),
cores_per_worker=int(args.cores_per_node / tasks_per_node),
max_workers=1,
address=address_by_interface("eth0"),
provider=CobaltProvider(
launcher=SingleNodeLauncher(),
init_blocks=1,
max_blocks=1,
queue=args.queue,
account='DLHub',
worker_init="source activate funcx_5"
),
)
import os
import os.path as osp
from parsl.providers import LocalProvider
from parsl.channels import LocalChannel
from parsl.executors import HighThroughputExecutor
from parsl.addresses import address_by_hostname
from parsl.config import Config
parsl_config = Config(
executors=[
HighThroughputExecutor(
label="coffea_parsl_default",
address=address_by_hostname(),
cores_per_worker=max(multiprocessing.cpu_count()//2, 1),
max_workers=1,
provider=LocalProvider(
channel=LocalChannel(),
init_blocks=1,
max_blocks=1,
nodes_per_block=1
),
)
],
strategy=None,
)
parsl.load(parsl_config)
filelist = {
'ZJets': [osp.join(os.getcwd(),'tests/samples/nano_dy.root')],
'Data' : [osp.join(os.getcwd(),'tests/samples/nano_dimuon.root')]
}
provider=CobaltProvider(
launcher=SingleNodeLauncher(),
init_blocks=1,
max_blocks=1,
queue=args.queue,
account='DLHub',
worker_init="source activate funcx_5"
),
)
],
run_dir="/home/tskluzac/FuncX/evaluation/runinfo",
strategy=None,
)
parsl.clear()
dfk = parsl.load(config)
executor = list(dfk.executors.values())[0]
@python_app
def noop():
pass
@python_app
def sleep10ms():
import time
time.sleep(0.01)
#sleep(0.01)
@python_app
def sleep100ms():
import time
time.sleep(0.1)
'''
job_name = "{0}.{1}".format(job_name, time.time())
# Set script path
script_path = "{0}/{1}.sh".format(self.script_dir, job_name)
script_path = os.path.abspath(script_path)
wrap_command = self.worker_init + '\n' + self.launcher(command, tasks_per_node, self.nodes_per_block)
self._write_submit_script(wrap_command, script_path)
job_id = None
proc = None
remote_pid = None
if (self.move_files is None and not isinstance(self.channel, LocalChannel)) or (self.move_files):
logger.debug("Pushing start script")
script_path = self.channel.push_file(script_path, self.channel.script_dir)
if not isinstance(self.channel, LocalChannel):
logger.debug("Launching in remote mode")
# Bash would return until the streams are closed. So we redirect to a outs file
cmd = 'bash {0} > {0}.out 2>&1 & \n echo "PID:$!" '.format(script_path)
retcode, stdout, stderr = self.channel.execute_wait(cmd, self.cmd_timeout)
for line in stdout.split('\n'):
if line.startswith("PID:"):
remote_pid = line.split("PID:")[1].strip()
job_id = remote_pid
if job_id is None:
logger.warning("Channel failed to start remote command/retrieve PID")
else:
# Set script path
script_path = "{0}/{1}.sh".format(self.script_dir, job_name)
script_path = os.path.abspath(script_path)
wrap_command = self.worker_init + '\n' + self.launcher(command, tasks_per_node, self.nodes_per_block)
self._write_submit_script(wrap_command, script_path)
job_id = None
proc = None
remote_pid = None
if (self.move_files is None and not isinstance(self.channel, LocalChannel)) or (self.move_files):
logger.debug("Moving start script")
script_path = self.channel.push_file(script_path, self.channel.script_dir)
if not isinstance(self.channel, LocalChannel):
logger.debug("Launching in remote mode")
# Bash would return until the streams are closed. So we redirect to a outs file
cmd = 'bash {0} &> {0}.out & \n echo "PID:$!" '.format(script_path)
retcode, stdout, stderr = self.channel.execute_wait(cmd, self.cmd_timeout)
for line in stdout.split('\n'):
if line.startswith("PID:"):
remote_pid = line.split("PID:")[1].strip()
job_id = remote_pid
if job_id is None:
logger.warning("Channel failed to start remote command/retrieve PID")
else:
try:
job_id, proc = self.channel.execute_no_wait('bash {0}'.format(script_path), self.cmd_timeout)
except Exception as e:
logger.debug("Channel execute failed for: {}, {}".format(self.channel, e))