Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
super().append_to_resp(line, result)
else:
self.buffer.append(line)
def flush(self):
if self.buffer:
self.queue.put_nowait(copy(self.buffer))
self.buffer.clear()
class AioImapException(Exception):
def __init__(self, reason):
super().__init__(reason)
class Error(AioImapException):
def __init__(self, reason):
super().__init__(reason)
class Abort(Error):
def __init__(self, reason):
super().__init__(reason)
class CommandTimeout(AioImapException):
def __init__(self, command):
self.command = command
class IncompleteRead(AioImapException):
def __init__(self, cmd, data=b''):
class Error(AioImapException):
def __init__(self, reason):
super().__init__(reason)
class Abort(Error):
def __init__(self, reason):
super().__init__(reason)
class CommandTimeout(AioImapException):
def __init__(self, command):
self.command = command
class IncompleteRead(AioImapException):
def __init__(self, cmd, data=b''):
self.cmd = cmd
self.data = data
def change_state(coro):
@functools.wraps(coro)
@asyncio.coroutine
def wrapper(self, *args, **kargs):
with (yield from self.state_condition):
res = yield from coro(self, *args, **kargs)
log.debug('state -> %s' % self.state)
self.state_condition.notify_all()
return res
return wrapper
class AioImapException(Exception):
def __init__(self, reason):
super().__init__(reason)
class Error(AioImapException):
def __init__(self, reason):
super().__init__(reason)
class Abort(Error):
def __init__(self, reason):
super().__init__(reason)
class CommandTimeout(AioImapException):
def __init__(self, command):
self.command = command
class IncompleteRead(AioImapException):
def __init__(self, cmd, data=b''):
self.cmd = cmd
self.data = data
def change_state(coro):
@functools.wraps(coro)
@asyncio.coroutine
def wrapper(self, *args, **kargs):
with (yield from self.state_condition):
res = yield from coro(self, *args, **kargs)