Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
with pytest.raises(APIError):
op = InOperator('certname')
op.add_array(inv1)
with pytest.raises(APIError):
op = InOperator('certname')
op.add_array(inv2)
with pytest.raises(APIError):
op = InOperator('certname')
op.add_array(arr)
op.add_array(arr)
with pytest.raises(APIError):
op = InOperator('certname')
op.add_array(arr)
ex = ExtractOperator()
ex.add_field("certname")
op.add_query(ex)
def test_events_endpoint(self):
assert str(InOperator('certname')) == \
'["in", "certname"]'
op = InOperator('certname')
ex = ExtractOperator()
ex.add_field("certname")
op.add_query(ex)
assert repr(op) == 'Query: ["in", "certname", ' \
'["extract", ["certname"]]]'
def test_fromoperator(self):
op = InOperator('certname')
ex = ExtractOperator()
ex.add_field(["certname", "facts"])
fr = FromOperator("facts")
fr.add_query(ex)
fr.add_offset(10)
op.add_query(fr)
assert repr(op) == 'Query: ["in", "certname", ' \
'["from", "facts", ["extract", ' \
'["certname", "facts"]], ["offset", 10]]]'
# last example on page
# https://puppet.com/docs/puppetdb/5.1/api/query/v4/ast.html
op = InOperator('certname')
ex = ExtractOperator()
def test_add_array(self):
arr = [1, "2", 3]
op = InOperator('certname')
op.add_array(arr)
assert repr(op) == 'Query: ["in", "certname", ' \
'["array", [1, "2", 3]]]'
op = InOperator('certname')
ex = ExtractOperator()
ex.add_field(["certname", "facts"])
fr = FromOperator("facts")
fr.add_query(ex)
fr.add_offset(10)
op.add_query(fr)
assert repr(op) == 'Query: ["in", "certname", ' \
'["from", "facts", ["extract", ' \
'["certname", "facts"]], ["offset", 10]]]'
# last example on page
# https://puppet.com/docs/puppetdb/5.1/api/query/v4/ast.html
op = InOperator('certname')
ex = ExtractOperator()
ex.add_field('certname')
fr = FromOperator('fact_contents')
nd = AndOperator()
nd.add(EqualsOperator("path",
["networking", "eth0", "macaddresses", 0]))
nd.add(EqualsOperator("value", "aa:bb:cc:dd:ee:00"))
ex.add_query(nd)
fr.add_query(ex)
op.add_query(fr)
assert str(op) == '["in", "certname", ' \
'["from", "fact_contents", ' \
def test_events_endpoint(self):
assert str(InOperator('certname')) == \
'["in", "certname"]'
op = InOperator('certname')
ex = ExtractOperator()
ex.add_field("certname")
op.add_query(ex)
assert repr(op) == 'Query: ["in", "certname", ' \
'["extract", ["certname"]]]'
def test_multiple_add_query(self):
with pytest.raises(APIError):
op = InOperator('certname')
op.add_query(ExtractOperator())
op.add_query(ExtractOperator())
def test_invalid_add_array(self):
arr = [1, 2, 3]
inv1 = [1, [2, 3]]
inv2 = []
with pytest.raises(APIError):
op = InOperator('certname')
op.add_array(inv1)
with pytest.raises(APIError):
op = InOperator('certname')
op.add_array(inv2)
with pytest.raises(APIError):
op = InOperator('certname')
op.add_array(arr)
op.add_array(arr)
with pytest.raises(APIError):
op = InOperator('certname')
op.add_array(arr)
ex = ExtractOperator()
def add(self, query):
if type(query) == list:
for i in query:
self.add(i)
elif type(query) == str:
self.operations.append(json.loads(query))
elif isinstance(query, (BinaryOperator, InOperator,
BooleanOperator)):
self.operations.append(query.json_data())
else:
raise APIError("Can only accept fixed-string queries, arrays " +
"or operator objects")