Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_operations_against_stopped_cluster(stopped_cluster):
p = subprocess.run(
['flintrock', 'run-command', stopped_cluster, 'ls'],
stderr=subprocess.PIPE)
expected_error_message = str(
ClusterInvalidState(
attempted_command='run-command',
state='stopped'))
assert p.returncode == 1
assert p.stderr.decode('utf-8').strip() == expected_error_message
p = subprocess.run(
['flintrock', 'copy-file', stopped_cluster, __file__, '/remote/path'],
stderr=subprocess.PIPE)
expected_error_message = str(
ClusterInvalidState(
attempted_command='copy-file',
state='stopped'))
assert p.returncode == 1
assert p.stderr.decode('utf-8').strip() == expected_error_message
def add_slaves_check(self):
if self.state != 'running':
raise ClusterInvalidState(
attempted_command='add-slaves',
state=self.state)
def start_check(self):
if self.state == 'running':
raise NothingToDo("Cluster is already running.")
elif self.state != 'stopped':
raise ClusterInvalidState(
attempted_command='start',
state=self.state)
def copy_file_check(self):
if self.state != 'running':
raise ClusterInvalidState(
attempted_command='copy-file',
state=self.state)
def run_command_check(self):
if self.state != 'running':
raise ClusterInvalidState(
attempted_command='run-command',
state=self.state)
def stop_check(self):
if self.state == 'stopped':
raise NothingToDo("Cluster is already stopped.")
elif self.state != 'running':
raise ClusterInvalidState(
attempted_command='stop',
state=self.state)