How to use the baseplate.lib.file_watcher.FileWatcher function in baseplate

To help you get started, we’ve selected a few baseplate examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github reddit / baseplate.py / tests / unit / lib / file_watcher_tests.py View on Github external
def test_newline_option(self):
        def parser(f):
            return f.readlines()

        with tempfile.NamedTemporaryFile() as watched_file:
            watched_file.write(b"A\nB\rC\r\nD")
            watched_file.flush()
            os.utime(watched_file.name, (1, 1))

            watcher = file_watcher.FileWatcher(watched_file.name, parser=parser)
            self.assertEqual(watcher.get_data(), ["A\n", "B\n", "C\n", "D"])

            watcher = file_watcher.FileWatcher(watched_file.name, parser=parser, newline="")
            self.assertEqual(watcher.get_data(), ["A\n", "B\r", "C\r\n", "D"])

            watcher = file_watcher.FileWatcher(watched_file.name, parser=parser, newline="\n")
            self.assertEqual(watcher.get_data(), ["A\n", "B\rC\r\n", "D"])

            watcher = file_watcher.FileWatcher(watched_file.name, parser=parser, newline="\r")
            self.assertEqual(watcher.get_data(), ["A\nB\r", "C\r", "\nD"])

            watcher = file_watcher.FileWatcher(watched_file.name, parser=parser, newline="\r\n")
            self.assertEqual(watcher.get_data(), ["A\nB\rC\r\n", "D"])

            watcher = file_watcher.FileWatcher(watched_file.name, parser=parser, newline="foo")
            with self.assertRaises(file_watcher.WatchedFileNotAvailableError):
github reddit / baseplate.py / tests / unit / lib / file_watcher_tests.py View on Github external
def test_file_reloads_when_changed(self):
        with tempfile.NamedTemporaryFile() as watched_file:
            watched_file.write(b"hello!")
            watched_file.flush()
            os.utime(watched_file.name, (1, 1))
            watcher = file_watcher.FileWatcher(watched_file.name, parser=lambda x: x.read())

            result = watcher.get_data()
            self.assertEqual(result, "hello!")

            watched_file.seek(0)
            watched_file.write(b"breaking news: hello again!")
            watched_file.flush()
            os.utime(watched_file.name, (2, 2))

            result = watcher.get_data()
            self.assertEqual(result, "breaking news: hello again!")
github reddit / baseplate.py / tests / unit / lib / file_watcher_tests.py View on Github external
def test_binary_mode(self):
        with tempfile.NamedTemporaryFile() as watched_file:
            watched_file.write(b"foo")
            watched_file.flush()
            os.utime(watched_file.name, (1, 1))

            watcher = file_watcher.FileWatcher(
                watched_file.name, parser=lambda f: f.read(), binary=True
            )

            # mock.mock_open does not appear to work with binary read_data, you
            # end up getting the following error:
            # TypeError: 'str' does not support the buffer interface
            # So all we are really checking is the arguments passed to `open`.
            with mock.patch.object(
                builtins, "open", mock.mock_open(read_data="foo"), create=True
            ) as open_mock:
                watcher.get_data()
            open_mock.assert_called_once_with(
                watched_file.name, encoding=None, mode="rb", newline=None
            )

            watcher = file_watcher.FileWatcher(
github reddit / baseplate.py / tests / unit / lib / file_watcher_tests.py View on Github external
def test_cant_set_encoding_and_binary(self):
        mock_parser = mock.Mock()
        with self.assertRaises(TypeError):
            file_watcher.FileWatcher("/does_not_exist", mock_parser, binary=True, encoding="utf-8")
github reddit / baseplate.py / tests / unit / lib / file_watcher_tests.py View on Github external
with tempfile.NamedTemporaryFile() as watched_file:
            watched_file.write(b"A\nB\rC\r\nD")
            watched_file.flush()
            os.utime(watched_file.name, (1, 1))

            watcher = file_watcher.FileWatcher(watched_file.name, parser=parser)
            self.assertEqual(watcher.get_data(), ["A\n", "B\n", "C\n", "D"])

            watcher = file_watcher.FileWatcher(watched_file.name, parser=parser, newline="")
            self.assertEqual(watcher.get_data(), ["A\n", "B\r", "C\r\n", "D"])

            watcher = file_watcher.FileWatcher(watched_file.name, parser=parser, newline="\n")
            self.assertEqual(watcher.get_data(), ["A\n", "B\rC\r\n", "D"])

            watcher = file_watcher.FileWatcher(watched_file.name, parser=parser, newline="\r")
            self.assertEqual(watcher.get_data(), ["A\nB\r", "C\r", "\nD"])

            watcher = file_watcher.FileWatcher(watched_file.name, parser=parser, newline="\r\n")
            self.assertEqual(watcher.get_data(), ["A\nB\rC\r\n", "D"])

            watcher = file_watcher.FileWatcher(watched_file.name, parser=parser, newline="foo")
            with self.assertRaises(file_watcher.WatchedFileNotAvailableError):
                watcher.get_data()
github reddit / baseplate.py / tests / unit / lib / file_watcher_tests.py View on Github external
def test_cant_set_newline_and_binary(self):
        mock_parser = mock.Mock()
        with self.assertRaises(TypeError):
            file_watcher.FileWatcher("/does_not_exist", mock_parser, binary=True, newline="\n")
github reddit / baseplate.py / tests / unit / clients / kombu_tests.py View on Github external
def secrets():
    mock_filewatcher = mock.Mock(spec=FileWatcher)
    mock_filewatcher.get_data.return_value = {
        "secrets": {
            "secret/rabbitmq/account": {
                "type": "credential",
                "username": "spez",
                "password": "hunter2",
            }
        },
        "vault": {"token": "test", "url": "http://vault.example.com:8200/"},
    }
    secrets = SecretsStore("/secrets")
    secrets._filewatcher = mock_filewatcher
    return secrets
github reddit / baseplate.py / tests / unit / lib / secrets / store_tests.py View on Github external
def setUp(self):
        self.mock_filewatcher = mock.Mock(spec=FileWatcher)
        self.store = SecretsStore("/whatever")
        self.store._filewatcher = self.mock_filewatcher
github reddit / baseplate.py / tests / unit / lib / file_watcher_tests.py View on Github external
watched_file.flush()
            os.utime(watched_file.name, (1, 1))

            watcher = file_watcher.FileWatcher(
                watched_file.name, parser=lambda f: f.read(), binary=False
            )

            with mock.patch.object(
                builtins, "open", mock.mock_open(read_data="foo"), create=True
            ) as open_mock:
                watcher.get_data()
            open_mock.assert_called_once_with(
                watched_file.name, encoding=None, mode="r", newline=None
            )

            watcher = file_watcher.FileWatcher(
                watched_file.name, parser=lambda f: f.read(), binary=False
            )
            self.assertEqual(watcher.get_data(), "foo")
github reddit / baseplate.py / tests / unit / lib / file_watcher_tests.py View on Github external
def test_file_failing_to_parse_on_first_load_raises(self):
        with tempfile.NamedTemporaryFile() as watched_file:
            watched_file.write(b"!")
            watched_file.flush()
            os.utime(watched_file.name, (1, 1))
            watcher = file_watcher.FileWatcher(watched_file.name, parser=json.load)

            with self.assertRaises(file_watcher.WatchedFileNotAvailableError):
                watcher.get_data()