Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for testing the update_service_address command."""
import unittest
if __name__ == "__main__":
from broker import utils
utils.import_depends()
from broker.brokertest import TestBrokerCommand
class TestUpdateServiceAddress(TestBrokerCommand):
def test_100_remove_eth1(self):
self.noouttest(["update_service_address",
"--hostname", "unittest20.aqd-unittest.ms.com",
"--name", "hostname", "--interfaces", "eth0"])
def test_101_verify_eth1_removed(self):
ip = self.net["zebra_vip"].usable[2]
eth0_ip = self.net["zebra_eth0"].usable[0]
eth1_ip = self.net["zebra_eth1"].usable[0]
command = ["show", "host", "--hostname", "unittest20.aqd-unittest.ms.com"]
out = self.commandtest(command)
self.searchoutput(out, r"Interface: eth0 %s \[boot, default_route\]" %
eth0_ip.mac, command)
self.searchoutput(out, r"Interface: eth1 %s \[default_route\]" %
eth1_ip.mac, command)
# limitations under the License.
"""Module for testing the add personality --copy_from command."""
import unittest
if __name__ == "__main__":
from broker import utils
utils.import_depends()
from broker.brokertest import TestBrokerCommand
from broker.grntest import VerifyGrnsMixin
GRN = "grn:/ms/ei/aquilon/aqd"
class TestCopyPersonality(VerifyGrnsMixin, TestBrokerCommand):
def test_100_restore_utunused_comments(self):
self.noouttest(["update_personality", "--archetype", "aquilon",
"--personality", "utunused/dev",
"--comments", "New personality comments"])
def test_110_add_utunused_clone(self):
command = ["add_personality", "--personality", "utunused-clone/dev",
"--archetype", "aquilon", "--copy_from", "utunused/dev"]
out = self.statustest(command)
self.matchoutput(out, "Personality aquilon/utunused/dev has "
"config_override set", command)
def test_111_verify_utunused_clone(self):
command = ["show_personality", "--personality", "utunused-clone/dev",
"--archetype", "aquilon"]
AUDIT_RAW_RE = re.compile(r'^(?P(?P'
r'(?P\d{4,})-(?P\d{2})-(?P\d{2})) '
r'(?P\d{2}):(?P\d{2}):'
r'(?P\d{2})(?P[-\+]\d{4})) '
r'(?P(?P\w+)'
r'(?:@(?P[\w\.]+))?) '
r'(?P\d+|-) '
r'aq (?P\w+)\b(?P.*)$', re.M)
# Global variables used to pass data between testcases
start_time = None
end_time = None
midpoint = None
class TestAudit(TestBrokerCommand):
""" Test the search_audit features """
def test_100_get_start(self):
""" get the oldest row in the xtn table"""
global start_time
command = ["search_audit", "--command", "all", "--limit", "1",
'--reverse_order']
out = self.commandtest(command)
m = self.searchoutput(out, AUDIT_RAW_RE, command)
start_time = parse(m.group('datetime'))
self.assertTrue(isinstance(start_time, datetime))
def test_101_get_end(self):
""" get the newest row of xtn table, calcluate midpoint """
global midpoint, end_time
command = ["search_audit", "--command", "all", "--limit", "1"]
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
if __name__ == "__main__":
import utils
utils.import_depends()
from broker.brokertest import TestBrokerCommand
class TestParameterConstraints(TestBrokerCommand):
def test_100_archetype_validation(self):
cmd = ["del_parameter_definition", "--archetype", "aquilon",
"--path=espinfo/function"]
out = self.badrequesttest(cmd)
self.matchoutput(out, "Parameter with path espinfo/function used by following and cannot be deleted", cmd)
def test_110_feature_validation(self):
cmd = ["del_parameter_definition", "--feature", "pre_host", "--type=host",
"--path=teststring"]
out = self.badrequesttest(cmd)
self.matchoutput(out, "Parameter with path teststring used by following and cannot be deleted", cmd)
def test_120_update_breaks_json_schema(self):
command = ["del_parameter", "--personality", "utpers-dev",
"--archetype", "aquilon", "--path", "actions/testaction/user"]
import unittest
if __name__ == "__main__":
import utils
utils.import_depends()
from broker.brokertest import TestBrokerCommand
from broker.personalitytest import PersonalityTestMixin
GRN = "grn:/ms/ei/aquilon/aqd"
PPROD = "justify-prod"
QPROD = "justify-qa"
class TestJustification(PersonalityTestMixin, TestBrokerCommand):
def test_100_setup(self):
command = ["add", "feature", "--feature", "testfeature",
"--type", "host", "--grn", GRN, "--visibility", "public",
"--activation", "reboot", "--deactivation", "reboot"]
self.noouttest(command)
command = ["add", "feature", "--feature", "testclusterfeature",
"--type", "host", "--grn", GRN, "--visibility", "public",
"--activation", "reboot", "--deactivation", "reboot"]
self.noouttest(command)
def test_110_host_setup(self):
host_list = ["aquilon91.aqd-unittest.ms.com", "unittest26.aqd-unittest.ms.com"]
for host in host_list:
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for testing parameter support."""
if __name__ == "__main__":
import utils
utils.import_depends()
import unittest2 as unittest
from broker.brokertest import TestBrokerCommand
FEATURE = 'myfeature'
class TestParameterDefinitionFeature(TestBrokerCommand):
def test_00_add_feature(self):
cmd = ["add_feature", "--feature", FEATURE, "--type=host"]
self.noouttest(cmd)
def test_100_add(self):
cmd = ["add_parameter_definition", "--feature", FEATURE, "--type=host",
"--path=testpath", "--value_type=string", "--description=blaah",
"--required", "--default=default"]
self.noouttest(cmd)
def test_110_add_existing(self):
cmd = ["add_parameter_definition", "--feature", FEATURE, "--type=host",
"--path=testpath", "--value_type=string", "--description=blaah",
"--required", "--default=default"]
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for testing the update personality command."""
if __name__ == "__main__":
from broker import utils
utils.import_depends()
import unittest2 as unittest
from broker.brokertest import TestBrokerCommand
from broker.grntest import VerifyGrnsMixin
class TestUpdatePersonality(VerifyGrnsMixin, TestBrokerCommand):
def test_200_invalid_function(self):
""" Verify that the list of built-in functions is restricted """
command = ["update_personality", "--personality", "vulcan-1g-desktop-prod",
"--archetype", "esx_cluster",
"--vmhost_capacity_function", "locals()"]
out = self.badrequesttest(command)
self.matchoutput(out, "name 'locals' is not defined", command)
def test_200_invalid_type(self):
command = ["update_personality", "--personality", "vulcan-1g-desktop-prod",
"--archetype", "esx_cluster",
"--vmhost_capacity_function", "memory - 100"]
out = self.badrequesttest(command)
self.matchoutput(out, "The function should return a dictonary.", command)
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for testing the update parameter command."""
import unittest
if __name__ == "__main__":
from broker import utils
utils.import_depends()
from broker.brokertest import TestBrokerCommand
class TestUpdateParameter(TestBrokerCommand):
def check_match(self, out, expected, command):
out = ' '.join(out.split())
self.matchoutput(out, expected, command)
def test_100_update_existing_leaf_path(self):
self.noouttest(["update_parameter", "--personality", "utpers-dev",
"--archetype", "aquilon",
"--path", "actions/testaction/user",
"--value", "user2"])
command = ["show_parameter", "--personality", "utpers-dev",
"--archetype", "aquilon", "--personality_stage", "next"]
out = self.commandtest(command)
expected = 'testaction: { "command": "/bin/testaction", "user": "user2" }'
self.check_match(out, expected, command)
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for testing the reset advertised status command."""
import re
import unittest
if __name__ == "__main__":
import broker.utils
broker.utils.import_depends()
from broker.brokertest import TestBrokerCommand
class TestResetAdvertisedStatus(TestBrokerCommand):
""" test reset advertised status """
def test_100_aquilon(self):
""" test reset advertised status on various build status """
hostname = "unittest02.one-nyp.ms.com"
# we start off as "ready", so each of these transitions (in order)
# should be valid
for status in ['failed', 'rebuild', 'ready']:
self.successtest(["change_status", "--hostname", hostname,
"--buildstatus", status])
# reset advertised state to build
command = ["reset_advertised_status", "--hostname", hostname]
if status == "ready":
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Module for testing the update personality command."""
if __name__ == "__main__":
from broker import utils
utils.import_depends()
import unittest2 as unittest
from broker.brokertest import TestBrokerCommand
from broker.grntest import VerifyGrnsMixin
class TestUpdatePersonality(VerifyGrnsMixin, TestBrokerCommand):
def test_200_invalid_function(self):
""" Verify that the list of built-in functions is restricted """
command = ["update_personality", "--personality", "vulcan-1g-desktop-prod",
"--archetype", "esx_cluster",
"--vmhost_capacity_function", "locals()"]
out = self.badrequesttest(command)
self.matchoutput(out, "name 'locals' is not defined", command)