Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def run(self):
"""The thread's main activity. Call start() instead."""
self.socket = self.context.socket(zmq.SUB)
self.socket.linger = 1000
self.socket.setsockopt(zmq.SUBSCRIBE, b'')
self.socket.setsockopt(zmq.IDENTITY, self.session.bsession)
self.socket.connect(self.address)
self.stream = zmqstream.ZMQStream(self.socket, self.ioloop)
self.stream.on_recv(self._handle_recv)
self._run_loop()
def main():
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect(CHANNEL)
socket.setsockopt(zmq.SUBSCRIBE, "")
zmq_stream = zmqstream.ZMQStream(socket)
data_store = AsyncDataStore(zmq_stream.on_recv)
viewer = GViewer(DisplayerContext(data_store, Displayer()),
config=Config(auto_scroll=True),
event_loop=urwid.TornadoEventLoop(ioloop.IOLoop.instance()))
viewer.start()
self._broker_pub_socket.setsockopt(zmq.SNDHWM, eventLimit)
self._broker_pub_socket.setsockopt(zmq.SNDTIMEO, 0)
self._data_socket.setsockopt(zmq.SNDHWM, eventLimit)
self._data_socket.setsockopt(zmq.SNDTIMEO, 0)
self._ctrl_socket.bind("tcp://*:%d" % (self._ctrl_port))
self._broker_pub_socket.bind("tcp://*:%d" % (self._broker_pub_port))
self._broker_sub_socket.bind("tcp://*:%d" % (self._broker_sub_port))
self._data_socket.connect("tcp://127.0.0.1:%d" % (self._broker_sub_port))
# We are installing event handlers for those sockets
# but also for data stream, since a PUB socket actually
# can leak data if it never is asked to process its events.
# (According to some vague discussions.)
# (e.g. https://github.com/zeromq/libzmq/issues/1256 )
self._data_stream = zmq.eventloop.zmqstream.ZMQStream(self._data_socket)
self._ctrl_stream = zmq.eventloop.zmqstream.ZMQStream(self._ctrl_socket)
self._ctrl_stream.on_recv_stream(self._answer_command)
self._xpub_stream = zmq.eventloop.zmqstream.ZMQStream(self._broker_pub_socket)
self._xpub_stream.on_recv_stream(self._forward_xpub)
self._xsub_stream = zmq.eventloop.zmqstream.ZMQStream(self._broker_sub_socket)
self._xsub_stream.on_recv_stream(self._forward_xsub)
ipc.uuid = ipc.hostname+':'+str(self._broker_pub_port)
t = threading.Thread(target=self._ioloop)
# Make sure the program exists even when the thread exists
t.daemon = True
t.start()
mport = mon.bind_to_random_port('tcp://%s' % localhost())
mon.setsockopt(zmq.SUBSCRIBE, b"")
self._hb_listener = zmqstream.ZMQStream(mon, self.loop)
self._hb_listener.on_recv(self._report_ping)
hb_monitor = "tcp://%s:%i" % (localhost(), mport)
heart = Heart(hb_ping, hb_pong, hb_monitor , heart_id=identity)
heart.start()
# create Shell Connections (MUX, Task, etc.):
shell_addrs = url('mux'), url('task')
# Use only one shell stream for mux and tasks
stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
stream.setsockopt(zmq.IDENTITY, identity)
shell_streams = [stream]
for addr in shell_addrs:
connect(stream, addr)
# control stream:
control_addr = url('control')
control_stream = zmqstream.ZMQStream(ctx.socket(zmq.ROUTER), loop)
control_stream.setsockopt(zmq.IDENTITY, identity)
connect(control_stream, control_addr)
# create iopub stream:
iopub_addr = url('iopub')
iopub_socket = ctx.socket(zmq.PUB)
iopub_socket.setsockopt(zmq.IDENTITY, identity)
connect(iopub_socket, iopub_addr)
def get_features(self):
engine = self.context['engine']
image = engine.get_image_data()
ctx = self.get_context()
socket = ctx.socket(zmq.REQ)
socket.connect('tcp://%s:%s' % (options.REMOTECV_HOST, options.REMOTECV_PORT))
socket.setsockopt(zmq.LINGER, 0)
self.timeout_handle = self.ioloop.add_timeout(datetime.timedelta(seconds=options.REMOTECV_TIMEOUT), self.on_timeout)
self.stream = zmqstream.ZMQStream(socket, self.ioloop)
self.stream.on_recv(self.on_result)
msg = {
'type': self.detection_type,
'size': engine.size,
'mode': engine.get_image_mode(),
'path': self.context['image_url']
}
if options.REMOTECV_SEND_IMAGE:
msg['image'] = image
self.stream.send(bson.dumps(msg))
# use UUID to authenticate pipe messages
self._pipe_uuid = uuid.uuid4().bytes
pipe_in = ctx.socket(zmq.PULL)
pipe_in.linger = 0
try:
self._pipe_port = pipe_in.bind_to_random_port("tcp://127.0.0.1")
except zmq.ZMQError as e:
warn("Couldn't bind IOPub Pipe to 127.0.0.1: %s" % e +
"\nsubprocess output will be unavailable."
)
self._pipe_flag = False
pipe_in.close()
return
self._pipe_in = ZMQStream(pipe_in, self.io_loop)
self._pipe_in.on_recv(self._handle_pipe_msg)
def create_connected_stream(self, port, socket_type):
sock = self.context.socket(socket_type)
addr = "tcp://%s:%i" % (self.kernel_manager.get_kernel_ip(self.kernel_id), port)
self.log.info("Connecting to: %s" % addr)
sock.connect(addr)
return ZMQStream(sock)
:return: A ZMQ connection stream.
:rtype: ZMQStream
"""
logger.debug("Connecting %s to %s." % (socktype, address))
socket = self._context.socket(socktype)
# configure curve authentication
if self.use_curve:
public, private = maybe_create_and_get_certificates(
self._config_prefix, "client")
server_public_file = os.path.join(
self._config_prefix, PUBLIC_KEYS_PREFIX, "server.key")
server_public, _ = zmq.auth.load_certificate(server_public_file)
socket.curve_publickey = public
socket.curve_secretkey = private
socket.curve_serverkey = server_public
stream = zmqstream.ZMQStream(socket, self._loop)
socket.connect(address)
return stream
try:
instances['in'] = self.context.socket(zmq.SUB)
if in_mode == self.BIND:
instances['in'].bind(in_address)
elif in_mode == self.CONNECT:
instances['in'].connect(in_address)
instances['in'].setsockopt(zmq.SUBSCRIBE, b'')
instances['out'] = self.context.socket(zmq.PUB)
if out_mode == self.BIND:
instances['out'].bind(out_address)
elif out_mode == self.CONNECT:
instances['out'].connect(out_address)
# Transfer data from subscriber to publisher.
instances['bridge'] = ZMQStream(instances['in'], io_loop=self.loop)
instances['bridge'].on_recv(lambda msg: instances['out'].send(msg[0]))
except ZMQError as e:
raise ex.OmnibusException(e)
self.bridges[in_address][in_mode][out_address][out_mode] = instances
return instances
def start(self):
self.socket = self.context.socket(zmq.ROUTER)
# most socket options must be set before bind/connect
self.socket.curve_secretkey = self.secret
self.socket.curve_publickey = self.public
self.socket.curve_server = True
self.stream = zmqstream.ZMQStream(self.socket)
self.stream.on_recv(self.on_message)
self.stream.bind('tcp://*:9000')