Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def add_urls_to_index(self, stream, params, filename, length):
base_filename = self._get_rel_or_base_name(filename, params)
cdxout = BytesIO()
write_cdx_index(cdxout, stream, base_filename,
cdxj=True, append_post=True,
writer_cls=params.get('writer_cls'))
z_key = res_template(self.redis_key_template, params)
cdx_list = cdxout.getvalue().rstrip().split(b'\n')
for cdx in cdx_list:
if cdx:
self.redis.zadd(z_key, 0, cdx)
return cdx_list
def _get_rel_or_base_name(self, filename, params):
rel_path = res_template(self.rel_path_template, params)
try:
base_name = os.path.relpath(filename, rel_path)
assert '..' not in base_name
except Exception:
base_name = None
if not base_name:
base_name = os.path.basename(filename)
return base_name
def scan_keys(self, match_templ, params, member_key=None):
if not member_key:
member_key = self.member_key_template
if not member_key:
return self.redis.scan_iter(match=match_templ)
key = res_template(member_key, params)
scan_key = 'scan:' + key
# check if already have keys to avoid extra redis call
keys = params.get(scan_key)
if not keys:
keys = self._load_key_set(key)
params[scan_key] = keys
#match_templ = match_templ.encode('utf-8')
return [match_templ.replace('*', key) for key in keys]
def _get_api_url(self, params):
api_url = res_template(self.api_url, params)
if 'closest' in params and self.closest_limit:
api_url += '&limit=' + str(self.closest_limit)
return api_url
def _iter_sources(self, params):
the_dir = res_template(self.base_dir, params)
the_dir = os.path.join(self.base_prefix, the_dir)
try:
sources = list(self._load_files(the_dir))
except Exception:
raise NotFoundException(the_dir)
return sources
def handle_timemap(self, params):
url = res_template(self.timemap_url, params)
headers = self._get_headers(params)
res = None
try:
res = self.sesh.get(url,
headers=headers,
timeout=params.get('_timeout'))
res.raise_for_status()
assert(res.text)
except Exception as e:
no_except_close(res)
self.logger.debug('FAILED: ' + str(e))
raise NotFoundException(url)
links = res.text
def get_new_filename(self, dir_, params):
timestamp = timestamp20_now()
randstr = base64.b32encode(os.urandom(5)).decode('utf-8')
filename = dir_ + res_template(self.filename_template, params,
hostname=self.hostname,
timestamp=timestamp,
random=randstr)
return filename
def load_index(self, params):
filename = res_template(self.filename_template, params)
fh = self._do_open(filename)
def do_iter():
with fh:
for obj in self._do_iter(fh, params):
yield obj
return do_iter()
def _write_to_file(self, params, write_callback):
full_dir = res_template(self.dir_template, params)
dir_key = self.get_dir_key(params)
result = self.fh_cache.get(dir_key)
close_file = False
new_size = start = 0
if result:
out, filename = result
is_new = False
else:
filename = self.get_new_filename(full_dir, params)
if not self.allow_new_file(filename, params):
return False