Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
import base64
from collections import deque
import pytest
from aiosmtplib.auth import SMTPAuth, crammd5_verify
from aiosmtplib.errors import SMTPAuthenticationError, SMTPException
from aiosmtplib.response import SMTPResponse
from aiosmtplib.status import SMTPStatus
pytestmark = pytest.mark.asyncio()
SUCCESS_RESPONSE = SMTPResponse(SMTPStatus.auth_successful, "OK")
FAILURE_RESPONSE = SMTPResponse(SMTPStatus.auth_failed, "Nope")
class DummySMTPAuth(SMTPAuth):
transport = None
def __init__(self):
super().__init__()
self.received_commands = []
self.responses = deque()
self.esmtp_extensions = {"auth": ""}
self.server_auth_methods = ["cram-md5", "login", "plain"]
self.supports_esmtp = True
def test_response_str(code, message):
response = SMTPResponse(code, message)
assert str(response) == "{} {}".format(response.code, response.message)
import base64
from collections import deque
import pytest
from aiosmtplib.auth import SMTPAuth, crammd5_verify
from aiosmtplib.errors import SMTPAuthenticationError, SMTPException
from aiosmtplib.response import SMTPResponse
from aiosmtplib.status import SMTPStatus
pytestmark = pytest.mark.asyncio()
SUCCESS_RESPONSE = SMTPResponse(SMTPStatus.auth_successful, "OK")
FAILURE_RESPONSE = SMTPResponse(SMTPStatus.auth_failed, "Nope")
class DummySMTPAuth(SMTPAuth):
transport = None
def __init__(self):
super().__init__()
self.received_commands = []
self.responses = deque()
self.esmtp_extensions = {"auth": ""}
self.server_auth_methods = ["cram-md5", "login", "plain"]
self.supports_esmtp = True
async def execute_command(self, *args, **kwargs):
def test_response_repr(code, message):
response = SMTPResponse(code, message)
assert repr(response) == "({}, {})".format(response.code, response.message)
username: Optional[str] = ...,
password: Optional[str] = ...,
mail_options: Optional[List[str]] = ...,
rcpt_options: Optional[List[str]] = ...,
timeout: Optional[float] = ...,
source_address: Optional[str] = ...,
use_tls: bool = ...,
start_tls: bool = ...,
validate_certs: bool = ...,
client_cert: Optional[str] = ...,
client_key: Optional[str] = ...,
tls_context: None = ...,
cert_bundle: Optional[str] = ...,
socket_path: None = ...,
sock: socket.socket = ...,
) -> Tuple[Dict[str, SMTPResponse], str]:
...
except ValueError:
raise SMTPResponseException(
SMTPStatus.invalid_response.value,
"Malformed SMTP response line: {!r}".format(line),
) from None
offset += len(line)
if len(message):
message.extend(b"\n")
message.extend(line[4:].strip(b" \t\r\n"))
if line[3:4] != b"-":
message_complete = True
break
if message_complete:
response = SMTPResponse(
code, bytes(message).decode("utf-8", "surrogateescape")
)
del self._buffer[:offset]
return response
else:
return None
async def _send_recipients(
self,
recipients: Sequence[str],
options: Iterable[str],
encoding: str = "ascii",
timeout: Optional[Union[float, Default]] = _default,
) -> Dict[str, SMTPResponse]:
"""
Send the recipients given to the server. Used as part of
:meth:`.sendmail`.
"""
recipient_errors = []
for address in recipients:
try:
await self.rcpt(
address, options=options, encoding=encoding, timeout=timeout
)
except SMTPRecipientRefused as exc:
recipient_errors.append(exc)
if len(recipient_errors) == len(recipients):
raise SMTPRecipientsRefused(recipient_errors)
:meth:`.sendmail`.
"""
recipient_errors = []
for address in recipients:
try:
await self.rcpt(
address, options=options, encoding=encoding, timeout=timeout
)
except SMTPRecipientRefused as exc:
recipient_errors.append(exc)
if len(recipient_errors) == len(recipients):
raise SMTPRecipientsRefused(recipient_errors)
formatted_errors = {
err.recipient: SMTPResponse(err.code, err.message)
for err in recipient_errors
}
return formatted_errors