Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_quit_reconnect_ok(self):
code, message = yield from self.smtp.quit()
self.assertTrue(200 <= code <= 299)
# Next command should fail
with self.assertRaises(SMTPServerDisconnected):
code, message = yield from self.smtp.noop()
yield from self.smtp.reconnect()
# after reconnect, it should work again
code, message = yield from self.smtp.noop()
self.assertTrue(200 <= code <= 299)
async def execute_command(
self, *args: bytes, timeout: Optional[Union[float, Default]] = _default
) -> SMTPResponse:
"""
Check that we're connected, if we got a timeout value, and then
pass the command to the protocol.
:raises SMTPServerDisconnected: connection lost
"""
if self.protocol is None:
raise SMTPServerDisconnected("Server not connected")
if timeout is _default:
timeout = self.timeout
response = await self.protocol.execute_command(*args, timeout=timeout)
# If the server is unavailable, be nice and close the connection
if response.code == SMTPStatus.domain_unavailable:
self.close()
return response
def write(self, data: bytes) -> None:
if self.transport is None or self.transport.is_closing():
raise SMTPServerDisconnected("Connection lost")
self.transport.write(data)
def eof_received(self) -> bool:
exc = SMTPServerDisconnected("Unexpected EOF received")
if self._response_waiter and not self._response_waiter.done():
self._response_waiter.set_exception(exc)
if self._connection_lost_waiter and not self._connection_lost_waiter.done():
self._connection_lost_waiter.set_exception(exc)
# Returning false closes the transport
return False
def connection_lost(self, exc: Optional[Exception]) -> None:
super().connection_lost(exc)
if exc:
smtp_exc = SMTPServerDisconnected("Connection lost")
smtp_exc.__cause__ = exc
if self._response_waiter and not self._response_waiter.done():
if exc:
self._response_waiter.set_exception(smtp_exc)
else:
self._response_waiter.cancel()
if self._connection_lost_waiter and not self._connection_lost_waiter.done():
if exc:
self._connection_lost_waiter.set_exception(smtp_exc)
else:
self._connection_lost_waiter.set_result(None)
self.transport = None
self._command_lock = None
tls_context,
server_side=False,
server_hostname=server_hostname,
ssl_handshake_timeout=timeout,
)
except asyncio.TimeoutError as exc:
raise SMTPTimeoutError("Timed out while upgrading transport") from exc
# SSLProtocol only raises ConnectionAbortedError on timeout
except ConnectionAbortedError as exc:
raise SMTPTimeoutError(exc.args[0]) from exc
except ConnectionResetError as exc:
if exc.args:
message = exc.args[0]
else:
message = "Connection was reset while upgrading transport"
raise SMTPServerDisconnected(message) from exc
self.transport = tls_transport
return response
async def execute_data_command(
self, message: bytes, timeout: Optional[float] = None
) -> SMTPResponse:
"""
Sends an SMTP DATA command to the server, followed by encoded message content.
Automatically quotes lines beginning with a period per RFC821.
Lone \\\\r and \\\\n characters are converted to \\\\r\\\\n
characters.
"""
if self._command_lock is None:
raise SMTPServerDisconnected("Server not connected")
message = LINE_ENDINGS_REGEX.sub(b"\r\n", message)
message = PERIOD_REGEX.sub(b"..", message)
if not message.endswith(b"\r\n"):
message += b"\r\n"
message += b".\r\n"
async with self._command_lock:
self.write(b"DATA\r\n")
start_response = await self.read_response(timeout=timeout)
if start_response.code != SMTPStatus.start_input:
raise SMTPDataError(start_response.code, start_response.message)
self.write(message)
response = await self.read_response(timeout=timeout)
if response.code != SMTPStatus.completed:
Get extra info from the transport.
Supported keys:
- ``peername``
- ``socket``
- ``sockname``
- ``compression``
- ``cipher``
- ``peercert``
- ``sslcontext``
- ``sslobject``
:raises SMTPServerDisconnected: connection lost
"""
if self.transport is None:
raise SMTPServerDisconnected("Server not connected")
return self.transport.get_extra_info(key)
)
self._validate_config()
if server_hostname is None:
server_hostname = self.hostname
tls_context = self._get_tls_context()
if not self.supports_extension("starttls"):
raise SMTPException("SMTP STARTTLS extension not supported by server.")
response = await self.protocol.start_tls(
tls_context, server_hostname=server_hostname, timeout=self.timeout
)
if self.protocol is None:
raise SMTPServerDisconnected("Connection lost")
# Update our transport reference
self.transport = self.protocol.transport
# RFC 3207 part 4.2:
# The client MUST discard any knowledge obtained from the server, such
# as the list of SMTP service extensions, which was not obtained from
# the TLS negotiation itself.
self._reset_server_state()
return response