Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _get_rendition(self, output_scale=1, **kwargs):
""" implements get_rendition and returns tuple of out_rel_path,size,pending """
basename, ext = os.path.splitext(
os.path.basename(self._record.file_path))
basename = slugify.slugify(basename)
if kwargs.get('format'):
ext = '.' + kwargs['format']
pipeline, out_args, size = self._build_pipeline(basename, ext, output_scale, kwargs)
# Build the output filename
out_basename = '_'.join([str(name) for name, _ in pipeline if name]) + ext
out_rel_path = os.path.join(
config.image_output_subdir,
self._record.checksum[0:2],
self._record.checksum[2:6],
out_basename)
out_fullpath = os.path.join(config.static_folder, out_rel_path)
if os.path.isfile(out_fullpath):
LOGGER.debug("rendition %s already exists", out_fullpath)
os.utime(out_fullpath)
pending = False
else:
LOGGER.debug("scheduling %s for render", out_fullpath)
LocalImage.thread_pool().submit(
self._render, out_fullpath, [op for _, op in pipeline if op], out_args)
pending = True
return out_rel_path, size, pending
def known_users(days=None):
""" Get the users known to the system, as a list of (user,last_seen) """
query = model.KnownUser.select()
if days:
since = (arrow.utcnow() - datetime.timedelta(days=days)).datetime
query = orm.select(e for e in query if e.last_seen >= since)
return [(_get_user(record.user), arrow.get(record.last_seen).to(config.timezone))
for record in query]
def get_counters(text, args):
""" Count the number of stateful items in Markdown text. """
counter = ItemCounter()
processor = misaka.Markdown(counter,
args.get('markdown_extensions', config.markdown_extensions))
processor(text)
return counter
def load_groups() -> typing.DefaultDict[str, typing.Set[str]]:
# We only want empty keys; \000 is unlikely to turn up in a well-formed text file
cfg = configparser.ConfigParser(delimiters=(
'\000'), allow_no_value=True, interpolation=None)
# disable authentication lowercasing; usernames should only be case-densitized
# by the auth backend
cfg.optionxform = lambda option: option # type:ignore
cfg.read(config.user_list)
groups: typing.DefaultDict[str, typing.Set[str]] = collections.defaultdict(set)
# populate the group list for each member
for group, members in cfg.items():
for member in members.keys():
groups[member].add(group)
return groups
def secure_link(endpoint: str, *args, **kwargs) -> str:
""" flask.url_for except it will force the link to be secure if we are
configured with AUTH_FORCE_HTTPS """
force_ssl = config.auth.get('AUTH_FORCE_HTTPS')
if force_ssl and flask.request.scheme != 'https':
kwargs = {**kwargs,
'_external': True,
'_scheme': 'https'}
return flask.url_for(endpoint, *args, **kwargs)
def _get_image(path: str, search_path: typing.Tuple[str, ...]) -> Image:
if path.startswith('@'):
return StaticImage(path[1:], search_path)
if path.startswith('//') or '://' in path:
return RemoteImage(path, search_path)
if os.path.isabs(path):
file_path = utils.find_file(os.path.relpath(
path, '/'), config.content_folder)
else:
file_path = utils.find_file(path, search_path)
if not file_path:
return ImageNotFound(path, search_path)
record = _get_asset(file_path)
if record.is_asset:
return FileAsset(record, search_path)
return LocalImage(record, search_path)
def _set_counter(self, section, args, counter: markdown.ItemCounter):
""" Register the counts that we already know """
footnotes = 'footnotes' in args.get('markdown_extensions', config.markdown_extensions)
LOGGER.debug("Registering %s:%s -> %s", section, footnotes, counter)
self._counters[(section, footnotes)] = counter
def thread_pool():
""" Get the rendition threadpool """
if not LocalImage._thread_pool:
LOGGER.info("Starting LocalImage threadpool")
LocalImage._thread_pool = concurrent.futures.ThreadPoolExecutor(
thread_name_prefix="Renderer",
max_workers=config.image_render_threads)
return LocalImage._thread_pool