Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from utils import time_logger
parser = argparse.ArgumentParser(description="groupby benchmark")
parser.add_argument("--path", dest="path", help="path to the csv data file")
parser.add_argument("--logfile", dest="logfile", help="path to the log file")
args = parser.parse_args()
file = args.path
file_size = os.path.getsize(file)
if not os.path.exists(os.path.split(args.logfile)[0]):
os.makedirs(os.path.split(args.logfile)[0])
logging.basicConfig(filename=args.logfile, level=logging.INFO)
df = pd.read_csv(file)
blocks = df._block_partitions.flatten().tolist()
ray.wait(blocks, len(blocks))
with time_logger(
"Groupby + sum aggregation on axis=0: {}; Size: {} bytes".format(file, file_size)
):
df_groupby = df.groupby("1")
blocks = df_groupby.sum()._block_partitions.flatten().tolist()
ray.wait(blocks, len(blocks))
with time_logger("Groupby mean on axis=0: {}; Size: {} bytes".format(file, file_size)):
blocks = df_groupby.mean()._block_partitions.flatten().tolist()
ray.wait(blocks, len(blocks))
parser.add_argument("--left", dest="left", help="path to the left csv data " "file")
parser.add_argument("--right", dest="right", help="path to the right csv data " "file")
parser.add_argument("--logfile", dest="logfile", help="path to the log file")
args = parser.parse_args()
file_left = args.left
file_size_left = os.path.getsize(file_left)
file_right = args.right
file_size_right = os.path.getsize(file_right)
if not os.path.exists(os.path.split(args.logfile)[0]):
os.makedirs(os.path.split(args.logfile)[0])
logging.basicConfig(filename=args.logfile, level=logging.INFO)
df_left = pd.read_csv(file_left)
df_right = pd.read_csv(file_right)
blocks = df_left._block_partitions.flatten().tolist()
ray.wait(blocks, len(blocks))
blocks = df_right._block_partitions.flatten().tolist()
ray.wait(blocks, len(blocks))
with time_logger(
"Inner Join: {} & {}; Left Size: {} bytes; Right Size: {} "
"bytes".format(file_left, file_right, file_size_left, file_size_right)
):
result = df_left.join(df_right, how="inner", lsuffix="left_")
ray.wait(result._block_partitions.flatten().tolist())
with time_logger(
"Outer Join: {} & {}; Left Size: {} bytes; Right Size: {} "
parser.add_argument("--right", dest="right", help="path to the right csv data " "file")
parser.add_argument("--logfile", dest="logfile", help="path to the log file")
args = parser.parse_args()
file_left = args.left
file_size_left = os.path.getsize(file_left)
file_right = args.right
file_size_right = os.path.getsize(file_right)
if not os.path.exists(os.path.split(args.logfile)[0]):
os.makedirs(os.path.split(args.logfile)[0])
logging.basicConfig(filename=args.logfile, level=logging.INFO)
df_left = pd.read_csv(file_left)
df_right = pd.read_csv(file_right)
blocks = df_left._block_partitions.flatten().tolist()
ray.wait(blocks, len(blocks))
blocks = df_right._block_partitions.flatten().tolist()
ray.wait(blocks, len(blocks))
with time_logger(
"Inner Join: {} & {}; Left Size: {} bytes; Right Size: {} "
"bytes".format(file_left, file_right, file_size_left, file_size_right)
):
result = df_left.join(df_right, how="inner", lsuffix="left_")
ray.wait(result._block_partitions.flatten().tolist())
with time_logger(
"Outer Join: {} & {}; Left Size: {} bytes; Right Size: {} "
"bytes".format(file_left, file_right, file_size_left, file_size_right)
from utils import time_logger
parser = argparse.ArgumentParser(description="arithmetic benchmark")
parser.add_argument("--path", dest="path", help="path to the csv data file")
parser.add_argument("--logfile", dest="logfile", help="path to the log file")
args = parser.parse_args()
file = args.path
file_size = os.path.getsize(file)
if not os.path.exists(os.path.split(args.logfile)[0]):
os.makedirs(os.path.split(args.logfile)[0])
logging.basicConfig(filename=args.logfile, level=logging.INFO)
df = pd.read_csv(file)
blocks = df._block_partitions.flatten().tolist()
ray.wait(blocks, len(blocks))
with time_logger("Transpose: {}; Size: {} bytes".format(file, file_size)):
blocks = df.T.flatten().tolist()
ray.wait(blocks, len(blocks))
with time_logger("Sum on axis=0: {}; Size: {} bytes".format(file, file_size)):
df.sum()
with time_logger("Sum on axis=1: {}; Size: {} bytes".format(file, file_size)):
df.sum(axis=1)
with time_logger("Median on axis=0: {}; Size: {} bytes".format(file, file_size)):
df.median()
exec(open("./helpers.py").read())
src_x = os.environ['SRC_X_LOCAL']
ver = modin.__version__
git = modin.__git_revision__
task = "sort"
question = "by int KEY"
data_name = os.path.basename(src_x)
solution = "modin"
fun = ".sort"
cache = "TRUE"
print("loading dataset...")
x = pd.read_csv(data_name)
print("sorting...")
gc.collect()
t_start = timeit.default_timer()
ans = x.sort_values('KEY')
print(ans.shape)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans['X2'].sum()]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(chk), chk_time_sec=chkt)
del ans
gc.collect()
exec(open("./_helpers/helpers.py").read())
ver = modin.__version__
git = ""
task = "groupby"
solution = "modin"
fun = ".groupby"
cache = "TRUE"
on_disk = "FALSE"
data_name = os.environ['SRC_GRP_LOCAL']
src_grp = os.path.join("data", data_name+".csv")
print("loading dataset %s" % data_name, flush=True)
x = pd.read_csv(src_grp, dtype={'id1':'category', 'id2':'category', 'id3':'category'})
print(len(x.index), flush=True)
task_init = timeit.default_timer()
print("grouping...", flush=True)
question = "sum v1 by id1" # q1
gc.collect()
t_start = timeit.default_timer()
ans = x.groupby(['id1'], observed=True).agg({'v1':'sum'})
ans.reset_index(inplace=True) # #68
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = [ans['v1'].sum()]
chkt = timeit.default_timer() - t_start