Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
one = IntStr(123)
two = IntStr("123")
three = IntStr("abc")
assert one == IntStr(123)
assert two == IntStr("123")
assert three == IntStr("abc")
assert one == two
assert not one == three
assert one != three
assert not one != two
assert one.unwrap() == Integer(123)
assert two.unwrap() == Integer(123)
assert three.unwrap() == String("abc")
def test_choice_default():
"""Ensure that choices with a default work correctly."""
class Dumb(Struct):
one = String
class ChoiceDefaultStruct(Struct):
a = Default(Choice("IntOrDumb", [Dumb, Integer]), 28)
b = Integer
class OtherStruct(Struct):
first = ChoiceDefaultStruct
second = String
v = OtherStruct(second="hello")
assert v.check()
assert json.loads(v.json_dumps()) == {"second": "hello"}
w = v(first=ChoiceDefaultStruct())
assert w.check()
assert json.loads(w.json_dumps()) == {'first': {'a': 28}, 'second': 'hello'}
x = v(first=ChoiceDefaultStruct(a=296, b=36))
assert x.check()
assert json.loads(x.json_dumps()) == {'first': {'a': 296, 'b': 36},
'second': 'hello'}
y = v(first=ChoiceDefaultStruct(a=Dumb(one="Oops"), b=37))
assert y.check()
assert json.loads(y.json_dumps()) == {'first': {'a': {'one': 'Oops'}, 'b': 37},
'second': 'hello'}
def test_config_with_other_replacements():
hwc = copy.deepcopy(HELLO_WORLD)
hwc['task']['start_command'] = 'echo %shard_id% %task_id% %port:http%'
job = convert(hwc)
main_process = [proc for proc in job.task().processes() if proc.name() == job.name()]
assert len(main_process) == 1
main_process = main_process[0]
assert main_process.cmdline() == String(
"echo {{mesos.instance}} {{thermos.task_id}} {{thermos.ports[http]}}")
from twitter.common.zookeeper.kazoo_client import TwitterKazooClient
from twitter.common.zookeeper.serverset import ServerSet
from gen.apache.aurora import AuroraAdmin
from gen.apache.aurora.constants import CURRENT_API_VERSION
from thrift.protocol import TBinaryProtocol
from thrift.transport import TSocket, TTransport
from pystachio import Boolean, Default, Integer, String
class SchedulerClientTrait(Cluster.Trait):
zk = String
zk_port = Default(Integer, 2181)
scheduler_zk_path = String
scheduler_uri = String
proxy_url = String
auth_mechanism = Default(String, 'UNAUTHENTICATED')
use_thrift_ssl = Default(Boolean, False)
class SchedulerClient(object):
THRIFT_RETRIES = 5
RETRY_TIMEOUT = Amount(1, Time.SECONDS)
class CouldNotConnect(Exception): pass
# TODO(wickman) Refactor per MESOS-3005 into two separate classes with separate traits:
# ZookeeperClientTrait
# DirectClientTrait
@classmethod
def get(cls, cluster, **kwargs):
from twitter.aurora.common.auth import make_session_key, SessionKeyError
from twitter.aurora.common.cluster import Cluster
from gen.twitter.aurora import AuroraAdmin
from gen.twitter.aurora.constants import CURRENT_API_VERSION
from thrift.protocol import TBinaryProtocol
from thrift.transport import TSocket, TTransport
from pystachio import Boolean, Default, Integer, String
class SchedulerClientTrait(Cluster.Trait):
zk = String
zk_port = Default(Integer, 2181)
scheduler_zk_path = String
scheduler_uri = String
proxy_url = String
auth_mechanism = Default(String, 'UNAUTHENTICATED')
use_thrift_ssl = Default(Boolean, False)
class SchedulerClient(object):
THRIFT_RETRIES = 5
RETRY_TIMEOUT = Amount(1, Time.SECONDS)
class CouldNotConnect(Exception): pass
# TODO(wickman) Refactor per MESOS-3005 into two separate classes with separate traits:
# ZookeeperClientTrait
# DirectClientTrait
@classmethod
from twitter.aurora.common.cluster import Cluster
from gen.twitter.aurora import AuroraAdmin
from gen.twitter.aurora.constants import CURRENT_API_VERSION
from thrift.protocol import TBinaryProtocol
from thrift.transport import TSocket, TTransport
from pystachio import Boolean, Default, Integer, String
class SchedulerClientTrait(Cluster.Trait):
zk = String
zk_port = Default(Integer, 2181)
scheduler_zk_path = String
scheduler_uri = String
proxy_url = String
auth_mechanism = Default(String, 'UNAUTHENTICATED')
use_thrift_ssl = Default(Boolean, False)
class SchedulerClient(object):
THRIFT_RETRIES = 5
RETRY_TIMEOUT = Amount(1, Time.SECONDS)
class CouldNotConnect(Exception): pass
# TODO(wickman) Refactor per MESOS-3005 into two separate classes with separate traits:
# ZookeeperClientTrait
# DirectClientTrait
@classmethod
def get(cls, cluster, **kwargs):
LoggerDestination = Enum('file', 'console', 'both', 'none')
LoggerMode = Enum('standard', 'rotate')
class Logger(Struct):
destination = Default(LoggerDestination, LoggerDestination('file'))
mode = Default(LoggerMode, LoggerMode('standard'))
rotate = RotatePolicy
class Process(Struct):
cmdline = Required(String)
name = Required(String)
# This is currently unused but reserved for future use by Thermos.
resources = Resources
# optionals
max_failures = Default(Integer, 1) # maximum number of failed process runs
# before process is failed.
daemon = Default(Boolean, False)
ephemeral = Default(Boolean, False)
min_duration = Default(Integer, 5) # integer seconds
final = Default(Boolean, False) # if this process should be a finalizing process
# that should always be run after regular processes
logger = Default(Logger, Empty)
class Task(Struct):