Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def emit(self, ctx, modules, fd):
"""Main control function.
"""
for epos, etag, eargs in ctx.errors:
if error.is_error(error.err_level(etag)):
raise error.EmitError("JTOX plugin needs a valid module")
tree = {}
mods = {}
annots = {}
for m,p in unique_prefixes(ctx).items():
mods[m.i_modulename] = [p, m.search_one("namespace").arg]
for module in modules:
for ann in module.search(("ietf-yang-metadata", "annotation")):
typ = ann.search_one("type")
annots[module.arg + ":" + ann.arg] = (
"string" if typ is None else self.base_type(ann, typ))
for module in modules:
self.process_children(module, tree, None)
json.dump({"modules": mods, "tree": tree, "annotations": annots}, fd)
def emit(self, ctx, modules, fd):
"""Main control function.
Set up the top-level parts of the stylesheet, then process
recursively all nodes in all data trees, and finally emit the
serialized stylesheet.
"""
for (epos, etag, eargs) in ctx.errors:
if error.is_error(error.err_level(etag)):
raise error.EmitError("JSONXSL plugin needs a valid module")
self.real_prefix = unique_prefixes(ctx)
self.top_names = []
for m in modules:
self.top_names.extend([c.arg for c in m.i_children if
c.keyword not in ("rpc", "notification")])
tree = ET.ElementTree(ss)
ET.SubElement(ss, "output", method="text")
xsltdir = os.environ.get("PYANG_XSLT_DIR",
"/usr/local/share/yang/xslt")
ET.SubElement(ss, "include", href=xsltdir + "/jsonxsl-templates.xsl")
ET.SubElement(ss, "strip-space", elements="*")
nsmap = ET.SubElement(ss, "template", name="nsuri-to-module")
ET.SubElement(nsmap, "param", name="uri")
choo = ET.SubElement(nsmap, "choose")
for module in self.real_prefix.keys():
def readline(self):
if len(self.lines) == 0:
raise error.Eof
self.buf = self.lines.popleft()
self.pos.line += 1
self.offset = 0
if self.max_line_len is not None:
curlen = len(self.buf)
if curlen >= 1 and self.buf[-1] == '\n':
if curlen >= 2 and self.buf[-2] == '\r':
curlen -= 2
else:
curlen -= 1
if curlen > self.max_line_len:
error.err_add(self.errors, self.pos, 'LONG_LINE',
(curlen, self.max_line_len))
def create_statement(self, e, parent):
if e.ns == yin_namespace:
keywd = e.local_name
try:
(argname, arg_is_elem) = syntax.yin_map[keywd]
except KeyError:
error.err_add(self.ctx.errors, e.pos,
'UNKNOWN_KEYWORD', keywd)
return None
else:
# extension
try:
prefix = self.prefixmap[e.ns]
except KeyError:
error.err_add(self.ctx.errors, e.pos,
'MODULE_NOT_IMPORTED', e.ns)
return None
keywd = (prefix, e.local_name)
keywdstr = util.keyword_to_str(keywd)
if 'no_extensions' in self.extra:
return None
res = self.find_extension(e.ns, e.local_name)
if res is None:
# we got a match
return (spec, canspec)
elif util.is_prefixed(stmt.keyword):
# allow extension statements mixed with these
# set canonical to False in this call to just remove the
# matching stmt from the spec
match_res = _match_stmt(ctx, stmt, (spec[i+1:], canspec), False)
if match_res != None:
return (spec[:i+1] + match_res[0], canspec)
else:
return None
elif keywd == '$cut':
# any non-optional statements left are errors
for (keywd, occurance) in spec[:i]:
if occurance == '1' or occurance == '+':
error.err_add(ctx.errors, stmt.pos, 'UNEXPECTED_KEYWORD_1',
(util.keyword_to_str(stmt.raw_keyword),
util.keyword_to_str(keywd)))
# consume them so we don't report the same error again
spec = spec[i:]
i = 0
elif canonical == True:
if occurance == '1' or occurance == '+':
error.err_add(ctx.errors, stmt.pos,
'UNEXPECTED_KEYWORD_CANONICAL_1',
(util.keyword_to_str(stmt.raw_keyword),
util.keyword_to_str(keywd)))
# consume it so we don't report the same error again
spec = spec[i:]
i = 0
# check next in spec
i += 1
def keyfun(e):
if e[0].ref == filenames[0]:
return 0
else:
return 1
self.ctx.errors.sort(key=lambda e: (e[0].ref, e[0].line))
if len(filenames) > 0:
# first print error for the first filename given
self.ctx.errors.sort(key=keyfun)
error_messages = []
for (epos, etag, eargs) in self.ctx.errors:
elevel = error.err_level(etag)
if error.is_warning(elevel):
logger.warning('%s: %s\n' %
(str(epos), error.err_to_str(etag, eargs)))
else:
err_msg = '%s: %s\n' % (str(epos), error.err_to_str(etag, eargs))
logger.error(err_msg)
error_messages.append(err_msg)
if len(error_messages) > 0:
err_msg = '\n'.join(error_messages)
raise YdkGenException(err_msg)
ctx.max_line_len, ctx.keep_comments,
not ctx.lax_quote_checks)
stmt = self._parse_statement(None)
except error.Abort:
return None
except error.Eof as e:
error.err_add(self.ctx.errors, self.pos, 'EOF_ERROR', ())
return None
try:
# we expect a error.Eof at this point, everything else is an error
self.tokenizer.peek()
except error.Eof:
return stmt
except:
pass
error.err_add(self.ctx.errors, self.pos, 'TRAILING_GARBAGE', ())
return None
if currarg is None:
if argval == '':
argval = None
else:
argval = currarg.replace(old, new)
replace = True
elif spec == '%DELETE':
argval = ''
replace = True
else:
argval += spec
elif spec[0] == '@':
argval += open(spec[1:], 'r').read().rstrip()
return argval, replace
except IOError as e:
raise error.EmitError(str(e))