Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_newline_option(self):
def parser(f):
return f.readlines()
with tempfile.NamedTemporaryFile() as watched_file:
watched_file.write(b"A\nB\rC\r\nD")
watched_file.flush()
os.utime(watched_file.name, (1, 1))
watcher = file_watcher.FileWatcher(watched_file.name, parser=parser)
self.assertEqual(watcher.get_data(), ["A\n", "B\n", "C\n", "D"])
watcher = file_watcher.FileWatcher(watched_file.name, parser=parser, newline="")
self.assertEqual(watcher.get_data(), ["A\n", "B\r", "C\r\n", "D"])
watcher = file_watcher.FileWatcher(watched_file.name, parser=parser, newline="\n")
self.assertEqual(watcher.get_data(), ["A\n", "B\rC\r\n", "D"])
watcher = file_watcher.FileWatcher(watched_file.name, parser=parser, newline="\r")
self.assertEqual(watcher.get_data(), ["A\nB\r", "C\r", "\nD"])
watcher = file_watcher.FileWatcher(watched_file.name, parser=parser, newline="\r\n")
self.assertEqual(watcher.get_data(), ["A\nB\rC\r\n", "D"])
watcher = file_watcher.FileWatcher(watched_file.name, parser=parser, newline="foo")
with self.assertRaises(file_watcher.WatchedFileNotAvailableError):
def test_file_reloads_when_changed(self):
with tempfile.NamedTemporaryFile() as watched_file:
watched_file.write(b"hello!")
watched_file.flush()
os.utime(watched_file.name, (1, 1))
watcher = file_watcher.FileWatcher(watched_file.name, parser=lambda x: x.read())
result = watcher.get_data()
self.assertEqual(result, "hello!")
watched_file.seek(0)
watched_file.write(b"breaking news: hello again!")
watched_file.flush()
os.utime(watched_file.name, (2, 2))
result = watcher.get_data()
self.assertEqual(result, "breaking news: hello again!")
def test_binary_mode(self):
with tempfile.NamedTemporaryFile() as watched_file:
watched_file.write(b"foo")
watched_file.flush()
os.utime(watched_file.name, (1, 1))
watcher = file_watcher.FileWatcher(
watched_file.name, parser=lambda f: f.read(), binary=True
)
# mock.mock_open does not appear to work with binary read_data, you
# end up getting the following error:
# TypeError: 'str' does not support the buffer interface
# So all we are really checking is the arguments passed to `open`.
with mock.patch.object(
builtins, "open", mock.mock_open(read_data="foo"), create=True
) as open_mock:
watcher.get_data()
open_mock.assert_called_once_with(
watched_file.name, encoding=None, mode="rb", newline=None
)
watcher = file_watcher.FileWatcher(
def test_cant_set_encoding_and_binary(self):
mock_parser = mock.Mock()
with self.assertRaises(TypeError):
file_watcher.FileWatcher("/does_not_exist", mock_parser, binary=True, encoding="utf-8")
with tempfile.NamedTemporaryFile() as watched_file:
watched_file.write(b"A\nB\rC\r\nD")
watched_file.flush()
os.utime(watched_file.name, (1, 1))
watcher = file_watcher.FileWatcher(watched_file.name, parser=parser)
self.assertEqual(watcher.get_data(), ["A\n", "B\n", "C\n", "D"])
watcher = file_watcher.FileWatcher(watched_file.name, parser=parser, newline="")
self.assertEqual(watcher.get_data(), ["A\n", "B\r", "C\r\n", "D"])
watcher = file_watcher.FileWatcher(watched_file.name, parser=parser, newline="\n")
self.assertEqual(watcher.get_data(), ["A\n", "B\rC\r\n", "D"])
watcher = file_watcher.FileWatcher(watched_file.name, parser=parser, newline="\r")
self.assertEqual(watcher.get_data(), ["A\nB\r", "C\r", "\nD"])
watcher = file_watcher.FileWatcher(watched_file.name, parser=parser, newline="\r\n")
self.assertEqual(watcher.get_data(), ["A\nB\rC\r\n", "D"])
watcher = file_watcher.FileWatcher(watched_file.name, parser=parser, newline="foo")
with self.assertRaises(file_watcher.WatchedFileNotAvailableError):
watcher.get_data()
def test_cant_set_newline_and_binary(self):
mock_parser = mock.Mock()
with self.assertRaises(TypeError):
file_watcher.FileWatcher("/does_not_exist", mock_parser, binary=True, newline="\n")
def secrets():
mock_filewatcher = mock.Mock(spec=FileWatcher)
mock_filewatcher.get_data.return_value = {
"secrets": {
"secret/rabbitmq/account": {
"type": "credential",
"username": "spez",
"password": "hunter2",
}
},
"vault": {"token": "test", "url": "http://vault.example.com:8200/"},
}
secrets = SecretsStore("/secrets")
secrets._filewatcher = mock_filewatcher
return secrets
def setUp(self):
self.mock_filewatcher = mock.Mock(spec=FileWatcher)
self.store = SecretsStore("/whatever")
self.store._filewatcher = self.mock_filewatcher
watched_file.flush()
os.utime(watched_file.name, (1, 1))
watcher = file_watcher.FileWatcher(
watched_file.name, parser=lambda f: f.read(), binary=False
)
with mock.patch.object(
builtins, "open", mock.mock_open(read_data="foo"), create=True
) as open_mock:
watcher.get_data()
open_mock.assert_called_once_with(
watched_file.name, encoding=None, mode="r", newline=None
)
watcher = file_watcher.FileWatcher(
watched_file.name, parser=lambda f: f.read(), binary=False
)
self.assertEqual(watcher.get_data(), "foo")
def test_file_failing_to_parse_on_first_load_raises(self):
with tempfile.NamedTemporaryFile() as watched_file:
watched_file.write(b"!")
watched_file.flush()
os.utime(watched_file.name, (1, 1))
watcher = file_watcher.FileWatcher(watched_file.name, parser=json.load)
with self.assertRaises(file_watcher.WatchedFileNotAvailableError):
watcher.get_data()