Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_key_after_group():
n = 1000
DT = dt.Frame(A=[random.choice("abcd") for _ in range(n)])
tmp = DT[:, dt.count(), dt.by(0)]
frame_integrity_check(tmp)
tmp.key = "A"
assert tmp.to_list()[0] == ["a", "b", "c", "d"]
assert sum(tmp.to_list()[1]) == n
def test_groupby_multi():
DT = dt.Frame(A=[1, 2, 3] * 3, B=[1, 2] * 4 + [1], C=range(9))
res = DT[:, sum(f.C), by("A", "B")]
assert res.to_list() == [[1, 1, 2, 2, 3, 3],
[1, 2, 1, 2, 1, 2],
[6, 3, 4, 8, 10, 5]]
def test_groups_large1():
n = 251 * 4000
xs = [(i * 19) % 251 for i in range(n)]
f0 = dt.Frame({"A": xs})
f1 = f0[:, count(), by("A")]
assert f1.to_list() == [list(range(251)), [4000] * 251]
question = "largest two v3 by id6" # q8
gc.collect()
t_start = timeit.default_timer()
ans = x[:2, {"largest2_v3": f.v3}, by(f.id6), sort(-f.v3)]
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans[:, sum(f.largest2_v3)]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(flatten(chk.to_list())), chk_time_sec=chkt, on_disk=on_disk)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x[:2, {"largest2_v3": f.v3}, by(f.id6), sort(-f.v3)]
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans[:, sum(f.largest2_v3)]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(flatten(chk.to_list())), chk_time_sec=chkt, on_disk=on_disk)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
del ans
question = "regression v1 v2 by id2 id4" # q9 # not yet implemeneted https://github.com/h2oai/datatable/issues/1543
gc.collect()
t_start = timeit.default_timer()
ans = x[:, {"r2": corr(f.v1, f.v2)**2}, by(f.id2, f.id4)]
print(ans.shape, flush=True)
print('=' * 50)
print("MyIEEEGroupBysTransformers name {} {}".format(
self._output_feature_names, self._feature_desc
)
)
return X[:, dt.f["col_cnt"] / dt.f["daily_cnt"]]
elif self.group_type == 'hour':
X = dt.Frame(X[:, self.group_col])
X[:, 'date'] = dt.Frame(ieee_datetime.dt.strftime('%Y%m%d_%H').values)
# Compute daily counts
hourly_cnt = X[:, {"hourly_cnt": dt.count()}, dt.by("date")]
hourly_cnt.key = ["date"]
X = X[:, :, dt.join(hourly_cnt)]
# Compute card count
col_cnt = X[:, {"col_cnt": dt.count()}, dt.by(*["date", self.group_col])]
col_cnt.key = ["date", self.group_col]
X = X[:, :, dt.join(col_cnt)]
self._output_feature_names = ["IEEEGroupBys_{}_{}".format(self.group_col, self.group_type)]
self._feature_desc = ["IEEEGroupBys_{}_{}".format(self.group_col, self.group_type)]
print('=' * 50)
print("MyIEEEGroupBysTransformers name {} {}".format(
self._output_feature_names, self._feature_desc
)
)
# datatable doesn't seem to have per-group transform yet (other than the whole dataframe)
res = op.sources[0].eval_implementation(
data_map=data_map, eval_env=eval_env, data_model=self
)
if len(op.order_by) > 0:
ascending = [False if ci in set(op.reverse) else True for ci in op.order_by]
if not all(ascending):
raise RuntimeError(
"reverse isn't implemented for datatable yet"
) # TODO: implement
syms = [datatable.f[c] for c in op.order_by]
res = res.sort(*syms)
if len(op.partition_by) > 0:
for (col, expr) in op.ops.items():
dt_expr = expr_to_dt_expr(expr)
res[col] = res[:, {col: dt_expr}, datatable.by(*op.partition_by)][col]
else:
for (col, expr) in op.ops.items():
dt_expr = expr_to_dt_expr(expr)
res[col] = res[:, {col: dt_expr}][col]
return res
question = "median v3 sd v3 by id4 id5" # q6
gc.collect()
t_start = timeit.default_timer()
ans = x[:, {'median_v3': median(f.v3), 'sd_v3': sd(f.v3)}, by(f.id4, f.id5)]
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans[:, [sum(f.median_v3), sum(f.sd_v3)]]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=1, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(flatten(chk.to_list())), chk_time_sec=chkt, on_disk=on_disk)
del ans
gc.collect()
t_start = timeit.default_timer()
ans = x[:, {'median_v3': median(f.v3), 'sd_v3': sd(f.v3)}, by(f.id4, f.id5)]
print(ans.shape, flush=True)
t = timeit.default_timer() - t_start
m = memory_usage()
t_start = timeit.default_timer()
chk = ans[:, [sum(f.median_v3), sum(f.sd_v3)]]
chkt = timeit.default_timer() - t_start
write_log(task=task, data=data_name, in_rows=x.shape[0], question=question, out_rows=ans.shape[0], out_cols=ans.shape[1], solution=solution, version=ver, git=git, fun=fun, run=2, time_sec=t, mem_gb=m, cache=cache, chk=make_chk(flatten(chk.to_list())), chk_time_sec=chkt, on_disk=on_disk)
print(ans.head(3), flush=True)
print(ans.tail(3), flush=True)
del ans
question = "max v1 - min v2 by id3" # q7
gc.collect()
t_start = timeit.default_timer()
ans = x[:, {"range_v1_v2": max(f.v1)-min(f.v2)}, by(f.id3)]
print(ans.shape, flush=True)