Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
global convert_to_json_array
global stats_interval
global time_count
global elem_count
#
# XXX: data enrichments, manipulations, correlations, etc. go here
#
if stats_interval:
time_now = int(time.time())
if avro_schema:
inputio = StringIO.StringIO(body)
decoder = avro.io.BinaryDecoder(inputio)
datum_reader = avro.io.DatumReader(avro_schema)
avro_data = []
while inputio.tell() < len(inputio.getvalue()):
x = datum_reader.read(decoder)
avro_data.append(str(x))
if stats_interval:
elem_count += len(avro_data)
if print_stdout:
print " [x] Received %r" % (",".join(avro_data),)
sys.stdout.flush()
print_stdout_num += 1
if (print_stdout_max == print_stdout_num):
sys.exit(0)
def _ReadResponse(self, writer_schema, reader_schema, decoder):
datum_reader = avro_io.DatumReader(writer_schema, reader_schema)
result = datum_reader.read(decoder)
return result
def records(self):
decoder = avroio.BinaryDecoder(
io.BytesIO(self._decompressed_block_bytes))
writer_schema = self._schema
reader_schema = self._schema
reader = avroio.DatumReader(writer_schema, reader_schema)
current_record = 0
while current_record < self._num_records:
yield reader.read(decoder)
current_record += 1
self._buffer_writer = StringIO()
self._buffer_encoder = io.BinaryEncoder(self._buffer_writer)
self._block_count = 0
self._meta = {}
self._header_written = False
if writers_schema is not None:
if codec not in VALID_CODECS:
raise DataFileException("Unknown codec: %r" % codec)
self._sync_marker = DataFileWriter.generate_sync_marker()
self.set_meta('avro.codec', codec)
self.set_meta('avro.schema', str(writers_schema))
self.datum_writer.writers_schema = writers_schema
else:
# open writer for reading to collect metadata
dfr = DataFileReader(writer, io.DatumReader())
# TODO(hammer): collect arbitrary metadata
# collect metadata
self._sync_marker = dfr.sync_marker
self.set_meta('avro.codec', dfr.get_meta('avro.codec'))
# get schema used to write existing file
schema_from_file = dfr.get_meta('avro.schema')
self.set_meta('avro.schema', schema_from_file)
self.datum_writer.writers_schema = schema.parse(schema_from_file)
# seek to the end of the file and prepare for writing
writer.seek(0, 2)
self._header_written = True
def json_avro_schema(self):
if self._json_avro_schema is None:
# dependency on the avro python reference implementation since getting full json
# avro schema from the c-api is elusive
from avro.datafile import DataFileReader
from avro.io import DatumReader
import json
with open(self.filename) as fo:
with DataFileReader(fo, DatumReader()) as avf:
self._json_avro_schema = json.loads(avf.meta['avro.schema'])
return self._json_avro_schema
def make_avro_reader(schema):
if sys.version_info >= (3, 0):
# why did they change it from parse to Parse in py3? huh?
parsed_schema = avro.schema.Parse(json.dumps(schema))
else:
parsed_schema = avro.schema.parse(json.dumps(schema))
reader = avro.io.DatumReader(parsed_schema)
def read_func(data):
bytes_reader = io.BytesIO(data)
decoder = avro.io.BinaryDecoder(bytes_reader)
return reader.read(decoder)
return read_func
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
reader = DataFileReader(open("devices.avro", "r"), DatumReader())
for device in reader:
print device
reader.close()
def _get_jc_for_avro_input(self, file_in, job_conf):
jc = dict(job_conf)
if self.avro_input:
jc[AVRO_INPUT] = self.avro_input
reader = DataFileReader(file_in, DatumReader())
if sys.version_info[0] == 3:
schema = reader.GetMeta(SCHEMA_KEY)
else:
schema = reader.get_meta(SCHEMA_KEY)
file_in.seek(0)
if self.avro_input == 'V':
jc[AVRO_VALUE_INPUT_SCHEMA] = schema
elif self.avro_input == 'K':
jc[AVRO_KEY_INPUT_SCHEMA] = schema
else:
schema_obj = json.loads(schema)
for field in schema_obj['fields']:
if field['name'] == 'key':
key_schema = field['type']
else:
value_schema = field['type']
try:
json.dumps(data)
return data
except:
LOG.debug('Failed to dump data as JSON, falling back to raw data.')
cleaned = {}
lim = [0]
if isinstance(data, str): # Not JSON dumpable, meaning some sort of bytestring or byte data
#detect if avro file
if(data[:3] == '\x4F\x62\x6A'):
#write data to file in memory
output = io.StringIO()
output.write(data)
#read and parse avro
rec_reader = io.DatumReader()
df_reader = datafile.DataFileReader(output, rec_reader)
return json.dumps(clean([record for record in df_reader]))
return base64.b64encode(data)
if hasattr(data, "__iter__"):
if type(data) is dict:
for i in data:
cleaned[i] = clean(data[i])
elif type(data) is list:
cleaned = []
for i, item in enumerate(data):
cleaned += [clean(item)]
else:
for i, item in enumerate(data):
cleaned[i] = clean(item)
else: