Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def open_file_safe(file_path: str, mode: str = "r+"):
# Use portalocker package for file locking if available,
# otherwise print a message to install the package.
try:
import portalocker # type: ignore
file_open_func = portalocker.Lock
file_open_args = [file_path]
file_open_kwargs = {"mode": mode, "flags": portalocker.LOCK_EX}
file_handle = file_open_func(*file_open_args, **file_open_kwargs)
return file_handle
except ImportError:
print(
"The conversion script is missing a required package: portalocker. Please run "
"python -m pip install -r requirements.txt to install the missing dependency."
)
exit(1)
def lock(locks):
for path in paths_for_locks(locks):
f = open(path, 'r+')
portalocker.lock(f, portalocker.LOCK_EX)
locked_files.append(f)
def acquire(self,startup=False):
"""
returns the time when the lock is acquired or
None if cron already running
lock is implemented by writing a pickle (start, stop) in cron.master
start is time when cron job starts and stop is time when cron completed
stop == 0 if job started but did not yet complete
if a cron job started within less than 60 seconds, acquire returns None
if a cron job started before 60 seconds and did not stop,
a warning is issue "Stale cron.master detected"
"""
if portalocker.LOCK_EX == None:
logger.warning('WEB2PY CRON: Disabled because no file locking')
return None
self.master = open(self.path,'rb+')
try:
ret = None
portalocker.lock(self.master,portalocker.LOCK_EX)
try:
(start, stop) = cPickle.load(self.master)
except:
(start, stop) = (0, 1)
if startup or self.now - start > 59.99:
ret = self.now
if not stop:
# this happens if previous cron job longer than 1 minute
logger.warning('WEB2PY CRON: Stale cron.master detected')
logger.debug('WEB2PY CRON: Acquiring lock')
def save_storage(storage, filename):
fp = open(filename, 'wb')
portalocker.lock(fp, portalocker.LOCK_EX)
cPickle.dump(dict(storage), fp)
portalocker.unlock(fp)
fp.close()
else:
raise
def unlock(file):
#fcntl.flock(_getfd(file), fcntl.LOCK_UN)
fcntl.lockf(_getfd(file), fcntl.LOCK_UN)
if __name__ == '__main__':
from time import time, strftime, localtime
import sys
import portalocker
log = open('log.txt', "a+")
portalocker.lock(log, portalocker.LOCK_EX)
timestamp = strftime("%m/%d/%Y %H:%M:%S\n", localtime(time()))
log.write( timestamp )
print "Wrote lines. Hit enter to release lock."
dummy = sys.stdin.readline()
log.close()
def _try_store_in_file(self, request, response):
try:
if (not response.session_id or self._forget
or self._unchanged(response)):
# self.clear_session_cookies()
self.save_session_id_cookie()
return False
if response.session_new or not response.session_file:
# Tests if the session sub-folder exists, if not, create it
session_folder = os.path.dirname(response.session_filename)
if not os.path.exists(session_folder): os.mkdir(session_folder)
response.session_file = open(response.session_filename, 'wb')
portalocker.lock(response.session_file, portalocker.LOCK_EX)
response.session_locked = True
if response.session_file:
session_pickled = response.session_pickled or cPickle.dumps(self)
response.session_file.write(session_pickled)
response.session_file.truncate()
finally:
self._close(response)
self.save_session_id_cookie()
return True
def write_file(filename, obj):
file_data = 0
while file_data is not None:
lock = portalocker.Lock(filename, mode='a+b', flags=portalocker.LOCK_EX)
lock.acquire()
fh = lock.fh
fh.seek(0)
if len(fh.read()) is 0:
file_data = None
else:
fh.seek(0)
file_data = pickle.load(fh)
if file_data is None:
clear_file(fh)
pickle.dump(obj, fh)
lock.release()
def lock(file, flags):
lockno(file.fileno(), flags)
def unlock(file):
unlockno(file.fileno())
if __name__ == '__main__':
from time import time, strftime, localtime
import sys
import portalocker
log = open('log.txt', "a+")
portalocker.lock(log, portalocker.LOCK_EX)
timestamp = strftime("%m/%d/%Y %H:%M:%S\n", localtime(time()))
log.write( timestamp )
print "Wrote lines. Hit enter to release lock."
dummy = sys.stdin.readline()
log.close()
del self.__hash
if __hash == hashlib.md5(str(self)).digest():
self._close(response)
return
if not response.session_id or self._forget:
self._close(response)
return
if response.session_new:
# Tests if the session sub-folder exists, if not, create it
session_folder = os.path.dirname(response.session_filename)
if not os.path.exists(session_folder):
os.mkdir(session_folder)
response.session_file = open(response.session_filename, 'wb')
portalocker.lock(response.session_file, portalocker.LOCK_EX)
response.session_locked = True
if response.session_file:
cPickle.dump(dict(self), response.session_file)
response.session_file.truncate()
self._close(response)