Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_child_heartbeat_started(event_loop):
up_read, up_write = os.pipe()
down_read, down_write = os.pipe()
os.close(up_write)
os.close(down_read)
child = Child(up_read, down_write, [], "1")
child.loop = event_loop
child._started = True
sp = StreamProtocol()
sp.reader = asyncio.StreamReader()
async def _reset():
sp.reader.feed_data(protocols.PING)
child._started = False
reset_task = asyncio.Task(_reset())
with mock.patch(
"blackhole.streams.StreamProtocol", return_value=sp
), mock.patch("asyncio.Task") as mock_task, mock.patch(
"blackhole.child.Child._start"
) as mock_start, mock.patch(
"blackhole.child.Child.stop"
) as mock_stop:
await child.heartbeat()
reset_task.cancel()
async def test_client_not_connected():
sp = StreamProtocol()
assert sp.is_connected() is False
async def test_client_connected(event_loop):
sp = StreamProtocol(loop=event_loop)
sp.connection_made(asyncio.Transport())
assert sp.is_connected() is True
3 bytes are used in the communication channel.
- b'x01' -- :const:`blackhole.protocols.PING`
- b'x02' -- :const:`blackhole.protocols.PONG`
These message values are defined in the :mod:`blackhole.protocols`
schema. Documentation is available at --
https://kura.github.io/blackhole/api-protocols.html
"""
read_fd = os.fdopen(self.up_read, "rb")
r_trans, r_proto = await self.loop.connect_read_pipe(
StreamProtocol, read_fd
)
write_fd = os.fdopen(self.down_write, "wb")
w_trans, w_proto = await self.loop.connect_write_pipe(
StreamProtocol, write_fd
)
reader = r_proto.reader
writer = asyncio.StreamWriter(w_trans, w_proto, reader, self.loop)
self.server_task = asyncio.Task(self._start())
while self._started:
try:
msg = await reader.read(3)
except: # noqa
break
if msg == protocols.PING:
logger.debug(
f"child.{self.idx}.heartbeat: Ping request received from "
"parent"
)
writer.write(protocols.PONG)
and child will be spawned.
.. note::
3 bytes are used in the communication channel.
- b'x01' -- :const:`blackhole.protocols.PING`
- b'x02' -- :const:`blackhole.protocols.PONG`
These message values are defined in the :mod:`blackhole.protocols`
schema. Documentation is available at --
https://kura.github.io/blackhole/api-protocols.html
"""
read_fd = os.fdopen(self.up_read, "rb")
r_trans, r_proto = await self.loop.connect_read_pipe(
StreamProtocol, read_fd
)
write_fd = os.fdopen(self.down_write, "wb")
w_trans, w_proto = await self.loop.connect_write_pipe(
StreamProtocol, write_fd
)
reader = r_proto.reader
writer = asyncio.StreamWriter(w_trans, w_proto, reader, self.loop)
self.server_task = asyncio.Task(self._start())
while self._started:
try:
msg = await reader.read(3)
except: # noqa
break
if msg == protocols.PING:
logger.debug(
async def connect(self):
"""
Connect the child and worker so they can communicate.
:param int up_write: A file descriptor.
:param int down_read: A file descriptor.
"""
read_fd = os.fdopen(self.down_read, "rb")
r_trans, r_proto = await self.loop.connect_read_pipe(
StreamProtocol, read_fd
)
write_fd = os.fdopen(self.up_write, "wb")
w_trans, w_proto = await self.loop.connect_write_pipe(
StreamProtocol, write_fd
)
reader = r_proto.reader
writer = asyncio.StreamWriter(w_trans, w_proto, reader, self.loop)
self.ping = time.monotonic()
self.rtransport = r_trans
self.wtransport = w_trans
self.chat_task = asyncio.ensure_future(self.chat(reader))
self.heartbeat_task = asyncio.ensure_future(self.heartbeat(writer))
async def connect(self):
"""
Connect the child and worker so they can communicate.
:param int up_write: A file descriptor.
:param int down_read: A file descriptor.
"""
read_fd = os.fdopen(self.down_read, "rb")
r_trans, r_proto = await self.loop.connect_read_pipe(
StreamProtocol, read_fd
)
write_fd = os.fdopen(self.up_write, "wb")
w_trans, w_proto = await self.loop.connect_write_pipe(
StreamProtocol, write_fd
)
reader = r_proto.reader
writer = asyncio.StreamWriter(w_trans, w_proto, reader, self.loop)
self.ping = time.monotonic()
self.rtransport = r_trans
self.wtransport = w_trans
self.chat_task = asyncio.ensure_future(self.chat(reader))
self.heartbeat_task = asyncio.ensure_future(self.heartbeat(writer))