Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __new__(cls, details, parent=None, name=None):
if not isinstance(parent, (Node, NoneType)):
raise ValueError("Unexpected value for 'parent', must be either None"
" or a Node instance, got %r" % type(parent))
if not isinstance(name, (NoneType, str, tuple)):
raise ValueError("Unexpected value for 'name', must be either None,"
" a string or a tuple, got %r" % type(name))
if isinstance(details, list):
self = super().__new__(List)
elif isinstance(details, dict):
self = super().__new__(Node)
else:
self = super().__new__(Scalar)
self._init(details, parent, name)
return self
def __new__(cls, details, parent=None, name=None):
if not isinstance(parent, (Node, NoneType)):
raise ValueError("Unexpected value for 'parent', must be either None"
" or a Node instance, got %r" % type(parent))
if not isinstance(name, (NoneType, str, tuple)):
raise ValueError("Unexpected value for 'name', must be either None,"
" a string or a tuple, got %r" % type(name))
if isinstance(details, list):
self = super().__new__(List)
elif isinstance(details, dict):
self = super().__new__(Node)
else:
self = super().__new__(Scalar)
self._init(details, parent, name)
return self
:returns: a string with the equivalent prettified statement(s)
When `safety_belt` is ``True``, the resulting statement is parsed again and its *AST*
compared with the original statement: if they don't match, a warning is emitted and the
original statement is returned. This is a transient protection against possible bugs in the
serialization machinery that may disappear before 1.0.
"""
# Intentional lazy imports, so the modules are loaded on demand
import warnings
from .printer import IndentedStream
from . import printers # noqa
orig_pt = parse_sql(statement)
prettified = IndentedStream(**options)(Node(orig_pt))
if safety_belt:
try:
pretty_pt = parse_sql(prettified)
except Error as e: # pragma: no cover
print(prettified)
warnings.warn("Detected a bug in pglast serialization, please report: %s\n%s"
% (e, prettified), RuntimeWarning)
return statement
_remove_stmt_len_and_location(orig_pt)
_remove_stmt_len_and_location(pretty_pt)
if pretty_pt != orig_pt: # pragma: no cover
print(prettified)
warnings.warn("Detected a non-cosmetic difference between original and"
" prettified statements, please report", RuntimeWarning)
between double-quotes
:param bool is_symbol:
whether the nodes are actually an *operator name*, in which case the last one
must be printed verbatim (such as ``"MySchema".===``)
"""
if standalone_items is None:
clm = self.compact_lists_margin
if clm is not None and clm > 0:
rawlist = self._concat_nodes(nodes, sep, are_names)
if self.current_column + len(rawlist) < clm:
self.write(rawlist)
return
standalone_items = not all(
(isinstance(n, Node)
and n.node_tag in ('A_Const', 'ColumnRef', 'SetToDefault', 'RangeVar'))
for n in nodes)
if (((sep != ',' or not self.comma_at_eoln)
and len(nodes) > 1
and len(sep) > 1
and relative_indent is None
and not are_names
and not is_symbol
and standalone_items)):
self.write(' '*(len(sep) + 1)) # separator added automatically
super().print_list(nodes, sep, relative_indent, standalone_items, are_names, is_symbol)
def __call__(self, sql, plpgsql=False):
"""Main entry point: execute :meth:`print_node` on each statement in `sql`.
:param sql: either the source SQL in textual form, or a :class:`~.node.Node` instance
:param bool plpgsql: whether `sql` is really a ``plpgsql`` statement
:returns: a string with the equivalent SQL obtained by serializing the syntax tree
"""
if isinstance(sql, str):
sql = Node(parse_plpgsql(sql) if plpgsql else parse_sql(sql))
elif isinstance(sql, Node):
sql = [sql]
elif not isinstance(sql, List):
raise ValueError("Unexpected value for 'sql', must be either a string,"
" a Node instance or a List instance, got %r" % type(sql))
first = True
for statement in sql:
if first:
first = False
else:
self.write(';')
self.newline()
for _ in range(self.separate_statements):
self.newline()
self.print_node(statement)
if self.semicolon_after_last_statement:
def traverse(self, parent_node):
"""
Recursively walk down the AST starting at `parent_node`.
For every node, call any callback functions registered for that
particular node tag.
"""
for node in parent_node.traverse():
# Ignore scalar values
if not isinstance(node, pglast.node.Node):
continue
tag = node.node_tag
if tag not in self._hooks:
continue
child_ctx = Context(self._session)
for hook in self._hooks[tag]:
hook(child_ctx, node)
# children can set up their own hooks, so recurse
child_ctx.traverse(node)
for exit_fn in self._exit_hooks:
exit_fn(self)
:returns: a generator that will yield one statement at a time
When `safety_belt` is ``True``, the resulting statement is parsed again and its *AST*
compared with the original statement: if they don't match, an :class:`~.error.Error` is
raised. This is a transient protection against possible bugs in the serialization machinery
that may disappear before 1.0.
"""
from . import printers # noqa
if stream_class is None:
from .printer import RawStream
stream_class = RawStream
for orig_pt in parse_sql(statements):
printed = stream_class(**options)(Node(orig_pt))
if safety_belt:
printed_pt = parse_sql(printed)[0]
_remove_stmt_len_and_location(orig_pt)
_remove_stmt_len_and_location(printed_pt)
if printed_pt != orig_pt: # pragma: no cover
raise Error("Detected a non-cosmetic difference between original and"
" printed statement")
yield printed
options.append('VERBOSE')
if optint & enums.VacuumOption.VACOPT_ANALYZE:
options.append('ANALYZE')
if optint & enums.VacuumOption.VACOPT_DISABLE_PAGE_SKIPPING:
options.append('DISABLE_PAGE_SKIPPING')
if 'VACUUM' in options:
output.write('VACUUM ')
options.remove('VACUUM')
else:
output.write('ANALYZE ')
options.remove('ANALYZE')
if options:
# Try so emit a syntax compatible with PG < 11, if possible.
if 'DISABLE_PAGE_SKIPPING' in options:
output.write('(')
output.print_list(Node(options, node), ',')
output.write(') ')
else:
for option in options:
output.write(option)
output.space()
if node.relation:
output.print_node(node.relation)
if node.va_cols:
output.write('(')
output.print_list(node.va_cols, ',', are_names=True)
output.write(')')
def __call__(self, sql, plpgsql=False):
"""Main entry point: execute :meth:`print_node` on each statement in `sql`.
:param sql: either the source SQL in textual form, or a :class:`~.node.Node` instance
:param bool plpgsql: whether `sql` is really a ``plpgsql`` statement
:returns: a string with the equivalent SQL obtained by serializing the syntax tree
"""
if isinstance(sql, str):
sql = Node(parse_plpgsql(sql) if plpgsql else parse_sql(sql))
elif isinstance(sql, Node):
sql = [sql]
elif not isinstance(sql, List):
raise ValueError("Unexpected value for 'sql', must be either a string,"
" a Node instance or a List instance, got %r" % type(sql))
first = True
for statement in sql:
if first:
first = False
else:
self.write(';')
self.newline()
for _ in range(self.separate_statements):
self.newline()
self.print_node(statement)
output.write('OR REPLACE ')
output.write('FUNCTION ')
output.print_name(node.funcname)
output.write('(')
# Functions returning a SETOF needs special care, because the resulting record
# definition is intermixed with real parameters: split them into two separated
# lists
real_params = node.parameters
if node.returnType and node.returnType.setof:
fpm = enums.FunctionParameterMode
record_def = []
real_params = []
for param in node.parameters:
if param.mode == fpm.FUNC_PARAM_TABLE:
record_def.append(Node(
{'ColumnDef': {
'typeName': {'TypeName': param.argType.parse_tree},
'colname': param.name.value}}))
else:
real_params.append(param)
if real_params:
output.print_list(real_params)
output.write(')')
if node.returnType:
output.newline()
output.writes('RETURNS')
if node.returnType.setof and record_def:
# Do not treat them as argument
output.write('TABLE (')
output.print_list(record_def, ',', standalone_items=False)