Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Sends the SMTP 'data' command (sends message data to server)
Raises SMTPDataError if there is an unexpected reply to the
DATA command.
Returns an SMTPResponse tuple (the last one, after all data is sent.)
"""
if timeout is _default:
timeout = self.timeout # type: ignore
self._raise_error_if_disconnected()
start_response = await self.protocol.execute_command( # type: ignore
'DATA', timeout=timeout)
if start_response.code != SMTPStatus.start_input:
raise SMTPDataError(start_response.code, start_response.message)
if isinstance(message, str):
message = message.encode('utf-8')
await self.protocol.write_message_data( # type: ignore
message, timeout=timeout)
response = await self.protocol.read_response( # type: ignore
timeout=timeout)
if response.code != SMTPStatus.completed:
raise SMTPDataError(response.code, response.message)
return response
"""
code = -1
message = bytearray()
offset = 0
message_complete = False
while True:
line_end_index = self._buffer.find(b"\n", offset)
if line_end_index == -1:
break
line = bytes(self._buffer[offset : line_end_index + 1])
if len(line) > MAX_LINE_LENGTH:
raise SMTPResponseException(
SMTPStatus.unrecognized_command, "Response too long"
)
try:
code = int(line[:3])
except ValueError:
raise SMTPResponseException(
SMTPStatus.invalid_response.value,
"Malformed SMTP response line: {!r}".format(line),
) from None
offset += len(line)
if len(message):
message.extend(b"\n")
message.extend(line[4:].strip(b" \t\r\n"))
if line[3:4] != b"-":
message_complete = True
host.
Returns an SMTPResponse namedtuple.
"""
if hostname is None:
hostname = self.source_address
if timeout is _default:
timeout = self.timeout # type: ignore
self._raise_error_if_disconnected()
response = await self.protocol.execute_command( # type: ignore
'HELO', hostname, timeout=timeout)
self.last_helo_response = response
if response.code != SMTPStatus.completed:
raise SMTPHeloError(response.code, response.message)
return response
) -> SMTPResponse:
"""
Send the SMTP HELO command.
Hostname to send for this command defaults to the FQDN of the local
host.
:raises SMTPHeloError: on unexpected server response code
"""
if hostname is None:
hostname = self.source_address
response = await self.execute_command(
b"HELO", hostname.encode("ascii"), timeout=timeout
)
self.last_helo_response = response
if response.code != SMTPStatus.completed:
raise SMTPHeloError(response.code, response.message)
return response
message = LINE_ENDINGS_REGEX.sub(b"\r\n", message)
message = PERIOD_REGEX.sub(b"..", message)
if not message.endswith(b"\r\n"):
message += b"\r\n"
message += b".\r\n"
async with self._command_lock:
self.write(b"DATA\r\n")
start_response = await self.read_response(timeout=timeout)
if start_response.code != SMTPStatus.start_input:
raise SMTPDataError(start_response.code, start_response.message)
self.write(message)
response = await self.read_response(timeout=timeout)
if response.code != SMTPStatus.completed:
raise SMTPDataError(response.code, response.message)
return response
self.transport = transport
try:
response = await protocol.read_response(timeout=self.timeout)
except SMTPServerDisconnected as exc:
raise SMTPConnectError(
"Error connecting to {host} on port {port}: {err}".format(
host=self.hostname, port=self.port, err=exc
)
) from exc
except SMTPTimeoutError as exc:
raise SMTPConnectTimeoutError(
"Timed out waiting for server ready message"
) from exc
if response.code != SMTPStatus.ready:
raise SMTPConnectError(str(response))
return response
async def help(self, timeout: Optional[Union[float, Default]] = _default) -> str:
"""
Send the SMTP HELP command, which responds with help text.
:raises SMTPResponseException: on unexpected server response code
"""
await self._ehlo_or_helo_if_needed()
response = await self.execute_command(b"HELP", timeout=timeout)
if response.code not in (
SMTPStatus.system_status_ok,
SMTPStatus.help_message,
SMTPStatus.completed,
):
raise SMTPResponseException(response.code, response.message)
return response.message
220-esmtp.example.com
AUTH PLAIN dGVzdAB0ZXN0AHRlc3RwYXNz
235 ok, go ahead (#2.0.0)
"""
username_bytes = username.encode("ascii")
password_bytes = password.encode("ascii")
username_and_password = b"\0" + username_bytes + b"\0" + password_bytes
encoded = base64.b64encode(username_and_password)
response = await self.execute_command(
b"AUTH", b"PLAIN", encoded, timeout=timeout
)
if response.code != SMTPStatus.auth_successful:
raise SMTPAuthenticationError(response.code, response.message)
return response