Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def validate_ospf3_neighbor_extensive_list(value):
# Pass osp3_neighbor_extensive-entry list of dict in value
if not isinstance(value, list):
raise SchemaTypeError('ospf3-table-entry is not a list')
# Create Arp Entry Schema
entry_schema = Schema({
"activity-timer": str,
"bdr-id": str,
"dr-id": str,
"interface-name": str,
"neighbor-address": str,
Optional("neighbor-adjacency-time"): {
"#text": str
},
"neighbor-id": str,
"neighbor-priority": str,
Optional("neighbor-up-time"): {
"#text": str
},
"options": str,
"ospf-area": str,
"ospf-neighbor-state": str,
def validate_file_information_list(value):
# Pass file-information list as value
if not isinstance(value, list):
raise SchemaTypeError('ospf-interface is not a list')
file_information_schema = Schema({
"file-date": {
Optional("#text"): str,
"@junos:format": str
},
"file-group": str,
"file-links": str,
"file-name": str,
"file-owner": str,
"file-permissions": {
Optional("#text"): str,
"@junos:format": str
},
"file-size": str
})
# Validate each dictionary in list
for item in value:
Optional("ifaf-primary"): bool,
Optional("ifaf-is-default"): bool,
Optional("ifaf-none"): bool,
Optional("ifaf-dest-route-down"): bool,
},
Optional("ifa-local"): str
})
# Validate each dictionary in list
if isinstance(value, dict):
value = [value]
for item in value:
interface_address_schema.validate(item)
return value
af_schema = Schema({
Optional("address-family-flags"): {
Optional("ifff-is-primary"): bool,
Optional("ifff-no-redirects"): bool,
Optional("ifff-none"): bool,
Optional("ifff-sendbcast-pkt-to-re"): bool,
Optional("internal-flags"): bool,
Optional("ifff-primary"): bool,
Optional("ifff-receive-ttl-exceeded"): bool,
Optional("ifff-receive-options"): bool,
Optional("ifff-encapsulation"): str,
},
"address-family-name": str,
Optional("interface-address"): Use(verify_interface_address_list),
Optional("intf-curr-cnt"): str,
Optional("intf-dropcnt"): str,
Optional("intf-unresolved-cnt"): str,
Optional("intf-curr-cnt"): str,
Optional("intf-dropcnt"): str,
Optional("intf-unresolved-cnt"): str,
Optional("generation"): str,
Optional("route-table"): str,
Optional("max-local-cache"): str,
Optional("maximum-labels"): str,
"mtu": str,
Optional("new-hold-limit"): str
})
# Validate each dictionary in list
for item in value:
af_schema.validate(item)
return value
l_i_schema = Schema({
Optional("address-family"): Use(verify_address_family_list),
Optional("encapsulation"): str,
Optional("filter-information"): str,
"if-config-flags": {
"iff-snmp-traps": bool,
"iff-up": bool,
Optional("internal-flags"): str
},
"local-index": str,
Optional("logical-interface-bandwidth"): str,
"name": str,
Optional("policer-overhead"): str,
Optional("snmp-index"): str,
Optional("traffic-statistics"): {
Optional("@junos:style"): str,
"input-packets": str,
def validate_rt_entry_list(value):
if not isinstance(value, list):
raise SchemaTypeError('Route entry is not a list')
rt_entry = Schema({
"rt-destination": str,
"destination-type": str,
"route-reference-count": str,
"nh":{
Optional("to"): str,
"nh-type": str,
"nh-index": str,
"nh-reference-count": str,
Optional("nh-lb-label"): str,
Optional("via"): str,
}
})
for item in value:
rt_entry.validate(item)
return value
def validate_ospf3_neighbor_list(value):
# Pass osp3_neighbor_detail-entry list of dict in value
if not isinstance(value, list):
raise SchemaTypeError('ospf3-table-entry is not a list')
# Create Arp Entry Schema
entry_schema = Schema({
"activity-timer": str,
"interface-name": str,
"neighbor-address": str,
"neighbor-id": str,
"neighbor-priority": str,
"ospf-neighbor-state": str
})
# Validate each dictionary in list
for item in value:
entry_schema.validate(item)
return value
def validate_interface_queue_list(value):
# Pass interface-queue list as value
if not isinstance(value, list):
raise SchemaTypeError('commit-history is not a list')
interface_queue_schema = Schema({
"max-octets-allowed": str,
"max-packets-allowed": str,
"name": str,
"number-of-queue-drops": str,
"octets-in-queue": str,
"packets-in-queue": str
})
# Validate each dictionary in list
for item in value:
interface_queue_schema.validate(item)
return value
def validate_bgp_peer_rib_list(value):
if not isinstance(value, list):
raise SchemaTypeError('bgp-rib of bgp-peer is not a list')
bgp_peer_rib_schema = Schema(
{
'accepted-prefix-count': str,
'active-prefix-count': str,
'name': str,
'received-prefix-count': str,
'suppressed-prefix-count': str
}
)
for item in value:
bgp_peer_rib_schema.validate(item)
return value
def validate_ospf3_database_list(value):
if not isinstance(value, list):
raise SchemaTypeError('ospf-database is not a list')
ospf3_database_schema = Schema({
Optional("@heading"): str,
"advertising-router": str,
"age": str,
"checksum": str,
"lsa-id": str,
"lsa-length": str,
"lsa-type": str,
"ospf3-link-lsa": {
"linklocal-address": str,
"ospf3-options": str,
Optional("ospf3-prefix"): str,
Optional("ospf3-prefix-options"): str,
"prefix-count": str,
"router-priority": str
},
Optional("our-entry"): bool,
if 'source' not in clean_data:
task = get_clean_function(section, clean_json, dev)
else:
task = load_class(clean_data, dev)
# Add the stage schema to the base schema
if hasattr(task, 'schema'):
schema.update({task.__name__: task.schema})
if warning_messages:
log.warning('\nWarning Messages')
log.warning('----------------')
log.warning(' - ' + '\n - '.join(warning_messages))
try:
Schema(base_schema).validate(clean)
except Exception as e:
log.error('\nExceptions')
log.error('----------')
pretty_schema_exception(e)