Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run(self):
ctx = zmq.Context(1)
sub = ctx.socket(zmq.SUB)
sub.connect('tcp://172.16.79.128:7002')
sub.setsockopt(zmq.SUBSCRIBE, b"")
self._pub = ctx.socket(zmq.PUB)
self._pub.connect('tcp://172.16.79.128:7003')
self._ready.set()
while not self._complete.is_set():
try:
parts = sub.recv_multipart(flags=zmq.NOBLOCK)
except zmq.ZMQError:
self._complete.wait(timeout=1)
continue
if parts[1] == b'connect':
self._pub.send(b'subscribe', zmq.SNDMORE)
self._pub.send(parts[0], zmq.SNDMORE)
self._pub.send(b'ceph:completion')
self._pub.send(b'subscribe', zmq.SNDMORE)
self._pub.send(parts[0], zmq.SNDMORE)
self._pub.send(b'ceph:sync')
def update(self):
self._chunks = {}
try:
while True:
[topic, data, meta] = self._socket.recv_serialized(
self._deserializer, zmq.NOBLOCK
)
if not topic in self._chunks:
self._chunks[topic] = {"data": [], "meta": {}}
self._append_data(topic, data)
self._append_meta(topic, meta)
except zmq.ZMQError:
pass # No more data
self._update_ports()
start_ts_ms_utc = mauka_message.makai_trigger.event_start_timestamp_ms
end_ts_ms_utc = mauka_message.makai_trigger.event_end_timestamp_ms
# pylint: disable=E1101
trigger_type = protobuf.opq_pb2.RequestEventMessage.TriggerType.Value(event_type)
device_ids = list(map(int, [mauka_message.makai_trigger.box_id]))
event_msg = self.request_event_message(start_ts_ms_utc, end_ts_ms_utc, trigger_type,
mauka_message.makai_trigger.max_value,
device_ids, "AcquisitionTriggerPlugin",
"{} {}-{}".format(str(trigger_type), start_ts_ms_utc, end_ts_ms_utc),
request_data)
self.event_type_to_last_event[event_type] = str(event_msg)
try:
self.debug("Sending event msg: {}".format(event_msg))
self.push_socket.send(event_msg)
except zmq.ZMQError as err:
self.logger.error("Error sending req to Makai: %s", str(err))
else:
self.logger.error("Received incorrect mauka message [%s] in AcquisitionTriggerPlugin",
protobuf.util.which_message_oneof(mauka_message))
def read_socket(self):
try:
while True:
msg = self.socket.recv(zmq.NOBLOCK)
params_values = paramsvalues_pb2.ParamsValues.FromString(msg)
for param_value in params_values.param_values:
index = param_value.param_no
if index in self.msgs:
self.msgs[index].append(param_value)
else:
self.msgs[index] = [param_value]
except zmq.ZMQError as e:
if e.errno != zmq.EAGAIN:
raise
current_time = int(time.mktime(time.gmtime()))
if current_time - self.saving_time > self.saving_interval:
saving_time = self.save_msgs()
if saving_time is not None:
self.heartbeat(saving_time)
self.saving_time = current_time
pending_messages = True
while pending_messages is True and self.running.is_set():
try:
# Try to read a message. If this fail we will get a
# zmq.ZMQError exception and then pending_messages will be
# set to False so that we exit the while loop.
assert (self._zmq_pull_socket is not None)
message = self._zmq_pull_socket.recv_string(flags=zmq.NOBLOCK)
# If we are here that means that a new message was
# successfully received from the client. Let's call the
# _parse_progress_message method to parse the message and
# update the self._client_data_list member variable.
self._parse_progress_message(message)
except zmq.ZMQError:
pending_messages = False
# if we're given the NOBLOCK flag act as normal
# and let the EAGAIN get raised
if flags & zmq.NOBLOCK:
res = super().send(data, flags, copy, track)
fut.set_result(res)
return fut
# ensure the zmq.NOBLOCK flag is part of flags
flags |= zmq.NOBLOCK
# Attempt to complete this operation indefinitely
try:
res = super().send(data, flags, copy, track)
fut.set_result(res)
return fut
except zmq.ZMQError as exc:
if exc.errno != zmq.EAGAIN:
fut.set_exception(exc)
return
self._loop.add_writer(self._sock_fd, self._send_ready)
self._buffer.append((fut, data, flags, copy, track))
return fut
def recv_message(self):
"""
Read and deserialize a message off the 0mq channel.
If no message is there available, we just return None
"""
try:
msg = self.subscriber.recv(zmq.NOBLOCK)
try:
return json.loads(msg)
except:
return msg
except zmq.ZMQError, zmq_err:
# on read error, we don't do anything as we're in
# non-blocking mode
pass
Parameters
----------
socket : ZMQStream or Socket
The socket or stream to use in receiving.
Returns
-------
[idents], msg
[idents] is a list of idents and msg is a nested message dict of
same format as self.msg returns.
"""
if isinstance(socket, ZMQStream):
socket = socket.socket
try:
msg_list = socket.recv_multipart(mode, copy=copy)
except zmq.ZMQError as e:
if e.errno == zmq.EAGAIN:
# We can convert EAGAIN to None as we know in this case
# recv_multipart won't return None.
return None,None
else:
raise
# split multipart message into identity list and message dict
# invalid large messages can cause very expensive string comparisons
idents, msg_list = self.feed_identities(msg_list, copy)
try:
return idents, self.unserialize(msg_list, content=content, copy=copy)
except Exception as e:
# TODO: handle it
raise e
addr = address_range["addr"]
try:
port_min = address_range["port_min"]
port_max = address_range["port_max"]
if port_min and port_max:
port = socket.bind_to_random_port(addr, port_min, port_max)
else:
port = socket.bind_to_random_port(addr)
bind_addr = "{0}:{1}".format(addr, port)
return bind_addr
except KeyError:
socket.bind(addr)
return addr
except (zmq.ZMQError, zmq.ZMQBindError) as e:
print('error binding to address %s: %s' % (address, e), file=sys.stderr)
if option_hint:
print('use %s <address> to specify a different port' %\
(option_hint,), file=sys.stderr)
raise
</address>
signal.signal(signal.SIGINT, self.signal_handler)
# secure the sockets from the world
self.secure()
log.info('FSCache started')
log.debug('FSCache started')
while self.running:
# we check for new events with the poller
try:
socks = dict(poller.poll())
except KeyboardInterrupt:
self.stop()
except zmq.ZMQError as t:
self.stop()
# check for next cache-request
if socks.get(creq_in) == zmq.POLLIN:
msg = serial.loads(creq_in.recv())
log.debug('Received request: {0}'.format(msg))
# we only accept requests as lists [req_id,