Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __get_timeout(self, name, default):
timeout = self.path.join(name, abs=1)
if timeout.check():
debug('%s exists', name)
return float(timeout.read().strip())
else:
debug('%s doesn\'t exist', name)
return float(default)
def flock(self):
# if we already have the lock, from a parent process, use it.
parent_service = os.environ.pop('PGCTL_SERVICE', None)
lock = os.environ.pop('PGCTL_SERVICE_LOCK', None)
debug('parentlock: %r', parent_service)
if lock:
lock = int(lock)
if parent_service == self.path:
debug('retrieved parent lock! %i', lock)
try:
yield lock
finally:
os.close(lock)
return
else:
from .flock import release
release(lock)
with flock(self.path.strpath) as lock:
debug('LOCK: %i', lock)
self.ensure_directory_structure()
with self.path.as_cwd():
yield lock
debug('parentlock: %r', parent_service)
if lock:
lock = int(lock)
if parent_service == self.path:
debug('retrieved parent lock! %i', lock)
try:
yield lock
finally:
os.close(lock)
return
else:
from .flock import release
release(lock)
with flock(self.path.strpath) as lock:
debug('LOCK: %i', lock)
self.ensure_directory_structure()
with self.path.as_cwd():
yield lock
def __get_timeout(self, name, default):
timeout = self.path.join(name, abs=1)
if timeout.check():
debug('%s exists', name)
return float(timeout.read().strip())
else:
debug('%s doesn\'t exist', name)
return float(default)
def flock(self):
# if we already have the lock, from a parent process, use it.
parent_service = os.environ.pop('PGCTL_SERVICE', None)
lock = os.environ.pop('PGCTL_SERVICE_LOCK', None)
debug('parentlock: %r', parent_service)
if lock:
lock = int(lock)
if parent_service == self.path:
debug('retrieved parent lock! %i', lock)
try:
yield lock
finally:
os.close(lock)
return
else:
from .flock import release
release(lock)
with flock(self.path.strpath) as lock:
debug('LOCK: %i', lock)
self.ensure_directory_structure()