Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_get_name(text, expected):
tree = ast.parse(text)
print(ast.dump(tree))
expr = tree.body[0].value
assert get_name(expr=expr) == expected
tree = astroid.parse(text)
print(tree.repr_tree())
expr = tree.body[-1].value
assert get_name(expr=expr) == expected
def test_get_contracts():
text = """
import deal
a = 1
1 / 0
not_a_deal.module_load(something)
deal.module_load(deal.silent)
"""
text = dedent(text)
tree = ast.parse(text)
print(ast.dump(tree))
nodes = DealLoader._get_contracts(tree=tree)
assert [get_name(node) for node in nodes] == ['deal.silent']
def get_contracts(decorators: List) -> Iterator[Tuple[str, list]]:
for contract in decorators:
if isinstance(contract, TOKENS.ATTR):
name = get_name(contract)
if name not in SUPPORTED_MARKERS:
continue
yield name.split('.')[-1], []
if isinstance(contract, TOKENS.CALL):
if not isinstance(contract.func, TOKENS.ATTR):
continue
name = get_name(contract.func)
if name == 'deal.chain':
yield from get_contracts(contract.args)
if name not in SUPPORTED_CONTRACTS:
continue
yield name.split('.')[-1], contract.args
# infer assigned value
if isinstance(contract, astroid.Name):
assigments = contract.lookup(contract.name)[1]
if not assigments:
continue
# use only the closest assignment
expr = assigments[0]
# can it be not an assignment? IDK
if not isinstance(expr, astroid.AssignName): # pragma: no cover
continue
def handle_call(expr, dive: bool = True) -> Optional[Union[Token, Iterator[Token]]]:
token_info = dict(line=expr.lineno, col=expr.col_offset)
name = get_name(expr.func)
if name and name == 'exit':
return Token(value=SystemExit, **token_info)
# sys.exit()
if isinstance(expr.func, TOKENS.ATTR):
name = get_name(expr.func)
if name and name == 'sys.exit':
return Token(value=SystemExit, **token_info)
# infer function call and check the function body for raises
if dive:
return _exceptions_from_func(expr=expr)
return None
def get_contracts(decorators: List) -> Iterator[Tuple[str, list]]:
for contract in decorators:
if isinstance(contract, TOKENS.ATTR):
name = get_name(contract)
if name not in SUPPORTED_MARKERS:
continue
yield name.split('.')[-1], []
if isinstance(contract, TOKENS.CALL):
if not isinstance(contract.func, TOKENS.ATTR):
continue
name = get_name(contract.func)
if name == 'deal.chain':
yield from get_contracts(contract.args)
if name not in SUPPORTED_CONTRACTS:
continue
yield name.split('.')[-1], contract.args
# infer assigned value
if isinstance(contract, astroid.Name):
def handle_call(expr) -> Optional[Token]:
token_info = dict(line=expr.lineno, col=expr.col_offset)
name = get_name(expr.func)
if name in ('print', 'sys.stdout', 'sys.stderr'):
return Token(value=name, **token_info)
if name in ('sys.stdout.write', 'sys.stderr.write'):
return Token(value=name[:-6], **token_info)
if name == 'open':
if _is_open_to_write(expr):
return Token(value='open', **token_info)
if _is_pathlib_write(expr):
return Token(value='Path.open', **token_info)
return None
for value in infer(expr.func):
if type(value) is not astroid.FunctionDef:
continue
# recursively infer exceptions from the function body
for error in get_exceptions(body=value.body, dive=False):
yield Token(value=error.value, line=expr.lineno, col=expr.col_offset)
# get explicitly specified exceptions from `@deal.raises`
if not value.decorators:
continue
for category, args in get_contracts(value.decorators.nodes):
if category != 'raises':
continue
for arg in args:
name = get_name(arg)
if name is None:
continue
yield Token(value=name, line=expr.lineno, col=expr.col_offset)
return None
def handle_raise(expr, **kwargs) -> Optional[Token]:
token_info = dict(line=expr.lineno, col=expr.col_offset)
name = get_name(expr.exc)
if not name:
# raised a value, too tricky
if not isinstance(expr.exc, TOKENS.CALL):
return None
# raised an instance of an exception
name = get_name(expr.exc.func)
if not name or name[0].islower():
return None
exc = getattr(builtins, name, name)
token_info['col'] = expr.exc.col_offset
return Token(value=exc, **token_info)
def handle_raise(expr, **kwargs) -> Optional[Token]:
token_info = dict(line=expr.lineno, col=expr.col_offset)
name = get_name(expr.exc)
if not name:
# raised a value, too tricky
if not isinstance(expr.exc, TOKENS.CALL):
return None
# raised an instance of an exception
name = get_name(expr.exc.func)
if not name or name[0].islower():
return None
exc = getattr(builtins, name, name)
token_info['col'] = expr.exc.col_offset
return Token(value=exc, **token_info)
def handle_with(expr) -> Optional[Token]:
token_info = dict(line=expr.lineno, col=expr.col_offset)
for item in expr.items:
if isinstance(item, ast.withitem):
item = item.context_expr
else:
item = item[0]
if _is_pathlib_write(item):
return Token(value='Path.open', **token_info)
if not isinstance(item, TOKENS.CALL):
continue
name = get_name(item.func)
if name == 'open':
if _is_open_to_write(item):
return Token(value='open', **token_info)
return None