Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
#!/usr/bin/env python
# encoding: utf-8
"""
flow.py
Created by Thomas Mangin on 2010-01-14.
Copyright (c) 2009-2015 Exa Networks. All rights reserved.
"""
import unittest
from exabgp.configuration.environment import environment
env = environment.setup('')
from exabgp.bgp.message.update.nlri.flow import *
from exabgp.protocol.ip.inet import *
from exabgp.bgp.message.update.attribute.community import *
class TestFlow (unittest.TestCase):
def setUp(self):
pass
def test_rule (self):
components = {
'destination': Destination("192.0.2.0",24),
'source' : Source("10.1.2.0",24),
'anyport_1' : AnyPort(NumericOperator.EQ,25),#!/usr/bin/env python
# encoding: utf-8
"""
protocol.py
Created by Thomas Mangin on 2009-08-27.
Copyright (c) 2009-2015 Exa Networks. All rights reserved.
"""
import unittest
from exabgp.configuration.environment import environment
env = environment.setup('')
from exabgp.bgp.message.open import Open
from exabgp.bgp.message.open import Capabilities
from exabgp.bgp.message.open import new_Open
from exabgp.bgp.message.notification import Notification
from exabgp.bgp.message.keepalive import KeepAlive
from exabgp.bgp.message.keepalive import new_KeepAlive
from exabgp.bgp.message.update import Update
from exabgp.bgp.message.update import Attributes
from exabgp.rib.table import Table
from exabgp.rib.delta import Delta
from exabgp.reactor.protocol import Protocol
from exabgp.bgp.neighbor import Neighbor
from StringIO import StringIOdef api_eor (self, command):
tokens = formated(command).split(' ')[2:]
number = len(tokens)
if not number:
return Family(1,1)
if number != 2:
return False
afi = AFI.fromString(tokens[0])
if afi == AFI.undefined:
return False
safi = SAFI.fromString(tokens[1])
if safi == SAFI.undefined:
return False
return Family(afi,safi)def fromElements (cls, prefix, suffix):
try:
if '.' in prefix:
data = [character(0),character(1)]
data.extend([character(int(_)) for _ in prefix.split('.')])
data.extend([character(suffix >> 8),character(suffix & 0xFF)])
distinguisher = concat_bytes_i(data)
else:
number = int(prefix)
if number < pow(2,16) and suffix < pow(2,32):
distinguisher = character(0) + character(0) + pack('!H',number) + pack('!L',suffix)
elif number < pow(2,32) and suffix < pow(2,16):
distinguisher = character(0) + character(2) + pack('!L',number) + pack('!H',suffix)
else:
raise ValueError('invalid route-distinguisher %s' % number)
return cls(distinguisher)
except ValueError:
raise ValueError('invalid route-distinguisher %s:%s' % (prefix,suffix))def create (cls,string,afi):
if afi == AFI.ipv4:
if string is cls.codes:
klass = cls(cls.codes[string],32)
klass.maximum = 32
return klass
maximum = 32
elif afi == AFI.ipv6:
if string in cls.codes:
raise ValueError('IPv4 mask used with an IPv6 address')
maximum = 128
else:
raise ValueError('invalid address family')
if not string.isdigit():
raise ValueError('invalid netmask %s' % string)
value = long(string)ip,mask = ip.split('/')
mask = int(mask)
except ValueError:
mask = 32
try:
if 'rd' in tokens:
klass = MPLS
elif 'route-distinguisher' in tokens:
klass = MPLS
elif 'label' in tokens:
klass = MPLS
else:
klass = INET
# nexthop must be false and its str return nothing .. an empty string does that
update = Change(klass(afi=IP.toafi(ip),safi=IP.tosafi(ip),packed=IP.pton(ip),mask=mask,nexthop=None,action=OUT.ANNOUNCE),Attributes())
except ValueError:
return self.error.set(self.syntax)
if 'announce' not in self.scope.content[-1]:
self.scope.content[-1]['announce'] = []
self.scope.content[-1]['announce'].append(update)
return Truedef prefix (tokeniser):
# XXX: could raise
ip = tokeniser()
try:
ip,mask = ip.split('/')
except ValueError:
mask = '32'
if ':' in ip:
mask = '128'
tokeniser.afi = IP.toafi(ip)
return IPRange.create(ip,mask)self.peer = peer
self.neighbor = peer.neighbor
self.negotiated = Negotiated(self.neighbor)
self.connection = None
if self.neighbor.connect:
self.port = self.neighbor.connect
elif os.environ.get('exabgp.tcp.port','').isdigit():
self.port = int(os.environ.get('exabgp.tcp.port'))
elif os.environ.get('exabgp_tcp_port','').isdigit():
self.port = int(os.environ.get('exabgp_tcp_port'))
else:
self.port = 179
from exabgp.configuration.environment import environment
self.log_routes = peer.neighbor.adj_rib_in or environment.settings().log.routesdef _newASN (self, value):
if value.count('.'):
high,low = value.split('.',1)
as_number = (int(high) << 16) + int(low)
else:
as_number = int(value)
return ASN(as_number)def _newASN (self,value):
if value.count('.'):
high,low = value.split('.',1)
asn = (int(high) << 16) + int(low)
else:
asn = int(value)
return ASN(asn)