Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_choice_default():
"""Ensure that choices with a default work correctly."""
class Dumb(Struct):
one = String
class ChoiceDefaultStruct(Struct):
a = Default(Choice("IntOrDumb", [Dumb, Integer]), 28)
b = Integer
class OtherStruct(Struct):
first = ChoiceDefaultStruct
second = String
v = OtherStruct(second="hello")
assert v.check()
assert json.loads(v.json_dumps()) == {"second": "hello"}
w = v(first=ChoiceDefaultStruct())
assert w.check()
assert json.loads(w.json_dumps()) == {'first': {'a': 28}, 'second': 'hello'}
x = v(first=ChoiceDefaultStruct(a=296, b=36))
assert x.check()
assert json.loads(x.json_dumps()) == {'first': {'a': 296, 'b': 36},
'second': 'hello'}
y = v(first=ChoiceDefaultStruct(a=Dumb(one="Oops"), b=37))
def test_simple_config():
job = convert(HELLO_WORLD)
assert job.name() == String('hello_world')
# properly converted defaults
assert job.cluster() == String('smf1-test')
assert job.instances() == Integer(1)
assert job.cron_schedule() == String('')
assert job.cron_collision_policy() == String('KILL_EXISTING')
assert job.daemon() == Integer(0) # Boolean(False)
assert job.constraints() == Map(String, String)({})
assert job.production() == Integer(0)
assert job.priority() == Integer(0)
assert job.max_task_failures() == Integer(1)
assert job.health_check_interval_secs() == Integer(30)
assert job.task() == Task(
name = job.name(),
resources = Resources(cpu = 0.1, ram = 64 * 1048576, disk = 64 * 1048576),
processes = [Process(name = job.name(), cmdline = 'echo hello world')],
)
def test_repr():
class Dumb(Struct):
one = String
class ChoiceDefaultStruct(Struct):
a = Default(Choice("IntOrDumb", [Dumb, Integer]), 28)
b = Integer
class OtherStruct(Struct):
first = ChoiceDefaultStruct
second = String
C = Choice([String, List(Integer)])
testvalone = C("hello")
testvaltwo = C([1, 2, 3])
assert repr(testvalone) == "Choice_String_IntegerList('hello')"
assert repr(testvaltwo) == "Choice_String_IntegerList([1, 2, 3])"
def test_choice_interpolation():
IntFloat = Choice((Integer, Float))
one = IntFloat('{{abc}}')
two = IntFloat('{{a}}{{b}}')
one_int = one.bind(abc=34)
assert isinstance(one_int.interpolate()[0], Integer)
assert one_int.check().ok()
one_fl = one.bind(abc=123.354)
assert isinstance(one_fl.interpolate()[0], Float)
assert one_fl.check().ok()
one_str = one.bind(abc="def")
assert not one_str.check().ok()
assert two.interpolate()[1] == [Ref.from_address('a'), Ref.from_address('b')]
two_one = two.bind(a=12, b=23)
assert two_one.check().ok()
assert two_one.unwrap() == Integer(1223)
two_two = two.bind(a=12, b=".34")
assert two_two.check().ok()
assert two_two.unwrap() == Float(12.34)
def test_config_with_options():
hwc = copy.deepcopy(HELLO_WORLD)
hwc['task']['production'] = True
hwc['task']['priority'] = 200
hwc['task']['daemon'] = True
hwc['task']['health_check_interval_secs'] = 30
hwc['cron_collision_policy'] = 'RUN_OVERLAP'
hwc['constraints'] = {
'dedicated': 'your_mom',
'cpu': 'x86_64'
}
job = convert(hwc)
assert job.production() == Integer(1)
assert job.priority() == Integer(200)
assert job.daemon() == Integer(1)
assert job.cron_collision_policy() == String('RUN_OVERLAP')
assert job.health_check_interval_secs() == Integer(30)
assert 'cpu' in job.constraints()
assert 'dedicated' in job.constraints()
assert job.constraints()['cpu'] == String('x86_64')
assert job.constraints()['dedicated'] == String('your_mom')
IntStr = Choice("IntStrFloat", (Integer, String))
one = IntStr(123)
two = IntStr("123")
three = IntStr("abc")
assert one == IntStr(123)
assert two == IntStr("123")
assert three == IntStr("abc")
assert one == two
assert not one == three
assert one != three
assert not one != two
assert one.unwrap() == Integer(123)
assert two.unwrap() == Integer(123)
assert three.unwrap() == String("abc")
from twitter.aurora.common.auth import make_session_key, SessionKeyError
from twitter.aurora.common.cluster import Cluster
from gen.twitter.aurora import AuroraAdmin
from gen.twitter.aurora.constants import CURRENT_API_VERSION
from thrift.protocol import TBinaryProtocol
from thrift.transport import TSocket, TTransport
from pystachio import Boolean, Default, Integer, String
class SchedulerClientTrait(Cluster.Trait):
zk = String
zk_port = Default(Integer, 2181)
scheduler_zk_path = String
scheduler_uri = String
proxy_url = String
auth_mechanism = Default(String, 'UNAUTHENTICATED')
use_thrift_ssl = Default(Boolean, False)
class SchedulerClient(object):
THRIFT_RETRIES = 5
RETRY_TIMEOUT = Amount(1, Time.SECONDS)
class CouldNotConnect(Exception): pass
# TODO(wickman) Refactor per MESOS-3005 into two separate classes with separate traits:
# ZookeeperClientTrait
# DirectClientTrait
def process_over_failure_limit(proc):
return (proc.max_failures() == Integer(0) or proc.max_failures() >= Integer(100))
for proc in job.task().processes():
user = String
class Resources(Struct):
cpu = Required(Float)
ram = Required(Integer)
disk = Required(Integer)
gpu = Default(Integer, 0)
class Constraint(Struct):
order = List(String)
class RotatePolicy(Struct):
log_size = Default(Integer, 100*MB)
backups = Default(Integer, 5)
LoggerDestination = Enum('file', 'console', 'both', 'none')
LoggerMode = Enum('standard', 'rotate')
class Logger(Struct):
destination = Default(LoggerDestination, LoggerDestination('file'))
mode = Default(LoggerMode, LoggerMode('standard'))
rotate = RotatePolicy
class Process(Struct):
String,
Struct
)
# Define constants for resources
BYTES = 1
KB = 1024 * BYTES
MB = 1024 * KB
GB = 1024 * MB
TB = 1024 * GB
class ThermosContext(Struct):
# TODO(wickman) Move the underlying replacement mechanism to %port% replacements
ports = Map(String, Integer)
# TODO(wickman) Move the underlying replacement mechanism to %task_id%
task_id = String
# TODO(wickman) Move underlying mechanism to %user%
user = String
class Resources(Struct):
cpu = Required(Float)
ram = Required(Integer)
disk = Required(Integer)
class Constraint(Struct):
order = List(String)