Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_pointless_attributes_remotion():
sql1 = parse_sql('select a from x; select b from y')
sql2 = parse_sql('select a from x;\n\nselect b from y')
assert sql1 != sql2
_remove_stmt_len_and_location(sql1)
_remove_stmt_len_and_location(sql2)
assert sql1 == sql2
def test_errors():
with pytest.raises(Error) as exc:
parse_sql('FooBar')
assert exc.typename == 'ParseError'
assert exc.value.location == 1
assert 'syntax error ' in str(exc.value)
with pytest.raises(Error) as exc:
parse_sql('SELECT foo FRON bar')
assert exc.typename == 'ParseError'
assert exc.value.location == 17
errmsg = str(exc.value)
assert 'syntax error at or near "bar"' in errmsg
assert 'location 17' in errmsg
with pytest.raises(Error) as exc:
parse_plpgsql('CREATE FUMCTION add (a integer, b integer)'
' RETURNS integer AS $$ BEGIN RETURN a + b; END; $$'
' LANGUAGE plpgsql')
assert exc.typename == 'ParseError'
assert exc.value.location == 8
errmsg = str(exc.value)
assert 'syntax error at or near "FUMCTION"' in errmsg
assert 'location 8' in errmsg
def _parse_column_type(typ):
"""
Feed column type name through pglast.
>>> _parse_column_type('real')
'pg_catalog.float4'
>>> _parse_column_type('double precision')
'pg_catalog.float8'
"""
sql = 'CREATE TABLE _(_ {0});'.format(typ)
create_table = pglast.Node(pglast.parse_sql(sql))[0].stmt
type_name = create_table.tableElts[0].typeName
return format_type_name(type_name)
def parse_column_type(typ):
"""
Feed the column type through pglast to normalize naming
e.g. `timestamp with time zone => timestamptz`.
>>> parse_column_type('integer')
'integer'
>>> parse_column_type('custom(type)')
'custom(type)'
"""
sql = 'CREATE TABLE _(_ {0});'.format(typ)
try:
create_table = pglast.Node(pglast.parse_sql(sql))[0].stmt
_, typ = _normalize_columns(create_table.tableElts)[0]
return typ
except pglast.parser.ParseError:
raise RuleConfigurationException(
RequireColumns, 'unable to parse column type "%s' % typ)
def _parse_string(text):
"""
Use ``pglast`` to turn ``text`` into a SQL AST node.
Returns ``pglast.node.Scalar(None)`` when no AST nodes could be
parsed. This is a hack, but prevents convoluting the downstream
logic too much, as ``Context.traverse`` will simply ignore scalar
values.
>>> _parse_string('SELECT 1')
[1*{RawStmt}]
>>> _parse_string('-- just a comment')
"""
ast = pglast.parse_sql(text)
return pglast.Node(ast) if ast else pglast.node.Scalar(None)