Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_init(self):
proc = PostmasterProcess(-123)
self.assertTrue(proc.is_single_user)
@patch.object(PostmasterProcess, 'from_pidfile')
def test_is_running(self, mock_frompidfile):
# Cached postmaster running
mock_postmaster = self.p._postmaster_proc = MockPostmaster()
self.assertEqual(self.p.is_running(), mock_postmaster)
# Cached postmaster not running, no postmaster running
mock_postmaster.is_running.return_value = False
mock_frompidfile.return_value = None
self.assertEqual(self.p.is_running(), None)
self.assertEqual(self.p._postmaster_proc, None)
# No cached postmaster, postmaster running
mock_frompidfile.return_value = mock_postmaster2 = MockPostmaster()
self.assertEqual(self.p.is_running(), mock_postmaster2)
self.assertEqual(self.p._postmaster_proc, mock_postmaster2)
def test_start(self, mock_is_running, mock_wait_for_port_open, mock_wait_for_startup, mock_popen):
mock_is_running.return_value = MockPostmaster()
mock_wait_for_port_open.return_value = True
mock_wait_for_startup.return_value = False
mock_popen.return_value.stdout.readline.return_value = '123'
self.assertTrue(self.p.start())
mock_is_running.return_value = None
mock_postmaster = MockPostmaster()
with patch.object(PostmasterProcess, 'start', return_value=mock_postmaster):
pg_conf = os.path.join(self.p.data_dir, 'postgresql.conf')
open(pg_conf, 'w').close()
self.assertFalse(self.p.start(task=CriticalTask()))
with open(pg_conf) as f:
lines = f.readlines()
self.assertTrue("f.oo = 'bar'\n" in lines)
mock_wait_for_startup.return_value = None
self.assertFalse(self.p.start(10))
self.assertIsNone(self.p.start())
mock_wait_for_port_open.return_value = False
self.assertFalse(self.p.start())
task = CriticalTask()
task.cancel()
def test_from_pidfile(self, mock_read, mock_init, mock_create_time):
mock_init.side_effect = psutil.NoSuchProcess(123)
mock_read.return_value = {}
self.assertIsNone(PostmasterProcess.from_pidfile(''))
mock_read.return_value = {"pid": "foo"}
self.assertIsNone(PostmasterProcess.from_pidfile(''))
mock_read.return_value = {"pid": "123"}
self.assertIsNone(PostmasterProcess.from_pidfile(''))
mock_init.side_effect = None
with patch.object(psutil.Process, 'pid', 123), \
patch.object(psutil.Process, 'ppid', return_value=124), \
patch('os.getpid', return_value=125) as mock_ospid, \
patch('os.getppid', return_value=126):
self.assertIsNotNone(PostmasterProcess.from_pidfile(''))
mock_create_time.return_value = 100000
mock_read.return_value = {"pid": "123", "start_time": "200000"}
self.assertIsNone(PostmasterProcess.from_pidfile(''))
mock_read.return_value = {"pid": "123", "start_time": "foobar"}
pass
cmdline = [pgcommand, '-D', data_dir, '--config-file={}'.format(conf)] + options
logger.debug("Starting postgres: %s", " ".join(cmdline))
parent_conn, child_conn = multiprocessing.Pipe(False)
proc = multiprocessing.Process(target=pg_ctl_start, args=(child_conn, cmdline, env))
proc.start()
pid = parent_conn.recv()
proc.join()
if pid is None:
return
logger.info('postmaster pid=%s', pid)
# TODO: In an extremely unlikely case, the process could have exited and the pid reassigned. The start
# initiation time is not accurate enough to compare to create time as start time would also likely
# be relatively close. We need the subprocess extract pid+start_time in a race free manner.
return PostmasterProcess.from_pid(pid)
def is_running(self):
"""Returns PostmasterProcess if one is running on the data directory or None. If most recently seen process
is running updates the cached process based on pid file."""
if self._postmaster_proc:
if self._postmaster_proc.is_running():
return self._postmaster_proc
self._postmaster_proc = None
# we noticed that postgres was restarted, force syncing of replication
self.slots_handler.schedule()
self._postmaster_proc = PostmasterProcess.from_pidfile(self._data_dir)
return self._postmaster_proc
def _from_pidfile(cls, data_dir):
postmaster_pid = PostmasterProcess._read_postmaster_pidfile(data_dir)
try:
pid = int(postmaster_pid.get('pid', 0))
if pid:
proc = cls(pid)
proc._postmaster_pid = postmaster_pid
return proc
except ValueError:
pass
def __init__(self, pid):
self.is_single_user = False
if pid < 0:
pid = -pid
self.is_single_user = True
super(PostmasterProcess, self).__init__(pid)
self.config.resolve_connection_addresses()
self.config.replace_pg_hba()
self.config.replace_pg_ident()
options = ['--{0}={1}'.format(p, configuration[p]) for p in self.config.CMDLINE_OPTIONS
if p in configuration and p != 'wal_keep_segments']
if self.cancellable.is_cancelled:
return False
with task or null_context():
if task and task.is_cancelled:
logger.info("PostgreSQL start cancelled.")
return False
self._postmaster_proc = PostmasterProcess.start(self.pgcommand('postgres'),
self._data_dir,
self.config.postgresql_conf,
options)
if task:
task.complete(self._postmaster_proc)
start_timeout = timeout
if not start_timeout:
try:
start_timeout = float(self.config.get('pg_ctl_timeout', 60))
except ValueError:
start_timeout = 60
# We want postmaster to open ports before we continue
if not self._postmaster_proc or not self.wait_for_port_open(self._postmaster_proc, start_timeout):