Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
)
if args.debug:
logger.info("-" * 16)
logger.info("Configuration:")
logger.info(agent_config)
if args.save:
save_dir = os.path.dirname(args.save)
if not os.path.isdir(save_dir):
try:
os.mkdir(save_dir, 0o755)
except OSError:
raise OSError("Cannot save agent to dir {} ()".format(save_dir))
runner = Runner(
agent=agent,
environment=environment,
repeat_actions=1
)
report_episodes = args.episodes // 1000
if args.debug:
report_episodes = 1
def episode_finished(r):
if r.episode % report_episodes == 0:
sps = r.timestep / (time.time() - r.start_time)
logger.info("Finished episode {ep} after {ts} timesteps. Steps Per Second {sps}".format(ep=r.episode, ts=r.timestep, sps=sps))
logger.info("Episode reward: {}".format(r.episode_rewards[-1]))
logger.info("Average of last 500 rewards: {}".format(sum(r.episode_rewards[-500:]) / 500))
logger.info("Average of last 100 rewards: {}".format(sum(r.episode_rewards[-100:]) / 100))
def create_training_operations(self, config):
num_actions = sum(util.prod(self.actions_config[name]['shape']) for name in sorted(self.action))
# Get hidden layers from network generator, then add NAF outputs, same for target network
flat_mean = layers['linear'](x=self.training_network.output, size=num_actions, scope='naf_action_means')
n = 0
for name in sorted(self.action):
shape = self.actions_config[name]['shape']
self.action_taken[name] = tf.reshape(tensor=flat_mean[:, n: n + util.prod(shape)], shape=((-1,) + shape))
n += util.prod(shape)
# Advantage computation
# Network outputs entries of lower triangular matrix L
lower_triangular_size = num_actions * (num_actions + 1) // 2
l_entries = layers['linear'](x=self.training_network.output, size=lower_triangular_size, scope='naf_matrix_entries')
l_matrix = tf.exp(x=tf.map_fn(fn=tf.diag, elems=l_entries[:, :num_actions]))
def test_environment(self):
self.start_tests(name='getting-started-environment')
environment = Environment.create(
environment='gym', level='CartPole', max_episode_timesteps=500
)
self.finished_test()
environment = Environment.create(environment='gym', level='CartPole-v1')
self.finished_test()
environment = Environment.create(
environment='test/data/environment.json', max_episode_timesteps=500
)
self.finished_test()
environment = Environment.create(
environment='test.data.custom_env.CustomEnvironment', max_episode_timesteps=10
)
self.finished_test()
def test_execution(self):
self.start_tests(name='getting-started-execution')
runner = Runner(
agent='test/data/agent.json', environment=dict(environment='gym', level='CartPole'),
max_episode_timesteps=10
)
runner.run(num_episodes=10)
runner.run(num_episodes=5, evaluation=True)
runner.close()
self.finished_test()
# Create agent and environment
environment = Environment.create(
environment='test/data/environment.json', max_episode_timesteps=10
)
agent = Agent.create(agent='test/data/agent.json', environment=environment)
# Train for 200 episodes
for _ in range(10):
states = environment.reset()
terminal = False
while not terminal:
actions = agent.act(states=states)
states, terminal, reward = environment.execute(actions=actions)
agent.observe(terminal=terminal, reward=reward)
# Evaluate for 100 episodes
sum_rewards = 0.0
for _ in range(5):
def test_agent(self):
self.start_tests(name='getting-started-agent')
environment = Environment.create(
environment='gym', level='CartPole', max_episode_timesteps=50
)
self.finished_test()
agent = Agent.create(
agent='tensorforce', environment=environment, update=64,
objective='policy_gradient', reward_estimation=dict(horizon=20)
)
self.finished_test()
agent = Agent.create(
agent='ppo', environment=environment, batch_size=10, learning_rate=1e-3
)
self.finished_test()
agent = Agent.create(agent='test/data/agent.json', environment=environment)
def long_unittest(self, horizon):
agent, environment = self.prepare(
min_timesteps=3, reward_estimation=dict(horizon=horizon), memory=20
)
states = environment.reset()
actions = agent.act(states=states)
states, terminal, reward = environment.execute(actions=actions)
_, horizon_output1 = agent.observe(terminal=terminal, reward=reward, query='horizon')
self.assertIsInstance(horizon_output1, util.np_dtype(dtype='long'))
if not isinstance(horizon, dict) or horizon['type'] == 'constant':
actions = agent.act(states=states)
states, terminal, reward = environment.execute(actions=actions)
_, horizon_output2 = agent.observe(terminal=terminal, reward=reward, query='horizon')
self.assertEqual(horizon_output2, horizon_output1)
else:
actions = agent.act(states=states)
states, terminal, reward = environment.execute(actions=actions)
_, horizon_output2 = agent.observe(terminal=terminal, reward=reward, query='horizon')
self.assertNotEqual(horizon_output2, horizon_output1)
actions = agent.act(states=states)
_, terminal, reward = environment.execute(actions=actions)
horizon_input = 3
def execute(self, actions):
if not self.is_valid_actions(actions, self._states):
raise TensorforceError.value(name='actions', value=actions)
self.timestep += 1
self._states = self.random_states()
terminal = (self.timestep >= self.min_timesteps and random() < 0.25)
reward = -1.0 + 2.0 * random()
return self._states, terminal, reward
return (lambda action, name, states: (
(
(isinstance(action, util.np_dtype('int')) and shape == ()) or
(
isinstance(action, np.ndarray) and
action.dtype == util.np_dtype('int') and action.shape == shape
)
) and (0 <= action).all() and (action < num_values).all() and
np.take_along_axis(
states[name + '_mask'], indices=np.expand_dims(action, axis=-1), axis=-1
).all()
return (lambda action, name, states: (
(isinstance(action, util.np_dtype('bool')) and shape == ()) or
(
isinstance(action, np.ndarray) and
action.dtype == util.np_dtype('bool') and action.shape == shape
)
with open(args.experiment_spec) as fp:
experiment_spec = json.load(fp=fp)
run_mode = experiment_spec.get("run_mode", "distributed")
if run_mode == "distributed":
ps_hosts = args.ps_hosts.split(",")
worker_hosts = args.worker_hosts.split(",") # []
cluster = {'ps': ps_hosts, 'worker': worker_hosts}
cluster_spec = tf.train.ClusterSpec(cluster)
else:
cluster_spec = None
if "environment" not in experiment_spec:
raise TensorForceError("No 'environment' configuration found in experiment-spec.")
environment_spec = experiment_spec["environment"]
# check for remote env and log it (remote envs are put into a separate container)
is_remote = environment_spec.pop("remote", False)
env_kwargs = {}
if is_remote:
img = environment_spec.pop("image", "default")
env_kwargs.update({"host": args.remote_env_host})
logger.info("Experiment is run with RemoteEnvironment {} (in separate container).".format(img))
if run_mode != "multi-threaded":
environments = [Environment.from_spec(experiment_spec["environment"], env_kwargs)]
else:
# For remote-envs in multi-threaded mode, we need to set a sequence of ports as all envs will be running
# in the same pod. For single mode: Use the default port.
environments = [Environment.from_spec(experiment_spec["environment"], env_kwargs)]