Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run(self):
res_queue = Queue()
cmd_queues: List[Queue] = [Queue() for i in range(self.args.num_workers)]
all_children = []
# Create (n) actors for gathering trajectories
actor_epsilons = np.linspace(0.001, 0.5, self.args.num_workers)
actors = [
MathyActor(
args=self.args,
command_queue=cmd_queues[i],
experience=self.experience,
greedy_epsilon=actor_epsilons[i],
result_queue=res_queue,
teacher=self.teacher,
worker_idx=i,
writer=self.writer,
)
for i in range(self.args.num_workers)
]
all_children += actors
# Create one learner for training on replay data
learner = MathyLearner(
args=self.args,
writer=self.writer,
)
all_children.append(learner)
for i, worker in enumerate(all_children):
worker.start()
try:
while True:
experience_frame: Optional[ExperienceFrame] = res_queue.get()
if experience_frame is not None:
self.experience.add_frame(experience_frame)
else:
break
except KeyboardInterrupt:
print("Received Keyboard Interrupt. Shutting down.")
MathyActor.request_quit = True
MathyLearner.request_quit = True
learner.model.save()
[w.join() for w in all_children]
print("Done. Bye!")
last_state: MathyEnvState,
):
env_name = self.teacher.get_env(self.worker_idx, self.iteration)
reward_sum = 0.0 # terminal
discounted_rewards: List[float] = []
for reward in episode_memory.rewards[::-1]:
reward_sum = reward + self.args.gamma * reward_sum
discounted_rewards.append(reward_sum)
discounted_rewards.reverse()
discounted_rewards = tf.convert_to_tensor(
value=np.array(discounted_rewards)[:, None], dtype=tf.float32
)
# Store experience frames now that we have finalized discounted
# reward values.
episode_memory.commit_frames(self.worker_idx, discounted_rewards)
MathyActor.global_moving_average_reward = record(
episode_reward,
self.worker_idx,
episode_steps,
env_name,
self.experience.is_full(),
)
self.maybe_write_episode_summaries(episode_reward, episode_steps, last_state)
def __init__(
self,
args: BaseConfig,
result_queue: Queue,
command_queue: Queue,
experience: Experience,
worker_idx: int,
greedy_epsilon: float,
writer: tf.summary.SummaryWriter,
teacher: Teacher,
):
super(MathyActor, self).__init__()
self.args = args
self.iteration = 0
self.experience = experience
self.greedy_epsilon = greedy_epsilon
self.worker_step_count = 0
self.result_queue = result_queue
self.command_queue = command_queue
self.worker_idx = worker_idx
self.teacher = teacher
self.envs = {}
env_name = self.teacher.get_env(self.worker_idx, self.iteration)
self.envs[env_name] = gym.make(env_name)
self.action_size = self.envs[env_name].action_space.n
self.writer = writer
self.model = ActorCriticModel(args=args, predictions=self.action_size)
self.model.maybe_load(self.envs[env_name].initial_state())
def run_episode(self, episode_memory: EpisodeMemory):
env_name = self.teacher.get_env(self.worker_idx, self.iteration)
if env_name not in self.envs:
self.envs[env_name] = gym.make(env_name)
env = self.envs[env_name]
episode_memory.clear()
self.ep_loss = 0
ep_reward = 0.0
ep_steps = 0
done = False
last_state = env.reset()
last_text = env.state.agent.problem
last_action = -1
last_reward = -1
while not done and MathyActor.request_quit is False:
# store rnn state for replay training
rnn_state_h = self.model.embedding.state_h.numpy()
rnn_state_c = self.model.embedding.state_c.numpy()
sample = episode_memory.get_current_batch(last_state, env.state)
if not self.experience.is_full():
# Select a random action from the last timestep mask
action_mask = sample.mask[-1][-1][:]
# normalize all valid action to equal probability
actions = action_mask / np.sum(action_mask)
action = np.random.choice(len(actions), p=actions)
value = np.random.random()
elif np.random.random() < self.greedy_epsilon:
_, value = self.model.predict_next(sample)
# Select a random action from the last timestep mask