Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if not net.has_internet_connection():
console.warn("No Internet connection detected. Automatic download of track data sets etc. is disabled.",
logger=logger)
cfg.add(config.Scope.applicationOverride, "system", "offline.mode", True)
else:
logger.info("Detected a working Internet connection.")
success = dispatch_sub_command(cfg, sub_command)
end = time.time()
if success:
console.println("")
console.info("SUCCESS (took %d seconds)" % (end - start), overline="-", underline="-")
else:
console.println("")
console.info("FAILURE (took %d seconds)" % (end - start), overline="-", underline="-")
sys.exit(64)
if not net.has_internet_connection():
console.warn("No Internet connection detected. Automatic download of track data sets etc. is disabled.",
logger=logger)
cfg.add(config.Scope.applicationOverride, "system", "offline.mode", True)
else:
logger.info("Detected a working Internet connection.")
success = dispatch_sub_command(cfg, sub_command)
end = time.time()
if success:
console.println("")
console.info("SUCCESS (took %d seconds)" % (end - start), overline="-", underline="-")
else:
console.println("")
console.info("FAILURE (took %d seconds)" % (end - start), overline="-", underline="-")
sys.exit(64)
cfg.add(config.Scope.applicationOverride, "generator", "chart.spec.path", args.chart_spec_path)
else:
# other options are stored elsewhere already
cfg.add(config.Scope.applicationOverride, "generator", "node.count", args.node_count)
cfg.add(config.Scope.applicationOverride, "driver", "profiling", args.enable_driver_profiling)
cfg.add(config.Scope.applicationOverride, "driver", "on.error", args.on_error)
cfg.add(config.Scope.applicationOverride, "driver", "load_driver_hosts", opts.csv_to_list(args.load_driver_hosts))
if sub_command != "list":
# Also needed by mechanic (-> telemetry) - duplicate by module?
target_hosts = opts.TargetHosts(args.target_hosts)
cfg.add(config.Scope.applicationOverride, "client", "hosts", target_hosts)
client_options = opts.ClientOptions(args.client_options, target_hosts=target_hosts)
cfg.add(config.Scope.applicationOverride, "client", "options", client_options)
if "timeout" not in client_options.default:
console.info("You did not provide an explicit timeout in the client options. Assuming default of 60 seconds.")
if list(target_hosts.all_hosts) != list(client_options.all_client_options):
console.println("--target-hosts and --client-options must define the same keys for multi cluster setups.")
exit(1)
# split by component?
if sub_command == "list":
cfg.add(config.Scope.applicationOverride, "system", "list.config.option", args.configuration)
cfg.add(config.Scope.applicationOverride, "system", "list.races.max_results", args.limit)
configure_logging(cfg)
logger.info("OS [%s]" % str(os.uname()))
logger.info("Python [%s]" % str(sys.implementation))
logger.info("Rally version [%s]" % version.version())
logger.info("Command line arguments: %s" % args)
# Configure networking
net.init()
if not args.offline:
def __call__(self, *args, **kwargs):
binaries = {}
console.info("Preparing for race ...", flush=True)
try:
for supplier in self.suppliers:
supplier.fetch()
for supplier in self.suppliers:
supplier.prepare()
for supplier in self.suppliers:
supplier.add(binaries)
return binaries
except BaseException:
raise
def prepare(self, binary):
if not self.preserve:
console.info("Rally will delete the benchmark candidate after the benchmark")
self.es_installer.install(binary["elasticsearch"])
# we need to immediately delete it as plugins may copy their configuration during installation.
self.es_installer.delete_pre_bundled_configuration()
# determine after installation because some variables will depend on the install directory
target_root_path = self.es_installer.es_home_path
provisioner_vars = self._provisioner_variables()
for p in self.es_installer.config_source_paths:
self.apply_config(p, target_root_path, provisioner_vars)
for installer in self.plugin_installers:
installer.install(target_root_path, binary.get(installer.plugin_name))
for plugin_config_path in installer.config_source_paths:
self.apply_config(plugin_config_path, target_root_path, provisioner_vars)
def generate(cfg):
logger = logging.getLogger(__name__)
chart_spec_path = cfg.opts("generator", "chart.spec.path", mandatory=False)
if cfg.opts("generator", "chart.type") == "time-series":
chart_type = TimeSeriesCharts
else:
chart_type = BarCharts
console.info("Loading track data...", flush=True)
race_configs = load_race_configs(cfg, chart_type, chart_spec_path)
env = cfg.opts("system", "env.name")
structures = []
console.info("Generating charts...", flush=True)
if chart_spec_path:
if chart_type == BarCharts:
# bar charts are flavor agnostic and split results based on a separate `user.setup` field
structures = gen_charts_per_track(race_configs, chart_type, env, logger=logger)
elif chart_type == TimeSeriesCharts:
structures = gen_charts_from_track_combinations(race_configs, chart_type, env, logger)
else:
# Process a normal track
structures = gen_charts_per_track(race_configs, chart_type, env, logger=logger)
def decompress(self, archive_path, documents_path, uncompressed_size):
if uncompressed_size:
console.info("Decompressing track data from [%s] to [%s] (resulting size: %.2f GB) ... " %
(archive_path, documents_path, convert.bytes_to_gb(uncompressed_size)),
end='', flush=True, logger=self.logger)
else:
console.info("Decompressing track data from [%s] to [%s] ... " % (archive_path, documents_path), end='',
flush=True, logger=self.logger)
io.decompress(archive_path, io.dirname(archive_path))
console.println("[OK]")
if not os.path.isfile(documents_path):
raise exceptions.DataError("Decompressing [%s] did not create [%s]. Please check with the track author if the compressed "
"archive has been created correctly." % (archive_path, documents_path))
extracted_bytes = os.path.getsize(documents_path)
if uncompressed_size is not None and extracted_bytes != uncompressed_size:
raise exceptions.DataError("[%s] is corrupt. Extracted [%d] bytes but [%d] bytes are expected." %
(documents_path, extracted_bytes, uncompressed_size))
# builds we always assume "master"
if not msg.sources and not self.cfg.exists("mechanic", "distribution.version"):
distribution_version = mechanic.cluster_distribution_version(self.cfg)
if not distribution_version:
raise exceptions.SystemSetupError("A distribution version is required. Please specify it with --distribution-version.")
logger.info("Automatically derived distribution version [%s]" % distribution_version)
self.cfg.add(config.Scope.benchmark, "mechanic", "distribution.version", distribution_version)
t = track.load_track(self.cfg)
challenge_name = self.cfg.opts("track", "challenge.name")
challenge = t.find_challenge_or_default(challenge_name)
if challenge is None:
raise exceptions.SystemSetupError("Track [%s] does not provide challenge [%s]. List the available tracks with %s list tracks."
% (t.name, challenge_name, PROGRAM_NAME))
if challenge.user_info:
console.info(challenge.user_info, logger=logger)
self.race = metrics.create_race(self.cfg, t, challenge)
self.metrics_store = metrics.metrics_store(
self.cfg,
track=self.race.track_name,
challenge=self.race.challenge_name,
read_only=False
)
self.lap_counter = LapCounter(self.race, self.metrics_store, self.cfg)
self.race_store = metrics.race_store(self.cfg)
logger.info("Asking mechanic to start the engine.")
cluster_settings = self.race.challenge.cluster_settings
self.send(self.mechanic, mechanic.StartEngine(self.cfg, self.metrics_store.open_context, cluster_settings, msg.sources, msg.build,
msg.distribution, msg.external, msg.docker))
# note that this check will only evaluate to True for a TCP-based actor system.
timeout = 15
while actor.actor_system_already_running() and timeout > 0:
logger.info("Actor system is still running. Waiting...")
time.sleep(1)
timeout -= 1
if timeout > 0:
shutdown_complete = True
logger.info("Shutdown completed.")
else:
logger.warning("Shutdown timed out. Actor system is still running.")
break
except KeyboardInterrupt:
times_interrupted += 1
logger.warning("User interrupted shutdown of internal actor system.")
console.info("Please wait a moment for Rally's internal components to shutdown.")
if not shutdown_complete and times_interrupted > 0:
logger.warning("Terminating after user has interrupted actor system shutdown explicitly for [%d] times.", times_interrupted)
console.println("")
console.warn("Terminating now at the risk of leaving child processes behind.")
console.println("")
console.warn("The next race may fail due to an unclean shutdown.")
console.println("")
console.println(SKULL)
console.println("")
elif not shutdown_complete:
console.warn("Could not terminate all internal processes within timeout. Please check and force-terminate all Rally processes.")
def prepare_file_offset_table(data_file_path):
"""
Creates a file that contains a mapping from line numbers to file offsets for the provided path. This file is used internally by
#skip_lines(data_file_path, data_file) to speed up line skipping.
:param data_file_path: The path to a text file that is readable by this process.
:return The number of lines read or ``None`` if it did not have to build the file offset table.
"""
offset_file_path = "%s.offset" % data_file_path
# recreate only if necessary as this can be time-consuming
if not os.path.exists(offset_file_path) or os.path.getmtime(offset_file_path) < os.path.getmtime(data_file_path):
console.info("Preparing file offset table for [%s] ... " % data_file_path, end="", flush=True)
line_number = 0
with open(offset_file_path, mode="wt", encoding="utf-8") as offset_file:
with open(data_file_path, mode="rt", encoding="utf-8") as data_file:
while True:
line = data_file.readline()
if len(line) == 0:
break
line_number += 1
if line_number % 50000 == 0:
print("%d;%d" % (line_number, data_file.tell()), file=offset_file)
console.println("[OK]")
return line_number
else:
return None