Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@actor.no_retry("race control")
def receiveMsg_PreparationComplete(self, msg, sender):
self.race.distribution_flavor = msg.distribution_flavor
self.race.distribution_version = msg.distribution_version
self.race.revision = msg.revision
# store race initially (without any results) so other components can retrieve full metadata
self.race_store.store_race(self.race)
if self.race.challenge.auto_generated:
console.info("Racing on track [{}] and car {} with version [{}].\n"
.format(self.race.track_name, self.race.car, self.race.distribution_version))
else:
console.info("Racing on track [{}], challenge [{}] and car {} with version [{}].\n"
.format(self.race.track_name, self.race.challenge_name, self.race.car, self.race.distribution_version))
self.run()
@actor.no_retry("driver")
def receiveMsg_PrepareBenchmark(self, msg, sender):
self.start_sender = sender
self.coordinator = Driver(self, msg.config)
self.coordinator.prepare_benchmark(msg.track)
@actor.no_retry("mechanic")
def receiveMsg_NodesStarted(self, msg, sender):
self.metrics_store.merge_meta_info(msg.system_meta_info)
# Initially the addresses of the children are not
# known and there is just a None placeholder in the
# array. As addresses become known, fill them in.
if sender not in self.children:
# Length-limited FIFO characteristics:
self.children.insert(0, sender)
self.children.pop()
self.transition_when_all_children_responded(sender, msg, "starting", "nodes_started", self.on_all_nodes_started)
@actor.no_retry("mechanic")
def receiveMsg_ResetRelativeTime(self, msg, sender):
if msg.reset_in_seconds > 0:
self.wakeupAfter(msg.reset_in_seconds, payload=MechanicActor.WAKEUP_RESET_RELATIVE_TIME)
else:
self.reset_relative_time()
@actor.no_retry("track preparator")
def receiveMsg_PrepareTrack(self, msg, sender):
# load node-specific config to have correct paths available
cfg = load_local_config(msg.config)
self.logger.info("Preparing track [%s]", msg.track.name)
# for "proper" track repositories this will ensure that all state is identical to the coordinator node. For simple tracks
# the track is usually self-contained but in some cases (plugins are defined) we still need to ensure that the track
# is present on all machines.
if msg.track.has_plugins:
track.track_repo(cfg, fetch=True, update=True)
# we also need to load track plugins eagerly as the respective parameter sources could require
track.load_track_plugins(cfg, runner.register_runner, scheduler.register_scheduler)
# Beware: This is a potentially long-running operation and we're completely blocking our actor here. We should do this
# maybe in a background thread.
track.prepare_track(msg.track, cfg)
self.send(sender, TrackPrepared())
@actor.no_retry("load generator")
def receiveMsg_WakeupMessage(self, msg, sender):
# it would be better if we could send ourselves a message at a specific time, simulate this with a boolean...
if self.start_driving:
self.logger.info("LoadGenerator[%s] starts driving now.", str(self.client_id))
self.start_driving = False
self.drive()
else:
current_samples = self.send_samples()
if self.cancel.is_set():
self.logger.info("LoadGenerator[%s] has detected that benchmark has been cancelled. Notifying master...",
str(self.client_id))
self.send(self.master, actor.BenchmarkCancelled())
elif self.executor_future is not None and self.executor_future.done():
e = self.executor_future.exception(timeout=0)
if e:
self.logger.info("LoadGenerator[%s] has detected a benchmark failure. Notifying master...", str(self.client_id))
@actor.no_retry("race control")
def receiveMsg_BenchmarkFailure(self, msg, sender):
self.logger.info("Received a benchmark failure from [%s] and will forward it now.", sender)
self.error = True
self.send(self.start_sender, msg)
@actor.no_retry("mechanic")
def receiveMsg_OnBenchmarkStart(self, msg, sender):
self.metrics_store.lap = msg.lap
# in the first lap, we are in state "cluster_started", after that in "benchmark_stopped"
self.send_to_children_and_transition(sender, msg, ["cluster_started", "benchmark_stopped"], "benchmark_starting")
@actor.no_retry("mechanic dispatcher")
def receiveMsg_StartEngine(self, startmsg, sender):
self.start_sender = sender
self.pending = []
self.remotes = defaultdict(list)
all_ips_and_ports = to_ip_port(startmsg.hosts)
all_node_ips = extract_all_node_ips(all_ips_and_ports)
for (ip, port), node in nodes_by_host(all_ips_and_ports).items():
submsg = startmsg.for_nodes(all_node_ips, ip, port, node)
submsg.reply_to = sender
if '127.0.0.1' == ip:
m = self.createActor(NodeMechanicActor,
targetActorRequirements={"coordinator": True})
self.pending.append((m, submsg))
else:
self.remotes[ip].append(submsg)
@actor.no_retry("mechanic")
def receiveMsg_BenchmarkStopped(self, msg, sender):
self.metrics_store.bulk_add(msg.system_metrics)
self.transition_when_all_children_responded(
sender, msg, "benchmark_stopping", "benchmark_stopped", self.on_benchmark_stopped)