Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_optional_without_default_value(self):
r = RequestSchema(querystring={optional('param'): Use(int)})
assert r.validate_querystring({}) == {}
def test_dict_keys():
assert Schema({str: int}).validate({"a": 1, "b": 2}) == {"a": 1, "b": 2}
with SE:
Schema({str: int}).validate({1: 1, "b": 2})
assert Schema({Use(str): Use(int)}).validate({1: 3.14, 3.14: 1}) == {"1": 3, "3.14": 1}
# builds a schema from a list of nodes and events
# :param project: validation type
# :param structure: list of nodes (another list) and events
# :return: schema
sub_list = []
for item in list_or_dict(structure):
sub_list.append(subscription_schema(project, item, level=level+1))
sub_list.append(event_schema(level))
node_schema = {
'node': {
'id': Use(type(project._id), error='node_id{}'.format(level)),
'title': Use(type(project.title), error='node_title{}'.format(level)),
'url': Use(type(project.url), error='node_{}'.format(level))
},
'kind': And(str, Use(lambda s: s in ('node', 'folder'),
error="kind didn't match node or folder {}".format(level))),
'nodeType': Use(lambda s: s in ('project', 'component'), error='nodeType not project or component'),
'category': Use(lambda s: s in settings.NODE_CATEGORY_MAP, error='category not in settings.NODE_CATEGORY_MAP'),
'permissions': {
'view': Use(lambda s: s in (True, False), error='view permissions is not True/False')
},
'children': sub_list
}
if level == 0:
return Schema([node_schema])
return node_schema
Optional('availabilityZone', default=''): And(str, Regex(r'^[a-z0-9-]+$')),
Optional('subnetId', default=''): And(str, Regex(r'^subnet-[a-z0-9]+$')),
'instanceType': And(str, And(is_valid_instance_type, error='Invalid instance type.')),
Optional('onDemandInstance', default=False): bool,
Optional('amiName', default=None): And(str, len, Regex(r'^[\w\(\)\[\]\s\.\/\'@-]{3,128}$')),
Optional('amiId', default=None): And(str, len, Regex(r'^ami-[a-z0-9]+$')),
Optional('rootVolumeSize', default=0): And(Or(int, str), Use(str),
Regex(r'^\d+$', error='Incorrect value for "rootVolumeSize".'),
Use(int),
And(lambda x: x > 0,
error='"rootVolumeSize" should be greater than 0 or should '
'not be specified.'),
),
Optional('maxPrice', default=0): And(Or(float, int, str), Use(str),
Regex(r'^\d+(\.\d{1,6})?$', error='Incorrect value for "maxPrice".'),
Use(float),
And(lambda x: x > 0, error='"maxPrice" should be greater than 0 or '
'should not be specified.'),
),
Optional('managedPolicyArns', default=[]): [str],
}
volumes_checks = [
And(lambda x: len(x) < 12, error='Maximum 11 volumes are supported at the moment.'),
]
instance_checks = [
And(lambda x: not (x['onDemandInstance'] and x['maxPrice']),
error='"maxPrice" cannot be specified for on-demand instances'),
And(lambda x: not (x['amiName'] and x['amiId']),
error='"amiName" and "amiId" parameters cannot be used together'),
]
def validator(fn):
"""
Decorate a function for use as a validator with `schema`_
.. _schema: https://github.com/keleshev/schema
"""
return schema.Use(fn)
def validate(kwargs):
schema.Schema({
'username': basestring,
'hostname': basestring,
'password': basestring,
'dummy_vlan': schema.And(schema.Use(int),
lambda v: 0 < v and v <= 4093,
schema.Use(str)),
}).validate(kwargs)
def string_is_log_level(option):
"""Check if a string is a valid log level"""
return And(Use(str.lower), Or('debug', 'info', 'warn', 'warning', 'error',
'critical', 'fatal')).validate(option)
upper_day : string formatted datetime
"""
lucene.getVMEnv().attachCurrentThread()
yesterday = datetime.utcnow().date() - timedelta(days=1)
yesterday = yesterday.strftime('%Y-%m-%d')
q_top_article_schema = Schema({
Optional('upper_day', default=yesterday):
And(Regex('^\d{4}-\d{2}-\d{2}$'),
Use(dateutil.parser.parse),
error='Invalid date, shoul be yyyy-mm-dd format'),
Optional('most_recent', default=True):
And(str,
Use(lambda s: s.lower()), lambda s: s in ('true', 'false'),
Use(lambda s: True if s == 'true' else False)),
Optional('exclude_tags', default=[]):
And(Use(eval), error='Invalid exclude_tags input format'),
})
q_kwargs = copy_req_args(request.args)
try:
q_kwargs = q_top_article_schema.validate(q_kwargs)
df = db_query_top_articles(engine, **q_kwargs)
if len(df) == 0:
raise APINoResultError('No top article found!')
response = dict(
status='OK',
num_of_entries=len(df),
articles=flask.json.loads(df.to_json(**TO_JSON_KWARGS)))
except SchemaError as e:
response = dict(status='ERROR', error=str(e))
except APINoResultError as e:
response = dict(status='No result error', error=str(e))
except Exception as e:
'influxdb': {
'host': And(str, len),
'port': And(int, port_range),
Optional('username'): And(str, len),
Optional('password'): And(str, len),
'database': And(str, len),
Optional('ssl'): bool
},
Optional("base64decode"): {
'source': And(str, len, Use(str_or_jsonPath)),
'target': And(str, len)
},
'points': [{
'measurement': And(str, len, Use(str_or_jsonPath)),
'topic': And(str, len),
Optional('httpcontent'): {str: And(str, len, Use(str_or_jsonPath))},
Optional('fields'): Or({str: And(str, len, Use(str_or_jsonPath))}, And(str, len, Use(str_or_jsonPath))),
Optional('tags'): {str: And(str, len, Use(str_or_jsonPath))},
Optional('database'): And(str, len)
}]
})
def load_config(config_filename):
with open(config_filename, 'r') as f:
config = yaml.load(f, Loader=yaml.FullLoader)
try:
return schema.validate(config)
except SchemaError as e:
# Better error format
error = str(e).splitlines()
del error[1]
def get_instance_parameters_schema(instance_parameters: dict, default_volume_type: str,
instance_checks: list = None, volumes_checks: list = None):
if not instance_checks:
instance_checks = []
if not volumes_checks:
volumes_checks = []
schema = Schema(And(
{
**instance_parameters,
Optional('dockerDataRoot', default=''): And(
str,
And(os.path.isabs, error='Use an absolute path when specifying a Docker data root directory'),
Use(lambda x: x.rstrip('/')),
),
Optional('volumes', default=[]): And(
[{
'name': And(Or(int, str), Use(str), Regex(r'^[\w-]+$')),
Optional('type', default=default_volume_type): str,
Optional('parameters', default={}): {
Optional('mountDir', default=''): And(
str,
And(os.path.isabs, error='Use absolute paths for mount directories'),
Use(lambda x: x.rstrip('/'))
),
And(str, Regex(r'^[\w]+$')): object,
},
}],
And(lambda x: is_unique_value(x, 'name'), error='Each instance volume must have a unique name.'),
And(lambda x: not has_prefix([(volume['parameters']['mountDir'] + '/') for volume in x