Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_sum_function(self):
assert str(FunctionOperator('sum', 'memoryfree_mb')) == \
'["function", "sum", "memoryfree_mb"]'
assert repr(FunctionOperator('sum', 'memoryfree_mb')) == \
'Query: ["function", "sum", "memoryfree_mb"]'
assert str(FunctionOperator('sum', 'memoryfree_mb')) == \
'["function", "sum", "memoryfree_mb"]'
with pytest.raises(APIError):
FunctionOperator("sum")
def test_max_function(self):
assert str(FunctionOperator('max', 'facterversion')) == \
'["function", "max", "facterversion"]'
assert repr(FunctionOperator('max', 'facterversion')) == \
'Query: ["function", "max", "facterversion"]'
assert str(FunctionOperator('max', 'facterversion')) == \
'["function", "max", "facterversion"]'
with pytest.raises(APIError):
FunctionOperator("max")
def test_min_function(self):
assert str(FunctionOperator('min', 'kernelversion')) == \
'["function", "min", "kernelversion"]'
assert repr(FunctionOperator('min', 'kernelversion')) == \
'Query: ["function", "min", "kernelversion"]'
assert str(FunctionOperator('min', 'kernelversion')) == \
'["function", "min", "kernelversion"]'
with pytest.raises(APIError):
FunctionOperator("min")
def test_sum_function(self):
assert str(FunctionOperator('sum', 'memoryfree_mb')) == \
'["function", "sum", "memoryfree_mb"]'
assert repr(FunctionOperator('sum', 'memoryfree_mb')) == \
'Query: ["function", "sum", "memoryfree_mb"]'
assert str(FunctionOperator('sum', 'memoryfree_mb')) == \
'["function", "sum", "memoryfree_mb"]'
with pytest.raises(APIError):
FunctionOperator("sum")
def test_avg_function(self):
assert str(FunctionOperator('avg', 'uptime')) == \
'["function", "avg", "uptime"]'
assert repr(FunctionOperator('avg', 'uptime')) == \
'Query: ["function", "avg", "uptime"]'
assert str(FunctionOperator('avg', 'uptime')) == \
'["function", "avg", "uptime"]'
with pytest.raises(APIError):
FunctionOperator("avg")
def test_with_add_function_operator(self):
op = ExtractOperator()
op.add_field(FunctionOperator('to_string',
'producer_timestamp',
'FMDAY'))
op.add_field(FunctionOperator('count'))
op.add_group_by(FunctionOperator('to_string',
'producer_timestamp',
'FMDAY'))
assert str(op) == '["extract", '\
'[["function", "to_string", "producer_timestamp", "FMDAY"], '\
'["function", "count"]], '\
'["group_by", '\
'["function", "to_string", "producer_timestamp", "FMDAY"]]]'
assert repr(op) == 'Query: ["extract", '\
'[["function", "to_string", "producer_timestamp", "FMDAY"], '\
'["function", "count"]], '\
'["group_by", '\
'["function", "to_string", "producer_timestamp", "FMDAY"]]]'
assert str(op) == '["extract", ' \
'[["function", "to_string", "producer_timestamp", "FMDAY"], '\
def test_with_add_function_operator(self):
op = ExtractOperator()
op.add_field(FunctionOperator('to_string',
'producer_timestamp',
'FMDAY'))
op.add_field(FunctionOperator('count'))
op.add_group_by(FunctionOperator('to_string',
'producer_timestamp',
'FMDAY'))
assert str(op) == '["extract", '\
'[["function", "to_string", "producer_timestamp", "FMDAY"], '\
'["function", "count"]], '\
'["group_by", '\
'["function", "to_string", "producer_timestamp", "FMDAY"]]]'
assert repr(op) == 'Query: ["extract", '\
'[["function", "to_string", "producer_timestamp", "FMDAY"], '\
'["function", "count"]], '\
'["group_by", '\
'["function", "to_string", "producer_timestamp", "FMDAY"]]]'
assert str(op) == '["extract", ' \
'[["function", "to_string", "producer_timestamp", "FMDAY"], '\
def test_min_function(self):
assert str(FunctionOperator('min', 'kernelversion')) == \
'["function", "min", "kernelversion"]'
assert repr(FunctionOperator('min', 'kernelversion')) == \
'Query: ["function", "min", "kernelversion"]'
assert str(FunctionOperator('min', 'kernelversion')) == \
'["function", "min", "kernelversion"]'
with pytest.raises(APIError):
FunctionOperator("min")
def test_max_function(self):
assert str(FunctionOperator('max', 'facterversion')) == \
'["function", "max", "facterversion"]'
assert repr(FunctionOperator('max', 'facterversion')) == \
'Query: ["function", "max", "facterversion"]'
assert str(FunctionOperator('max', 'facterversion')) == \
'["function", "max", "facterversion"]'
with pytest.raises(APIError):
FunctionOperator("max")
def _build_query(env, start, end, certname=None):
"""Build a extract query with optional certname and environment."""
query = ExtractOperator()
query.add_field(FunctionOperator('count'))
query.add_field('status')
subquery = AndOperator()
subquery.add(GreaterEqualOperator('producer_timestamp', start))
subquery.add(LessOperator('producer_timestamp', end))
if certname is not None:
subquery.add(EqualsOperator('certname', certname))
if env != '*':
subquery.add(EqualsOperator('environment', env))
query.add_query(subquery)
query.add_group_by("status")
return query