Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def groupby_disk(b, grouper, npartitions=None, blocksize=2 ** 20):
if npartitions is None:
npartitions = b.npartitions
token = tokenize(b, grouper, npartitions, blocksize)
import partd
p = ("partd-" + token,)
dirname = config.get("temporary_directory", None)
if dirname:
file = (apply, partd.File, (), {"dir": dirname})
else:
file = (partd.File,)
try:
dsk1 = {p: (partd.Python, (partd.Snappy, file))}
except AttributeError:
dsk1 = {p: (partd.Python, file)}
# Partition data on disk
name = "groupby-part-{0}-{1}".format(funcname(grouper), token)
dsk2 = dict(
((name, i), (partition, grouper, (b.name, i), npartitions, p, blocksize))
for i in range(b.npartitions)
)
# Barrier
barrier_token = "groupby-barrier-" + token
dsk3 = {barrier_token: (chunk.barrier,) + tuple(dsk2)}
def __call__(self, *args, **kwargs):
import partd
if self.tempdir:
file = partd.File(dir=self.tempdir)
else:
file = partd.File()
if self.buffer:
return partd.PandasBlocks(partd.Buffer(partd.Dict(), file))
else:
return partd.PandasBlocks(file)
def __call__(self, *args, **kwargs):
import partd
if self.tempdir:
file = partd.File(dir=self.tempdir)
else:
file = partd.File()
if self.buffer:
return partd.PandasBlocks(partd.Buffer(partd.Dict(), file))
else:
return partd.PandasBlocks(file)