Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def read(self):
return self.lines.pop(0)
def __iter__(self):
return self.lines.__iter__()
def custom5_process(str_lines):
return [ujson.loads(l) for l in str_lines]
class Custom5Reader(jsonlines.jsonlines.ReaderWriterBase):
def __init__(self, fp):
len_timer = Timer("Extracting lines as strings").start()
str_lines = [f for f in fp]
len_timer.stop()
group_timer = Timer("Grouping lines").start()
num_groups = POOL_WORKERS
line_groups = [[] for _ in range(POOL_WORKERS)]
for i, line in enumerate(str_lines):
group = i % num_groups
line_groups[group].append(line)
group_timer.stop()
with mp.Pool(POOL_WORKERS) as p:
async_batched_results = p.map_async(custom5_process, line_groups)
def read(self):
return self.lines.pop(0)
def __iter__(self):
return self.lines.__iter__()
def custom4_process(str_line):
output = ujson.loads(str_line)
return output
class Custom4Reader(jsonlines.jsonlines.ReaderWriterBase):
def __init__(self, fp):
len_timer = Timer("Extracting lines as strings").start()
str_lines = [f for f in fp]
len_timer.stop()
with ThreadPoolExecutor() as executor:
results = executor.map(custom4_process, str_lines)
list_timer = Timer("Converting results generator to list").start()
self.lines = list(results)
list_timer.stop()
def read(self):
return self.lines.pop(0)
def stop(self):
self.t2 = time.time()
print(f'Timer "{self.name}" took {humanize_float(self.t2-self.t1)} seconds')
return self.t2-self.t1
def custom1_process(args):
assert len(args) == 2
str_line, q = args
output = ujson.loads(str_line)
q.put(1)
return output
class Custom1Reader(jsonlines.jsonlines.ReaderWriterBase):
def __init__(self, fp):
m = mp.Manager()
shared_queue = m.Queue()
progress = 0
len_timer = Timer("Calculating file length").start()
str_lines = [f for f in fp]
total_size = len(str_lines)
len_timer.stop()
with mp.Pool(POOL_WORKERS) as p:
async_results = p.map_async(custom1_process, zip(str_lines, itertools.repeat(shared_queue)))
with tqdm(desc="Manifest Loading Progress", total=total_size, unit_scale=True) as tqdm_progress:
while True:
allow_none=allow_none,
skip_empty=skip_empty)
except InvalidLineError:
if not skip_invalid:
raise
except EOFError:
pass
def __iter__(self):
"""
See :py:meth:`~Reader.iter()`.
"""
return self.iter()
class Writer(ReaderWriterBase):
"""
Writer for the jsonlines format.
The `fp` argument must be a file-like object with a ``.write()``
method accepting either text (unicode) or bytes.
The `compact` argument can be used to to produce smaller output.
The `sort_keys` argument can be used to sort keys in json objects,
and will produce deterministic output.
For more control, provide a a custom encoder callable using the
`dumps` argument. The callable must produce (unicode) string output.
If specified, the `compact` and `sort` arguments will be ignored.
When the `flush` argument is set to ``True``, the writer will call
def read(self):
return self.lines.pop(0)
def __iter__(self):
return self.lines.__iter__()
def custom2_process(str_line):
output = ujson.loads(str_line)
return output
class Custom2Reader(jsonlines.jsonlines.ReaderWriterBase):
def __init__(self, fp):
len_timer = Timer("Extracting lines as strings").start()
str_lines = [f for f in fp]
len_timer.stop()
with mp.Pool(POOL_WORKERS) as p:
async_results = p.map_async(custom2_process, str_lines)
results = async_results.get()
assert async_results.successful(), "There was an uncaught error"
self.lines = results
def read(self):
return self.lines.pop(0)
async_results = p.map_async(custom2_process, str_lines)
results = async_results.get()
assert async_results.successful(), "There was an uncaught error"
self.lines = results
def read(self):
return self.lines.pop(0)
def __iter__(self):
return self.lines.__iter__()
class Custom3Reader(jsonlines.jsonlines.ReaderWriterBase):
def __init__(self, fp):
len_timer = Timer("Extracting lines as strings").start()
str_lines = [f for f in fp]
len_timer.stop()
array_timer = Timer("Converting lines to one big JSON array string").start()
json_array = ",\n".join(str_lines)
json_array = f"[{json_array}]"
array_timer.stop()
self.lines = ujson.loads(json_array)
def read(self):
return self.lines.pop(0)
else:
wrapping = '<{} at 0x{:x}>'.format(
type(self._fp).__name__,
id(self._fp))
return ''.format(
type(self).__name__, id(self), wrapping)
def __enter__(self):
return self
def __exit__(self, *exc_info):
self.close()
return False
class Reader(ReaderWriterBase):
"""
Reader for the jsonlines format.
The first argument must be an iterable that yields JSON encoded
strings. Usually this will be a readable file-like object, such as
an open file or an ``io.TextIO`` instance, but it can also be
something else as long as it yields strings when iterated over.
The `loads` argument can be used to replace the standard json
decoder. If specified, it must be a callable that accepts a
(unicode) string and returns the decoded object.
Instances are iterable and can be used as a context manager.
:param file-like iterable: iterable yielding lines as strings
:param callable loads: custom json decoder callable