Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
tweet.metadata.isBareRetweet.known = True
tweet.metadata.isBareRetweet.data = False
tweet.metadata.isRetweet.known = True
tweet.metadata.isRetweet.data = True
tweet.metadata.venueID.known = False
tweet.metadata.venueID.data = None
tweet.metadata.venuePoint.known = False
tweet.metadata.venuePoint.data = None
tmp_file = tempfile.mktemp()
with open(tmp_file, "w+b") as f:
df = datafile.DataFileWriter(f, io.DatumWriter(), schema.parse(schema_json))
df.append(tweet)
df.close()
with open(tmp_file, "rb") as f:
df = datafile.DataFileReader(f, SpecificDatumReader())
tweet1 = next(df)
df.close()
self.assertEqual(tweet.ID, tweet1.ID)
self.assertEqual(tweet.text, tweet1.text)
self.assertEqual(tweet.authorScreenName, tweet1.authorScreenName)
self.assertEqual(tweet.authorProfileImageURL, tweet1.authorProfileImageURL)
self.assertEqual(tweet.authorUserID, tweet1.authorUserID)
self.assertTrue(isinstance(tweet1.location, AvroPoint))
self.assertEqual(tweet.location.latitude, tweet1.location.latitude)
def write_avro_manual_close(foutd):
schema = avro.schema.parse(avroSchemaOut)
dictRes = data.to_dict(orient='records')
writer = avro.datafile.DataFileWriter(foutd, avro.io.DatumWriter(), schema)
for ll, row in enumerate(dictRes):
writer.append(row)
writer.close()
"type": "record",
"name": "Message",
"fields" : [
{"name": "message_id", "type": "int"},
{"name": "topic", "type": "string"},
{"name": "user_id", "type": "int"}
]
}"""
SCHEMA = schema.parse(SCHEMA_STR)
# Create a 'record' (datum) writer
rec_writer = io.DatumWriter(SCHEMA)
# Create a 'data file' (avro file) writer
df_writer = datafile.DataFileWriter(
open(OUTFILE_NAME, 'wb'),
rec_writer,
writers_schema = SCHEMA
)
df_writer.append( {"message_id": 11, "topic": "Hello galaxy", "user_id": 1} )
df_writer.append( {"message_id": 12, "topic": "Jim is silly!", "user_id": 1} )
df_writer.append( {"message_id": 23, "topic": "I like apples.", "user_id": 2} )
df_writer.close()
# Test reading avros
rec_reader = io.DatumReader()
# Create a 'data file' (avro file) reader
df_reader = datafile.DataFileReader(
open(OUTFILE_NAME),
def _create_avro_file(schema, items, file_prefix):
_, result_file_path = tempfile.mkstemp(prefix=file_prefix, suffix='.avro')
parsed_schema = avro.schema.Parse(schema)
with open(result_file_path, 'wb') as f:
writer = DataFileWriter(f, DatumWriter(), parsed_schema)
for s in items:
writer.append(s)
writer.close()
return result_file_path
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
'''
@author: anant bhardwaj
@date: Oct 9, 2013
Sample Serialization/De-serialization Code (Avro)
'''
schema = avro.schema.parse(open("sample.avsc").read())
writer = DataFileWriter(open("data.avro", "w"), DatumWriter(), schema)
csail = {"id":1, "name": "MIT Computer Science & Artificial Intelligence Lab"}
writer.append({"id": 1, "name": "Sam Madden", "dept": csail, "sex": "Male"})
writer.append({"id": 2, "name": "David Karger", "dept": csail, "sex": "Male"})
writer.close()
reader = DataFileReader(open("data.avro", "r"), DatumReader())
for user in reader:
print user
reader.close()
def init_avro(output_dir, part_id, schema_path):
out_filename = '%(output_dir)s/part-%(part_id)s.avro' % \
{"output_dir": output_dir, "part_id": str(part_id)}
schema_string = linestring = open(schema_path, 'r').read()
email_schema = schema.parse(schema_string)
rec_writer = io.DatumWriter(email_schema)
df_writer = datafile.DataFileWriter(
open(out_filename, 'wb'),
rec_writer,
email_schema
)
return df_writer
def open(self, temp_path):
file_handle = super(_AvroSink, self).open(temp_path)
return avro.datafile.DataFileWriter(
file_handle, avro.io.DatumWriter(), self._schema, self._codec)
output_dir = self.init_directory(output_path)
output_dirtmp = self.init_directory(output_path + 'tmp') # Handle Avro Write Error
out_filename = '%(output_dir)s/part-%(part_id)s.avro' % \
{"output_dir": output_dir, "part_id": str(part_id)}
out_filenametmp = '%(output_dirtmp)s/part-%(part_id)s.avro' % \
{"output_dirtmp": output_dirtmp, "part_id": str(part_id)} # Handle Avro Write Error
self.schema = open(schema_path, 'r').read()
email_schema = schema.parse(self.schema)
rec_writer = io.DatumWriter(email_schema)
self.avro_writer = datafile.DataFileWriter(
open(out_filename, 'wb'),
rec_writer,
email_schema
)
# CREATE A TEMP AvroWriter that can be used to workaround the UnicodeDecodeError when writing into AvroStorage
self.avro_writertmp = datafile.DataFileWriter(
open(out_filenametmp, 'wb'),
rec_writer,
email_schema
)