Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'hidden_layer_sizes': (256, 256),
'squash': True,
}
},
sampler_params={
'type': 'SimpleSampler',
'kwargs': {
'max_path_length': HORIZON,
'min_pool_size': HORIZON,
'batch_size': 256,
}
},
run_params={
'seed': tune.sample_from(
lambda spec: np.random.randint(0, 10000)),
'checkpoint_at_end': True,
'checkpoint_frequency': HORIZON // N_CHECKPOINTS,
'checkpoint_replay_pool': False,
},
resources_per_trial={
'cpu': N_CPUS,
'gpu': N_GPUS,
'extra_cpu': 0,
'extra_gpu': 0,
}
)
class TestSoftActorCritic(unittest.TestCase):
def error_check(errors):
if num_nodes == 1:
# In a single-node setting, each object is evicted and
# reconstructed exactly once, so exactly half the objects will
# produce an error during reconstruction.
min_errors = num_objects // 2
else:
# In a multinode setting, each object is evicted zero or one
# times, so some of the nondeterministic tasks may not be
# reexecuted.
min_errors = 1
return len(errors) >= min_errors
errors = wait_for_errors(error_check)
# Make sure all the errors have the correct type.
assert all(error["type"] == ray_constants.HASH_MISMATCH_PUSH_ERROR
for error in errors)
assert cluster.remaining_processes_alive()
def test_int_dataframe():
ray.init()
pandas_df = pd.DataFrame({'col1': [0, 1, 2, 3],
'col2': [4, 5, 6, 7],
'col3': [8, 9, 10, 11],
'col4': [12, 13, 14, 15]})
ray_df = rdf.from_pandas(pandas_df, 2)
testfuncs = [lambda x: x + 1,
lambda x: str(x),
lambda x: x * x,
lambda x: x,
lambda x: False]
test_roundtrip(ray_df, pandas_df)
test_index(ray_df, pandas_df)
def test_on_ring(self):
trainable_class = ExperimentRunner
variant_spec = get_variant_spec(sac_params)
experiment_kwargs = generate_experiment_kwargs(variant_spec)
try:
ray.init(num_cpus=N_CPUS, num_gpus=N_GPUS)
except Exception:
pass # already initialized
tune.run(
trainable_class,
**experiment_kwargs,
reuse_actors=True)
def run_driver():
output = run_string_as_driver(driver_script)
assert "success" in output
iteration = 0
running_ids = [
run_driver._remote(
args=[], kwargs={}, num_cpus=0, resources={str(i): 0.01})
for i in range(num_nodes)
]
start_time = time.time()
previous_time = start_time
while True:
# Wait for a driver to finish and start a new driver.
[ready_id], running_ids = ray.wait(running_ids, num_returns=1)
ray.get(ready_id)
running_ids.append(
run_driver._remote(
args=[],
kwargs={},
num_cpus=0,
resources={str(iteration % num_nodes): 0.01}))
new_time = time.time()
print("Iteration {}:\n"
" - Iteration time: {}.\n"
" - Absolute time: {}.\n"
" - Total elapsed time: {}.".format(
iteration, new_time - previous_time, new_time,
new_time - start_time))
def run_exp(flow_params, **kwargs):
alg_run, env_name, config = setup_rllib_exps(flow_params, 1, 1, **kwargs)
try:
ray.init(num_cpus=1)
except Exception as e:
print("ERROR", e)
config['train_batch_size'] = 50
config['horizon'] = 50
config['sample_batch_size'] = 50
config['num_workers'] = 0
config['sgd_minibatch_size'] = 32
run_experiments({
'test': {
'run': alg_run,
'env': env_name,
'config': {
**config
},
'checkpoint_freq': 1,
'stop': {
'training_iteration': 1,
},
def ping(self):
return
@ray.remote
class Worker(object):
def __init__(self, actor):
self.actor = actor
def ping(self):
return ray.get(self.actor.ping.remote())
a = Actor.remote()
workers = [Worker.remote(a) for _ in range(100)]
for _ in range(10):
out = ray.get([w.ping.remote() for w in workers])
assert out == [None for _ in workers]
@ray.remote
def multiple_dependency(i, arg1, arg2, arg3):
arg1 = np.copy(arg1)
arg1[0] = i
return arg1
elapsed_times.append(end_time - start_time)
elapsed_times = np.sort(elapsed_times)
average_elapsed_time = sum(elapsed_times) / 1000
print("Time required to submit a trivial function call and get the "
"result:")
print(" Average: {}".format(average_elapsed_time))
print(" 90th percentile: {}".format(elapsed_times[900]))
print(" 99th percentile: {}".format(elapsed_times[990]))
print(" worst: {}".format(elapsed_times[999]))
# average_elapsed_time should be about 0.0013.
# Measure the time required to do do a put.
elapsed_times = []
for _ in range(1000):
start_time = time.time()
ray.put(1)
end_time = time.time()
elapsed_times.append(end_time - start_time)
elapsed_times = np.sort(elapsed_times)
average_elapsed_time = sum(elapsed_times) / 1000
print("Time required to put an int:")
print(" Average: {}".format(average_elapsed_time))
print(" 90th percentile: {}".format(elapsed_times[900]))
print(" 99th percentile: {}".format(elapsed_times[990]))
print(" worst: {}".format(elapsed_times[999]))
# average_elapsed_time should be about 0.00087.
def tune_run(config):
agent = config['env_config']['agent']
experiment, scheduler = get_tune_experiment(config, agent)
tune.run_experiments(experiment, scheduler=scheduler)