Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def main(argv):
valid = set()
invalid_avro = set()
invalid_json = set()
if len(argv) < 3:
print "Give me an avro schema file and a whitespace-separated list of json files to validate against it."
else:
schema = parse(open(argv[1]).read())
for arg in argv[2:]:
try:
json = loads(open(arg, 'r').read())
if validate(schema, json):
valid.add(arg)
else:
invalid_avro.add(arg)
except ValueError:
invalid_json.add(arg)
print 'Valid files:\n\t' + '\n\t'.join(valid)
print 'Invalid avro:\n\t' + '\n\t'.join(invalid_avro)
print 'Invalid json:\n\t' + '\n\t'.join(invalid_json)
def schema_for_item(value, array_schema, tool):
if not array_schema:
return None
opts = array_schema.get('items', [])
if not opts:
return None
if not isinstance(opts, list):
opts = [opts]
opts = [schema_by_name(opt, tool) for opt in opts]
if len(opts) == 1:
return opts[0]
for opt in opts:
sch = avro.schema.parse(json.dumps(opt))
if avro.io.validate(sch, value):
return opt
return None
# are handled properly (see self.UNSET and above binary handling).
if schema_type == 'array':
return (isinstance(datum, list) and
all(self._validate(schema.items, d) for d in datum))
elif schema_type == 'map':
return (isinstance(datum, dict) and
all(isinstance(k, basestring) for k in datum.keys()) and
all(self._validate(schema.values, v) for v in datum.values()))
elif schema_type in ['union', 'error_union']:
return any(self._validate_union(s, datum) for s in schema.schemas)
elif schema_type in ['record', 'error', 'request']:
return (isinstance(datum, dict) and
all(self._validate(f.type, datum.get(f.name, self.UNSET))
for f in schema.fields))
return validate(schema, datum)
def _is_valid_default_value(cls, schema_type, value):
"""Verify whether given value is a valid default value for the
specified schema type. It assumes the given schema_type is a valid
Avro schema. It utilizes `avro.io.validate` function from the Python
Avro library for the validation.
"""
schema_obj = schema.make_avsc_object(schema_type)
field_type = (schema_obj.schemas[0]
if isinstance(schema_obj, schema.UnionSchema)
else schema_obj)
return io.validate(field_type, value)
def _validate(self, schema, datum):
"""
Validate a datum matches a schema.
:param schema: Avro schema to match against the `datum`
:param datum: Data to validate
"""
return validate(schema, datum)
def schema_for_item(value, array_schema, tool):
if not array_schema:
return None
opts = array_schema.get('items', [])
if not opts:
return None
if not isinstance(opts, list):
opts = [opts]
opts = [schema_by_name(opt, tool) for opt in opts]
if len(opts) == 1:
return opts[0]
for opt in opts:
sch = avro.schema.parse(json.dumps(opt))
if avro.io.validate(sch, value):
return opt
return None