Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def record_command(self, command, *args):
self.commands.append((command, *args))
def record_server_response(self, status):
self.responses.append(status)
def handle_message(self, message):
self.messages.append(message)
async def handle_EHLO(self, server, session, envelope, hostname):
"""Advertise auth login support."""
session.host_name = hostname
return "250-AUTH LOGIN\r\n250 HELP"
class TestSMTPD(SMTPD):
def _getaddr(self, arg):
"""
Don't raise an exception on unparsable email address
"""
try:
return super()._getaddr(arg)
except HeaderParseError:
return None, ""
async def _call_handler_hook(self, command, *args):
self.event_handler.record_command(command, *args)
return await super()._call_handler_hook(command, *args)
async def push(self, status):
result = await super().push(status)
self.event_handler.record_server_response(status)
self.messages = messages_list
self.commands = commands_list
self.responses = responses_list
super().__init__(message_class=Message)
def record_command(self, command, *args):
self.commands.append((command, *args))
def record_server_response(self, status):
self.responses.append(status)
def handle_message(self, message):
self.messages.append(message)
class TestSMTPD(SMTPD):
def _getaddr(self, arg):
"""
Don't raise an exception on unparsable email address
"""
try:
return super()._getaddr(arg)
except HeaderParseError:
return None, ""
async def _call_handler_hook(self, command, *args):
self.event_handler.record_command(command, *args)
hook_response = await super()._call_handler_hook(command, *args)
response_message = getattr(
self.event_handler, command + "_response_message", None
)
if args.setuid: # pragma: nomswin
if pwd is None:
print('Cannot import module "pwd"; try running with -n option.',
file=sys.stderr)
sys.exit(1)
nobody = pwd.getpwnam('nobody').pw_uid
try:
os.setuid(nobody)
except PermissionError:
print('Cannot setuid "nobody"; try running with -n option.',
file=sys.stderr)
sys.exit(1)
factory = partial(
SMTP, args.handler,
data_size_limit=args.size, enable_SMTPUTF8=args.smtputf8)
logging.basicConfig(level=logging.ERROR)
log = logging.getLogger('mail.log')
loop = asyncio.get_event_loop()
if args.debug > 0:
log.setLevel(logging.INFO)
if args.debug > 1:
log.setLevel(logging.DEBUG)
if args.debug > 2:
loop.set_debug(enabled=True)
log.info('Server listening on %s:%s', args.host, args.port)
server = loop.run_until_complete(
import time
import random
import base64
import socket
import asyncio
import logging
from aiosmtpd.smtp import SMTP, MISSING, syntax
from heralding.capabilities.handlerbase import HandlerBase
log = logging.getLogger(__name__)
class SMTPHandler(SMTP):
fqdn = ''
def __init__(self, reader, writer, session, options, loop):
self.banner = options['protocol_specific_data']['banner']
super().__init__(None, hostname=self.banner, loop=loop)
# Reset standard banner.
self.__ident__ = ""
self._reader = reader
self._writer = writer
self.transport = writer
self._set_rset_state()
self.session = session
self.session.peer = self.transport.get_extra_info('peername')
self.session.extended_smtp = None
self.session.host_name = None
def smtp_factory():
return SMTP(smtp.MsgHandler(loop=loop), enable_SMTPUTF8=True)
def factory(self):
"""``CustomIdentController`` factory.
Overrides ``super.factory()``.
Creates an aiosmtpd server object.
Returns:
Returns the server object.
"""
server = Server(self.handler)
server.hostname = self.ident_hostname
server.__ident__ = self.ident
return server
from aiosmtpd.smtp import SMTP, syntax
from public import public
@public
class LMTP(SMTP):
@syntax('LHLO hostname')
async def smtp_LHLO(self, arg):
"""The LMTP greeting, used instead of HELO/EHLO."""
await super().smtp_HELO(arg)
self.show_smtp_greeting = False
async def smtp_HELO(self, arg):
"""HELO is not a valid LMTP command."""
await self.push('500 Error: command "HELO" not recognized')
async def smtp_EHLO(self, arg):
"""EHLO is not a valid LMTP command."""
await self.push('500 Error: command "EHLO" not recognized')
import sys
import socket
import asyncio
import logging
from aiosmtpd.events import Debugging
from aiosmtpd.smtp import SMTP
class ExitableSMTP(SMTP):
@asyncio.coroutine
def smtp_EXIT(self, arg):
if arg:
yield from self.push('501 Syntax: NOOP')
else:
yield from self.push('250 OK')
self.loop.stop()
self._connection_closed = True
self._handler_coroutine.cancel()
logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger('mail.log')
loop = asyncio.get_event_loop()
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
def factory(self):
"""Allow subclasses to customize the handler/server creation."""
return SMTP(self.handler, enable_SMTPUTF8=self.enable_SMTPUTF8)