Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_rename_plus_ipv6(self):
require_user('root')
mtu = 1280 # mtu must be >= 1280 if you plan to use IPv6
txqlen = 2000
nsid = str(uuid4())
ipdb_main = IPDB()
ipdb_test = IPDB(nl=NetNS(nsid))
if1 = uifname()
if2 = uifname()
if3 = uifname()
# create
ipdb_main.create(kind='veth',
ifname=if1,
peer=if2,
mtu=mtu,
txqlen=txqlen).commit()
# move
with ipdb_main.interfaces[if2] as veth:
veth.net_ns_fd = nsid
# set it up
self.called.add('bind')
assert async_cache in (True, False)
assert isinstance(groups, int)
raise NotImplementedError('mock thee')
def clone(self):
self.called.add('clone')
self.mnl = type(self)()
return self.mnl
def close(self):
self.called.add('close')
mock = MockNL()
try:
IPDB(nl=mock)
except NotImplementedError:
pass
assert mock.called == set(('clone', ))
assert mock.mnl.called == set(('bind', 'close'))
def _generate_device_name(prefix):
with pyroute2.IPDB() as ipdb:
for idx in range(100):
name = '{}{}'.format(prefix, idx)
if name not in ipdb.interfaces:
return name
raise RuntimeError('Unable to allocate a {} interface'.format(prefix))
if action == 'RTM_NEWLINK' and \
msg.get_attr('IFLA_IFNAME', '') == p0:
# get corresponding interface -- in the case of
# post-callbacks it is created already
interface = ipdb.interfaces[msg['index']]
# add it as a port to the bridge
ipdb.interfaces[br0].add_port(interface)
try:
ipdb.interfaces[br0].commit()
except Exception:
pass
# create IPDB instance
with IPDB() as ip:
# create watchdogs
wd0 = ip.watchdog(ifname=br0)
wd1 = ip.watchdog(ifname=p0)
# create bridge
ip.create(kind='bridge', ifname=br0).commit()
# wait the bridge to be created
wd0.wait()
# register callback
cuid = ip.register_callback(cb)
# create ports
ip.create(kind='dummy', ifname=p0).commit()
# sleep for interfaces
wd1.wait()
ip.unregister_callback(cuid)
def _create_bridge(ifname, slaves = [], netns = None, mtu = 1500):
check_not_null(ifname, "the interface name cannot be null")
ip = None
if netns is None:
ip = IPDB()
else:
ip = IPDB(nl = NetNS(netns))
ip.create(kind = "bridge", ifname = ifname).commit()
with ip.interfaces[ifname] as bridge:
if len(slaves) > 0:
for intf in slaves:
bridge.add_port(intf)
bridge.set_mtu(int(mtu))
bridge.up()
ip.release()
def _create_bridge(ifname, slaves = [], netns = None, mtu = 1500):
check_not_null(ifname, "the interface name cannot be null")
ip = None
if netns is None:
ip = IPDB()
else:
ip = IPDB(nl = NetNS(netns))
ip.create(kind = "bridge", ifname = ifname).commit()
with ip.interfaces[ifname] as bridge:
if len(slaves) > 0:
for intf in slaves:
bridge.add_port(intf)
bridge.set_mtu(int(mtu))
bridge.up()
ip.release()
def _create_pair(ifname, peer, netns = None, mtu = 1500):
check_not_null(ifname, "the interface name cannot be null")
check_not_null(ifname, "the peer interface name cannot be null")
ip = None
if netns is None:
ip = IPDB()
else:
ip = IPDB(nl = NetNS(netns))
ip.create(ifname = ifname, kind = "veth", peer = peer).commit()
with ip.interfaces[ifname] as veth:
veth.set_mtu(int(mtu))
veth.up()
if peer is not None:
with ip.interfaces[peer] as veth_peer:
veth_peer.set_mtu(int(mtu))
veth_peer.up()
ip.release()
def _is_if_on_bridge(self, ifname):
with pyroute2.IPDB(plugins=('interfaces',)) as ipdb:
try:
for port_id in ipdb.interfaces[self.bridge_name].ports:
port = ipdb.interfaces[port_id]
if port.ifname == ifname:
return True
except KeyError:
return False
return False
def _switch_on(ifname, netns = None):
check_not_null(ifname, "the interface name cannot be null")
ip = None
if netns is None:
ip = IPDB()
else:
ip = IPDB(nl = NetNS(netns))
with ip.interfaces[ifname] as interface:
interface.up()
ip.release()
def setup(self):
'''
Set up daemon runtime state.
'''
try:
self.set_settingup()
self._gre_keys = {}
self._interface_names = set()
self.mesh_links = set()
self.root_ipdb = pyroute2.IPDB() if not self.dry_run else None
except Exception as e:
if self.logger.is_running():
self.logger.exception(e)
raise
for o in self.sorted_overlays:
try:
o.setup(self)
except Exception as e:
if o.logger.is_running():
o.logger.exception(e)
raise
try:
self.ipsec_process = ipsec_process.create(self)