Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"Field `%s` contains undefined reference to `%s`"
% (field, link))
elif isinstance(link, CommentedSeq):
errors = []
for n, i in enumerate(link):
try:
link[n] = self.validate_link(field, i, docid, all_doc_ids)
except validate.ValidationException as v:
errors.append(v)
if bool(errors):
raise validate.ValidationException(
"\n".join([six.text_type(e) for e in errors]))
elif isinstance(link, CommentedMap):
self.validate_links(link, docid, all_doc_ids)
else:
raise validate.ValidationException(
"`%s` field is %s, expected string, list, or a dict."
% (field, type(link).__name__))
return link
if "inherited_from" not in f:
f["inherited_from"] = ex
exfields.extend(basetype.get("fields", []))
elif t["type"] == "enum":
exsym.extend(basetype.get("symbols", []))
if t["type"] == "record":
t = copy.copy(t)
exfields.extend(t.get("fields", []))
t["fields"] = exfields
fieldnames = set() # type: Set[Text]
for field in t["fields"]:
if field["name"] in fieldnames:
raise validate.ValidationException(
"Field name %s appears twice in %s" % (field["name"], t["name"]))
else:
fieldnames.add(field["name"])
elif t["type"] == "enum":
t = copy.copy(t)
exsym.extend(t.get("symbols", []))
t["symbol"] = exsym
types[t["name"]] = t
n.append(t)
ex_types = {}
for t in n:
ex_types[t["name"]] = t
break
except validate.ValidationException as e:
errors.append(sl.makeError(u"tried `%s` but\n%s" % (
name, validate.indent(str(e), nolead=False))))
objerr = sl.makeError(u"Invalid")
for ident in loader.identifiers:
if ident in item:
objerr = sl.makeError(
u"Object `%s` is not valid because"
% (relname(item[ident])))
break
anyerrors.append(u"%s\n%s" %
(objerr, validate.indent(bullets(errors, "- "))))
if len(anyerrors) > 0:
raise validate.ValidationException(
strip_dup_lineno(bullets(anyerrors, "* ")))
def validate_doc(schema_names, # type: Names
doc, # type: Union[Dict[Text, Any], List[Dict[Text, Any]], Text, None]
loader, # type: Loader
strict, # type: bool
source=None
):
# type: (...) -> None
has_root = False
for r in schema_names.names.values():
if ((hasattr(r, 'get_prop') and r.get_prop(u"documentRoot")) or (
u"documentRoot" in r.props)):
has_root = True
break
if not has_root:
raise validate.ValidationException(
"No document roots defined in the schema")
if isinstance(doc, list):
validate_doc = doc
elif isinstance(doc, CommentedMap):
validate_doc = CommentedSeq([doc])
validate_doc.lc.add_kv_line_col(0, [doc.lc.line, doc.lc.col])
validate_doc.lc.filename = doc.lc.filename
else:
raise validate.ValidationException("Document must be dict or list")
roots = []
for r in schema_names.names.values():
if ((hasattr(r, "get_prop") and r.get_prop(u"documentRoot")) or (
r.props.get(u"documentRoot"))):
roots.append(r)
def fetch(self, url, inject_ids=True): # type: (Text, bool) -> Any
if url in self.idx:
return self.idx[url]
try:
text = self.fetch_text(url)
if isinstance(text, bytes):
textIO = StringIO(text.decode('utf-8'))
else:
textIO = StringIO(text)
textIO.name = url # type: ignore
result = yaml.round_trip_load(textIO)
add_lc_filename(result, url)
except yaml.parser.ParserError as e:
raise validate.ValidationException("Syntax error %s" % (e))
if (isinstance(result, CommentedMap) and inject_ids
and bool(self.identifiers)):
for identifier in self.identifiers:
if identifier not in result:
result[identifier] = url
self.idx[self.expand_url(result[identifier], url)] = result
else:
self.idx[url] = result
return result
document.lc.data[
j - 1] = document.lc.data[j - llen]
for item in l:
document.insert(i, item)
document.lc.data[i] = lc
i += 1
else:
document[i] = l
i += 1
else:
document[i], _ = loader.resolve_all(
val, base_url, file_base=file_base, checklinks=False)
i += 1
except validate.ValidationException as v:
_logger.warn("failed", exc_info=True)
raise validate.ValidationException("(%s) (%s) Validation error in position %i:\n%s" % (
id(loader), file_base, i, validate.indent(six.text_type(v))))
for identifer in loader.identity_links:
if identifer in metadata:
if isinstance(metadata[identifer], (str, six.text_type)):
metadata[identifer] = loader.expand_url(
metadata[identifer], base_url, scoped_id=True)
loader.idx[metadata[identifer]] = document
if checklinks:
all_doc_ids={} # type: Dict[Text, Text]
self.validate_links(document, u"", all_doc_ids)
return document, metadata
except validate.ValidationException as v:
raise validate.ValidationException(strip_dup_lineno(str(v)))
validationErrors = u""
try:
document_loader.validate_links(data, u"", {})
except validate.ValidationException as v:
validationErrors = six.text_type(v) + "\n"
try:
validate_doc(avsc_names, data, document_loader, strict, source=source)
except validate.ValidationException as v:
validationErrors += six.text_type(v)
if validationErrors != u"":
raise validate.ValidationException(validationErrors)
return data, metadata
and "$import" not in idmapFieldValue
and "$include" not in idmapFieldValue):
ls = CommentedSeq()
for k in sorted(idmapFieldValue.keys()):
val = idmapFieldValue[k]
v = None # type: Optional[CommentedMap]
if not isinstance(val, CommentedMap):
if idmapField in loader.mapPredicate:
v = CommentedMap(
((loader.mapPredicate[idmapField], val),))
v.lc.add_kv_line_col(
loader.mapPredicate[idmapField],
document[idmapField].lc.data[k])
v.lc.filename = document.lc.filename
else:
raise validate.ValidationException(
"mapSubject '%s' value '%s' is not a dict"
"and does not have a mapPredicate", k, v)
else:
v = val
v[loader.idmap[idmapField]] = k
v.lc.add_kv_line_col(loader.idmap[idmapField],
document[idmapField].lc.data[k])
v.lc.filename = document.lc.filename
ls.lc.add_kv_line_col(
len(ls), document[idmapField].lc.data[k])
ls.lc.filename = document.lc.filename
ls.append(v)
u"documentRoot" in r.props)):
has_root = True
break
if not has_root:
raise validate.ValidationException(
"No document roots defined in the schema")
if isinstance(doc, list):
validate_doc = doc
elif isinstance(doc, CommentedMap):
validate_doc = CommentedSeq([doc])
validate_doc.lc.add_kv_line_col(0, [doc.lc.line, doc.lc.col])
validate_doc.lc.filename = doc.lc.filename
else:
raise validate.ValidationException("Document must be dict or list")
roots = []
for r in schema_names.names.values():
if ((hasattr(r, "get_prop") and r.get_prop(u"documentRoot")) or (
r.props.get(u"documentRoot"))):
roots.append(r)
anyerrors = []
for pos, item in enumerate(validate_doc):
sl = SourceLine(validate_doc, pos, six.text_type)
success = False
for r in roots:
success = validate.validate_ex(
r, item, loader.identifiers, strict,
foreign_properties=loader.foreign_properties, raise_ex=False)
if success: