Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def api_eor (self, command):
tokens = formated(command).split(' ')[2:]
number = len(tokens)
if not number:
return Family(1,1)
if number != 2:
return False
afi = AFI.fromString(tokens[0])
if afi == AFI.undefined:
return False
safi = SAFI.fromString(tokens[1])
if safi == SAFI.undefined:
return False
return Family(afi,safi)
# -- Reading length of next-hop
len_nh = ord(data[offset])
offset += 1
rd = 0
# check next-hope size
if afi == AFI.ipv4:
if safi in (SAFI.unicast,SAFI.multicast):
if len_nh != 4:
raise Notify(3,0,'invalid ipv4 unicast/multicast next-hop length %d expected 4' % len_nh)
elif safi in (SAFI.mpls_vpn,):
if len_nh != 12:
raise Notify(3,0,'invalid ipv4 mpls_vpn next-hop length %d expected 12' % len_nh)
rd = 8
elif safi in (SAFI.flow_ip,):
if len_nh not in (0,4):
raise Notify(3,0,'invalid ipv4 flow_ip next-hop length %d expected 4' % len_nh)
elif safi in (SAFI.flow_vpn,):
if len_nh not in (0,4):
raise Notify(3,0,'invalid ipv4 flow_vpn next-hop length %d expected 4' % len_nh)
elif afi == AFI.ipv6:
if safi in (SAFI.unicast,):
if len_nh not in (16,32):
raise Notify(3,0,'invalid ipv6 unicast next-hop length %d expected 16 or 32' % len_nh)
elif safi in (SAFI.mpls_vpn,):
if len_nh not in (24,40):
raise Notify(3,0,'invalid ipv6 mpls_vpn next-hop length %d expected 24 or 40' % len_nh)
rd = 8
elif safi in (SAFI.flow_ip,):
if len_nh not in (0,16,32):
raise Notify(3,0,'invalid ipv6 flow_ip next-hop length %d expected 0, 16 or 32' % len_nh)
try:
safi = tokens.pop(0)
except IndexError:
self._error = 'missing family safi'
if self.debug: raise Exception() # noqa
return False
if safi == 'unicast':
scope[-1]['families'].append((AFI(AFI.ipv4),SAFI(SAFI.unicast)))
elif safi == 'multicast':
scope[-1]['families'].append((AFI(AFI.ipv4),SAFI(SAFI.multicast)))
elif safi == 'nlri-mpls':
scope[-1]['families'].append((AFI(AFI.ipv4),SAFI(SAFI.nlri_mpls)))
elif safi == 'mpls-vpn':
scope[-1]['families'].append((AFI(AFI.ipv4),SAFI(SAFI.mpls_vpn)))
elif safi in ('flow'):
scope[-1]['families'].append((AFI(AFI.ipv4),SAFI(SAFI.flow_ip)))
elif safi == 'flow-vpn':
scope[-1]['families'].append((AFI(AFI.ipv4),SAFI(SAFI.flow_vpn)))
else:
return False
return True
def _set_family_ipv4 (self, scope, tokens):
if self._family:
self._error = 'ipv4 can not be used with all or minimal'
if self.debug: raise Exception() # noqa
return False
try:
safi = tokens.pop(0)
except IndexError:
self._error = 'missing family safi'
if self.debug: raise Exception() # noqa
return False
if safi == 'unicast':
scope[-1]['families'].append((AFI(AFI.ipv4),SAFI(SAFI.unicast)))
elif safi == 'multicast':
scope[-1]['families'].append((AFI(AFI.ipv4),SAFI(SAFI.multicast)))
elif safi == 'nlri-mpls':
scope[-1]['families'].append((AFI(AFI.ipv4),SAFI(SAFI.nlri_mpls)))
elif safi == 'mpls-vpn':
scope[-1]['families'].append((AFI(AFI.ipv4),SAFI(SAFI.mpls_vpn)))
elif safi in ('flow'):
scope[-1]['families'].append((AFI(AFI.ipv4),SAFI(SAFI.flow_ip)))
elif safi == 'flow-vpn':
scope[-1]['families'].append((AFI(AFI.ipv4),SAFI(SAFI.flow_vpn)))
else:
return False
return True
def register_nlri (klass):
new = (AFI.create(afi),SAFI.create(safi))
if new in cls.registered_nlri:
if force:
# python has a bug and does not allow %ld/%ld (pypy does)
cls.registered_nlri['%s/%s' % new] = klass
else:
raise RuntimeError('Tried to register %s/%s twice' % new)
else:
# python has a bug and does not allow %ld/%ld (pypy does)
cls.registered_nlri['%s/%s' % new] = klass
cls.registered_families.append(new)
return klass
return register_nlri
nlri.nexthop = NextHop.unpack(nexthop)
nlris.append(nlri)
else:
nlri,left = NLRI.unpack_nlri(afi,safi,data,IN.ANNOUNCED,addpath)
# allow unpack_nlri to return none for "treat as withdraw" controlled by NLRI.unpack_nlri
if nlri:
nlris.append(nlri)
if left == data:
raise RuntimeError("sub-calls should consume data")
data = left
return cls(afi,safi,nlris)
EMPTY_MPRNLRI = MPRNLRI(AFI.undefined,SAFI.undefined,[])
def _single_line (self, scope, name, tokens, valid):
command = tokens[0]
if valid and command not in valid:
self._error = 'invalid keyword "%s"' % command
if self.debug: raise Exception() # noqa
return False
elif name == 'route':
if command in self._dispatch_route_cfg:
if command in ('rd','route-distinguisher'):
return self._dispatch_route_cfg[command](scope,tokens[1:],SAFI.mpls_vpn)
else:
return self._dispatch_route_cfg[command](scope,tokens[1:])
elif name == 'l2vpn':
if command in self._dispatch_vpls_cfg:
if command in ('rd','route-distinguisher'):
return self._dispatch_vpls_cfg[command](scope,tokens[1:],SAFI.vpls)
else:
return self._dispatch_vpls_cfg[command](scope,tokens[1:])
elif name == 'flow-route':
if command in self._dispatch_flow_cfg:
if command in ('rd','route-distinguisher'):
return self._dispatch_flow_cfg[command](scope,tokens[1:],SAFI.flow_vpn)
else:
return self._dispatch_flow_cfg[command](scope,tokens[1:])
from exabgp.bgp.message import OUT
from exabgp.bgp.message.update.nlri.nlri import NLRI
from exabgp.bgp.message.update.nlri.cidr import CIDR
from exabgp.bgp.message.update.nlri.label import Label
from exabgp.bgp.message.update.nlri.qualifier import RouteDistinguisher
from exabgp.bgp.message.update.nlri.qualifier import PathInfo
from exabgp.protocol.ip import IP
from exabgp.protocol.ip import NoNextHop
# ====================================================== IPVPN
# RFC 4364
@NLRI.register(AFI.ipv4,SAFI.mpls_vpn)
@NLRI.register(AFI.ipv6,SAFI.mpls_vpn)
class IPVPN (Label):
__slots__ = ['rd']
def __init__ (self, afi, safi, action=OUT.UNSET):
Label.__init__(self, afi, safi, action)
self.rd = RouteDistinguisher.NORD
def feedback (self, action):
if self.nexthop is None and action == OUT.ANNOUNCE:
return 'ip-vpn nlri next-hop missing'
return ''
@classmethod
def new (cls, afi, safi, packed, mask, labels, rd, nexthop=None, action=OUT.UNSET):
instance = cls(afi,safi,action)