Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class _WindowsLock(_InterProcessLock):
"""Interprocess lock implementation that works on windows systems."""
@staticmethod
def _trylock(lockfile):
fileno = lockfile.fileno()
msvcrt.locking(fileno, msvcrt.LK_NBLCK, 1)
@staticmethod
def _unlock(lockfile):
fileno = lockfile.fileno()
msvcrt.locking(fileno, msvcrt.LK_UNLCK, 1)
class _FcntlLock(_InterProcessLock):
"""Interprocess lock implementation that works on posix systems."""
@staticmethod
def _trylock(lockfile):
fcntl.lockf(lockfile, fcntl.LOCK_EX | fcntl.LOCK_NB)
@staticmethod
def _unlock(lockfile):
fcntl.lockf(lockfile, fcntl.LOCK_UN)
if os.name == 'nt':
import msvcrt
InterProcessLock = _WindowsLock
else:
import fcntl
def trylock(self):
self._trylock(self.lockfile)
def unlock(self):
self._unlock(self.lockfile)
@staticmethod
def _trylock():
raise NotImplementedError()
@staticmethod
def _unlock():
raise NotImplementedError()
class _WindowsLock(_InterProcessLock):
"""Interprocess lock implementation that works on windows systems."""
@staticmethod
def _trylock(lockfile):
fileno = lockfile.fileno()
msvcrt.locking(fileno, msvcrt.LK_NBLCK, 1)
@staticmethod
def _unlock(lockfile):
fileno = lockfile.fileno()
msvcrt.locking(fileno, msvcrt.LK_UNLCK, 1)
class _FcntlLock(_InterProcessLock):
"""Interprocess lock implementation that works on posix systems."""