How to use the pywatchman.compat function in pywatchman

To help you get started, we’ve selected a few pywatchman examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github facebook / watchman / tests / integration / test_bser_cli.py View on Github external
"--no-local",
            "-j",
        ]
        proc = subprocess.Popen(
            cli_cmd,
            stdin=subprocess.PIPE,
            stderr=subprocess.PIPE,
            stdout=subprocess.PIPE,
        )

        stdout, stderr = proc.communicate(input=watchman_cmd)
        self.assertEqual(proc.poll(), 0, stderr)
        # the response should be bser to match our input
        result = bser.loads(stdout)
        result_sockname = result["sockname"]
        if compat.PYTHON3:
            result_sockname = encoding.decode_local(result_sockname)
        self.assertEqual(
            result_sockname, sockname, binascii.hexlify(stdout).decode("ascii")
        )
github facebook / watchman / tests / integration / path_utils.py View on Github external
def get_canonical_filesystem_path(name):
        fd = os.open(name, os.O_RDONLY, 0)
        try:
            numchars = 1024  # MAXPATHLEN
            # The kernel caps this routine to MAXPATHLEN, so there is no
            # point in over-allocating or trying again with a larger buffer
            buf = ctypes.create_string_buffer(numchars)
            ctypes.set_errno(0)
            result = getpath_fcntl(fd, F_GETPATH, buf)
            if result != 0:
                raise OSError(ctypes.get_errno())
            # buf is a bytes buffer, so normalize it if necessary
            ret = buf.value
            if isinstance(name, compat.UNICODE):
                ret = os.fsdecode(ret)
            return ret
        finally:
            os.close(fd)
github facebook / watchman / tests / integration / WatchmanTestCase.py View on Github external
import tempfile
import time

import Interrupt
import pywatchman
import TempDir
import WatchmanInstance
from path_utils import norm_absolute_path, norm_relative_path


try:
    import unittest2 as unittest
except ImportError:
    import unittest

if pywatchman.compat.PYTHON3:
    STRING_TYPES = (str, bytes)
else:
    STRING_TYPES = (str, unicode)  # noqa: F821


if os.name == "nt":
    # monkey patch to hopefully minimize test flakiness
    def wrap_with_backoff(fn):
        def wrapper(*args, **kwargs):
            delay = 0.01
            attempts = 10
            while True:
                try:
                    return fn(*args, **kwargs)
                except WindowsError as e:
                    if attempts == 0: