Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def fetch_mail(host, user, password):
imap_client = aioimaplib.IMAP4_SSL(host=host)
yield from imap_client.wait_hello_from_server()
yield from imap_client.login(user, password)
res, data = yield from imap_client.select()
print('there is %s messages INBOX' % data[0])
yield from imap_client.logout()
for handler in self.handlers:
# initialize handler
handler_instance = handler()
handler_instance.set_session(self)
if hasattr(handler_instance, 'init'):
await handler_instance.init()
if hasattr(handler_instance, 'on_event'):
self.log.debug("subscribing to topic %s", handler_instance.subscribe_topic)
# Used with base handler defined subscribe_topic
if handler_instance.subscribe_topic is not None:
self.log.debug("subscribed to topic: %s", handler_instance.subscribe_topic)
while True:
imap_client = aioimaplib.IMAP4_SSL(host='imap.gmail.com')
emails = await self.fetch_emails()
for mail in emails:
groups = re.search(BODY_PATTERN, str(mail))
if groups:
await handler_instance.on_event(groups[1])
else:
# Used with config.json defined topics
if self.subscribed_topics is not None:
emails = await imap_client.fetch('20', 'BODY[]')
for mail in emails:
groups = re.search(BODY_PATTERN, str(mail))
if groups:
await handler_instance.on_event(groups[1])
else:
raise Exception("Failed to connect to gmail")
async def fetch_emails(self):
"""
Get emails from server
"""
imap_client = aioimaplib.IMAP4_SSL(host=IMAP_HOST)
await imap_client.wait_hello_from_server()
await imap_client.login(GMAIL_USERNAME, GMAIL_PASSWORD)
emails = await imap_client.fetch('20', 'BODY[]')
return emails
def check_mailbox(host, user, password):
imap_client = aioimaplib.IMAP4_SSL(host=host)#,timeout=4
#print ("connecting")
yield from imap_client.wait_hello_from_server()
a = yield from imap_client.login(user, password)
yield from imap_client.logout()
#print ("logged out")
if 'OK' in a:
return True
elif 'authentication failed' in a:
return False