Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_write(self):
with open(osp.join(self.dpath, 'weather.jsonl')) as reader:
main(
[
'write', 'weather.avro',
'--schema', dumps(self.schema),
'--codec', 'null',
],
client=self.client,
stdin=reader
)
with temppath() as tpath:
self.client.download('weather.avro', tpath)
eq_(
self._get_data_bytes(tpath),
self._get_data_bytes(osp.join(self.dpath, 'weather.avro'))
)
def test_write_codec(self):
with open(osp.join(self.dpath, 'weather.jsonl')) as reader:
main(
[
'write', 'weather.avro',
'--schema', dumps(self.schema),
'--codec', 'deflate',
],
client=self.client,
stdin=reader
)
# Correct content.
with AvroReader(self.client, 'weather.avro') as reader:
records = list(reader)
eq_(records, self.records)
# Different size (might not be smaller, since very small file).
compressed_size = self.client.content('weather.avro')['length']
uncompressed_size = osp.getsize(osp.join(self.dpath, 'weather.avro'))
ok_(compressed_size != uncompressed_size)
def test_read_part_file(self):
data = {
'part-m-00000.avro': [{'name': 'jane'}, {'name': 'bob'}],
'part-m-00001.avro': [{'name': 'john'}, {'name': 'liz'}],
}
for fname, records in data.items():
with AvroWriter(self.client, 'data.avro/%s' % (fname, )) as writer:
for record in records:
writer.write(record)
with temppath() as tpath:
with open(tpath, 'w') as writer:
main(
['read', 'data.avro', '--parts', '1,'],
client=self.client,
stdout=writer
)
with open(tpath) as reader:
records = [loads(line) for line in reader]
eq_(records, data['part-m-00001.avro'])
def test_schema(self):
self.client.upload('weather.avro', osp.join(self.dpath, 'weather.avro'))
with temppath() as tpath:
with open(tpath, 'w') as writer:
main(['schema', 'weather.avro'], client=self.client, stdout=writer)
with open(tpath) as reader:
schema = load(reader)
eq_(self.schema, schema)
def test_read(self):
self.client.upload('weather.avro', osp.join(self.dpath, 'weather.avro'))
with temppath() as tpath:
with open(tpath, 'w') as writer:
main(
['read', 'weather.avro', '--num', '2'],
client=self.client,
stdout=writer
)
with open(tpath) as reader:
records = [loads(line) for line in reader]
eq_(records, self.records[:2])