Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
import avro.io as avio
from spavro.datafile import DataFileReader,DataFileWriter
from spavro import schema
#recursively make all directories
dparts=fname.split(os.sep)[:-1]
for i in range(len(dparts)):
pdir=os.sep+os.sep.join(dparts[:i+1])
if not(os.path.exists(pdir)):
os.mkdir(pdir)
with file(fname,'w') as hf:
inschema="""{"type":"string"}"""
writer=DataFileWriter(hf,avio.DatumWriter(inschema),writers_schema=schema.parse(inschema))
#encoder = avio.BinaryEncoder(writer)
#datum_writer = avio.DatumWriter()
for datum in lines:
writer.append(datum)
writer.close()
def _WriteResponse(self, writer_schema, response_datum, encoder):
datum_writer = avro_io.DatumWriter(writer_schema)
datum_writer.write(response_datum, encoder)
def main():
if len(sys.argv) < 2:
print "Usage: cat input.json | python2.7 JSONtoAvro.py output"
return
s = schema.parse(open("tweet.avsc").read())
f = open(sys.argv[1], "wb")
writer = datafile.DataFileWriter(f, io.DatumWriter(), s, codec = 'deflate')
failed = 0
for line in sys.stdin:
line = line.strip()
try:
data = json.loads(line)
except ValueError as detail:
continue
try:
writer.append(data)
except io.AvroTypeException as detail:
print line
failed += 1
def _WriteRequest(self, request_schema, request_datum, encoder):
logger.info('writing request: %s', request_datum)
datum_writer = avro_io.DatumWriter(request_schema)
datum_writer.write(request_datum, encoder)
def init_avro(output_dir, part_id, schema_path):
out_filename = '%(output_dir)s/part-%(part_id)s.avro' % \
{"output_dir": output_dir, "part_id": str(part_id)}
schema_string = linestring = open(schema_path, 'r').read()
email_schema = schema.parse(schema_string)
rec_writer = io.DatumWriter(email_schema)
df_writer = datafile.DataFileWriter(
open(out_filename, 'wb'),
rec_writer,
email_schema
)
return df_writer
def write_error(self, writers_schema, error_exception, encoder):
datum_writer = io.DatumWriter(writers_schema)
datum_writer.write(str(error_exception), encoder)
def make_avro_writer(schema, output):
if sys.version_info >= (3, 0):
# why did they change it from parse to Parse in py3? huh?
parsed_schema = avro.schema.Parse(json.dumps(schema))
else:
parsed_schema = avro.schema.parse(json.dumps(schema))
writer = avro.io.DatumWriter(parsed_schema)
encoder = avro.io.BinaryEncoder(output)
def write_func(datum):
writer.write(datum, encoder)
return write_func