Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _transmit_data(self):
"""Method that deals with sending a block to the wire."""
if self._current_block is None:
self._transmit_oack()
return
fmt = "!HH%ds" % len(self._current_block)
packet = struct.pack(
fmt, constants.OPCODE_DATA, self._last_block_sent, self._current_block
)
self._get_listener().sendto(packet, self._peer)
self._stats.packets_sent += 1
self._stats.bytes_sent += len(self._current_block)
if len(self._current_block) < self._block_size:
self._waiting_last_ack = True