Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_vrfy_failure(self):
bad_address = 'test@---'
with self.assertRaises(SMTPResponseException):
yield from self.smtp.vrfy(bad_address)
while True:
line_end_index = self._buffer.find(b"\n", offset)
if line_end_index == -1:
break
line = bytes(self._buffer[offset : line_end_index + 1])
if len(line) > MAX_LINE_LENGTH:
raise SMTPResponseException(
SMTPStatus.unrecognized_command, "Response too long"
)
try:
code = int(line[:3])
except ValueError:
raise SMTPResponseException(
SMTPStatus.invalid_response.value,
"Malformed SMTP response line: {!r}".format(line),
) from None
offset += len(line)
if len(message):
message.extend(b"\n")
message.extend(line[4:].strip(b" \t\r\n"))
if line[3:4] != b"-":
message_complete = True
break
if message_complete:
response = SMTPResponse(
code, bytes(message).decode("utf-8", "surrogateescape")
)
"""Parse the actual response (if any) from the data buffer
"""
code = -1
message = bytearray()
offset = 0
message_complete = False
while True:
line_end_index = self._buffer.find(b"\n", offset)
if line_end_index == -1:
break
line = bytes(self._buffer[offset : line_end_index + 1])
if len(line) > MAX_LINE_LENGTH:
raise SMTPResponseException(
SMTPStatus.unrecognized_command, "Response too long"
)
try:
code = int(line[:3])
except ValueError:
raise SMTPResponseException(
SMTPStatus.invalid_response.value,
"Malformed SMTP response line: {!r}".format(line),
) from None
offset += len(line)
if len(message):
message.extend(b"\n")
message.extend(line[4:].strip(b" \t\r\n"))
if line[3:4] != b"-":
"""
class SMTPDataError(SMTPResponseException):
"""
Server refused DATA content.
"""
class SMTPAuthenticationError(SMTPResponseException):
"""
Server refused our AUTH request; may be caused by invalid credentials.
"""
class SMTPSenderRefused(SMTPResponseException):
"""
SMTP server refused the message sender.
"""
def __init__(self, code: int, message: str, sender: str) -> None:
self.code = code
self.message = message
self.sender = sender
self.args = (code, message, sender)
class SMTPRecipientRefused(SMTPResponseException):
"""
SMTP server refused a message recipient.
"""
"""
class SMTPSenderRefused(SMTPResponseException):
"""
SMTP server refused the message sender.
"""
def __init__(self, code: int, message: str, sender: str) -> None:
self.code = code
self.message = message
self.sender = sender
self.args = (code, message, sender)
class SMTPRecipientRefused(SMTPResponseException):
"""
SMTP server refused a message recipient.
"""
def __init__(self, code: int, message: str, recipient: str) -> None:
self.code = code
self.message = message
self.recipient = recipient
self.args = (code, message, recipient)
class SMTPRecipientsRefused(SMTPException):
"""
SMTP server refused multiple recipients.
"""
async def rset(
self, timeout: Optional[Union[float, Default]] = _default
) -> SMTPResponse:
"""
Send an SMTP RSET command, which resets the server's envelope
(the envelope contains the sender, recipient, and mail data).
:raises SMTPResponseException: on unexpected server response code
"""
await self._ehlo_or_helo_if_needed()
response = await self.execute_command(b"RSET", timeout=timeout)
if response.code != SMTPStatus.completed:
raise SMTPResponseException(response.code, response.message)
return response
self.args = (code, message)
class SMTPHeloError(SMTPResponseException):
"""
Server refused HELO or EHLO.
"""
class SMTPDataError(SMTPResponseException):
"""
Server refused DATA content.
"""
class SMTPAuthenticationError(SMTPResponseException):
"""
Server refused our AUTH request; may be caused by invalid credentials.
"""
class SMTPSenderRefused(SMTPResponseException):
"""
SMTP server refused the message sender.
"""
def __init__(self, code: int, message: str, sender: str) -> None:
self.code = code
self.message = message
self.sender = sender
self.args = (code, message, sender)
async def rset(
self, timeout: OptionalDefaultNumber = _default) -> SMTPResponse:
"""
Sends an SMTP 'rset' command (resets session)
Returns an SMTPResponse namedtuple.
"""
if timeout is _default:
timeout = self.timeout # type: ignore
self._raise_error_if_disconnected()
response = await self.protocol.execute_command( # type: ignore
'RSET', timeout=timeout)
if response.code != SMTPStatus.completed:
raise SMTPResponseException(response.code, response.message)
return response