Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_uid_with_illegal_command(self):
imap_client = yield from self.login_user('user', 'pass', select=True)
for command in {'COPY', 'FETCH', 'STORE'}.symmetric_difference(Commands.keys()):
with self.assertRaises(aioimaplib.Abort) as expected:
yield from imap_client.uid(command)
self.assertEqual(expected.exception.args,
('command UID only possible with COPY, FETCH or STORE (was %s)' % command,))
@asyncio.coroutine
def execute(self, command):
if self.state not in Commands.get(command.name).valid_states:
raise Abort('command %s illegal in state %s' % (command.name, self.state))
if self.pending_sync_command is not None:
yield from self.pending_sync_command.wait()
if Commands.get(command.name).exec == Exec.is_sync:
if self.pending_async_commands:
yield from self.wait_async_pending_commands()
self.pending_sync_command = command
else:
if self.pending_async_commands.get(command.untagged_resp_name) is not None:
yield from self.pending_async_commands[command.untagged_resp_name].wait()
self.pending_async_commands[command.untagged_resp_name] = command
self.send(str(command))
try:
yield from command.wait()
@asyncio.coroutine
def idle(self):
if 'IDLE' not in self.capabilities:
raise Abort('server has not IDLE capability')
return (yield from self.execute(IdleCommand(self.new_tag(), self.idle_queue, loop=self.loop)))
@asyncio.coroutine
def uid(self, command, *criteria, timeout=None):
if self.state not in Commands.get('UID').valid_states:
raise Abort('command UID illegal in state %s' % self.state)
if command.upper() == 'FETCH':
return (yield from self.fetch(criteria[0], criteria[1], by_uid=True, timeout=timeout))
if command.upper() == 'STORE':
return (yield from self.store(*criteria, by_uid=True))
if command.upper() == 'COPY':
return (yield from self.copy(*criteria, by_uid=True))
if command.upper() == 'MOVE':
return (yield from self.move(*criteria, by_uid=True))
if command.upper() == 'EXPUNGE':
if 'UIDPLUS' not in self.capabilities:
raise Abort('EXPUNGE with uids is only valid with UIDPLUS capability. UIDPLUS not in (%s)' % self.capabilities)
return (yield from self.expunge(*criteria, by_uid=True))
raise Abort('command UID only possible with COPY, FETCH, EXPUNGE (w/UIDPLUS) or STORE (was %s)' % command.upper())
def _response_done(self, line):
log.debug('tagged status %s' % line)
tag, _, response = line.partition(' ')
if self.pending_sync_command is not None:
if self.pending_sync_command.tag != tag:
raise Abort('unexpected tagged response with pending sync command (%s) response: %s' %
(self.pending_sync_command, response))
command = self.pending_sync_command
self.pending_sync_command = None
else:
cmds = self._find_pending_async_cmd_by_tag(tag)
if len(cmds) == 0:
raise Abort('unexpected tagged (%s) response: %s' % (tag, response))
elif len(cmds) > 1:
raise Error('inconsistent state : two commands have the same tag (%s)' % cmds)
command = cmds.pop()
self.pending_async_commands.pop(command.untagged_resp_name)
response_result, _, response_text = response.partition(' ')
command.close(response_text, result=response_result)
@asyncio.coroutine
def namespace(self):
if 'NAMESPACE' not in self.capabilities:
raise Abort('server has not NAMESPACE capability')
return (yield from self.execute(Command('NAMESPACE', self.new_tag(), loop=self.loop)))
@asyncio.coroutine
def move(self, uid_set, mailbox, by_uid=False):
if 'MOVE' not in self.capabilities:
raise Abort('server has not MOVE capability')
return (yield from self.execute(
Command('MOVE', self.new_tag(), uid_set, mailbox, prefix='UID' if by_uid else '', loop=self.loop)))
def _continuation(self, line):
if self.pending_sync_command is not None and self.pending_sync_command.name == 'APPEND':
if self.literal_data is None:
Abort('asked for literal data but have no literal data to send')
self.transport.write(self.literal_data)
self.transport.write(CRLF)
self.literal_data = None
elif self.pending_sync_command is not None:
log.debug('continuation line appended to pending sync command %s : %s' % (self.pending_sync_command, line))
self.pending_sync_command.append_to_resp(line)
self.pending_sync_command.flush()
else:
log.info('server says %s (ignored)' % line)