Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
transmitter.timeout = timeout
transmitter.is_quiet = not is_parse_icmp_reply
transmitter.timestamp = timestamp != TimestampFormat.NONE
try:
result = transmitter.ping()
except CommandError as e:
logger.error(e)
sys.exit(e.errno)
ping_result_text = result.stdout
if result.returncode != 0:
if result.stderr:
logger.error(result.stderr)
ping_parser = PingParsing()
stats = ping_parser.parse(ping_result_text)
output = stats.as_dict()
if is_parse_icmp_reply:
output["icmp_replies"] = stats.icmp_replies
return (dest_or_file, output)
deadline,
timeout,
options.icmp_reply,
options.timestamp,
)
)
for future in futures.as_completed(future_list):
key, ping_data = future.result()
output[key] = ping_data
finally:
logger.debug("shutdown ProcessPoolExecutor")
executor.shutdown()
else:
ping_result_text = sys.stdin.read()
ping_parser = PingParsing()
stats = ping_parser.parse(ping_result_text)
output = stats.as_dict()
if options.icmp_reply:
output["icmp_replies"] = stats.icmp_replies
print_result(dumps_dict(output, timestamp_format=options.timestamp, indent=options.indent))
return 0