Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_nonpartitioned_file(self):
partname = 'part-r-00000'
self._write('dl/' + partname, b'world')
with temppath() as tpath:
fname = self.client.download('dl/' + partname, tpath)
with open(fname) as reader:
eq_(reader.read(), 'world')
def test_write(self):
with open(osp.join(self.dpath, 'weather.jsonl')) as reader:
main(
[
'write', 'weather.avro',
'--schema', dumps(self.schema),
'--codec', 'null',
],
client=self.client,
stdin=reader
)
with temppath() as tpath:
self.client.download('weather.avro', tpath)
eq_(
self._get_data_bytes(tpath),
self._get_data_bytes(osp.join(self.dpath, 'weather.avro'))
)
def test_missing_dir(self):
self._write('dl', b'hello')
with temppath() as tpath:
self.client.download('dl', osp.join(tpath, 'foo'))
def test_download_folder_to_existing_folder_parallel(self):
self._write('foo/dl', b'hello')
self._write('foo/bar/dl', b'there')
with temppath() as tpath:
os.mkdir(tpath)
self.client.download('foo', tpath, n_threads=0)
with open(osp.join(tpath, 'foo', 'dl')) as reader:
eq_(reader.read(), 'hello')
with open(osp.join(tpath, 'foo', 'bar', 'dl')) as reader:
eq_(reader.read(), 'there')
def test_overwrite_file(self):
with temppath() as tpath:
self._write('dl', b'hello')
self.client.download('dl', tpath)
self.client.write('dl', b'there', overwrite=True)
fname = self.client.download('dl', tpath, overwrite=True)
with open(fname) as reader:
eq_(reader.read(), 'there')
def test_create_client_with_default_alias(self):
with temppath() as tpath:
config = Config(tpath)
config.add_section(config.global_section)
config.set(config.global_section, 'default.alias', 'dev')
section = 'dev.alias'
config.add_section(section)
config.set(section, 'url', 'http://host:port')
save_config(config)
Config(tpath).get_client()
def test_with_progress(self):
def cb(path, nbytes, chunk_lengths=[]):
chunk_lengths.append(nbytes)
return chunk_lengths
self._write('foo', b'hello, world!')
with temppath() as tpath:
with open(tpath, 'wb') as writer:
with self.client.read('foo', chunk_size=5, progress=cb) as reader:
for chunk in reader:
writer.write(chunk)
with open(tpath, 'rb') as reader:
eq_(reader.read(), b'hello, world!')
eq_(cb('', 0), [5, 10, 13, -1, 0])
def _download_partitioned_file(self, n_threads):
parts = {
'part-r-00000': b'fee',
'part-r-00001': b'faa',
'part-r-00002': b'foo',
}
for name, content in parts.items():
self._write('dl/%s' % (name, ), content)
with temppath() as tpath:
self.client.download('dl', tpath, n_threads=-1)
local_parts = os.listdir(tpath)
eq_(set(local_parts), set(parts)) # We have all the parts.
for part in local_parts:
with open(osp.join(tpath, part), mode='rb') as reader:
eq_(reader.read(), parts[part]) # Their content is correct.
def test_download_empty_folder(self):
self.client._mkdirs('foo')
with temppath() as tpath:
self.client.download('foo', tpath)
def test_upload_file(self):
with temppath() as tpath:
with open(tpath, 'w') as writer:
writer.write('hello, world!')
self.client.upload('up', tpath)
eq_(self._read('up'), b'hello, world!')