How to use the aiosmtpd.smtp.SMTP function in aiosmtpd

To help you get started, we’ve selected a few aiosmtpd examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github cole / aiosmtplib / tests / smtpd.py View on Github external
def record_command(self, command, *args):
        self.commands.append((command, *args))

    def record_server_response(self, status):
        self.responses.append(status)

    def handle_message(self, message):
        self.messages.append(message)

    async def handle_EHLO(self, server, session, envelope, hostname):
        """Advertise auth login support."""
        session.host_name = hostname
        return "250-AUTH LOGIN\r\n250 HELP"


class TestSMTPD(SMTPD):
    def _getaddr(self, arg):
        """
        Don't raise an exception on unparsable email address
        """
        try:
            return super()._getaddr(arg)
        except HeaderParseError:
            return None, ""

    async def _call_handler_hook(self, command, *args):
        self.event_handler.record_command(command, *args)
        return await super()._call_handler_hook(command, *args)

    async def push(self, status):
        result = await super().push(status)
        self.event_handler.record_server_response(status)
github cole / aiosmtplib / tests / testserver / smptd_server.py View on Github external
self.messages = messages_list
        self.commands = commands_list
        self.responses = responses_list
        super().__init__(message_class=Message)

    def record_command(self, command, *args):
        self.commands.append((command, *args))

    def record_server_response(self, status):
        self.responses.append(status)

    def handle_message(self, message):
        self.messages.append(message)


class TestSMTPD(SMTPD):
    def _getaddr(self, arg):
        """
        Don't raise an exception on unparsable email address
        """
        try:
            return super()._getaddr(arg)
        except HeaderParseError:
            return None, ""

    async def _call_handler_hook(self, command, *args):
        self.event_handler.record_command(command, *args)

        hook_response = await super()._call_handler_hook(command, *args)
        response_message = getattr(
            self.event_handler, command + "_response_message", None
        )
github aio-libs / aiosmtpd / aiosmtpd / main.py View on Github external
if args.setuid:                               # pragma: nomswin
        if pwd is None:
            print('Cannot import module "pwd"; try running with -n option.',
                  file=sys.stderr)
            sys.exit(1)
        nobody = pwd.getpwnam('nobody').pw_uid
        try:
            os.setuid(nobody)
        except PermissionError:
            print('Cannot setuid "nobody"; try running with -n option.',
                  file=sys.stderr)
            sys.exit(1)

    factory = partial(
        SMTP, args.handler,
        data_size_limit=args.size, enable_SMTPUTF8=args.smtputf8)

    logging.basicConfig(level=logging.ERROR)
    log = logging.getLogger('mail.log')
    loop = asyncio.get_event_loop()

    if args.debug > 0:
        log.setLevel(logging.INFO)
    if args.debug > 1:
        log.setLevel(logging.DEBUG)
    if args.debug > 2:
        loop.set_debug(enabled=True)

    log.info('Server listening on %s:%s', args.host, args.port)

    server = loop.run_until_complete(
github johnnykv / heralding / heralding / capabilities / smtp.py View on Github external
import time
import random
import base64
import socket
import asyncio
import logging

from aiosmtpd.smtp import SMTP, MISSING, syntax

from heralding.capabilities.handlerbase import HandlerBase

log = logging.getLogger(__name__)


class SMTPHandler(SMTP):
  fqdn = ''

  def __init__(self, reader, writer, session, options, loop):
    self.banner = options['protocol_specific_data']['banner']
    super().__init__(None, hostname=self.banner, loop=loop)
    # Reset standard banner.
    self.__ident__ = ""
    self._reader = reader
    self._writer = writer
    self.transport = writer

    self._set_rset_state()
    self.session = session
    self.session.peer = self.transport.get_extra_info('peername')
    self.session.extended_smtp = None
    self.session.host_name = None
github jrmi / byemail / byemail / commands.py View on Github external
def smtp_factory():
        return SMTP(smtp.MsgHandler(loop=loop), enable_SMTPUTF8=True)
github scrapbird / sarlacc / smtpd / src / mailer.py View on Github external
def factory(self):
        """``CustomIdentController`` factory.

        Overrides ``super.factory()``.
        Creates an aiosmtpd server object.

        Returns:
            Returns the server object.
        """

        server = Server(self.handler)
        server.hostname = self.ident_hostname
        server.__ident__ = self.ident
        return server
github aio-libs / aiosmtpd / aiosmtpd / lmtp.py View on Github external
from aiosmtpd.smtp import SMTP, syntax
from public import public


@public
class LMTP(SMTP):
    @syntax('LHLO hostname')
    async def smtp_LHLO(self, arg):
        """The LMTP greeting, used instead of HELO/EHLO."""
        await super().smtp_HELO(arg)
        self.show_smtp_greeting = False

    async def smtp_HELO(self, arg):
        """HELO is not a valid LMTP command."""
        await self.push('500 Error: command "HELO" not recognized')

    async def smtp_EHLO(self, arg):
        """EHLO is not a valid LMTP command."""
        await self.push('500 Error: command "EHLO" not recognized')
github aio-libs / aiosmtpd / server.py View on Github external
import sys
import socket
import asyncio
import logging

from aiosmtpd.events import Debugging
from aiosmtpd.smtp import SMTP

class ExitableSMTP(SMTP):
    @asyncio.coroutine
    def smtp_EXIT(self, arg):
        if arg:
            yield from self.push('501 Syntax: NOOP')
        else:
            yield from self.push('250 OK')
            self.loop.stop()
            self._connection_closed = True
            self._handler_coroutine.cancel()


logging.basicConfig(level=logging.DEBUG)
log = logging.getLogger('mail.log')

loop = asyncio.get_event_loop()
sock = socket.socket(socket.AF_INET6, socket.SOCK_STREAM)
github aio-libs / aiosmtpd / aiosmtpd / controller.py View on Github external
def factory(self):
        """Allow subclasses to customize the handler/server creation."""
        return SMTP(self.handler, enable_SMTPUTF8=self.enable_SMTPUTF8)