Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# add
stub.add(func='fname', contract=Category.RAISES, value='TypeError')
with pytest.raises(ValueError, match='unsupported contract'):
stub.add(func='fname', contract=Category.POST, value='SyntaxError')
assert stub._content == {'fname': {'raises': ['TypeError']}}
# do not add twice
stub.add(func='fname', contract=Category.RAISES, value='TypeError')
assert stub._content == {'fname': {'raises': ['TypeError']}}
# get
assert stub.get(func='fname', contract=Category.RAISES) == frozenset({'TypeError'})
with pytest.raises(ValueError, match='unsupported contract'):
stub.get(func='fname', contract=Category.POST)
assert stub.get(func='unknown', contract=Category.RAISES) == frozenset()
# dump
stub.dump()
content = json.loads(path.read_text(encoding='utf8'))
assert content == {'fname': {'raises': ['TypeError']}}
# load
stub2 = StubFile(path=path)
stub2.load()
assert stub2._content == {'fname': {'raises': ['TypeError']}}
def test_stub_file(tmp_path: Path):
path = tmp_path / 'example.json'
stub = StubFile(path=path)
# add
stub.add(func='fname', contract=Category.RAISES, value='TypeError')
with pytest.raises(ValueError, match='unsupported contract'):
stub.add(func='fname', contract=Category.POST, value='SyntaxError')
assert stub._content == {'fname': {'raises': ['TypeError']}}
# do not add twice
stub.add(func='fname', contract=Category.RAISES, value='TypeError')
assert stub._content == {'fname': {'raises': ['TypeError']}}
# get
assert stub.get(func='fname', contract=Category.RAISES) == frozenset({'TypeError'})
with pytest.raises(ValueError, match='unsupported contract'):
stub.get(func='fname', contract=Category.POST)
assert stub.get(func='unknown', contract=Category.RAISES) == frozenset()
# dump
stub.dump()
def __call__(self, func: Func, stubs: StubsManager = None) -> Iterator[Error]:
for contract in func.contracts:
if contract.category != Category.RAISES:
continue
yield from self._check(func=func, contract=contract, stubs=stubs)
def get(self, func: str, contract: Category) -> FrozenSet[str]:
if contract != Category.RAISES:
raise ValueError('unsupported contract')
values = self._content.get(func, {}).get(contract.value, [])
return frozenset(values)
def add(self, func: str, contract: Category, value: str) -> None:
if contract != Category.RAISES:
raise ValueError('unsupported contract')
contracts = self._content.setdefault(func, dict())
values = contracts.setdefault(contract.value, [])
if value in values:
return
values.append(value)
values.sort()
def __call__(self, func: Func, stubs: StubsManager = None) -> Iterator[Error]:
for contract in func.contracts:
if contract.category != Category.POST:
continue
yield from self._check(func=func, contract=contract)
def from_astroid(cls, tree: astroid.Module) -> List['Func']:
funcs = []
for expr in tree.body:
if not isinstance(expr, astroid.FunctionDef):
continue
contracts = []
if expr.decorators:
for category, args in get_contracts(expr.decorators.nodes):
contract = Contract(args=args, category=Category(category))
contracts.append(contract)
funcs.append(cls(
name=expr.name,
body=expr.body,
contracts=contracts,
line=expr.lineno,
col=expr.col_offset,
))
return funcs
def __call__(self, func: Func, stubs: StubsManager = None) -> Iterator[Error]:
for contract in func.contracts:
if contract.category != Category.PURE:
continue
yield from self._check(func=func)
return
def __call__(self, func: Func, stubs: StubsManager = None) -> Iterator[Error]:
for contract in func.contracts:
if contract.category != Category.SILENT:
continue
yield from self._check(func=func)
# if `@deal.silent` is duplicated, check the function only once
return
for func in infer(expr.func):
if type(func) is not astroid.FunctionDef:
continue
if not func.decorators:
continue
code = 'def f({}):0'.format(func.args.as_string())
func_args = ast.parse(code).body[0].args # type: ignore
for category, contract_args in get_contracts(func.decorators.nodes):
if category != 'pre':
continue
contract = Contract(
args=contract_args,
category=Category.PRE,
func_args=func_args,
)
try:
result = contract.run(*args, **kwargs)
except NameError:
continue
if result is False or type(result) is str:
msg = _format_message(args, kwargs)
yield Token(value=msg, line=expr.lineno, col=expr.col_offset)