Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
parser.add_argument('avro_file', help='Avro file for iterate')
parser.add_argument('--pyavro', default=False, action='store_true',
help='run the avro python benchmark as well')
args = parser.parse_args(argv[1:])
from fastavro import reader
print('Using {0}'.format(reader))
with open(args.avro_file, 'rb') as fo:
timeit('fastavro', reader(fo))
if args.pyavro:
import avro.io
import avro.datafile
with open(args.avro_file, 'rb') as fo:
reader = avro.datafile.DataFileReader(fo, avro.io.DatumReader())
timeit('avro', reader)
def read_avro(fin):
reader = avro.datafile.DataFileReader(fin, avro.io.DatumReader())
return list(reader)
def _read_avro(fhandle, path, offset, length, stats):
contents = ''
try:
fhandle.seek(offset)
data_file_reader = datafile.DataFileReader(fhandle, io.DatumReader())
try:
contents_list = []
read_start = fhandle.tell()
# Iterate over the entire sought file.
for datum in data_file_reader:
read_length = fhandle.tell() - read_start
if read_length > length and len(contents_list) > 0:
break
else:
datum_str = str(datum) + "\n"
contents_list.append(datum_str)
finally:
data_file_reader.close()
contents = "".join(contents_list)
if _format in {'XLS', 'XLSX'}:
for line in read_xls(_file):
yield line
else:
mode = 'r' if _format != 'AVRO' else 'rb'
with open(_file, mode) as data:
# preformatting on all data
if 'TAIL' in _format:
update_state('current_file', json.dumps({st_ino: _file}))
data.seek(int(agent_config_vars['state']['current_file_offset'])) # read from state
if _format == 'XML':
data = xml2dict.parse(data)
yield data
else:
if _format == 'AVRO':
data = avro.datafile.DataFileReader(data, avro.io.DatumReader())
# read each line
logger.debug('reading each line')
for line in data:
yield reader_next_line(_format, data, line)
if 'TAIL' in _format:
if 'TAILF' in _format:
logger.debug('tailing file')
# keep reading file
for line2 in tail_file(_file, data):
yield reader_next_line(_format, data, line2)
# move from current file to completed, reset position
update_state('completed_files_st_ino', st_ino, append=True)
update_state('current_file', '')
update_state('current_file_offset', 0)