Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def main(reactor, loops):
"""
Benchmark `loops` number of insertions into LruCache where half of them are
evicted.
"""
cache = LruCache(loops // 2)
start = perf_counter()
for i in range(loops):
cache[i] = True
end = perf_counter() - start
return end
def test_calibrate_warmups(self, nwarmup, unit):
half = nwarmup + (len(self.warmups) - nwarmup) // 2
sample1 = [value for loops, value in self.warmups[nwarmup:half]]
sample2 = [value for loops, value in self.warmups[half:]]
first_value = sample1[0]
# test if the first value is an outlier
values = sample1[1:] + sample2
q1 = percentile(values, 0.25)
q3 = percentile(values, 0.75)
iqr = q3 - q1
outlier_max = (q3 + 1.5 * iqr)
# only check maximum, not minimum
outlier = not(first_value <= outlier_max)
mean1 = statistics.mean(sample1)
mean2 = statistics.mean(sample2)
mean_diff = (mean1 - mean2) / float(mean2)
s1_q1 = percentile(sample1, 0.25)
s2_q1 = percentile(sample2, 0.25)
s1_q3 = percentile(sample1, 0.75)
s2_q3 = percentile(sample2, 0.75)
q1_diff = (s1_q1 - s2_q1) / float(s2_q1)
q3_diff = (s1_q3 - s2_q3) / float(s2_q3)
def _test(spawn, sleep, options):
global counter
counter = 0
before_spawn = perf_counter()
for _ in xrange(N):
spawn(incr, **options.kwargs)
spawn_duration = perf_counter() - before_spawn
if options.sleep:
assert counter == 0, counter
before_sleep = perf_counter()
sleep(0)
sleep_duration = perf_counter() - before_sleep
assert counter == N, (counter, N)
else:
sleep_duration = -1
if options.join:
before_join = perf_counter()
options.join()
join_duration = perf_counter() - before_join
def main():
runner = pyperf.Runner(add_cmdline_args=add_cmdline_args)
runner.argparser.add_argument("--cases",
help="Comma separated list of cases. Available cases: %s. By default, run all cases."
% ', '.join(CASES))
runner.metadata['description'] = "Benchmark json.dumps()"
args = runner.parse_args()
if args.cases:
cases = []
for case in args.cases.split(','):
case = case.strip()
if case:
cases.append(case)
if not cases:
print("ERROR: empty list of cases")
sys.exit(1)
else:
edit.destroy_constraint()
# HOORAY FOR GLOBALS... Oh wait.
# In spirit of the original, we'll keep it, but ugh.
planner = None
def delta_blue(n):
chain_test(n)
projection_test(n)
if __name__ == "__main__":
runner = pyperf.Runner()
runner.metadata['description'] = "DeltaBlue benchmark"
n = 100
runner.bench_func('deltablue', delta_blue, n)
start=start)
start += nvalue
if self.test_calibrate_warmups(nwarmup, unit):
break
if len(self.warmups) >= MAX_WARMUP_VALUES:
print("ERROR: failed to calibrate the number of warmups")
values = [format_value(unit, value)
for loops, value in self.warmups]
print("Values (%s): %s" % (len(values), ', '.join(values)))
sys.exit(1)
nwarmup += 1
if self.args.verbose:
print("Calibration: use %s warmups" % format_number(nwarmup))
print()
if self.args.recalibrate_warmups:
self.metadata['recalibrate_warmups'] = nwarmup
else:
self.metadata['calibrate_warmups'] = nwarmup
if args.verbose:
text = format_value(unit, value)
if is_warmup:
text = ('%s (loops: %s, raw: %s)'
% (text,
format_number(self.loops),
format_value(unit, raw_value)))
print("%s %s: %s" % (value_name, start + index, text))
if calibrate_loops and raw_value < args.min_time:
if self.loops * 2 > MAX_LOOPS:
print("ERROR: failed to calibrate the number of loops")
print("Raw timing %s with %s is still smaller than "
"the minimum time of %s"
% (format_value(unit, raw_value),
format_number(self.loops, 'loop'),
format_timedelta(args.min_time)))
sys.exit(1)
self.loops *= 2
# need more values for the calibration
nvalue += 1
index += 1
def format_run(bench, run_index, run, common_metadata=None, raw=False,
verbose=0, lines=None):
if lines is None:
lines = []
inner_loops = run.get_inner_loops()
if run._is_calibration():
if run._is_calibration_warmups():
warmups = run._get_calibration_warmups()
action = 'calibrate the number of warmups: %s' % format_number(warmups)
elif run._is_recalibration_warmups():
warmups = run._get_calibration_warmups()
action = 'recalibrate the number of warmups: %s' % format_number(warmups)
elif run._is_recalibration_loops():
loops = run._get_calibration_loops()
action = 'recalibrate the number of loops: %s' % format_number(loops)
else:
loops = run._get_calibration_loops()
action = 'calibrate the number of loops: %s' % format_number(loops)
lines.append("Run %s: %s" % (run_index, action))
if raw:
name = 'raw calibrate'
else:
name = 'calibrate'
unit = bench.get_unit()
format_value = bench.format_value
else:
isolated = False
cpus = parse_cpu_list(cpus)
if set_cpu_affinity(cpus):
if self.args.verbose:
if isolated:
text = ("Pin process to isolated CPUs: %s"
% format_cpu_list(cpus))
else:
text = ("Pin process to CPUs: %s"
% format_cpu_list(cpus))
print(text)
if isolated:
self.args.affinity = format_cpu_list(cpus)
else:
if not isolated:
print("ERROR: CPU affinity not available.", file=sys.stderr)
print("Use Python 3.3 or newer, or install psutil dependency")
sys.exit(1)
elif not self.args.quiet:
print("WARNING: unable to pin worker processes to "
"isolated CPUs, CPU affinity not available")
print("Use Python 3.3 or newer, or install psutil dependency")
cpus = get_isolated_cpus()
if not cpus:
# no isolated CPUs or unable to get the isolated CPUs
return
else:
isolated = False
cpus = parse_cpu_list(cpus)
if set_cpu_affinity(cpus):
if self.args.verbose:
if isolated:
text = ("Pin process to isolated CPUs: %s"
% format_cpu_list(cpus))
else:
text = ("Pin process to CPUs: %s"
% format_cpu_list(cpus))
print(text)
if isolated:
self.args.affinity = format_cpu_list(cpus)
else:
if not isolated:
print("ERROR: CPU affinity not available.", file=sys.stderr)
print("Use Python 3.3 or newer, or install psutil dependency")
sys.exit(1)
elif not self.args.quiet:
print("WARNING: unable to pin worker processes to "
"isolated CPUs, CPU affinity not available")
print("Use Python 3.3 or newer, or install psutil dependency")