Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
names=names,
usecols=usecols,
dtype=dtype,
sep=sep,
thousands=thousands,
decimal=decimal,
lineterminator=lineterminator,
quotechar=quotechar,
quoting=quoting,
escapechar=escapechar,
parse_dates=parse_dates,
infer_datetime_format=infer_datetime_format,
encoding=encoding,
converters=converters)
else:
bounders = calculate_bounders(num_items=total_size, max_size=max_result_size)
logger.debug(f"bounders: {bounders}")
bounders_len = len(bounders)
count = 0
forgotten_bytes = 0
for ini, end in bounders:
count += 1
ini -= forgotten_bytes
end -= 1 # Range is inclusive, contrary from Python's List
bytes_range = "bytes={}-{}".format(ini, end)
logger.debug(f"bytes_range: {bytes_range}")
body = client_s3.get_object(Bucket=bucket_name, Key=key_path, Range=bytes_range)["Body"].read()
chunk_size = len(body)
logger.debug(f"chunk_size (bytes): {chunk_size}")
if count == 1: # first chunk
def _get_bounders(dataframe, num_partitions):
num_rows = len(dataframe.index)
return calculate_bounders(num_items=num_rows, num_groups=num_partitions)
def get_objects_sizes(self, objects_paths: List[str], procs_io_bound: Optional[int] = None) -> Dict[str, int]:
if not procs_io_bound:
procs_io_bound = self._session.procs_io_bound
logger.debug(f"procs_io_bound: {procs_io_bound}")
objects_sizes: Dict[str, int] = {}
procs = []
receive_pipes = []
bounders = calculate_bounders(len(objects_paths), procs_io_bound)
logger.debug(f"len(bounders): {len(bounders)}")
for bounder in bounders:
receive_pipe, send_pipe = mp.Pipe()
logger.debug(f"bounder: {bounder}")
proc = mp.Process(
target=self._get_objects_head_remote,
args=(
send_pipe,
self._session.primitives,
objects_paths[bounder[0]:bounder[1]],
),
)
proc.daemon = False
proc.start()
procs.append(proc)
receive_pipes.append(receive_pipe)
def delete_objects_batch(session_primitives, bucket, batch):
session = session_primitives.session
client = session.boto3_session.client(service_name="s3", config=session.botocore_config)
num_requests = int(ceil((float(len(batch)) / 1000.0)))
bounders = calculate_bounders(len(batch), num_requests)
logger.debug(f"Bounders: {bounders}")
for bounder in bounders:
client.delete_objects(Bucket=bucket, Delete={"Objects": batch[bounder[0]:bounder[1]]})
filters: Optional[Union[List[Tuple[Any]], List[Tuple[Any]]]] = None,
procs_cpu_bound: Optional[int] = None) -> pd.DataFrame:
"""
Read parquet data from S3
:param path: AWS S3 path or List of paths (E.g. s3://bucket-name/folder_name/)
:param columns: Names of columns to read from the file
:param filters: List of filters to apply, like ``[[('x', '=', 0), ...], ...]``.
:param procs_cpu_bound: Number of cores used for CPU bound tasks
"""
procs_cpu_bound = procs_cpu_bound if procs_cpu_bound is not None else self._session.procs_cpu_bound if self._session.procs_cpu_bound is not None else 1
logger.debug(f"procs_cpu_bound: {procs_cpu_bound}")
df: Optional[pd.DataFrame] = None
session_primitives = self._session.primitives
path = [path] if type(path) == str else path # type: ignore
bounders = calculate_bounders(len(path), procs_cpu_bound)
logger.debug(f"len(bounders): {len(bounders)}")
if len(bounders) == 1:
df = Pandas._read_parquet_paths(session_primitives=session_primitives,
path=path,
columns=columns,
filters=filters,
procs_cpu_bound=procs_cpu_bound)
else:
procs = []
receive_pipes = []
for bounder in bounders:
receive_pipe, send_pipe = mp.Pipe()
logger.debug(f"bounder: {bounder}")
proc = mp.Process(
target=self._read_parquet_paths_remote,
args=(