Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
resp = response(250)
elif line.lower().startswith("starttls"):
if not ssl or not options.ssl:
resp = response(500)
else:
resp = response(220)
elif line.lower().startswith("vrfy"):
resp = response(252)
elif line.lower().startswith("quit"):
resp = response(221)
close = True
elif line.lower().startswith("data"):
resp = response(354)
mail_state.reading = True
else:
resp = response(500)
# this is a blocking action, sadly
# async non blocking methods did not
# work. =(
if options.delay > 0:
# import has to be called here for
# some reason...
import time
time.sleep(options.delay)
return resp, close
# Reset mail state
mail_state.reading = False
mail_state.email_id = new_id
resp = response(250)
elif line.lower().startswith("starttls"):
if not ssl or not options.ssl:
resp = response(500)
else:
resp = response(220)
elif line.lower().startswith("vrfy"):
resp = response(252)
elif line.lower().startswith("quit"):
resp = response(221)
close = True
elif line.lower().startswith("data"):
resp = response(354)
mail_state.reading = True
else:
resp = response(500)
# this is a blocking action, sadly
# async non blocking methods did not
# work. =(
if options.delay > 0:
# import has to be called here for
# some reason...
import time
time.sleep(options.delay)
return resp, close
new_id = email_id()
log.debug("[%s] RSET received, changing ID to [%s]" %
(mail_state.email_id, new_id))
# Reset mail state
mail_state.reading = False
mail_state.email_id = new_id
resp = response(250)
elif line.lower().startswith("starttls"):
if not ssl or not options.ssl:
resp = response(500)
else:
resp = response(220)
elif line.lower().startswith("vrfy"):
resp = response(252)
elif line.lower().startswith("quit"):
resp = response(221)
close = True
elif line.lower().startswith("data"):
resp = response(354)
mail_state.reading = True
else:
resp = response(500)
# this is a blocking action, sadly
# async non blocking methods did not
# work. =(
if options.delay > 0:
# import has to be called here for
# some reason...
import time
time.sleep(options.delay)
return resp, close
resp = response(250)
elif line.lower().startswith("rset"):
new_id = email_id()
log.debug("[%s] RSET received, changing ID to [%s]" %
(mail_state.email_id, new_id))
# Reset mail state
mail_state.reading = False
mail_state.email_id = new_id
resp = response(250)
elif line.lower().startswith("starttls"):
if not ssl or not options.ssl:
resp = response(500)
else:
resp = response(220)
elif line.lower().startswith("vrfy"):
resp = response(252)
elif line.lower().startswith("quit"):
resp = response(221)
close = True
elif line.lower().startswith("data"):
resp = response(354)
mail_state.reading = True
else:
resp = response(500)
# this is a blocking action, sadly
# async non blocking methods did not
# work. =(
if options.delay > 0:
# import has to be called here for
# some reason...
import time
close = False
if mail_state.reading:
resp = None
# Not exactly nice but it's only way I could safely figure
# out if it was the \n.\n
if line[0] == "." and len(line) == 3 and ord(line[0]) == 46:
mail_state.reading = False
resp = response()
elif line.lower().startswith("ehlo"):
resp = []
for k, r in enumerate(EHLO_RESPONSES):
r = r.format(options.message_size_limit) if k == 1 else r
resp.append("%s\r\n" % r)
elif any(line.lower().startswith(e) for e in ['helo', 'mail from',
'rcpt to', 'noop']):
resp = response(250)
elif line.lower().startswith("rset"):
new_id = email_id()
log.debug("[%s] RSET received, changing ID to [%s]" %
(mail_state.email_id, new_id))
# Reset mail state
mail_state.reading = False
mail_state.email_id = new_id
resp = response(250)
elif line.lower().startswith("starttls"):
if not ssl or not options.ssl:
resp = response(500)
else:
resp = response(220)
elif line.lower().startswith("vrfy"):
resp = response(252)
elif line.lower().startswith("quit"):
elif any(line.lower().startswith(e) for e in ['helo', 'mail from',
'rcpt to', 'noop']):
resp = response(250)
elif line.lower().startswith("rset"):
new_id = email_id()
log.debug("[%s] RSET received, changing ID to [%s]" %
(mail_state.email_id, new_id))
# Reset mail state
mail_state.reading = False
mail_state.email_id = new_id
resp = response(250)
elif line.lower().startswith("starttls"):
if not ssl or not options.ssl:
resp = response(500)
else:
resp = response(220)
elif line.lower().startswith("vrfy"):
resp = response(252)
elif line.lower().startswith("quit"):
resp = response(221)
close = True
elif line.lower().startswith("data"):
resp = response(354)
mail_state.reading = True
else:
resp = response(500)
# this is a blocking action, sadly
# async non blocking methods did not
# work. =(
if options.delay > 0:
# import has to be called here for
def handle_command(line, mail_state):
"""Handle each SMTP command as it's sent to the server
The paramater 'line' is the currently stream of data
ending in '\\n'.
'mail_state' is an instance of 'blackhole.state.MailState'.
"""
close = False
if mail_state.reading:
resp = None
# Not exactly nice but it's only way I could safely figure
# out if it was the \n.\n
if line[0] == "." and len(line) == 3 and ord(line[0]) == 46:
mail_state.reading = False
resp = response()
elif line.lower().startswith("ehlo"):
resp = []
for k, r in enumerate(EHLO_RESPONSES):
r = r.format(options.message_size_limit) if k == 1 else r
resp.append("%s\r\n" % r)
elif any(line.lower().startswith(e) for e in ['helo', 'mail from',
'rcpt to', 'noop']):
resp = response(250)
elif line.lower().startswith("rset"):
new_id = email_id()
log.debug("[%s] RSET received, changing ID to [%s]" %
(mail_state.email_id, new_id))
# Reset mail state
mail_state.reading = False
mail_state.email_id = new_id
resp = response(250)