Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def perturb():
global Y
Y = np.zeros(n)
for i in range(n):
v = X[i]
x = (xxhash.xxh32(str(v), seed=i).intdigest() % g)
y = x
p_sample = np.random.random_sample()
# the following two are equivalent
# if p_sample > p:
# while not y == x:
# y = np.random.randint(0, g)
if p_sample > p - q:
# perturb
y = np.random.randint(0, g)
Y[i] = y
f'Msg_{lang}_hashes.csv'
if hash_table.exists():
get_msbt_hashes.texthashes[lang] = {}
with hash_table.open('r') as h_file:
csv_loop = csv.reader(h_file)
for row in csv_loop:
get_msbt_hashes.texthashes[lang][row[0]] = row[1]
elif util.get_game_file(f'Pack/Bootup_{lang}.pack').exists():
get_msbt_hashes.texthashes[lang] = {}
with util.get_game_file(f'Pack/Bootup_{lang}.pack').open('rb') as b_file:
bootup_pack = sarc.read_file_and_make_sarc(b_file)
msg_bytes = util.decompress(
bootup_pack.get_file_data(f'Message/Msg_{lang}.product.ssarc').tobytes())
msg_pack = sarc.SARC(msg_bytes)
for msbt in msg_pack.list_files():
get_msbt_hashes.texthashes[lang][msbt] = xxhash.xxh32(
msg_pack.get_file_data(msbt)).hexdigest()
return get_msbt_hashes.texthashes[lang]
from contextlib import closing
from filecmp import cmp as filecmp
from math import ceil
from multiprocessing.pool import ThreadPool
from os.path import abspath
from stat import S_IFMT, S_ISLNK
import xxhash
from .structs import Cache, DupInfo, FileInfo, FilterType, SkipException
from .utils.fs import (blksize, checksum, fsdecode, is_archived, is_hidden,
is_os64, is_system, remove, sidesum, signature,
splitpaths, walk)
_xxhash_xxh = xxhash.xxh64 if is_os64 else xxhash.xxh32
_LINKSIZE = 900 if os.name == 'nt' else 60 #: bytes
_LITTLESIZE = 100 << 10 #: bytes
_BIGSIZE = 100 << 20 #: bytes
_SIZERATE = 10 #: percentage
_BLKSIZE = 4 << 10
_XXHSIZE = _xxhash_xxh().block_size << 11
CACHE = Cache()
def _iterdups(dupinfo):
for key, value in dupinfo.dups.items():
if isinstance(value, DupInfo):
for subobj, subkey, subvalue in _iterdups(value):
yield subobj, subkey, subvalue
def generate_location_hash_by_seed(authticket, lat, lng, acc=5):
first_hash = xxhash.xxh32(authticket, seed=HASH_SEED).intdigest()
location_bytes = d2h(lat) + d2h(lng) + d2h(acc)
loc_hash = xxhash.xxh32(location_bytes, seed=first_hash).intdigest()
return ctypes.c_int32(loc_hash).value
def _seed(self, val):
"""Returns a unique seed for val and the (optional) namespace."""
if self._namespace:
return xxhash.xxh32(
self._namespace.encode('utf-8') +
Magnitude.RARE_CHAR +
val.encode('utf-8')).intdigest()
else:
return xxhash.xxh32(val.encode('utf-8')).intdigest()
end_pos = len(merged_entries[data_type])
buf = BytesIO()
byml.Writer(
{data_type: merged_entries[data_type][i*4096:end_pos]}, be=True).write(buf)
new_gamedata.add_file(f'/{data_type}_{i}.bgdata', buf.getvalue())
bootup_rstb = inject_gamedata_into_bootup(new_gamedata)
(util.get_master_modpack_dir() / 'logs').mkdir(parents=True, exist_ok=True)
with (util.get_master_modpack_dir() / 'logs' / 'gamedata.sarc').open('wb') as g_file:
new_gamedata.write(g_file)
print('Updating RSTB...')
rstable.set_size('GameData/gamedata.sarc', bootup_rstb)
glog_path.parent.mkdir(parents=True, exist_ok=True)
with glog_path.open('w', encoding='utf-8') as l_file:
l_file.write(xxhash.xxh32(str(mods)).hexdigest())
def generateLocation2(lat, lng, alt):
locationBytes = d2h(lat) + d2h(lng) + d2h(alt)
return xxhash.xxh32(locationBytes, seed=static_seed).intdigest()
def generateLocation1(authticket, lat, lng, alt):
firstHash = xxhash.xxh32(authticket, seed=static_seed).intdigest()
locationBytes = d2h(lat) + d2h(lng) + d2h(alt)
return xxhash.xxh32(locationBytes, seed=firstHash).intdigest()
def sos_hash_output(values, jobs=1):
'''
Parallel hash
FIXME: parallel not implemented for now
'''
return [xxh(value).hexdigest() for value in values]
def is_file_modded(name, data):
fhash = xxhash.xxh32(data).hexdigest()
return not (fhash == hashtable[name])