Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def fetch(
conn: asyncpg.connection.Connection, *,
schema_pattern: str=None,
domain_pattern: str=None) -> List[asyncpg.Record]:
return await conn.fetch("""
SELECT
t.oid AS oid,
t.typname AS name,
ns.nspname AS schema,
CASE WHEN t.typbasetype != 0 THEN
format_type(t.typbasetype, t.typtypmod)
ELSE
NULL
END AS basetype_full,
CASE WHEN t.typbasetype != 0 THEN
format_type(t.typbasetype, NULL)
ELSE
NULL
async def fetch(
conn: asyncpg.connection.Connection) -> List[asyncpg.Record]:
return await conn.fetch("""
SELECT
async def fetch(
conn: asyncpg.connection.Connection, *,
modules=None,
exclude_modules=None) -> List[asyncpg.Record]:
return await conn.fetch("""
SELECT
id,
name,
builtin
FROM
edgedb.module AS m
WHERE
($1::text[] IS NULL
OR split_part(m.name, '::', 1) = any($1::text[]))
AND ($2::text[] IS NULL
OR split_part(m.name, '::', 1) != all($2::text[]))
""", modules, exclude_modules)
async def fetch_properties(
conn: asyncpg.connection.Connection,
*,
modules=None,
exclude_modules=None) -> List[asyncpg.Record]:
return await conn.fetch("""
SELECT
p.id AS id,
p.name AS name,
edgedb._resolve_type_name(p.bases)
AS bases,
edgedb._resolve_type_name(p.ancestors)
AS ancestors,
p.cardinality AS cardinality,
p.required AS required,
p.expr AS expr,
p.readonly AS readonly,
p.default AS default,
edgedb._resolve_type_name(p.source)
AS source,
p.target AS target,
async def fetch(
conn: asyncpg.connection.Connection, *,
schema_pattern: str=None, type_pattern: str=None,
include_arrays: bool=True) -> List[asyncpg.Record]:
qry = """
SELECT
tp.oid AS oid,
tp.typrelid AS typrelid,
tp.typname AS name,
ns.nspname AS schema,
cmt.description AS comment
FROM
pg_type AS tp
INNER JOIN pg_namespace AS ns
ON ns.oid = tp.typnamespace
LEFT JOIN pg_description AS cmt
ON (cmt.objoid = tp.oid AND cmt.objsubid = 0)
WHERE
($1::text IS NULL OR ns.nspname LIKE $1::text) AND
async def fetch(
conn: asyncpg.connection.Connection) -> typing.List[asyncpg.Record]:
return await conn.fetch("""
SELECT
async def fetch_triggers(
conn: asyncpg.connection.Connection, *,
schema_pattern: str=None, table_pattern: str=None,
table_list: Optional[List[str]]=None,
trigger_pattern: str=None,
inheritable_only: bool=False) -> List[asyncpg.Record]:
qry = """
SELECT
table_name AS table_name,
array_agg((
trg_id,
trg_name,
trg_proc,
trg_constraint,
trg_granularity,
trg_timing,
trg_events,
trg_definition,
trg_metadata
)) AS triggers
FROM
def get_size(self, value):
if isinstance(value, (dict, asyncpg.Record)):
if "state" in value:
return len(value["state"])
if isinstance(value, list) and len(value) > 0:
# if its a list, guesss from first gey the length, and
# estimate it from the total lenghts on the list..
return getsizeof(value[0]) * len(value)
if isinstance(value, _basic_types):
return getsizeof(value)
return _default_size
async def fetch(
conn: asyncpg.connection.Connection, *,
schema_pattern: str=None,
constraint_pattern: str=None) -> typing.List[asyncpg.Record]:
return await conn.fetch("""
SELECT
(CASE WHEN cd.typname IS NOT NULL THEN
ARRAY[cdns.nspname, cd.typname]
ELSE NULL END) AS domain_name,
(CASE WHEN cc.relname IS NOT NULL THEN
ARRAY[ccns.nspname, cc.relname]
ELSE NULL END) AS table_name,
c.oid::int AS constraint_id,
c.contype::text AS constraint_type,
c.conname::text AS constraint_name,
-- Unique constraints have their expression stored
-- in index catalog.
async def fetch(
conn: asyncpg.connection.Connection, *,
schema_pattern: str=None,
constraint_pattern: str=None) -> List[asyncpg.Record]:
return await conn.fetch("""
SELECT
(CASE WHEN cd.typname IS NOT NULL THEN
ARRAY[cdns.nspname, cd.typname]
ELSE NULL END) AS domain_name,
(CASE WHEN cc.relname IS NOT NULL THEN
ARRAY[ccns.nspname, cc.relname]
ELSE NULL END) AS table_name,
c.oid::int AS constraint_id,
c.contype::text AS constraint_type,
c.conname::text AS constraint_name,
-- Unique constraints have their expression stored
-- in index catalog.