Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
data_dir = os.path.join('data', 'test0')
self.p = Postgresql({'name': 'postgresql0', 'scope': 'batman', 'data_dir': data_dir,
'config_dir': data_dir, 'retry_timeout': 10,
'krbsrvname': 'postgres', 'pgpass': os.path.join(data_dir, 'pgpass0'),
'listen': '127.0.0.2, 127.0.0.3:5432', 'connect_address': '127.0.0.2:5432',
'authentication': {'superuser': {'username': 'foo', 'password': 'test'},
'replication': {'username': '', 'password': 'rep-pass'}},
'remove_data_directory_on_rewind_failure': True,
'use_pg_rewind': True, 'pg_ctl_timeout': 'bla',
'parameters': self._PARAMETERS,
'recovery_conf': {'foo': 'bar'},
'pg_hba': ['host all all 0.0.0.0/0 md5'],
'pg_ident': ['krb realm postgres'],
'callbacks': {'on_start': 'true', 'on_stop': 'true', 'on_reload': 'true',
'on_restart': 'true', 'on_role_change': 'true'}})
@patch.object(Postgresql, 'get_major_version', Mock(return_value=120000))
@patch.object(Postgresql, 'is_running', Mock(return_value=True))
def setUp(self):
super(TestPostgresql, self).setUp()
self.p.config.write_postgresql_conf()
self.p._callback_executor = Mock()
@patch.object(Postgresql, 'remove_data_directory', Mock(return_value=True))
@patch.object(Postgresql, 'data_directory_empty', Mock(return_value=False))
@patch.object(Bootstrap, '_post_restore', Mock(side_effect=OSError))
def test_create_replica(self, mock_cancellable_subprocess_call):
self.p.config._config['create_replica_methods'] = ['pgBackRest']
self.p.config._config['pgBackRest'] = {'command': 'pgBackRest', 'keep_data': True, 'no_params': True}
mock_cancellable_subprocess_call.return_value = 0
self.assertEqual(self.b.create_replica(self.leader), 0)
self.p.config._config['create_replica_methods'] = ['basebackup']
self.p.config._config['basebackup'] = [{'max_rate': '100M'}, 'no-sync']
self.assertEqual(self.b.create_replica(self.leader), 0)
self.p.config._config['basebackup'] = [{'max_rate': '100M', 'compress': '9'}]
with patch('patroni.postgresql.bootstrap.logger.error', new_callable=Mock()) as mock_logger:
self.b.create_replica(self.leader)
mock_logger.assert_called_once()
self.b.post_bootstrap({}, task)
self.assertTrue(task.result)
self.b.bootstrap(config)
with patch.object(Postgresql, 'pending_restart', PropertyMock(return_value=True)), \
patch.object(Postgresql, 'restart', Mock()) as mock_restart:
self.b.post_bootstrap({}, task)
mock_restart.assert_called_once()
self.b.bootstrap(config)
self.p.set_state('stopped')
self.p.reload_config({'authentication': {'superuser': {'username': 'p', 'password': 'p'},
'replication': {'username': 'r', 'password': 'r'},
'rewind': {'username': 'rw', 'password': 'rw'}},
'listen': '*', 'retry_timeout': 10, 'parameters': {'wal_level': '', 'hba_file': 'foo'}})
with patch.object(Postgresql, 'major_version', PropertyMock(return_value=110000)), \
patch.object(Postgresql, 'restart', Mock()) as mock_restart:
self.b.post_bootstrap({}, task)
mock_restart.assert_called_once()
@patch.object(Postgresql, 'state', PropertyMock(return_value='running'))
@patch.object(Postgresql, 'data_directory_empty', Mock(return_value=False))
def test_run(self):
self.p.postgresql.set_role('replica')
self.p.sighup_handler()
self.p.ha.dcs.watch = Mock(side_effect=SleepException)
self.p.api.start = Mock()
self.p.logger.start = Mock()
self.p.config._dynamic_configuration = {}
self.assertRaises(SleepException, self.p.run)
with patch('patroni.config.Config.reload_local_configuration', Mock(return_value=False)):
self.p.sighup_handler()
self.assertRaises(SleepException, self.p.run)
with patch('patroni.config.Config.set_dynamic_configuration', Mock(return_value=True)):
self.assertRaises(SleepException, self.p.run)
with patch('patroni.postgresql.Postgresql.data_directory_empty', Mock(return_value=False)):
self.assertRaises(SleepException, self.p.run)
@patch.object(Postgresql, 'get_postgres_role_from_data_directory', Mock(return_value='master'))
def setUp(self):
data_dir = os.path.join('data', 'test0')
self.p = Postgresql({'name': 'postgresql0', 'scope': 'batman', 'data_dir': data_dir,
'config_dir': data_dir, 'retry_timeout': 10,
'krbsrvname': 'postgres', 'pgpass': os.path.join(data_dir, 'pgpass0'),
'listen': '127.0.0.2, 127.0.0.3:5432', 'connect_address': '127.0.0.2:5432',
'authentication': {'superuser': {'username': 'foo', 'password': 'test'},
'replication': {'username': '', 'password': 'rep-pass'}},
'remove_data_directory_on_rewind_failure': True,
'use_pg_rewind': True, 'pg_ctl_timeout': 'bla',
'parameters': self._PARAMETERS,
'recovery_conf': {'foo': 'bar'},
'pg_hba': ['host all all 0.0.0.0/0 md5'],
'pg_ident': ['krb realm postgres'],
'callbacks': {'on_start': 'true', 'on_stop': 'true', 'on_reload': 'true',
'on_restart': 'true', 'on_role_change': 'true'}})
def test__get_local_timeline_lsn(self):
self.r.trigger_check_diverged_lsn()
with patch.object(Postgresql, 'controldata',
Mock(return_value={'Database cluster state': 'shut down in recovery',
'Minimum recovery ending location': '0/0',
"Min recovery ending loc's timeline": '0'})):
self.r.rewind_or_reinitialize_needed_and_possible(self.leader)
with patch.object(Postgresql, 'is_running', Mock(return_value=True)):
with patch.object(MockCursor, 'fetchone', Mock(side_effect=[(False, ), Exception])):
self.r.rewind_or_reinitialize_needed_and_possible(self.leader)
import logging
import os
import shutil
import subprocess
import re
import psutil
from patroni.postgresql import Postgresql
from patroni.postgresql.connection import get_connection_cursor
logger = logging.getLogger(__name__)
class PostgresqlUpgrade(Postgresql):
def adjust_shared_preload_libraries(self, version):
shared_preload_libraries = self.config.get('parameters').get('shared_preload_libraries')
self._old_config_values['shared_preload_libraries'] = shared_preload_libraries
extensions = {
'timescaledb': (9.6, 11),
'pg_cron': (9.5, 12),
'pg_stat_kcache': (9.4, 12),
'pg_partman': (9.4, 12)
}
filtered = []
for value in shared_preload_libraries.split(','):
value = value.strip()
if value not in extensions or version >= extensions[value][0] and version <= extensions[value][1]:
def parse_history(data):
for line in data.split('\n'):
values = line.strip().split('\t')
if len(values) == 3:
try:
values[0] = int(values[0])
values[1] = Postgresql.parse_lsn(values[1])
yield values
except (IndexError, ValueError):
logger.exception('Exception when parsing timeline history line "%s"', values)
def touch_member(config, dcs):
''' Rip-off of the ha.touch_member without inter-class dependencies '''
p = Postgresql(config['postgresql'])
p.set_state('running')
p.set_role('master')
def restapi_connection_string(config):
protocol = 'https' if config.get('certfile') else 'http'
connect_address = config.get('connect_address')
listen = config['listen']
return '{0}://{1}/patroni'.format(protocol, connect_address or listen)
data = {
'conn_url': p.connection_string,
'api_url': restapi_connection_string(config['restapi']),
'state': p.state,
'role': p.role
}