Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def genAugmentedStatements(ctx, augments, definitions, paths):
new_definitions = definitions.copy()
path = '/'
for augment in augments:
#apis = OrderedDict()
apis = paths
parent_augments = augment.i_target_node.top.search('augment')
genAugmentedStatements(ctx, parent_augments, new_definitions, apis)
for _def in new_definitions:
if _def not in definitions:
definitions[_def] = new_definitions[_def]
chs = [ch for ch in augment.i_target_node.top.i_children
if ch.keyword in (statements.data_definition_keywords + ['rpc','notification'])]
gen_apis(chs, path, apis, new_definitions)
for api in apis:
path = '/'
api_match = False
if api.split('/')[-3] == augment.arg.split('/')[-1].split(':')[1]:
api_match = True
if api.split('/')[2:-3]:
path +='/'.join(api.split('/')[2:-3])+'/'
elif api.split('/')[-2] == augment.arg.split('/')[-1].split(':')[1]:
api_match = True
if api.split('/')[2:-2]:
path +='/'.join(api.split('/')[2:-2])+'/'
if api_match:
for child in augment.i_target_node.i_children:
def emit(self, ctx, modules, fd):
module = modules[0]
# cannot do XSD unless everything is ok for our module
for (epos, etag, eargs) in ctx.errors:
if (epos.top == module and
error.is_error(error.err_level(etag))):
raise error.EmitError("XSD translation needs a valid module")
# we also need to have all other modules found
for pre in module.i_prefixes:
(modname, revision) = module.i_prefixes[pre]
mod = statements.modulename_to_module(module, modname, revision)
if mod == None:
raise error.EmitError("cannot find module %s, needed by XSD"
" translation" % modname)
emit_xsd(ctx, module, fd)
def _parse_statement(self, parent):
# modification: when the --keep-comments flag is provided,
# we would like to see if a statement is a comment, and if so
# treat it differently than we treat keywords further down
if self.ctx.keep_comments:
cmt, is_line_end, is_multi_line = self.tokenizer.get_comment(self.last_line)
if cmt is not None:
stmt = statements.new_statement(self.top,
parent,
self.pos,
'_comment',
cmt)
stmt.is_line_end = is_line_end
stmt.is_multi_line = is_multi_line
return stmt
keywd = self.tokenizer.get_keyword()
# check for argument
tok = self.tokenizer.peek()
if tok == '{' or tok == ';':
arg = None
argstrs = None
else:
argstrs = self.tokenizer.get_strings()
# Build the identities and typedefs (these are added to the class_map which
# is globally referenced).
build_identities(ctx, defn["identity"])
build_typedefs(ctx, defn["typedef"])
# Iterate through the tree which pyang has built, solely for the modules
# that pyang was asked to build
for modname in pyang_called_modules:
module = module_d[modname]
mods = [module]
for i in module.search("include"):
subm = ctx.get_module(i.arg)
if subm is not None:
mods.append(subm)
for m in mods:
children = [ch for ch in module.i_children if ch.keyword in statements.data_definition_keywords]
get_children(ctx, fd, children, m, m)
if ctx.opts.build_rpcs:
rpcs = [ch for ch in module.i_children if ch.keyword == "rpc"]
# Build RPCs specifically under the module name, since this
# can be used as a proxy for the namespace.
if len(rpcs):
get_children(
ctx, fd, rpcs, module, module, register_paths=False, path="/%s_rpc" % (safe_name(module.arg))
)
if ctx.opts.build_notifications:
notifications = [ch for ch in module.i_children if ch.keyword == "notification"]
# Build notifications specifically under the module name,
# since this can be used as a proxy for the namespace.
if len(notifications):
fd.write(' xmlns:ncn="urn:ietf:params:xml:ns:' \
'netconf:notification:1.0"\n')
fd.write(' targetNamespace="%s"\n' % module.i_xsd_namespace)
fd.write(' xmlns="%s"\n' % module.i_xsd_namespace)
fd.write(' elementFormDefault="qualified"\n')
fd.write(' attributeFormDefault="unqualified"\n')
if len(module.search('revision')) > 0:
fd.write(' version="%s"\n' %
module.search('revision')[0].arg)
fd.write(' xml:lang="en"')
handled_modules = []
for m in mods:
for pre in m.i_prefixes:
(modname, revision) = m.i_prefixes[pre]
mod = statements.modulename_to_module(m, modname, revision)
if mod in handled_modules or mod.keyword == 'submodule':
continue
handled_modules.append(mod)
if pre in ['xs', 'yin', 'nc', 'ncn']:
# someone uses one of our prefixes
# generate a new prefix for that module
i = 0
pre = "p" + str(i)
while pre in prefixes:
i = i + 1
pre = "p" + str(i)
prefixes.append(pre)
mod.i_xsd_prefix = pre
if mod == module:
uri = mod.i_xsd_namespace
else:
for module in modules:
module_annotations = module.search(('opencpe-annotations', 'annotate'))
for annotation in module_annotations:
annotations[annotation.arg] = annotation
actions = []
#action-fields will be collected from everywhere, but should be positioned in the ocpe-actiontable.yang file
for module in modules:
module_actions = module.search(('opencpe-actiontable', 'action-field'))
actions += module_actions
# MAIN LOOP
for module in modules:
chs = [ch for ch in module.i_children if ch.keyword in statements.data_definition_keywords]
if len(chs) > 0:
print_children(chs, module, typedefs, groupings, augments, deviations, annotations, fd)
#now write the header file
fd = open('p_table.h', 'w')
make_license(fd)
fd.write("""
#ifndef __P_TABLE_H
#define __P_TABLE_H
#define dm__system 1
\n""")
for newLine in header_collector:
fd.write( newLine )
fd.write("\n\n#endif\n")
""" Emits the complete JSON Schema specification for the yang file."""
model = OrderedDict()
if ctx.opts.schema_path is not None:
global NAMESPACE
NAMESPACE += ctx.opts.schema_path
# Go through all modules and extend the model.
for module in modules:
global MODEL_ID
MODEL_ID = module.arg
print_header(model, module)
# extract children which contain data definition keywords
chs = [ch for ch in module.i_children
if ch.keyword in statements.data_definition_keywords]
typdefs = [module.i_typedefs[element] for element in module.i_typedefs]
models = list(module.i_groupings.values())
# The attribute definitions are processed and stored in the "typedefs" data structure for further use.
gen_typedefs(typdefs)
for element in typdefs:
models.append(element)
# Print the JSON Schema definitions of the Yang groupings.
gen_model(models, model)
# If a model at runtime was dependant of another model which had been encounter yet, it is generated 'a posteriori'.
if pending_models:
gen_model(pending_models, model)
lambda ctx, s: v_chk_recommended_substmt(ctx, s))
if ctx.opts.lint_ensure_hyphenated_names:
statements.add_validation_fun(
'grammar', ['*'],
lambda ctx, s: v_chk_hyphenated_names(ctx, s))
statements.add_validation_fun(
'grammar', ['namespace'],
lambda ctx, s: v_chk_namespace(ctx, s, self.namespace_prefixes))
statements.add_validation_fun(
'grammar', ['module', 'submodule'],
lambda ctx, s: v_chk_module_name(ctx, s, self.modulename_prefixes))
statements.add_validation_fun(
'strict', ['include'],
lambda ctx, s: v_chk_include(ctx, s))
statements.add_validation_fun(
'strict', ['module'],
lambda ctx, s: v_chk_mandatory_top_level(ctx, s))
# register our error codes
error.add_error_code(
'LINT_EXPLICIT_DEFAULT', 4,
'RFC 8407: 4.4: '
+ 'statement "%s" is given with its default value "%s"')
error.add_error_code(
'LINT_MISSING_REQUIRED_SUBSTMT', 3,
'%s: '
+ 'statement "%s" must have a "%s" substatement')
e.remove_child(arg_elem)
elif arg_is_elem == False:
arg = e.find_attribute(argname)
if arg is None:
error.err_add(self.ctx.errors, e.pos,
'MISSING_ARGUMENT_ATTRIBUTE', (argname, keywdstr))
else:
e.remove_attribute(argname)
else:
# no arguments
arg = None
self.check_attr(e.pos, e.attrs)
if parent is not None:
stmt = statements.new_statement(self.top, parent, e.pos, keywd, arg)
parent.substmts.append(stmt)
else:
stmt = self.top
for ch in e.children:
self.create_statement(ch, stmt)
# Add our special argument syntax checkers
syntax.add_arg_type('smi-oid', _chk_smi_oid)
syntax.add_arg_type('smi-max-access', _chk_smi_max_access)
# Register that we handle extensions from the YANG module 'ietf-yang-smiv2'
grammar.register_extension_module(smi_module_name)
# Register the special grammar
for stmt, occurence, (arg, rules), add_to_stmts in smi_stmts:
grammar.add_stmt((smi_module_name, stmt), (arg, rules))
grammar.add_to_stmts_rules(add_to_stmts,
[((smi_module_name, stmt), occurence)])
# Add validation step
statements.add_validation_phase('smi_set_oid', after='inherit_properties')
statements.add_validation_fun('smi_set_oid',
[(smi_module_name, 'oid')],
v_set_oid)
statements.add_validation_fun('smi_set_oid',
[(smi_module_name, 'subid')],
v_set_subid)
# Register special error codes
error.add_error_code('SMIv2_BAD_SUBID', 1,
"subid needs an oid or subid statement in an ancestor")
error.add_error_code('SMIv2_SUBID_AND_OID', 1,
"subid and oid cannot be given at the same time")