Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
print('producer.py [-e true] [-l true]')
sys.exit()
elif opt in ("-e", "--extra"):
print("extra header requested")
extra = True
elif opt in ("-l", "--loop"):
print("loop mode")
loopMode = True
rangeValue=1000
extrabytes = bytes('')
for i in xrange(rangeValue):
writer = avro.io.DatumWriter(schema)
bytes_writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytes_writer)
#Prepare our msg data
rawvarie="python-random-"+str(random.randint(10,10000))+"-loop-"+str(i)
writer.write({"timestamp": current_milli_time(), "src": "ESC", "host_ip": "my_ipv6", "rawdata": rawvarie}, encoder)
raw_bytes = bytes_writer.getvalue()
if extra:
elements = [0, 0, 0, 0, 23]
extrabytes = bytes(bytearray(elements))
producer.send_messages(topic, extrabytes+raw_bytes)
if rangeValue > 1:
time.sleep(0.5)
def encode(self, message_avro_representation):
# Benchmarking this revealed that recreating stringio and the encoder
# isn't slower than truncating the stringio object. This is supported
# by benchmarks that indicate it's faster to instantiate a new object
# than truncate an existing one:
# http://stackoverflow.com/questions/4330812/how-do-i-clear-a-stringio-object
stringio = cStringIO.StringIO()
encoder = avro.io.BinaryEncoder(stringio)
self.avro_writer.write(message_avro_representation, encoder)
return stringio.getvalue()
LOOPS = 1
with open(sys.argv[1]) as reader:
datum_reader = DatumReader()
file_reader = DataFileReader(reader, datum_reader)
SCHEMA = datum_reader.writers_schema
RECORDS = list(file_reader)
buf = BytesIO()
datum_writer = DatumWriter(SCHEMA)
start = time()
n = 0
for _ in repeat(None, LOOPS):
for record in RECORDS:
buf.seek(0)
encoder = BinaryEncoder(buf)
datum_writer.write(record, encoder)
n += 1
print 1000. * (time() - start) / n
def Request(self, message_name, request_datum):
"""Writes a request message and reads a response or error message.
Args:
message_name: Name of the IPC method.
request_datum: IPC request.
Returns:
The IPC response.
"""
# build handshake and call request
buffer_writer = io.BytesIO()
buffer_encoder = avro_io.BinaryEncoder(buffer_writer)
self._WriteHandshakeRequest(buffer_encoder)
self._WriteCallRequest(message_name, request_datum, buffer_encoder)
# send the handshake and call request; block until call response
call_request = buffer_writer.getvalue()
return self._IssueRequest(call_request, message_name, request_datum)
def collect(self,record,partition=None):
"""Collect a map or reduce output value
Parameters
------------------------------------------------------
record - The record to write
partition - Indicates the partition for a pre-partitioned map output
- currently not supported
"""
# Replace the encoder and buffer every time we collect.
with io.BytesIO() as buff:
self.encoder = avro.io.BinaryEncoder(buff)
self.datum_writer.write(record, self.encoder)
value = buff.getvalue()
datum = {"datum": value}
if partition is not None:
datum["partition"] = partition
self.outputClient.request("output", datum)
async def serialize(self, schema: Schema, value: dict) -> bytes:
schema_id = self.schemas_to_ids[self.serialize_schema(schema)]
writer = DatumWriter(schema)
with io.BytesIO() as bio:
enc = BinaryEncoder(bio)
bio.write(struct.pack(HEADER_FORMAT, START_BYTE, schema_id))
try:
writer.write(value, enc)
return bio.getvalue()
except avro.io.AvroTypeException as e:
raise InvalidMessageSchema("Object does not fit to stored schema") from e
"name": doc_type,
"fields": []
}
for k in m.keys():
schema['fields'].append({"name": k, "type": "string"})
print json.dumps(schema)
# put the schema in the avro registry
print requests.put("http://localhost:8080/ingest/v1/set_avro_schema.json/" + topic, data=json.dumps(schema))
avro_schema = avro.schema.parse(json.dumps(schema))
writer = avro.io.DatumWriter(avro_schema)
bytes_writer = io.BytesIO()
encoder = avro.io.BinaryEncoder(bytes_writer)
writer.write(m, encoder)
raw_bytes = bytes_writer.getvalue()
producer.send_messages(topic, raw_bytes)
def __init__(self, writer, datum_writer, writers_schema=None):
"""
If the schema is not present, presume we're appending.
"""
self._writer = writer
self._encoder = io.BinaryEncoder(writer)
self._datum_writer = datum_writer
self._buffer_writer = cStringIO.StringIO()
self._buffer_encoder = io.BinaryEncoder(self._buffer_writer)
self._block_count = 0
self._meta = {}
if writers_schema is not None:
self._sync_marker = DataFileWriter.generate_sync_marker()
self.set_meta('codec', 'null')
self.set_meta('schema', str(writers_schema))
self.datum_writer.writers_schema = writers_schema
self._write_header()
else:
# open writer for reading to collect metadata
dfr = DataFileReader(writer, io.DatumReader())
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter, BinaryEncoder, BinaryDecoder
import sys
LOOPS = 1
with open(sys.argv[1]) as reader:
datum_reader = DatumReader()
file_reader = DataFileReader(reader, datum_reader)
SCHEMA = datum_reader.writers_schema
BUFS = []
datum_writer = DatumWriter(SCHEMA)
for record in file_reader:
buf = BytesIO()
encoder = BinaryEncoder(buf)
datum_writer.write(record, encoder)
BUFS.append(buf)
datum_reader = DatumReader(SCHEMA)
start = time()
n = 0
for _ in repeat(None, LOOPS):
for buf in BUFS:
n += 1
buf.seek(0)
record = datum_reader.read(BinaryDecoder(buf))
print 1000. * (time() - start) / n
def Respond(self, call_request):
"""Entry point to process one procedure call.
Args:
call_request: Serialized procedure call request.
Returns:
Serialized procedure call response.
Raises:
???
"""
buffer_reader = io.BytesIO(call_request)
buffer_decoder = avro_io.BinaryDecoder(buffer_reader)
buffer_writer = io.BytesIO()
buffer_encoder = avro_io.BinaryEncoder(buffer_writer)
error = None
response_metadata = {}
try:
remote_protocol = self._ProcessHandshake(buffer_decoder, buffer_encoder)
# handshake failure
if remote_protocol is None:
return buffer_writer.getvalue()
# read request using remote protocol
request_metadata = META_READER.read(buffer_decoder)
remote_message_name = buffer_decoder.read_utf8()
# get remote and local request schemas so we can do
# schema resolution (one fine day)
remote_message = remote_protocol.message_map.get(remote_message_name)