Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_quote_address_with_display_names(address, expected_address):
quoted_address = quote_address(address)
assert quoted_address == expected_address
def test_quote_address(email):
assert quote_address(email) == "<{}>".format(email)
async def rcpt(
self, recipient: str,
options: Iterable[str] = None,
timeout: OptionalDefaultNumber = _default) -> SMTPResponse:
"""
Sends the SMTP 'rcpt' command (specifies a recipient for the message)
Returns an SMTPResponse namedtuple.
"""
if timeout is _default:
timeout = self.timeout # type: ignore
if options is None:
options = []
to = 'TO:{}'.format(quote_address(recipient))
self._raise_error_if_disconnected()
response = await self.protocol.execute_command( # type: ignore
'RCPT', to, *options, timeout=timeout)
success_codes = (SMTPStatus.completed, SMTPStatus.will_forward)
if response.code not in success_codes:
raise SMTPRecipientRefused(
response.code, response.message, recipient)
return response
options: Optional[Iterable[str]] = None,
encoding: str = "ascii",
timeout: Optional[Union[float, Default]] = _default,
) -> SMTPResponse:
"""
Send an SMTP MAIL command, which specifies the message sender and
begins a new mail transfer session ("envelope").
:raises SMTPSenderRefused: on unexpected server response code
"""
if options is None:
options = []
await self._ehlo_or_helo_if_needed()
quoted_sender = quote_address(sender)
addr_bytes = quoted_sender.encode(encoding)
options_bytes = [option.encode("ascii") for option in options]
response = await self.execute_command(
b"MAIL", b"FROM:" + addr_bytes, *options_bytes, timeout=timeout
)
if response.code != SMTPStatus.completed:
raise SMTPSenderRefused(response.code, response.message, sender)
return response
encoding: str = "ascii",
timeout: Optional[Union[float, Default]] = _default,
) -> SMTPResponse:
"""
Send an SMTP RCPT command, which specifies a single recipient for
the message. This command is sent once per recipient and must be
preceded by 'MAIL'.
:raises SMTPRecipientRefused: on unexpected server response code
"""
if options is None:
options = []
await self._ehlo_or_helo_if_needed()
quoted_recipient = quote_address(recipient)
addr_bytes = quoted_recipient.encode(encoding)
options_bytes = [option.encode("ascii") for option in options]
response = await self.execute_command(
b"RCPT", b"TO:" + addr_bytes, *options_bytes, timeout=timeout
)
if response.code not in (SMTPStatus.completed, SMTPStatus.will_forward):
raise SMTPRecipientRefused(response.code, response.message, recipient)
return response
async def mail(
self, sender: str, options: Iterable[str] = None,
timeout: OptionalDefaultNumber = _default) -> SMTPResponse:
"""
Sends the SMTP 'mail' command (begins mail transfer session)
Returns an SMTPResponse namedtuple.
"""
if timeout is _default:
timeout = self.timeout # type: ignore
if options is None:
options = []
from_string = 'FROM:{}'.format(quote_address(sender))
self._raise_error_if_disconnected()
response = await self.protocol.execute_command( # type: ignore
'MAIL', from_string, *options, timeout=timeout)
if response.code != SMTPStatus.completed:
raise SMTPSenderRefused(response.code, response.message, sender)
return response