Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __eq__(self, other):
if isinstance(other, RowProxy):
return self.as_tuple() == other.as_tuple()
elif isinstance(other, Sequence):
return self.as_tuple() == other
else:
return NotImplemented
from typing import List
from aiopg.sa.result import RowProxy
RowsProxy = List[RowProxy]
def _process_rows(self, rows):
process_row = RowProxy
metadata = self._metadata
keymap = metadata._keymap
processors = metadata._processors
return [process_row(metadata, row, processors, keymap)
for row in rows]
async def resolve_owner(self, info: ResolveInfo) -> List[RowProxy]:
app = info.context['request'].app
return await app['loaders'].users.load(self['owner_id'])
async def resolve_rooms(self, info: ResolveInfo) -> List[RowProxy]:
app = info.context['request'].app
async with app['db'].acquire() as conn:
return await select_rooms(conn)
from typing import List
from aiopg.sa.result import RowProxy
RowsProxy = List[RowProxy]