Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _python_create_file(filename):
if sys.version_info >= (3,):
schema = avro.schema.Parse(json_schema)
else:
schema = avro.schema.parse(json_schema)
fp = open(filename, 'wb')
writer = avro.datafile.DataFileWriter(fp, avro.io.DatumWriter(), schema)
for i in range(1):
writer.append({"name": "Alyssa", "favorite_number": 256})
writer.append({"name": "Ben", "favorite_number": 7, "favorite_color": "red"})
writer.close()
fp.close()
def avro_output(file_path, schema):
return beam.io.WriteToAvro(
file_path,
avro.schema.Parse(schema),
num_shards=1,
use_fastavro=False,
shard_name_template='S_N',
file_name_suffix='.avro')
def deserialize(flight_info_bytes) :
if flight_info_bytes is not None:
bytes_reader = BytesIO(flight_info_bytes)
decoder = BinaryDecoder(bytes_reader)
schema_flight_info = Parse(open(dir_path + "/flight-info.schema.avsc", "rb").read())
reader = DatumReader(schema_flight_info)
flight_info = reader.read(decoder)
return json.dumps([{"id": 907955534287978496}])
else:
return None
def make_avro_writer(schema, output):
if sys.version_info >= (3, 0):
# why did they change it from parse to Parse in py3? huh?
parsed_schema = avro.schema.Parse(json.dumps(schema))
else:
parsed_schema = avro.schema.parse(json.dumps(schema))
writer = avro.io.DatumWriter(parsed_schema)
encoder = avro.io.BinaryEncoder(output)
def write_func(datum):
writer.write(datum, encoder)
return write_func
def write_to_avro(self, file_output_path, schema):
return beam.io.WriteToAvro(
file_output_path,
avro.schema.Parse(schema),
num_shards=1,
use_fastavro=False,
shard_name_template='S_N',
file_name_suffix='.avro')
# Magic code that starts a data container file:
MAGIC = b'Obj' + bytes([VERSION])
# Size of the magic code, in number of bytes:
MAGIC_SIZE = len(MAGIC)
# Size of the synchronization marker, in number of bytes:
SYNC_SIZE = 16
# Interval between synchronization markers, in number of bytes:
# TODO: make configurable
SYNC_INTERVAL = 1000 * SYNC_SIZE
# Schema of the container header:
META_SCHEMA = schema.Parse("""
{
"type": "record", "name": "org.apache.avro.file.Header",
"fields": [{
"name": "magic",
"type": {"type": "fixed", "name": "magic", "size": %(magic_size)d}
}, {
"name": "meta",
"type": {"type": "map", "values": "bytes"}
}, {
"name": "sync",
"type": {"type": "fixed", "name": "sync", "size": %(sync_size)d}
}]
}
""" % {
'magic_size': MAGIC_SIZE,
'sync_size': SYNC_SIZE,
def __init__(self, block_bytes, num_records, codec, schema_string,
offset, size):
# Decompress data early on (if needed) and thus decrease the number of
# parallel copies of the data in memory at any given time during block
# iteration.
self._decompressed_block_bytes = self._decompress_bytes(block_bytes, codec)
self._num_records = num_records
self._schema = Parse(schema_string)
self._offset = offset
self._size = size
"""
# DEV NOTE: since Avro is not a requirement, do *not* import this
# module unconditionally anywhere in the main code (importing it in
# the Avro examples is OK, ofc).
import sys
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter, BinaryDecoder, BinaryEncoder
from pydoop.mapreduce.api import RecordWriter, RecordReader
import pydoop.hdfs as hdfs
from pydoop.utils.py3compat import StringIO
parse = avro.schema.Parse if sys.version_info[0] == 3 else avro.schema.parse
class Deserializer(object):
def __init__(self, schema_str):
schema = parse(schema_str)
self.reader = DatumReader(schema)
def deserialize(self, rec_bytes):
return self.reader.read(BinaryDecoder(StringIO(rec_bytes)))
class Serializer(object):
def __init__(self, schema_str):
schema = parse(schema_str)
return f.read()
# Handshake schema is pulled in during build
HANDSHAKE_REQUEST_SCHEMA_JSON = LoadResource('HandshakeRequest.avsc')
HANDSHAKE_RESPONSE_SCHEMA_JSON = LoadResource('HandshakeResponse.avsc')
HANDSHAKE_REQUEST_SCHEMA = schema.Parse(HANDSHAKE_REQUEST_SCHEMA_JSON)
HANDSHAKE_RESPONSE_SCHEMA = schema.Parse(HANDSHAKE_RESPONSE_SCHEMA_JSON)
HANDSHAKE_REQUESTOR_WRITER = avro_io.DatumWriter(HANDSHAKE_REQUEST_SCHEMA)
HANDSHAKE_REQUESTOR_READER = avro_io.DatumReader(HANDSHAKE_RESPONSE_SCHEMA)
HANDSHAKE_RESPONDER_WRITER = avro_io.DatumWriter(HANDSHAKE_RESPONSE_SCHEMA)
HANDSHAKE_RESPONDER_READER = avro_io.DatumReader(HANDSHAKE_REQUEST_SCHEMA)
META_SCHEMA = schema.Parse('{"type": "map", "values": "bytes"}')
META_WRITER = avro_io.DatumWriter(META_SCHEMA)
META_READER = avro_io.DatumReader(META_SCHEMA)
SYSTEM_ERROR_SCHEMA = schema.Parse('["string"]')
AVRO_RPC_MIME = 'avro/binary'
# protocol cache
# Map: remote name -> remote MD5 hash
_REMOTE_HASHES = {}
# Decoder/encoder for a 32 bits big-endian integer.
UINT32_BE = avro_io.STRUCT_INT
# Default size of the buffers use to frame messages: