Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_read_part_file(self):
data = {
'part-m-00000.avro': [{'name': 'jane'}, {'name': 'bob'}],
'part-m-00001.avro': [{'name': 'john'}, {'name': 'liz'}],
}
for fname, records in data.items():
with AvroWriter(self.client, 'data.avro/%s' % (fname, )) as writer:
for record in records:
writer.write(record)
with temppath() as tpath:
with open(tpath, 'w') as writer:
main(
['read', 'data.avro', '--parts', '1,'],
client=self.client,
stdout=writer
)
with open(tpath) as reader:
records = [loads(line) for line in reader]
eq_(records, data['part-m-00001.avro'])
def test_write_empty(self):
with AvroWriter(self.client, 'empty.avro', schema=self.schema):
pass
with AvroReader(self.client, 'empty.avro') as reader:
eq_(reader.schema, self.schema)
eq_(list(reader), [])
def test_write(self):
writer = AvroWriter(
self.client,
'weather.avro',
schema=self.schema,
)
with writer:
for record in self.records:
writer.write(record)
with temppath() as tpath:
self.client.download('weather.avro', tpath)
eq_(
self._get_data_bytes(osp.join(self.dpath, 'weather.avro')),
self._get_data_bytes(tpath)
)
def test_write_overwrite_error(self):
# To check that the background `AsyncWriter` thread doesn't hang.
self.client.makedirs('weather.avro')
with AvroWriter(self.client, 'weather.avro', schema=self.schema) as writer:
for record in self.records:
writer.write(record)
def test_write_in_multiple_blocks(self):
writer = AvroWriter(
self.client,
'weather.avro',
schema=self.schema,
sync_interval=1 # Flush block on every write.
)
with writer:
for record in self.records:
writer.write(record)
with AvroReader(self.client, 'weather.avro') as reader:
eq_(list(reader), self.records)
def test_infer_schema(self):
with AvroWriter(self.client, 'weather.avro') as writer:
for record in self.records:
writer.write(record)
with AvroReader(self.client, 'weather.avro') as reader:
eq_(list(reader), self.records)
def main(argv=None, client=None, stdin=sys.stdin, stdout=sys.stdout):
"""Entry point.
:param argv: Arguments list.
:param client: For testing.
"""
args = docopt(__doc__, argv=argv)
if not client:
client = configure_client('hdfscli-avro', args)
elif args['--log']:
raise HdfsError('Logging is only available when no client is specified.')
overwrite = args['--force']
parts = parse_arg(args, '--parts', int, ',')
if args['write']:
writer = AvroWriter(
client,
args['HDFS_PATH'],
overwrite=overwrite,
schema=parse_arg(args, '--schema', loads),
codec=args['--codec'],
)
with writer:
records = (loads(line) for line in stdin)
for record in records:
writer.write(record)
else:
reader = AvroReader(client, args['HDFS_PATH'], parts=parts)
with reader:
if args['schema']:
stdout.write('%s\n' % (dumps(reader.schema, indent=2), ))
elif args['read']:
from hdfs import Config
from hdfs.ext.avro import AvroReader, AvroWriter
# Get the default alias' client.
client = Config().get_client()
# Some sample data.
records = [
{'name': 'Ann', 'age': 23},
{'name': 'Bob', 'age': 22},
]
# Write an Avro File to HDFS (since our records' schema is very simple, we let
# the writer infer it automatically, otherwise we would pass it as argument).
with AvroWriter(client, 'names.avro', overwrite=True) as writer:
for record in records:
writer.write(record)
# Read it back.
with AvroReader(client, 'names.avro') as reader:
schema = reader.schema # The inferred schema.
content = reader.content # The remote file's HDFS content object.
assert list(reader) == records # The records match!
def write_dataframe(client, hdfs_path, df, **kwargs):
"""Save dataframe to HDFS as Avro.
:param client: :class:`hdfs.client.Client` instance.
:param hdfs_path: Remote path where the dataframe will be stored.
:param df: Dataframe to store.
:param \*\*kwargs: Keyword arguments passed through to
:class:`hdfs.ext.avro.AvroWriter`.
"""
metadata = {'pandas.columns': json.dumps(df.columns.tolist())}
with AvroWriter(client, hdfs_path, metadata=metadata, **kwargs) as writer:
for _, row in df.iterrows():
writer.write(row.to_dict())