Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def write_response(mail_state, resp):
"""Write the response back to the stream"""
log.debug("[%s] SEND: %s" % (mail_state.email_id, resp.upper().rstrip()))
mail_state.stream.write(resp)
# Multiple responses, i.e. EHLO
if isinstance(resp, list):
for r in resp:
write_response(mail_state, r)
else:
# Otherwise it's a single response
write_response(mail_state, resp)
# Switch to SSL connection if starttls is called
# and we have an SSL library
if line.lower().startswith("starttls") and ssl and options.ssl:
fileno = mail_state.stream.socket.fileno()
IOLoop.current().remove_handler(fileno)
mail_state.stream = ssl_connection(connection)
# Close connection
if close is True:
log.debug("Closing")
mail_state.stream.close()
del mail_state.stream
return
else:
loop()
# Not exactly nice but it's only way I could safely figure
# out if it was the \n.\n
if line[0] == "." and len(line) == 3 and ord(line[0]) == 46:
mail_state.reading = False
resp = response()
elif line.lower().startswith("ehlo"):
resp = []
for k, r in enumerate(EHLO_RESPONSES):
r = r.format(options.message_size_limit) if k == 1 else r
resp.append("%s\r\n" % r)
elif any(line.lower().startswith(e) for e in ['helo', 'mail from',
'rcpt to', 'noop']):
resp = response(250)
elif line.lower().startswith("rset"):
new_id = email_id()
log.debug("[%s] RSET received, changing ID to [%s]" %
(mail_state.email_id, new_id))
# Reset mail state
mail_state.reading = False
mail_state.email_id = new_id
resp = response(250)
elif line.lower().startswith("starttls"):
if not ssl or not options.ssl:
resp = response(500)
else:
resp = response(220)
elif line.lower().startswith("vrfy"):
resp = response(252)
elif line.lower().startswith("quit"):
resp = response(221)
close = True
elif line.lower().startswith("data"):
Accepts the socket connections and passes them off
to be handled.
'sock' is an instance of 'socket'.
'fd' is an open file descriptor for the current connection.
'events' is an integer of the number of events on the socket.
"""
while True:
try:
connection, address = sock.accept()
except socket.error as e:
if e.errno not in (errno.EWOULDBLOCK, errno.EAGAIN):
raise
return
log.debug("Connection from '%s'" % address[0])
connection.setblocking(0)
stream = connection_stream(connection)
# No stream, bail out
if not stream:
return
mail_state = MailState()
mail_state.email_id = email_id()
mail_state.stream = stream
# Sadly there is nothing I can do about the handle and loop
# fuctions. They have to exist within connection_ready
def handle(line):
"""
Handle a line of socket data, figure out if
it's a valid SMTP keyword and handle it