Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
buffersize=buffersize,
)
loc = res.headers['location']
def consumer(_data):
"""Thread target."""
res = self._request(
method='POST' if append else 'PUT',
url=loc,
data=(c.encode(encoding) for c in _data) if encoding else _data,
)
if not res:
raise _to_error(res)
if data is None:
return AsyncWriter(consumer)
else:
consumer(data)
def _start_writer(self):
_logger.debug('Starting underlying writer.')
def write(records):
fastavro.writer(
fo=self._fo.__enter__(),
schema=self._schema,
records=records,
**self._writer_kwargs
)
self._writer = AsyncWriter(write).__enter__()