Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_stop_conn_error():
def my_generator():
yield 1
raise ConnectionError
yield 2
gen = utils.yield_or_stop(my_generator())
for val in gen:
assert 1 == val
def test_iter():
test_list = (0, 1, 2, 3)
def my_generator():
for i in test_list:
yield i
gen = utils.yield_or_stop(my_generator())
assert isinstance(gen, GeneratorType)
i = 0
for val in gen:
assert i == val
i = i + 1
def test_stop_empty():
def my_generator():
yield 1
raise EmptyResponseError
yield 2
gen = utils.yield_or_stop(my_generator())
for val in gen:
assert 1 == val
def test_stop_http_error():
def my_generator():
yield 1
raise HTTPError
yield 2
gen = utils.yield_or_stop(my_generator())
for val in gen:
assert 1 == val
unrep_query = OrOperator()
unrep_query.add(NullOperator('report_timestamp', True))
unrep_query.add(LessEqualOperator('report_timestamp', unreported))
query.add(unrep_query)
if len(query.operations) == 0:
query = None
nodelist = puppetdb.nodes(
query=query,
unreported=app.config['UNRESPONSIVE_HOURS'],
with_status=True,
with_event_numbers=app.config['WITH_EVENT_NUMBERS'])
nodes = []
for node in yield_or_stop(nodelist):
if status_arg:
if node.status == status_arg:
nodes.append(node)
else:
nodes.append(node)
return Response(stream_with_context(
stream_template('nodes.html',
nodes=nodes,
envs=envs,
current_env=env)))
query.add(EqualsOperator("certname", node_name))
query.add(report_id_query)
reports = puppetdb.reports(query=query)
try:
report = next(reports)
except StopIteration:
abort(404)
report.version = commonmark.commonmark(report.version)
return render_template(
'report.html',
report=report,
events=yield_or_stop(report.events()),
logs=report.logs,
metrics=report.metrics,
envs=envs,
current_env=env)