How to use the aiosmtpd.errors.TooMuchDataError function in aiosmtpd

To help you get started, we’ve selected a few aiosmtpd examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github aio-libs / aiosmtpd / aiosmtpd / streams.py View on Github external
not_enough = True

        while not_enough:
            while self._buffer and not_enough:
                ichar = self._buffer.find(const.LINE_TERM)
                if ichar < 0:
                    line.extend(self._buffer)
                    self._buffer.clear()
                else:
                    ichar += len(const.LINE_TERM)
                    line.extend(self._buffer[:ichar])
                    del self._buffer[:ichar]
                    not_enough = False

            if max_len and len(line) > max_len:
                raise errors.TooMuchDataError()

            if self._eof:
                break

            if not_enough:
                yield from self._wait_for_data('read_crlf_line')

        self._maybe_resume_transport()
        return bytes(line)