Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
destination=destination,
packet_transmit=packet_transmit,
packet_receive=packet_receive,
duplicates=duplicates,
icmp_replies=icmp_replies,
)
rtt_pattern = (
pp.Literal("Minimum = ")
+ pp.Word(pp.nums)
+ pp.Literal("ms, Maximum = ")
+ pp.Word(pp.nums)
+ pp.Literal("ms, Average = ")
+ pp.Word(pp.nums)
)
parse_list = rtt_pattern.parseString(_to_unicode(rtt_line))
return PingStats(
destination=destination,
packet_transmit=packet_transmit,
packet_receive=packet_receive,
duplicates=duplicates,
rtt_min=float(parse_list[1]),
rtt_avg=float(parse_list[5]),
rtt_max=float(parse_list[3]),
icmp_replies=icmp_replies,
)
packet_transmit=packet_transmit,
packet_receive=packet_receive,
duplicates=duplicates,
icmp_replies=icmp_replies,
)
rtt_pattern = (
pp.Literal("round-trip min/avg/max =")
+ pp.Word(pp.nums + ".")
+ "/"
+ pp.Word(pp.nums + ".")
+ "/"
+ pp.Word(pp.nums + ".")
+ pp.Word(pp.nums + "ms")
)
parse_list = rtt_pattern.parseString(_to_unicode(rtt_line))
return PingStats(
destination=destination,
packet_transmit=packet_transmit,
packet_receive=packet_receive,
duplicates=duplicates,
rtt_min=float(parse_list[1]),
rtt_avg=float(parse_list[3]),
rtt_max=float(parse_list[5]),
icmp_replies=icmp_replies,
)
if typepy.is_not_null_string(ping_message.stdout):
ping_message = ping_message.stdout
except AttributeError:
pass
logger.debug("parsing ping result: {}".format(ping_message))
self.__parser = NullPingParser()
if typepy.is_null_string(ping_message):
logger.debug("ping_message is empty")
self.__stats = PingStats()
return self.__stats
ping_lines = _to_unicode(ping_message).splitlines()
parser_class_list = (
LinuxPingParser,
WindowsPingParser,
MacOsPingParser,
AlpineLinuxPingParser,
)
for parser_class in parser_class_list:
self.__parser = parser_class()
try:
self.__stats = self.__parser.parse(ping_lines)
return self.__stats
except ParseError as e:
if e.reason != ParseErrorReason.HEADER_NOT_FOUND:
raise e
except pp.ParseException:
def parse(self, ping_message):
icmp_replies = self._parse_icmp_reply(ping_message)
stats_headline, packet_info_line, body_line_list = self._preprocess_parse_stats(
lines=ping_message
)
packet_pattern = (
pp.Word(pp.nums)
+ pp.Literal("packets transmitted,")
+ pp.Word(pp.nums)
+ pp.Literal("packets received,")
)
destination = self._parse_destination(stats_headline)
duplicates = self._parse_duplicate(packet_info_line)
parse_list = packet_pattern.parseString(_to_unicode(packet_info_line))
packet_transmit = int(parse_list[0])
packet_receive = int(parse_list[2])
is_valid_data = True
try:
rtt_line = body_line_list[1]
except IndexError:
is_valid_data = False
if not is_valid_data or typepy.is_null_string(rtt_line):
return PingStats(
destination=destination,
packet_transmit=packet_transmit,
packet_receive=packet_receive,
duplicates=duplicates,
icmp_replies=icmp_replies,
def parse(self, ping_message):
icmp_replies = self._parse_icmp_reply(ping_message)
stats_headline, packet_info_line, body_line_list = self._preprocess_parse_stats(
lines=ping_message
)
packet_pattern = (
pp.Word(pp.nums)
+ pp.Literal("packets transmitted,")
+ pp.Word(pp.nums)
+ pp.Literal("packets received,")
)
destination = self._parse_destination(stats_headline)
duplicates = self._parse_duplicate(packet_info_line)
parse_list = packet_pattern.parseString(_to_unicode(packet_info_line))
packet_transmit = int(parse_list[0])
packet_receive = int(parse_list[2])
is_valid_data = True
try:
rtt_line = body_line_list[1]
except IndexError:
is_valid_data = False
if not is_valid_data or typepy.is_null_string(rtt_line):
return PingStats(
destination=destination,
packet_transmit=packet_transmit,
packet_receive=packet_receive,
duplicates=duplicates,
icmp_replies=icmp_replies,
def _parse_duplicate(self, line):
packet_pattern = (
pp.SkipTo(pp.Word(pp.nums) + pp.Literal("duplicates,"))
+ pp.Word(pp.nums)
+ pp.Literal("duplicates,")
)
try:
duplicate_parse_list = packet_pattern.parseString(_to_unicode(line))
except pp.ParseException:
return 0
return int(duplicate_parse_list[-2])