Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
foo = str(uuid4())
bar = str(uuid4())
ifA = uifname()
ifB = uifname()
netnsmod.create(foo)
netnsmod.create(bar)
with IPDB(nl=NetNS(foo)) as ip:
ip.create(ifname=ifA,
kind='veth',
peer={'ifname': ifB,
'net_ns_fd': bar}).commit()
assert ifA in ip.interfaces.keys()
assert ifB not in ip.interfaces.keys()
with IPDB(nl=NetNS(bar)) as ip:
assert ifA not in ip.interfaces.keys()
assert ifB in ip.interfaces.keys()
ip.interfaces[ifB].remove().commit()
assert ifA not in ip.interfaces.keys()
assert ifB not in ip.interfaces.keys()
with IPDB(nl=NetNS(foo)) as ip:
assert ifA not in ip.interfaces.keys()
assert ifB not in ip.interfaces.keys()
netnsmod.remove(foo)
netnsmod.remove(bar)
def test_veth_peer_attrs(self):
require_user('root')
ifA = self.get_ifname()
ns = str(uuid.uuid4())
addr_ext = '06:00:00:00:02:02'
addr_int = '06:00:00:00:02:03'
with IPDB(nl=NetNS(ns)) as nsdb:
veth = self.ip.create(**{'ifname': 'x' + ifA,
'kind': 'veth',
'address': addr_ext,
'peer': {'ifname': ifA,
'address': addr_int,
'net_ns_fd': ns}})
veth.commit()
assert nsdb.interfaces[ifA]['address'] == addr_int
assert self.ip.interfaces['x' + ifA]['address'] == addr_ext
with veth:
veth.remove()
netns.remove(ns)
ip = IPDB()
if args.remove:
# cleanup interfaces
for ifn in range(args.ifnum):
iface = args.ifname+str(ifn)
if iface in ip.interfaces:
with ip.interfaces[iface] as i:
i.remove()
if 'tap'+args.ifname in ip.interfaces:
with ip.interfaces['tap'+args.ifname] as i:
i.remove()
if brige_name in ip.interfaces:
with ip.interfaces[brige_name] as i:
i.remove()
for ifn in range(args.ifnum):
netns = NetNS('node'+str(ifn))
netns.close()
netns.remove()
else:
for ifn in range(args.ifnum):
iface = args.ifname+str(ifn)
if not iface in ip.interfaces:
ip.create(kind='veth', ifname=iface, peer=iface+'.1').commit()
ip.create(kind='tuntap', ifname='tap'+args.ifname, mode='tap').commit()
with ip.create(kind='bridge', ifname=brige_name) as i:
for ifn in range(args.ifnum):
iface = args.ifname+str(ifn)
i.add_port(ip.interfaces[iface])
i.add_port(ip.interfaces['tap'+args.ifname])
if args.ipv4:
def _ns_add_ifc(self, name, ns_ifc, ifc_base_name=None, in_ifc=None,
out_ifc=None, ipaddr=None, macaddr=None, fn=None, cmd=None,
action="ok", disable_ipv6=False):
if name in self.ipdbs:
ns_ipdb = self.ipdbs[name]
else:
try:
nl=NetNS(name)
self.namespaces.append(nl)
except KeyboardInterrupt:
# remove the namespace if it has been created
pyroute2.netns.remove(name)
raise
ns_ipdb = IPDB(nl)
self.ipdbs[nl.netns] = ns_ipdb
if disable_ipv6:
cmd1 = ["sysctl", "-q", "-w",
"net.ipv6.conf.default.disable_ipv6=1"]
nsp = NSPopen(ns_ipdb.nl.netns, cmd1)
nsp.wait(); nsp.release()
ns_ipdb.interfaces.lo.up().commit()
if in_ifc:
in_ifname = in_ifc.ifname
with in_ifc as v:
def get_net_namespace_for_pid(pid):
net_ns_path = net_namespace_path(pid)
if not os.path.isfile(net_ns_path):
raise RuntimeError("No such net namespace %s" % net_ns_path )
create_directory_ignore_exists("/var/run/netns", 0766)
create_symlink_ignore_exists(net_ns_path, "/var/run/netns/%d" % pid)
return NetNS(str(pid))
def _netns_interface_exists(self, mac_address):
with pyroute2.NetNS(consts.AMPHORA_NAMESPACE,
flags=os.O_CREAT) as netns:
for link in netns.get_links():
for attr in link['attrs']:
if attr[0] == 'IFLA_ADDRESS' and attr[1] == mac_address:
return True
return False
def _switch_on(ifname, netns = None):
check_not_null(ifname, "the interface name cannot be null")
ip = None
if netns is None:
ip = IPDB()
else:
ip = IPDB(nl = NetNS(netns))
with ip.interfaces[ifname] as interface:
interface.up()
ip.release()
def _get_networks(self):
networks = dict()
with pyroute2.NetNS(consts.AMPHORA_NAMESPACE) as netns:
for interface in netns.get_links():
interface_name = None
for item in interface['attrs']:
if (item[0] == 'IFLA_IFNAME' and
not item[1].startswith('eth')):
break
if item[0] == 'IFLA_IFNAME':
interface_name = item[1]
if item[0] == 'IFLA_STATS64':
networks[interface_name] = dict(
network_tx=item[1]['tx_bytes'],
network_rx=item[1]['rx_bytes'])
return networks
def _bridge_add_port(master, slaves = [], netns = None):
check_not_null(master, "the master bridge name cannot be null")
ip = None
if netns is None:
ip = IPDB()
else:
ip = IPDB(nl = NetNS(netns))
with ip.interfaces[master] as bridge:
if len(slaves) > 0:
for interface in slaves:
bridge.add_node_port(interface)
ip.release()