Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def init(ndb, connection, mode, rtnl_log, tid):
ret = DBSchema(ndb, connection, mode, rtnl_log, tid)
ret.event_map = {ifinfmsg: [ret.load_ifinfmsg],
ifaddrmsg: [partial(ret.load_netlink, 'addresses')],
ndmsg: [ret.load_ndmsg],
rtmsg: [ret.load_rtmsg],
fibmsg: [partial(ret.load_netlink, 'rules')],
nsinfmsg: [ret.load_nsinfmsg]}
return ret
'br_mcast_membership_intvl': _br_time_check,
'br_mcast_querier_intvl': _br_time_check,
'br_mcast_query_intvl': _br_time_check,
'br_mcast_query_response_intvl': _br_time_check,
'br_mcast_startup_query_intvl': _br_time_check}
_virtual_fields = ['ipdb_scope',
'ipdb_priority',
'vlans',
'ipaddr',
'ports',
'vlan_flags',
'net_ns_fd',
'net_ns_pid']
_fields = [ifinfmsg.nla2name(i[0]) for i in ifinfmsg.nla_map]
for name in ('bridge_slave_data', ):
data = getattr(ifinfmsg.ifinfo, name)
_fields.extend([ifinfmsg.nla2name(i[0]) for i in data.nla_map])
_fields.append('index')
_fields.append('flags')
_fields.append('mask')
_fields.append('change')
_fields.append('kind')
_fields.append('peer')
_fields.append('vlan_id')
_fields.append('vlan_protocol')
_fields.append('bond_mode')
_fields.extend(_get_data_fields())
_fields.extend(_virtual_fields)
def __init__(self, ipdb, mode=None, parent=None, uid=None):
'''
Parameters:
buf = ctypes.create_string_buffer(15000)
buf_len = ctypes.c_ulong(15000)
(ctypes
.windll
.iphlpapi
.GetAdaptersInfo(ctypes.byref(buf),
ctypes.byref(buf_len)))
adapter = IP_ADAPTER_INFO.from_address(ctypes.addressof(buf))
while True:
mac = ':'.join(['%02x' % x for x in adapter.Address][:6])
ifname = ctypes.string_at(ctypes.addressof(adapter.AdapterName))
spec = {'index': adapter.Index,
'attrs': (['IFLA_ADDRESS', mac],
['IFLA_IFNAME', ifname])}
msg = ifinfmsg().load(spec)
del msg['value']
ret['interfaces'].append(msg)
ipaddr = adapter.IpAddressList
while True:
addr = ctypes.string_at(ctypes.addressof(ipaddr.IpAddress))
mask = ctypes.string_at(ctypes.addressof(ipaddr.IpMask))
spec = {'index': adapter.Index,
'family': AF_INET,
'prefixlen': dqn2int(mask),
'attrs': (['IFA_ADDRESS', addr],
['IFA_LOCAL', addr],
['IFA_LABEL', ifname])}
msg = ifaddrmsg().load(spec)
del msg['value']
ret['addresses'].append(msg)
def _get_data_fields():
global supported_kinds
ret = []
for data in supported_kinds:
msg = ifinfmsg.ifinfo.data_map.get(data)
if msg is not None:
if getattr(msg, 'prefix', None) is not None:
ret += [msg.nla2name(i[0]) for i in msg.nla_map]
else:
ret += [ifinfmsg.nla2name(i[0]) for i in msg.nla_map]
return ret
class IPLinkRequest(IPRequest):
'''
Utility class, that converts human-readable dictionary
into RTNL link request.
'''
blacklist = ['carrier',
'carrier_changes',
'info_slave_kind']
# get common ifinfmsg NLAs
common = []
for (key, _) in ifinfmsg.nla_map:
common.append(key)
common.append(key[len(ifinfmsg.prefix):].lower())
common.append('family')
common.append('ifi_type')
common.append('index')
common.append('flags')
common.append('change')
def __init__(self, *argv, **kwarg):
self.deferred = []
self.kind = None
self.specific = {}
self.linkinfo = None
self._info_data = None
IPRequest.__init__(self, *argv, **kwarg)
if 'index' not in self:
self['index'] = 0
'br_ageing_time': _br_time_check,
'br_forward_delay': _br_time_check,
'br_mcast_membership_intvl': _br_time_check,
'br_mcast_querier_intvl': _br_time_check,
'br_mcast_query_intvl': _br_time_check,
'br_mcast_query_response_intvl': _br_time_check,
'br_mcast_startup_query_intvl': _br_time_check}
_virtual_fields = ['ipdb_scope',
'ipdb_priority',
'vlans',
'ipaddr',
'ports',
'vlan_flags',
'net_ns_fd',
'net_ns_pid']
_fields = [ifinfmsg.nla2name(i[0]) for i in ifinfmsg.nla_map]
for name in ('bridge_data', 'bridge_slave_data'):
data = getattr(ifinfmsg.ifinfo, name)
_fields.extend([ifinfmsg.nla2name(i[0]) for i in data.nla_map])
_fields.append('index')
_fields.append('flags')
_fields.append('mask')
_fields.append('change')
_fields.append('kind')
_fields.append('peer')
_fields.append('vlan_id')
_fields.append('bond_mode')
_fields.extend(_get_data_fields())
_fields.extend(_virtual_fields)
def __init__(self, ipdb, mode=None, parent=None, uid=None):
'''
def flush_deferred(self):
# create IFLA_LINKINFO
linkinfo = {'attrs': []}
self.linkinfo = linkinfo['attrs']
dict.__setitem__(self, 'IFLA_LINKINFO', linkinfo)
self.linkinfo.append(['IFLA_INFO_KIND', self.kind])
# load specific NLA names
cls = ifinfmsg.ifinfo.data_map.get(self.kind, None)
if cls is not None:
prefix = cls.prefix or 'IFLA_'
for nla, _ in cls.nla_map:
self.specific[nla] = nla
self.specific[nla[len(prefix):].lower()] = nla
# flush deferred NLAs
for (key, value) in self.deferred:
if not self.set_specific(key, value):
dict.__setitem__(self, key, value)
self.deferred = []
log.warning('this compatibility hack will be removed soon')
return self.vlan_filter(command[5:], **kwarg)
flags_dump = NLM_F_REQUEST | NLM_F_DUMP
flags_req = NLM_F_REQUEST | NLM_F_ACK
flags_create = flags_req | NLM_F_CREATE | NLM_F_EXCL
commands = {'set': (RTM_NEWLINK, flags_req),
'update': (RTM_SETLINK, flags_create),
'add': (RTM_NEWLINK, flags_create),
'del': (RTM_DELLINK, flags_create),
'remove': (RTM_DELLINK, flags_create),
'delete': (RTM_DELLINK, flags_create),
'dump': (RTM_GETLINK, flags_dump),
'get': (RTM_GETLINK, NLM_F_REQUEST)}
msg = ifinfmsg()
# ifinfmsg fields
#
# ifi_family
# ifi_type
# ifi_index
# ifi_flags
# ifi_change
#
msg['family'] = kwarg.pop('family', 0)
lrq = kwarg.pop('kwarg_filter', IPLinkRequest)
(command, msg_flags) = commands.get(command, command)
# index
msg['index'] = kwarg.pop('index', 0)
# flags
flags = kwarg.pop('flags', 0) or 0
# change
def __init__(self, *argv, **kwarg):
kwarg['iclass'] = ifinfmsg
self.event_map = {ifinfmsg: "load_rtnlmsg"}
dict.__setitem__(self, 'flags', 0)
dict.__setitem__(self, 'state', 'unknown')
if isinstance(argv[1], dict) and argv[1].get('create'):
if 'ifname' not in argv[1]:
raise Exception('specify at least ifname')
super(Interface, self).__init__(*argv, **kwarg)
def _flush_mnl(self):
if self.mnl is not None:
# terminate the main loop
for t in range(3):
try:
msg = ifinfmsg()
msg['index'] = 1
msg.reset()
self.mnl.put(msg, RTM_GETLINK)
except Exception as e:
log.error("shutdown error: %s", e)
# Just give up.