Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
table = get_stock_rstb()
rstb_values = {}
for mod in util.get_installed_mods():
rstb_values.update(get_mod_rstb_values(mod))
if (util.get_master_modpack_dir() / 'logs' / 'rstb.log').exists():
rstb_values.update(get_mod_rstb_values(util.get_master_modpack_dir()))
if (util.get_master_modpack_dir() / 'logs' / 'map.log').exists():
rstb_values.update(get_mod_rstb_values(
util.get_master_modpack_dir(), log_name='map.log'))
table, rstb_changes = merge_rstb(table, rstb_values)
for change in rstb_changes:
if not change[1] or (change[1] and verbose):
print(change[0])
for bootup_pack in util.get_master_modpack_dir().glob('content/Pack/Bootup_*.pack'):
lang = util.get_file_language(bootup_pack)
if table.is_in_table(f'Message/Msg_{lang}.product.sarc'):
table.delete_entry(f'Message/Msg_{lang}.product.sarc')
rstb_path = util.get_master_modpack_dir() / 'content' / 'System' / 'Resource' / \
'ResourceSizeTable.product.srsizetable'
if not rstb_path.exists():
rstb_path.parent.mkdir(parents=True, exist_ok=True)
with rstb_path.open('wb') as r_file:
with io.BytesIO() as buf:
table.write(buf, True)
r_file.write(util.compress(buf.getvalue()))
rstb_log = util.get_master_modpack_dir() / 'logs' / 'master-rstb.log'
rstb_log.parent.mkdir(parents=True, exist_ok=True)
with rstb_log.open('w', encoding='utf-8') as r_file:
def merge_events():
""" Merges all installed event info mods """
event_mods = [mod for mod in util.get_installed_mods() \
if (mod.path / 'logs' / 'eventinfo.yml').exists()]
merged_events = util.get_master_modpack_dir() / 'logs' / 'eventinfo.byml'
event_merge_log = util.get_master_modpack_dir() / 'logs' / 'eventinfo.log'
event_mod_hash = str(hash(tuple(event_mods)))
if not event_mods:
print('No event info merging necessary')
if merged_events.exists():
merged_events.unlink()
event_merge_log.unlink()
try:
stock_eventinfo = util.get_nested_file_bytes(
str(util.get_game_file('Pack/Bootup.pack')) + '//Event/EventInfo.product.sbyml',
unyaz=False
)
util.inject_file_into_bootup(
'Event/EventInfo.product.sbyml',
stock_eventinfo
size = guess_bfres_size(file)
canon = util.get_canon_name(file.relative_to(util.get_master_modpack_dir()))
if canon:
diffs[canon] = size
sarc_files = [file for file in files if util.is_file_sarc(str(file)) \
and file.suffix != '.ssarc']
if sarc_files:
num_threads = min(multiprocessing.cpu_count(), len(sarc_files))
pool = original_pool or multiprocessing.Pool(processes=num_threads)
results = pool.map(_get_sizes_in_sarc, sarc_files)
for result in results:
diffs.update(result)
if not original_pool:
pool.close()
pool.join()
with (util.get_master_modpack_dir() / 'logs' / 'rstb.log').open('w', encoding='utf-8') as log:
log.write('name,size,path\n')
for canon, size in diffs.items():
log.write(f'{canon},{size},//\n')
def cache_merged_aamp(file: str, data: bytes):
out = Path(util.get_master_modpack_dir() / 'logs' / 'dm' / file)
out.parent.mkdir(parents=True, exist_ok=True)
out.write_bytes(data)
diffs = consolidate_diff_files(get_deepmerge_diffs(only_these=only_these))
print('Performing deep merge...')
if not diffs:
return
num_threads = min(multiprocessing.cpu_count(), len(diffs))
pool = original_pool or multiprocessing.Pool(processes=num_threads)
pool.map(partial(threaded_merge, verbose=verbose), diffs.items())
if not original_pool:
pool.close()
pool.join()
if not wait_rstb:
bcml.rstable.generate_master_rstb()
(util.get_master_modpack_dir() / 'logs').mkdir(parents=True, exist_ok=True)
with merge_log.open('w', encoding='utf-8') as l_file:
for file_type in diffs:
for file in diffs[file_type]:
l_file.write(f'{file}\n')
if only_these and file in old_merges:
old_merges.remove(file)
if only_these:
for file in old_merges:
l_file.write(f'{file}\n')
def remerge(self, params):
try:
if not util.get_installed_mods():
if util.get_master_modpack_dir().exists():
rmtree(util.get_master_modpack_dir())
install.link_master_mod()
return
if params["name"] == "all":
install.refresh_merges()
else:
[
m()
for m in mergers.get_mergers()
if m().friendly_name == params["name"]
][0].perform_merge()
except Exception as err: # pylint: disable=broad-except
raise Exception(
f"There was an error merging your mods. {str(err)}\n"
"Note that this could leave your game in an unplayable state."
file.unlink()
for sarc_file in sarcs:
try:
sarcs[sarc_file].insert(0, util.get_game_file(sarc_file))
except FileNotFoundError:
continue
if not sarcs:
print('No SARC merging necessary')
return
num_threads = min(cpu_count(), len(sarcs))
pool = self._pool or Pool(processes=num_threads)
print(f'Merging {len(sarcs)} SARC files...')
results = pool.starmap(merge_sarcs, sarcs.items())
for result in results:
file, data = result
output_path = util.get_master_modpack_dir() / file
output_path.parent.mkdir(parents=True, exist_ok=True)
if output_path.suffix.startswith('.s'):
data = util.compress(data)
output_path.write_bytes(data)
if not self._pool:
pool.close()
pool.join()
print('Finished merging SARCs')
verbose: bool = False, original_pool: multiprocessing.Pool = None):
"""
Merges installed text mods and saves the new Bootup_XXxx.pack, fixing the RSTB if needed
:param lang: The game language to use, defaults to USen.
:type lang: str, optional
:param tmp_dir: The temp directory to extract to, defaults to "tmp_text" in BCML's work dir.
:type tmp_dir: class:`pathlib.Path`, optional
:param verbose: Whether to display more detailed output, defaults to False
:type verbose: bool, optional
"""
print(f'Loading text mods for language {lang}...')
text_mods = get_modded_text_entries(lang)
if not text_mods:
print('No text merging necessary.')
old_path = util.get_master_modpack_dir() / 'content' / 'Pack' / \
f'Bootup_{lang}.pack'
if old_path.exists():
old_path.unlink()
return
if verbose:
print(f' Found {len(text_mods)} text mods to be merged')
if tmp_dir.exists():
if verbose:
print('Cleaning temp directory...')
shutil.rmtree(tmp_dir, ignore_errors=True)
print('Extracting clean MSYTs...')
try:
extract_ref_msyts(lang, for_merge=True, tmp_dir=tmp_dir)
except FileNotFoundError:
return
def get_merged_files() -> List[str]:
"""Gets a list of all currently deep merged files"""
log = util.get_master_modpack_dir() / 'logs' / 'deepmerge.log'
if not log.exists():
return []
else:
with log.open('r') as l_file:
return l_file.readlines()
def get_bootup_injection(self):
tmp_sarc = util.get_master_modpack_dir() / 'logs' / 'gamedata.sarc'
if tmp_sarc.exists():
return (
'GameData/gamedata.ssarc',
util.compress(tmp_sarc.read_bytes())
)
else:
return