Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_suffix_expr(self):
root = self.mkdtemp()
self.touchRelative(root, "foo.c")
os.mkdir(os.path.join(root, "subdir"))
self.touchRelative(root, "subdir", "bar.txt")
self.watchmanCommand("watch", root)
self.assertFileListsEqual(
self.watchmanCommand(
"query", root, {"expression": ["suffix", "c"], "fields": ["name"]}
)["files"],
["foo.c"],
)
with self.assertRaises(pywatchman.WatchmanError) as ctx:
self.watchmanCommand("query", root, {"expression": "suffix"})
self.assertRegex(str(ctx.exception), "Expected array for 'suffix' term")
# Create a huge query. We're shooting for more than 2MB;
# the server buffer size is 1MB and we want to make sure
# we need more than 2 chunks, and we want to tickle some
# buffer boundary conditions
base = 2 * 1024 * 1024
for size in range(base - 256, base + 2048, 63):
try:
res = self.watchmanCommand(
"query", root, {"expression": ["name", "a" * size]}
)
self.assertEqual([], res["files"])
except pywatchman.WatchmanError as e:
# We don't want to print the real command, as
# it is too long, instead, replace it with
# a summary of the size that we picked
e.cmd = "big query with size %d" % size
if self.transport == "cli":
e.cmd = "%s\n%s" % (e.cmd, self.getLogSample())
raise
root,
{
"expression": ["name", "foo.c", "wholename"],
"relative_root": "subdir",
"fields": ["name"],
},
)["files"],
[],
)
with self.assertRaises(pywatchman.WatchmanError) as ctx:
self.watchmanCommand("query", root, {"expression": "name"})
self.assertRegex(str(ctx.exception), "Expected array for 'i?name' term")
with self.assertRaises(pywatchman.WatchmanError) as ctx:
self.watchmanCommand(
"query", root, {"expression": ["name", "one", "two", "three"]}
)
self.assertRegex(
str(ctx.exception), "Invalid number of arguments for 'i?name' term"
)
with self.assertRaises(pywatchman.WatchmanError) as ctx:
self.watchmanCommand("query", root, {"expression": ["name", 2]})
self.assertRegex(
str(ctx.exception),
("Argument 2 to 'i?name' must be either a string " "or an array of string"),
)
(
(root, {"subscriptions": ["sub1", False], "sync_timeout": 2000}),
"expected 'subscriptions' to be an array of subscription names",
),
(
(root, {"subscriptions": ["sub1", "notsub"], "sync_timeout": 2000}),
"this client does not have a subscription named 'notsub'",
),
(
(root, {"subscriptions": ["sub1", "sub-other"], "sync_timeout": 2000}),
"subscription 'sub-other' is on root",
),
]
for args, err_msg in broken_args:
with self.assertRaises(pywatchman.WatchmanError) as ctx:
self.watchmanCommand("flush-subscriptions", *args)
self.assertIn(err_msg, str(ctx.exception))
ret = self.watchmanCommand(
"flush-subscriptions",
root,
{"sync_timeout": 1000, "subscriptions": ["sub1", "sub2"]},
)
version = ret["version"]
self.assertEqual([], ret["no_sync_needed"])
self.assertCountEqual(["sub1", "sub2"], ret["synced"])
# Do not wait for subscription results -- instead, make sure they've
# shown up immediately.
sub1_data = self.getSubscription("sub1", root)
sub2_data = self.getSubscription("sub2", root)
self.assertFileListsEqual(
self.watchmanCommand(
"query", root, {"expression": ["type", "f"], "fields": ["name"]}
)["files"],
["foo.c", "subdir/bar.txt"],
)
self.assertEqual(
self.watchmanCommand(
"query", root, {"expression": ["type", "d"], "fields": ["name", "type"]}
)["files"],
[{"name": "subdir", "type": "d"}],
)
with self.assertRaises(pywatchman.WatchmanError) as ctx:
self.watchmanCommand("query", root, {"expression": ["type", "x"]})
self.assertIn("invalid type string 'x'", str(ctx.exception))
with self.assertRaises(pywatchman.WatchmanError) as ctx:
self.watchmanCommand("query", root, {"expression": "type"})
self.assertIn(
'"type" term requires a type string parameter', str(ctx.exception)
)
with self.assertRaises(pywatchman.WatchmanError) as ctx:
self.watchmanCommand("query", root, {"expression": ["type", 123]})
self.assertIn(
'First parameter to "type" term must be a type string', str(ctx.exception)
def assertWatchProjectIsRestricted(self, inst, client, path):
with self.assertRaises(pywatchman.WatchmanError) as ctx:
client.query("watch-project", path)
message = str(ctx.exception)
self.assertIn(
(
"None of the files listed in global config root_files are "
+ "present in path `{0}` or any of its parent directories."
).format(path),
message,
)
self.assertIn(
"root_files is defined by the `{0}` config file".format(inst.cfg_file),
message,
)
self.assertIn(
"config file and includes `.watchmanconfig`, `.git`, and `.foo`.", message
)
def test_unsupportedStorageType(self):
# If the storage type is not supported, watchman should throw
test_query = self.get_skeleton_query()
test_query["since"]["scm"]["saved-state"] = {"storage": "foo", "config": {}}
with self.assertRaises(pywatchman.WatchmanError) as ctx:
self.watchmanCommand("query", self.root, test_query)
self.assertIn("invalid storage type 'foo'", str(ctx.exception))
)
self.assertFileListsEqual(["includes/a.h", "includes/b.h"], res["files"])
res = self.watchmanCommand(
"query", root, {"glob": ["*/*.h"], "fields": ["name"]}
)
self.assertFileListsEqual(["includes/a.h", "includes/b.h"], res["files"])
os.unlink(os.path.join(inc_dir, "a.h"))
res = self.watchmanCommand(
"query", root, {"glob": ["*/*.h"], "fields": ["name"]}
)
self.assertFileListsEqual(["includes/b.h"], res["files"])
with self.assertRaises(pywatchman.WatchmanError) as ctx:
self.watchmanCommand(
"query", root, {"glob": ["*/*.h"], "relative_root": "bogus"}
)
self.assertIn("check your relative_root", str(ctx.exception))
with self.assertRaises(pywatchman.WatchmanError) as ctx:
self.watchmanCommand("query", root, {"glob": [12345]})
self.assertIn("expected json string object", str(ctx.exception))
project_prefix = cygwin_adjusted_path(project_prefix)
diagnostics = []
values = []
try:
values = build_file_processor.process(
watch_root,
project_prefix,
build_file,
diagnostics=diagnostics,
package_implicit_load=package_implicit_load,
)
except BaseException as e:
# sys.exit() don't emit diagnostics.
if e is not SystemExit:
if isinstance(e, WatchmanError):
source = "watchman"
message = e.msg
else:
source = "parse"
message = str(e)
diagnostics.append(
Diagnostic(
message=message,
level="fatal",
source=source,
exception=sys.exc_info(),
)
)
raise
finally:
java_process_send_result(to_parent, values, diagnostics, None)
if cmd_buf:
item = cmd_buf.pop()
try:
self.sendConn.send(item)
except pywatchman.SocketTimeout:
cmd_buf.append(item)
yield
try:
result = self.recvConn.receive()
except pywatchman.SocketTimeout:
# Socket timeout - yield runtime context.
yield
else:
if 'error' in result:
raise pywatchman.WatchmanError('error from watchman: {}'.format(result['error']))
elif self.isUnilateralResponse(result) or 'subscribe' in result:
yield result
else:
yield result
break