Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _parse_suboption_packet(self, ts, data):
if " errorParam=" in data:
sre = tokens.OPTIONS_SUBOPTION_ERROR_RE.match(data)
if not sre:
raise RegexParsingError(data)
optype, id, entity, error, error_param = sre.groups()
error, error_param = clean_option_errors(error, error_param)
else:
sre = tokens.OPTIONS_SUBOPTION_RE.match(data)
if not sre:
raise RegexParsingError(data)
optype, id, entity = sre.groups()
error, error_param = None, None
id = int(id)
if not entity:
raise ParsingError("SubOption / target got an empty entity: %r" % (data))
entity = self.parse_entity_or_player(entity)
packet = packets.Option(ts, entity, id, None, optype, error, error_param)
if optype == "subOption":
self._suboption_packet = packet
node = self._option_packet
elif optype == "target":
node = self._suboption_packet or self._option_packet
node.options.append(packet)
def handle_send_option(self, ts, data):
if data.startswith("selectedOption="):
sre = tokens.SEND_OPTION_RE.match(data)
if not sre:
raise RegexParsingError(data)
option, suboption, target, position = sre.groups()
packet = packets.SendOption(ts, int(option), int(suboption), int(target), int(position))
self.current_block.packets.append(packet)
return packet
raise NotImplementedError("Unhandled send option: %r" % (data))
def handle_entity_choices_old(self, ts, data):
if data.startswith("id="):
sre = tokens.CHOICES_CHOICE_OLD_1_RE.match(data)
if sre:
self.register_choices_old_1(ts, *sre.groups())
else:
sre = tokens.CHOICES_CHOICE_OLD_2_RE.match(data)
if not sre:
raise RegexParsingError(data)
self.register_choices_old_2(ts, *sre.groups())
else:
return self.handle_entity_choices(ts, data)
def read_line(self, line):
sre = tokens.TIMESTAMP_RE.match(line)
if not sre:
raise RegexParsingError(line)
level, ts, line = sre.groups()
if line.startswith(tokens.SPECTATOR_MODE_TOKEN):
line = line.replace(tokens.SPECTATOR_MODE_TOKEN, "").strip()
return self.process_spectator_mode(line)
sre = self.line_regex.match(line)
if not sre:
return
method, msg = sre.groups()
msg = msg.strip()
if not self.current_block and "CREATE_GAME" not in msg:
# Ignore messages before the first CREATE_GAME packet
return
for handler in PowerHandler, ChoicesHandler, OptionsHandler:
callback = handler.find_callback(self, method)
def handle_power(self, ts, opcode, data):
self.flush()
if opcode == "CREATE_GAME":
regex, callback = tokens.CREATE_GAME_RE, self.create_game
elif opcode in ("ACTION_START", "BLOCK_START"):
sre = tokens.BLOCK_START_RE.match(data)
if sre is None:
sre = tokens.ACTION_START_OLD_RE.match(data)
if not sre:
raise RegexParsingError(data)
entity, type, index, target = sre.groups()
effectid, effectindex = None, None
else:
type, entity, effectid, effectindex, target = sre.groups()
index = None
self.block_start(ts, entity, type, index, effectid, effectindex, target)
return
elif opcode in ("ACTION_END", "BLOCK_END"):
regex, callback = tokens.BLOCK_END_RE, self.block_end
elif opcode == "FULL_ENTITY":
if data.startswith("FULL_ENTITY - Updating"):
regex, callback = tokens.FULL_ENTITY_UPDATE_RE, self.full_entity_update
else:
regex, callback = tokens.FULL_ENTITY_CREATE_RE, self.full_entity
elif opcode == "SHOW_ENTITY":
regex, callback = tokens.SHOW_ENTITY_RE, self.show_entity
def handle_entities_chosen(self, ts, data):
if data.startswith("id="):
sre = tokens.ENTITIES_CHOSEN_RE.match(data)
if not sre:
raise RegexParsingError(data)
id, player, count = sre.groups()
id = int(id)
player = self.parse_entity_or_player(player)
self._chosen_packet_count = int(count)
self._chosen_packet = packets.ChosenEntities(ts, player, id)
self.current_block.packets.append(self._chosen_packet)
return self._chosen_packet
elif data.startswith("Entities["):
sre = tokens.ENTITIES_CHOSEN_ENTITIES_RE.match(data)
if not sre:
raise RegexParsingError(data)
idx, entity = sre.groups()
id = self.parse_entity_or_player(entity)
if not id:
raise ParsingError("Missing entity chosen %r (%r)" % (id, entity))
if not self._chosen_packet:
raise ParsingError("Entity Chosen outside of choice packet: %r" % (data))
self._chosen_packet.choices.append(id)
if len(self._chosen_packet.choices) > self._chosen_packet_count:
raise ParsingError("Too many choices (expected %r)" % (self._chosen_packet_count))
return id
raise NotImplementedError("Unhandled entities chosen: %r" % (data))
raise RegexParsingError(data)
return self.register_choices(ts, *sre.groups())
elif data.startswith("Source="):
sre = tokens.CHOICES_SOURCE_RE.match(data)
if not sre:
raise RegexParsingError(data)
entity, = sre.groups()
id = self.parse_entity_or_player(entity)
if not self._choice_packet:
raise ParsingError("Source Choice Entity outside of choie packet: %r" % (data))
self._choice_packet.source = id
return id
elif data.startswith("Entities["):
sre = tokens.CHOICES_ENTITIES_RE.match(data)
if not sre:
raise RegexParsingError(data)
idx, entity = sre.groups()
id = self.parse_entity_or_player(entity)
if not id:
raise ParsingError("Missing choice entity %r (%r)" % (id, entity))
if not self._choice_packet:
raise ParsingError("Choice Entity outside of choice packet: %r" % (data))
self._choice_packet.choices.append(id)
return id
raise NotImplementedError("Unhandled entity choice: %r" % (data))
def _parse_option_packet(self, ts, data):
if " errorParam=" in data:
sre = tokens.OPTIONS_OPTION_ERROR_RE.match(data)
optype, id, type, entity, error, error_param = sre.groups()
if not sre:
raise RegexParsingError(data)
error, error_param = clean_option_errors(error, error_param)
else:
sre = tokens.OPTIONS_OPTION_RE.match(data)
if not sre:
raise RegexParsingError(data)
optype, id, type, entity = sre.groups()
error, error_param = None, None
id = int(id)
type = parse_enum(enums.OptionType, type)
if entity:
entity = self.parse_entity_or_player(entity)
self._option_packet = packets.Option(ts, entity, id, type, optype, error, error_param)
if not self._options_packet:
raise ParsingError("Option without a parent option group: %r" % (data))
self._options_packet.options.append(self._option_packet)
self._suboption_packet = None
return self._option_packet
def handle_send_choices(self, ts, data):
if data.startswith("id="):
sre = tokens.SEND_CHOICES_CHOICE_RE.match(data)
if not sre:
raise RegexParsingError(data)
id, type = sre.groups()
id = int(id)
type = parse_enum(enums.ChoiceType, type)
self._send_choice_packet = packets.SendChoices(ts, id, type)
self.current_block.packets.append(self._send_choice_packet)
return self._send_choice_packet
elif data.startswith("m_chosenEntities"):
sre = tokens.SEND_CHOICES_ENTITIES_RE.match(data)
if not sre:
raise RegexParsingError(data)
idx, entity = sre.groups()
id = self.parse_entity_or_player(entity)
if not id:
raise ParsingError("Missing chosen entity %r (%r)" % (id, entity))
if not self._send_choice_packet:
raise ParsingError("Chosen Entity outside of choice packet: %r" % (data))