Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def process_sing_file(self, replay_file_name):
archive = mpyq.MPQArchive(replay_file_name)
contents = archive.header['user_data_header']['content']
header = versions.latest().decode_replay_header(contents)
base_build = header['m_version']['m_baseBuild']
try:
protocol = versions.build(base_build)
except Exception as e:
self.fail('Unsupported base build: {0} ({1!s})'.format(base_build, e))
required_internal_files = ['replay.details',
'replay.details.backup',
'replay.initData',
'replay.game.events',
'replay.message.events',
'replay.tracker.events'
]
for internal_file_name in required_internal_files:
contents = archive.read_file(internal_file_name)
self.assertIsNotNone(contents)
# just decode init data
def process_sing_file(self, replay_file_name):
archive = mpyq.MPQArchive(replay_file_name)
contents = archive.header['user_data_header']['content']
header = versions.latest().decode_replay_header(contents)
base_build = header['m_version']['m_baseBuild']
try:
protocol = versions.build(base_build)
except Exception as e:
self.fail('Unsupported base build: {0} ({1!s})'.format(base_build, e))
required_internal_files = ['replay.details',
'replay.details.backup',
'replay.initData',
'replay.game.events',
'replay.message.events',
'replay.tracker.events'
]
for internal_file_name in required_internal_files:
def test_latest(self):
p = _versions.latest()
self.assertIsNotNone(p)
def test_specific(self):
p = _versions.build(58400)
self.assertIsNotNone(p)
self.assertRaises(ImportError, lambda: _versions.build(42))
filters.insert(0, StatCollectionFilter())
def process_event(event):
for f in filters:
event = f.process(event)
# Read the protocol header, this can be read with any protocol
contents = archive.header['user_data_header']['content']
header = latest().decode_replay_header(contents)
if args.header:
process_event(header)
# The header's baseBuild determines which protocol to use
baseBuild = header['m_version']['m_baseBuild']
try:
protocol = build(baseBuild)
except Exception as e:
print('Unsupported base build: {0} ({1!s})'.format(baseBuild, e),
file=sys.stderr)
sys.exit(1)
# Process game metadata
if args.all or args.metadata:
contents = read_contents(archive, 'replay.gamemetadata.json')
process_event(json.loads(contents))
# Print protocol details
if args.all or args.details:
contents = read_contents(archive, 'replay.details')
details = protocol.decode_replay_details(contents)
details = process_details_data(details)
process_event(details)
"""Download the battle.net cache files needed by replays."""
test_looks_like_battle_net(bnet_base)
downloaded = 0
failed = set()
for replay_path in replays:
try:
archive = mpyq.MPQArchive(replay_path)
except ValueError:
print("Failed to parse replay:", replay_path)
continue
extracted = archive.extract()
contents = archive.header["user_data_header"]["content"]
header = s2versions.latest().decode_replay_header(contents)
base_build = header["m_version"]["m_baseBuild"]
prot = s2versions.build(base_build)
details_bytes = (extracted.get(b"replay.details") or
extracted.get(b"replay.details.backup"))
details = prot.decode_replay_details(details_bytes)
for map_handle in details["m_cacheHandles"]:
# server = map_handle[4:8].decode("utf-8").strip("\x00 ")
map_hash = binascii.b2a_hex(map_handle[8:]).decode("utf8")
file_type = map_handle[0:4].decode("utf8")
cache_path = os.path.join(
bnet_base, "Cache", map_hash[0:2], map_hash[2:4],
"%s.%s" % (map_hash, file_type))
url = DEPOT_URL_TEMPLATE.format(hash=map_hash, type=file_type)
if not os.path.exists(cache_path) and url not in failed:
elif not args.quiet:
filters.insert(0, PrettyPrintFilter(sys.stdout))
if args.types:
filters.insert(0, TypeDumpFilter())
if args.stats:
filters.insert(0, StatCollectionFilter())
def process_event(event):
for f in filters:
event = f.process(event)
# Read the protocol header, this can be read with any protocol
contents = archive.header['user_data_header']['content']
header = latest().decode_replay_header(contents)
if args.header:
process_event(header)
# The header's baseBuild determines which protocol to use
baseBuild = header['m_version']['m_baseBuild']
try:
protocol = build(baseBuild)
except Exception as e:
print('Unsupported base build: {0} ({1!s})'.format(baseBuild, e),
file=sys.stderr)
sys.exit(1)
# Process game metadata
if args.all or args.metadata:
contents = read_contents(archive, 'replay.gamemetadata.json')
process_event(json.loads(contents))
def update_battle_net_cache(replays, bnet_base):
"""Download the battle.net cache files needed by replays."""
test_looks_like_battle_net(bnet_base)
downloaded = 0
failed = set()
for replay_path in replays:
try:
archive = mpyq.MPQArchive(replay_path)
except ValueError:
print("Failed to parse replay:", replay_path)
continue
extracted = archive.extract()
contents = archive.header["user_data_header"]["content"]
header = s2versions.latest().decode_replay_header(contents)
base_build = header["m_version"]["m_baseBuild"]
prot = s2versions.build(base_build)
details_bytes = (extracted.get(b"replay.details") or
extracted.get(b"replay.details.backup"))
details = prot.decode_replay_details(details_bytes)
for map_handle in details["m_cacheHandles"]:
# server = map_handle[4:8].decode("utf-8").strip("\x00 ")
map_hash = binascii.b2a_hex(map_handle[8:]).decode("utf8")
file_type = map_handle[0:4].decode("utf8")
cache_path = os.path.join(
bnet_base, "Cache", map_hash[0:2], map_hash[2:4],
"%s.%s" % (map_hash, file_type))
file=sys.stderr)
sys.exit(1)
diff(version_list[0], version_list[1])
return
# Check/test the replay file
if args.replay_file is None:
print(".S2Replay file not specified", file=sys.stderr)
sys.exit(1)
archive = MPQArchive(args.replay_file)
filters = []
if args.json:
filters.insert(0, JSONOutputFilter(sys.stdout))
elif args.ndjson:
filters.insert(0, NDJSONOutputFilter(sys.stdout))
elif not args.quiet:
filters.insert(0, PrettyPrintFilter(sys.stdout))
if args.types:
filters.insert(0, TypeDumpFilter())
if args.stats:
filters.insert(0, StatCollectionFilter())
def process_event(event):
for f in filters:
event = f.process(event)
# Read the protocol header, this can be read with any protocol