Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_and_with_no_operations(self):
op = AndOperator()
with pytest.raises(APIError):
repr(op)
with pytest.raises(APIError):
str(op)
with pytest.raises(APIError):
str(op)
fr.add_query(ex)
fr.add_offset(10)
op.add_query(fr)
assert repr(op) == 'Query: ["in", "certname", ' \
'["from", "facts", ["extract", ' \
'["certname", "facts"]], ["offset", 10]]]'
# last example on page
# https://puppet.com/docs/puppetdb/5.1/api/query/v4/ast.html
op = InOperator('certname')
ex = ExtractOperator()
ex.add_field('certname')
fr = FromOperator('fact_contents')
nd = AndOperator()
nd.add(EqualsOperator("path",
["networking", "eth0", "macaddresses", 0]))
nd.add(EqualsOperator("value", "aa:bb:cc:dd:ee:00"))
ex.add_query(nd)
fr.add_query(ex)
op.add_query(fr)
assert str(op) == '["in", "certname", ' \
'["from", "fact_contents", ' \
def test_and_operator(self):
op = AndOperator()
op.add(EqualsOperator("operatingsystem", "CentOS"))
op.add([EqualsOperator("architecture", "x86_64"),
GreaterOperator("operatingsystemmajrelease", 6)])
assert str(op) == '["and", ["=", "operatingsystem", "CentOS"], '\
'["=", "architecture", "x86_64"], '\
'[">", "operatingsystemmajrelease", 6]]'
assert repr(op) == 'Query: ["and", '\
'["=", "operatingsystem", "CentOS"], '\
'["=", "architecture", "x86_64"], '\
'[">", "operatingsystemmajrelease", 6]]'
assert str(op) == '["and", ["=", "operatingsystem", "CentOS"], ' \
'["=", "architecture", "x86_64"], '\
'[">", "operatingsystemmajrelease", 6]]'
with pytest.raises(APIError):
def _build_query(env, start, end, certname=None):
"""Build a extract query with optional certname and environment."""
query = ExtractOperator()
query.add_field(FunctionOperator('count'))
query.add_field('status')
subquery = AndOperator()
subquery.add(GreaterEqualOperator('producer_timestamp', start))
subquery.add(LessOperator('producer_timestamp', end))
if certname is not None:
subquery.add(EqualsOperator('certname', certname))
if env != '*':
subquery.add(EqualsOperator('environment', env))
query.add_query(subquery)
query.add_group_by("status")
return query
def __init__(self):
super(AndOperator, self).__init__("and")