Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_bserInput(self):
sockname = self.getSockPath()
watchman_cmd = bser.dumps(["get-sockname"])
cli_cmd = [
os.environ.get("WATCHMAN_BINARY", "watchman"),
"--sockname={0}".format(sockname),
"--logfile=/BOGUS",
"--statefile=/BOGUS",
"--no-spawn",
"--no-local",
"-j",
]
proc = subprocess.Popen(
cli_cmd,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
)
"--statefile=/BOGUS",
"--no-spawn",
"--no-local",
"-j",
]
proc = subprocess.Popen(
cli_cmd,
stdin=subprocess.PIPE,
stderr=subprocess.PIPE,
stdout=subprocess.PIPE,
)
stdout, stderr = proc.communicate(input=watchman_cmd)
self.assertEqual(proc.poll(), 0, stderr)
# the response should be bser to match our input
result = bser.loads(stdout)
result_sockname = result["sockname"]
if compat.PYTHON3:
result_sockname = encoding.decode_local(result_sockname)
self.assertEqual(
result_sockname, sockname, binascii.hexlify(stdout).decode("ascii")
)
async def send(self, *args):
cmd = bser.dumps(*args, version=2, capabilities=0)
await self.transport.write(cmd)
def encode_result(values, diagnostics, profile):
result = {'values': values}
if diagnostics:
encoded_diagnostics = []
for d in diagnostics:
encoded_diagnostics.append({
'message': d.message,
'level': d.level,
'source': d.source,
})
result['diagnostics'] = encoded_diagnostics
if profile is not None:
result['profile'] = profile
try:
return bser.dumps(result)
except Exception as e:
# Try again without the values
result['values'] = []
if 'diagnostics' not in result:
result['diagnostics'] = []
result['diagnostics'].append({
'message': format_traceback_and_exception(),
'level': 'fatal',
'source': 'parse',
})
return bser.dumps(result)
p = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
close_fds=os.name != "nt",
)
except OSError as e:
raise WatchmanError('"watchman" executable not in PATH (%s)', e)
stdout, stderr = p.communicate()
exitcode = p.poll()
if exitcode:
raise WatchmanError("watchman exited with code %d" % exitcode)
result = bser.loads(stdout)
if "error" in result:
raise WatchmanError(str(result["error"]))
return result["sockname"]
source='mercurial'
)
to_parent.write(encode_result([], [d], None))
to_parent.flush()
configs = {}
if options.config is not None:
with open(options.config, 'rb') as f:
for section, contents in bser.loads(f.read()).iteritems():
for field, value in contents.iteritems():
configs[(section, field)] = value
ignore_paths = []
if options.ignore_paths is not None:
with open(options.ignore_paths, 'rb') as f:
ignore_paths = [make_glob(i) for i in bser.loads(f.read())]
buildFileProcessor = BuildFileProcessor(
project_root,
cell_roots,
options.build_file_name,
options.allow_empty_globs,
options.ignore_buck_autodeps_files,
options.no_autodeps_signatures,
watchman_client,
options.watchman_glob_stat_results,
options.watchman_use_glob_generator,
use_mercurial_glob,
options.enable_build_file_sandboxing,
project_import_whitelist=options.build_file_import_whitelist or [],
implicit_includes=options.include or [],
configs=configs,