Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def strongScaling(algorithm, threadSequence, input, inputTitle=None, repetitions=1, outPath=None):
""" Evaluate strong scaling, i.e. how the performance varies with the number of threads
for a fixed input size.
"""
data = [] # collects data about the experiments
threadsAvailable = getMaxNumberOfThreads() # remember maximum number of threads and restore later
for nThreads in threadSequence:
setNumberOfThreads(nThreads)
print("set number of threads to {0}".format(getMaxNumberOfThreads()))
for r in range(repetitions):
print("running {0}".format(algorithm.toString()))
timer = stopwatch.Timer()
result = algorithm.run(input)
timer.stop()
print("elapsed time: {0}".format(timer.elapsed))
if inputTitle is None:
try:
inputTitle = input.toString()
except:
inputTitle = str(input)
# append run data
data.append({"algo": algorithm.toString(), "input": inputTitle, "threads": nThreads, "time": timer.elapsed})
setNumberOfThreads(threadsAvailable)
if outPath:
with open(outPath, "w") as outFile:
columns = ["algo", "input", "threads", "time"]
writer = csv.DictWriter(outFile, fieldnames=columns)
writer.writeheader()
def weakScaling(algorithm, threadSequence, inputSequence, inputTitles=None, repetitions=1, outPath=None):
""" Evaluate weak scaling, i.e. how the performance varies with the number of threads
for a fixed input size per processor.
"""
data = [] # collects data about the experiments
threadsAvailable = getMaxNumberOfThreads() # remember maximum number of threads and restore later
i = -1
for (input, nThreads) in zip(inputSequence, threadSequence):
i += 1
setNumberOfThreads(nThreads)
print("set number of threads to {0}".format(getMaxNumberOfThreads()))
for r in range(repetitions):
print("running {0}".format(algorithm.toString()))
timer = stopwatch.Timer()
result = algorithm.run(input)
timer.stop()
# append run data
data.append({"algo": algorithm.toString(), "input": inputTitles[i], "threads": nThreads, "time": timer.elapsed})
setNumberOfThreads(threadsAvailable)
if outPath:
with open(outPath, "w") as outFile:
columns = ["algo", "input", "threads", "time"]
writer = csv.DictWriter(outFile, fieldnames=columns)
writer.writeheader()
for row in data:
writer.writerow(row)
return data
def detectCommunities(G, algo=None, inspect=True):
""" Perform high-performance community detection on the graph.
:param G the graph
:param algorithm community detection algorithm instance
:return communities (as type Partition)
"""
if algo is None:
algo = PLM(G, refine=False)
t = stopwatch.Timer()
algo.run()
zeta = algo.getPartition()
t.stop()
print("{0} detected communities in {1} [s]".format(algo.toString(), t.elapsed))
if inspect:
print ("solution properties:")
inspectCommunities(zeta, G)
return zeta
def evalCommunityDetection(algo, G):
""" Evaluate a community detection algorithm """
if not have_tabulate:
raise MissingDependencyError("tabulate")
t = stopwatch.Timer()
algo.run()
zeta = algo.getPartition()
t.stop()
results = [
["time [s]", t.elapsed],
["# communities", zeta.numberOfSubsets()],
["modularity", Modularity().getQuality(zeta, G)]
]
print(tabulate.tabulate(results))