Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_write(self):
write_dataframe(self.client, 'weather.avro', self.df)
with AvroReader(self.client, 'weather.avro') as reader:
eq_(list(reader), self.records)
def test_infer_schema(self):
with AvroWriter(self.client, 'weather.avro') as writer:
for record in self.records:
writer.write(record)
with AvroReader(self.client, 'weather.avro') as reader:
eq_(list(reader), self.records)
def test_read_with_compatible_schema(self):
self.client.upload('w.avro', osp.join(self.dpath, 'weather.avro'))
schema = {
'name': 'test.Weather',
'type': 'record',
'fields': [
{'name': 'temp', 'type': 'int'},
{'name': 'tag', 'type': 'string', 'default': ''},
],
}
with AvroReader(self.client, 'w.avro', reader_schema=schema) as reader:
eq_(
list(reader),
[{'temp': r['temp'], 'tag': ''} for r in self.records]
)
def test_write_codec(self):
with open(osp.join(self.dpath, 'weather.jsonl')) as reader:
main(
[
'write', 'weather.avro',
'--schema', dumps(self.schema),
'--codec', 'deflate',
],
client=self.client,
stdin=reader
)
# Correct content.
with AvroReader(self.client, 'weather.avro') as reader:
records = list(reader)
eq_(records, self.records)
# Different size (might not be smaller, since very small file).
compressed_size = self.client.content('weather.avro')['length']
uncompressed_size = osp.getsize(osp.join(self.dpath, 'weather.avro'))
ok_(compressed_size != uncompressed_size)
def test_read(self):
self.client.upload('weather.avro', osp.join(self.dpath, 'weather.avro'))
with AvroReader(self.client, 'weather.avro') as reader:
eq_(list(reader), self.records)
def test_write_empty(self):
with AvroWriter(self.client, 'empty.avro', schema=self.schema):
pass
with AvroReader(self.client, 'empty.avro') as reader:
eq_(reader.schema, self.schema)
eq_(list(reader), [])
def read_dataframe(client, hdfs_path):
"""Read dataframe from HDFS Avro file.
:param client: :class:`hdfs.client.Client` instance.
:param hdfs_path: Remote path to an Avro file (potentially distributed).
"""
with AvroReader(client, hdfs_path) as reader:
# Hack-ish, but loading all elements in memory first to get length.
if 'pandas.columns' in reader.metadata:
columns = json.loads(reader.metadata['pandas.columns'])
else:
columns = None
return pd.DataFrame.from_records(list(reader), columns=columns)
overwrite = args['--force']
parts = parse_arg(args, '--parts', int, ',')
if args['write']:
writer = AvroWriter(
client,
args['HDFS_PATH'],
overwrite=overwrite,
schema=parse_arg(args, '--schema', loads),
codec=args['--codec'],
)
with writer:
records = (loads(line) for line in stdin)
for record in records:
writer.write(record)
else:
reader = AvroReader(client, args['HDFS_PATH'], parts=parts)
with reader:
if args['schema']:
stdout.write('%s\n' % (dumps(reader.schema, indent=2), ))
elif args['read']:
encoder = _Encoder()
num = parse_arg(args, '--num', int)
freq = parse_arg(args, '--freq', float)
if freq:
for record in reader:
if random() <= freq:
stdout.write(encoder.encode(record))
stdout.write('\n')
else:
for record in islice(reader, num):
stdout.write(encoder.encode(record))
stdout.write('\n')
client = Config().get_client()
# Some sample data.
records = [
{'name': 'Ann', 'age': 23},
{'name': 'Bob', 'age': 22},
]
# Write an Avro File to HDFS (since our records' schema is very simple, we let
# the writer infer it automatically, otherwise we would pass it as argument).
with AvroWriter(client, 'names.avro', overwrite=True) as writer:
for record in records:
writer.write(record)
# Read it back.
with AvroReader(client, 'names.avro') as reader:
schema = reader.schema # The inferred schema.
content = reader.content # The remote file's HDFS content object.
assert list(reader) == records # The records match!