Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_delete_objects(bucket):
write_fake_objects(bucket, "objs/", 3)
awswrangler.s3.utils.delete_objects("s3://" + bucket + "/objs/", batch_size=2)
def test_delete_listed_objects(bucket):
write_fake_objects(bucket, "objs/", 3)
keys = awswrangler.s3.utils.list_objects("s3://" + bucket + "/objs/", batch_size=2)
assert len(keys) == 3
awswrangler.s3.utils.delete_listed_objects(bucket, keys, batch_size=2)
keys = awswrangler.s3.utils.list_objects("s3://" + bucket + "/objs/", batch_size=2)
assert len(keys) == 0
def test_s3_write_single(bucket, database, file_format):
df = pd.read_csv("data_samples/micro.csv")
awswrangler.s3.write(
df=df,
database=database,
path=f"s3://{bucket}/test/",
file_format=file_format,
preserve_index=False,
mode="overwrite",
num_procs=1,
)
df2 = awswrangler.athena.read(database, "select * from test")
assert len(df.index) == len(df2.index)
def test_s3_write(bucket, database, file_format):
df = pd.read_csv("data_samples/micro.csv")
awswrangler.s3.write(
df=df,
database=database,
path=f"s3://{bucket}/test/",
file_format=file_format,
preserve_index=True,
mode="overwrite",
)
df2 = awswrangler.athena.read(database, "select * from test")
assert len(df.index) == len(df2.index)
def _data_to_s3_object_writer(dataframe,
path,
preserve_index,
compression,
session_primitives,
file_format,
cast_columns=None,
extra_args=None,
isolated_dataframe=False):
fs = s3.get_fs(session_primitives=session_primitives)
fs = pa.filesystem._ensure_filesystem(fs)
s3.mkdir_if_not_exists(fs, path)
if compression is None:
compression_end = ""
elif compression == "snappy":
compression_end = ".snappy"
elif compression == "gzip":
compression_end = ".gz"
else:
raise InvalidCompression(compression)
guid = pa.compat.guid()
if file_format == "parquet":
outfile = f"{guid}.parquet{compression_end}"
elif file_format == "csv":
outfile = f"{guid}.csv{compression_end}"
else:
def _data_to_s3_object_writer(dataframe,
path,
preserve_index,
compression,
session_primitives,
file_format,
cast_columns=None,
extra_args=None,
isolated_dataframe=False):
fs = s3.get_fs(session_primitives=session_primitives)
fs = pa.filesystem._ensure_filesystem(fs)
s3.mkdir_if_not_exists(fs, path)
if compression is None:
compression_end = ""
elif compression == "snappy":
compression_end = ".snappy"
elif compression == "gzip":
compression_end = ".gz"
else:
raise InvalidCompression(compression)
guid = pa.compat.guid()
if file_format == "parquet":
outfile = f"{guid}.parquet{compression_end}"
elif file_format == "csv":
:param usecols: Same as pandas.read_csv()
:param dtype: Same as pandas.read_csv()
:param sep: Same as pandas.read_csv()
:param thousands: Same as pandas.read_csv()
:param decimal: Same as pandas.read_csv()
:param lineterminator: Same as pandas.read_csv()
:param quotechar: Same as pandas.read_csv()
:param quoting: Same as pandas.read_csv()
:param escapechar: Same as pandas.read_csv()
:param parse_dates: Same as pandas.read_csv()
:param infer_datetime_format: Same as pandas.read_csv()
:param encoding: Same as pandas.read_csv()
:param converters: Same as pandas.read_csv()
:return: Pandas Dataframe
"""
metadata = s3.S3.head_object_with_retry(client=client_s3, bucket=bucket_name, key=key_path)
logger.debug(f"metadata: {metadata}")
total_size = metadata["ContentLength"]
logger.debug(f"total_size: {total_size}")
if total_size <= 0:
raise EmptyS3Object(metadata)
elif total_size <= max_result_size:
yield Pandas._read_csv_once(client_s3=client_s3,
bucket_name=bucket_name,
key_path=key_path,
header=header,
names=names,
usecols=usecols,
dtype=dtype,
sep=sep,
thousands=thousands,
decimal=decimal,