Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
scan_directory string before sending it to cache.
"""
directories = {}
files = {}
if os.path.exists(scan_directory) is not True:
return None
norm_dir_name = scan_directory.strip()
self.d_cache[norm_dir_name] = {}
self.d_cache[norm_dir_name]["last_sort"] = None
dirname = os.path.split(scan_directory)[0]
if self.filter_filenames is not None:
utilities.rename_path_to_clean(dirname)
dirname = utilities.clean_path(dirname)
utilities.check_files_in_directory(dirname)
for s_entry in scandir.scandir(scan_directory):
data = DirEntry()
data.st = s_entry.stat()
if s_entry.name.strip().lower() in self.files_to_ignore:
continue
if self.hidden_dot_files and s_entry.name.startswith("."):
continue
data.fq_filename = os.path.join(
os.path.realpath(scan_directory.strip()),
s_entry.name)
data.parentdirectory = os.path.split(scan_directory)[0:-1][0]
data.human_st_mtime = time.asctime(
time.localtime(data.st[stat.ST_MTIME]))
data.is_dir = s_entry.is_dir()
# print (s_entry.is_dir())
# if gallery has chapters divided into sub folders
if len(chapters) != 0:
log_d('Chapters divided in folders..')
for ch in chapters:
chap = chap_container.create_chapter()
chap.title = title_parser(ch)['title']
chap.path = os.path.join(path, ch)
metafile.update(GMetafile(chap.path))
chap.pages = len([x for x in scandir.scandir(chap.path) if x.name.endswith(IMG_FILES)])
else: #else assume that all images are in gallery folder
chap = chap_container.create_chapter()
chap.title = title_parser(os.path.split(path)[1])['title']
chap.path = path
metafile.update(GMetafile(path))
chap.pages = len([x for x in scandir.scandir(path) if x.name.endswith(IMG_FILES)])
except NotADirectoryError:
if path.endswith(ARCHIVE_FILES):
gallery_object.is_archive = 1
log_i("Gallery source is an archive")
archive_g = sorted(check_archive(path))
for g in archive_g:
chap = chap_container.create_chapter()
chap.path = g
chap.in_archive = 1
metafile.update(GMetafile(g, path))
arch = ArchiveFile(path)
chap.pages = len(arch.dir_contents(g))
arch.close()
metafile.apply_gallery(gallery_object)
def process_dir(self, path, st):
""" i_dir should be absolute path
st is the stat object associated with the directory
"""
last_report = MPI.Wtime()
count = 0
try:
with timeout(seconds=10):
entries = scandir(path)
except OSError as e:
self.logger.warn(e, extra=self.d)
self.skipped += 1
except TimeoutError as e:
self.logger.error("%s when scandir() on %s" %
(e, path), extra=self.d)
self.skipped += 1
else:
for entry in entries:
try:
if entry.is_symlink():
self.sym_links += 1
elif entry.stat().st_mode & 0o170000 == S_IFIFO:
self.pipes += 1
elif entry.stat().st_mode & 0o170000 == S_IFSOCK:
self.sockets += 1
agent_type='OTHER',
agent_role='CREATOR',
othertype='SOFTWARE'))
else:
agents = [mets.agent(attributes["organization_name"],
agent_role='CREATOR')]
# Create mets header
metshdr = mets.metshdr(attributes["create_date"],
attributes["last_moddate"],
attributes["record_status"],
agents)
# Collect elements from workspace XML files
elements = []
for entry in scandir(attributes["workspace"]):
if entry.name.endswith(('-amd.xml', 'dmdsec.xml',
'structmap.xml', 'filesec.xml',
'rightsmd.xml')) and entry.is_file():
element = lxml.etree.parse(entry.path).getroot()[0]
elements.append(element)
elements = mets.merge_elements('{%s}amdSec' % NAMESPACES['mets'], elements)
elements.sort(key=mets.order)
# Create METS element
mets_element = mets.mets(METS_PROFILE[attributes["mets_profile"]],
objid=attributes["objid"],
label=attributes["label"],
namespaces=NAMESPACES)
mets_element = mets_extend(mets_element,
METS_CATALOG,
def _iterdir(dirname, dironly):
if not dirname:
if isinstance(dirname, bytes):
dirname = bytes(os.curdir, 'ASCII')
else:
dirname = os.curdir
try:
it = scandir(dirname)
for entry in it:
try:
if not dironly or entry.is_dir():
yield entry.name
except OSError:
pass
except OSError:
return
* can operate on .zip files from a http://URL
(C) 2016 Willem Hengeveld
"""
from __future__ import division, print_function, absolute_import, unicode_literals
import sys
import os
import binascii
import struct
import datetime
import zlib
import itertools
if sys.version_info[0] == 2:
import scandir
os.scandir = scandir.scandir
def zip_decrypt(data, pw):
"""
INPUT: data - an array of bytes
pw - either a tuple of 3 dwords, or a byte array.
OUTPUT: a decrypted array of bytes.
The very weak 'zip' encryption
This encryption can be cracked using tools like pkcrack.
Pkcrack does a known plaintext attack, requiring 13 bytes of plaintext.
"""
def make_crc_tab(poly):
def calcentry(v, poly):
for _ in range(8):
def scan_dirs(self):
paths = []
for p in gui_constants.MONITOR_PATHS:
dir_content = scandir.scandir(p)
for d in dir_content:
paths.append(d.path)
fetch_inst = fetch.Fetch(self)
fetch_inst.series_path = paths
def set_scanned_d(d):
self.scanned_data = d
fetch_inst.FINISHED.connect(set_scanned_d)
fetch_inst.local()
#contents = []
#for g in self.scanned_data:
# contents.append(g)
#paths = sorted(paths)
#new_galleries = []
#for x in contents:
##append ..
if(( ("limitSelection" in self.options
and self.options["limitSelection"] )
and os.path.normpath(self.options["selectionPath"]) == os.path.normpath(self.currentPath))
or ("hideFolders" in self.options and self.options["hideFolders"]) ):
pass
##dont add ..
else:
entry = {}
entry["name"] = ".."
entry["isFolder"] = True
entry["text"] = self.entryFont.render("..", True, self.textColor)
self.entryList.append(entry)
scan = scandir.scandir(os.path.normpath(self.currentPath))
try:
hideFolders = self.options["hideFolders"] if "hideFolders" in self.options else False
for f in scan:
if(f.is_dir() and not hideFolders ):
entry = {}
entry["name"] = f.name
entry["isFolder"] = True
entry["text"] = None
folderList.append(entry)
elif ( not self.selectFolder and self.filterFile(f.name)):
entry = {}
entry["name"] = f.name
entry["isFolder"] = False
def walk(path, pattern=None):
try:
for f in scandir.scandir(path):
#print f.path, f.is_dir(), f.is_symlink(), f.is_file()
if f.is_symlink(): continue
if f.is_dir() and not f.path in MOUNT_POINTS:
#print f.path, f.is_dir()
#yield from walk(f.path) #yield bug
for path, size, mtime in walk(f.path, pattern):
yield (path, size, mtime)
else:
if pattern is None or re.match(pattern, f.path):
#get file info
path = f.path
stat = f.stat()
size = stat.st_blocks*stat.st_blksize/1024/8
mtime = int(stat.st_mtime)
def get_tree_size(path):
"""Return total size of all files in directory tree at path."""
size = 0
try:
for entry in scandir.scandir(path):
if entry.is_symlink():
pass
elif entry.is_dir():
size += get_tree_size(os.path.join(path, entry.name))
else:
size += entry.stat().st_size
except OSError:
pass
return size