Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def worker(args, return_dict, **kwargs):
det = clevercsv.Detector()
filename, encoding, partial = args
return_dict["error"] = False
return_dict["dialect"] = None
return_dict["method"] = None
with gzip.open(filename, "rt", newline="", encoding=encoding) as fp:
data = fp.read(N_BYTES_PARTIAL) if partial else fp.read()
try:
return_dict["dialect"] = det.detect(data, **kwargs)
return_dict["method"] = det.method_
except clevercsv.Error:
return_dict["error"] = True
def worker(args, return_dict, **kwargs):
det = clevercsv.Detector()
filename, encoding = args
with gzip.open(filename, "rt", newline="", encoding=encoding) as fid:
return_dict["dialect"] = det.detect(fid.read(N_BYTES))
def detector(gz_filename, encoding, n_lines=None):
det = clevercsv.Detector()
sample = get_sample(gz_filename, encoding, n_lines=n_lines)
try:
dialect = det.detect(sample)
except clevercsv.Error:
raise DetectionError
if dialect is None:
return None
return dict(
delimiter=dialect.delimiter,
quotechar=dialect.quotechar,
escapechar=dialect.escapechar,
)