Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from pyroute2.common import map_namespace
from pyroute2.netlink import nlmsg
from pyroute2.netlink import nla
FR_ACT_UNSPEC = 0
FR_ACT_TO_TBL = 1
FR_ACT_GOTO = 2
FR_ACT_NOP = 3
FR_ACT_BLACKHOLE = 6
FR_ACT_UNREACHABLE = 7
FR_ACT_PROHIBIT = 8
(FR_ACT_NAMES, FR_ACT_VALUES) = map_namespace('FR_ACT', globals())
class fibmsg(nlmsg):
'''
IP rule message
C structure::
struct fib_rule_hdr {
__u8 family;
__u8 dst_len;
__u8 src_len;
__u8 tos;
__u8 table;
__u8 res1; /* reserved */
__u8 res2; /* reserved */
__u8 action;
__u32 flags;
};
def test_register_policy(self):
self.ip.register_policy(100, nlmsg)
self.ip.register_policy({101: nlmsg})
self.ip.register_policy(102, nlmsg)
assert self.ip.get_policy_map()[100] == nlmsg
assert self.ip.get_policy_map(101)[101] == nlmsg
assert self.ip.get_policy_map([102])[102] == nlmsg
self.ip.unregister_policy(100)
self.ip.unregister_policy([101])
self.ip.unregister_policy({102: nlmsg})
assert 100 not in self.ip.get_policy_map()
assert 101 not in self.ip.get_policy_map()
assert 102 not in self.ip.get_policy_map()
'ect0pkts',
'cepkts')
fields = [(i, 'Q') for i in field_names]
class icmp6_stats(nla):
# ./include/uapi/linux/snmp.h
field_names = ('num',
'inmsgs',
'inerrors',
'outmsgs',
'outerrors',
'csumerrors')
fields = [(i, 'Q') for i in field_names]
class ifinfmsg(ifinfbase, nlmsg):
def decode(self):
nlmsg.decode(self)
if self['flags'] & 1:
self['state'] = 'up'
else:
self['state'] = 'down'
class ifinfveth(ifinfbase, nla):
pass
'tbf': sched_tbf,
'netem': sched_netem,
'fw': cls_fw,
'u32': cls_u32,
'matchall': cls_matchall,
'basic': cls_basic,
'flow': cls_flow,
'ingress': sched_ingress,
'pfifo_fast': sched_pfifo_fast,
'choke': sched_choke,
'drr': sched_drr,
'prio': sched_pfifo_fast,
'cake': sched_cake}
class tcmsg(nlmsg):
prefix = 'TCA_'
fields = (('family', 'B'),
('pad1', 'B'),
('pad2', 'H'),
('index', 'i'),
('handle', 'I'),
('parent', 'I'),
('info', 'I'))
nla_map = (('TCA_UNSPEC', 'none'),
('TCA_KIND', 'asciiz'),
('TCA_OPTIONS', 'get_options'),
('TCA_STATS', 'stats'),
('TCA_XSTATS', 'get_xstats'),
class sock_diag_req(nlmsg):
fields = (('sdiag_family', 'B'),
('sdiag_protocol', 'B'))
UDIAG_SHOW_NAME = 0x01
UDIAG_SHOW_VFS = 0x02
UDIAG_SHOW_PEER = 0x04
UDIAG_SHOW_ICONS = 0x08
UDIAG_SHOW_RQLEN = 0x10
UDIAG_SHOW_MEMINFO = 0x20
class inet_addr_codec(nlmsg):
def encode(self):
# FIXME: add human-friendly API to specify IP addresses as str
# (see also decode())
if self['idiag_src'] == 0:
self['idiag_src'] = (0, 0, 0, 0)
if self['idiag_dst'] == 0:
self['idiag_dst'] = (0, 0, 0, 0)
nlmsg.encode(self)
def decode(self):
nlmsg.decode(self)
if self[self.ffname] == AF_INET:
self['idiag_dst'] = inet_ntop(AF_INET,
pack('>I', self['idiag_dst'][0]))
self['idiag_src'] = inet_ntop(AF_INET,
self['tcpi_delivery_rate_app_limited'] = \
self['tcpi_delivery_rate_app_limited'] & 0x80 >> 7
class unix_diag_req(nlmsg):
fields = (('sdiag_family', 'B'),
('sdiag_protocol', 'B'),
('__pad', 'H'),
('udiag_states', 'I'),
('udiag_ino', 'I'),
('udiag_show', 'I'),
('udiag_cookie', 'Q'))
class unix_diag_msg(nlmsg):
fields = (('udiag_family', 'B'),
('udiag_type', 'B'),
('udiag_state', 'B'),
('__pad', 'B'),
('udiag_ino', 'I'),
('udiag_cookie', 'Q'))
nla_map = (('UNIX_DIAG_NAME', 'asciiz'),
('UNIX_DIAG_VFS', 'unix_diag_vfs'),
('UNIX_DIAG_PEER', 'uint32'),
('UNIX_DIAG_ICONS', 'hex'),
('UNIX_DIAG_RQLEN', 'unix_diag_rqlen'),
('UNIX_DIAG_MEMINFO', 'hex'),
('UNIX_DIAG_SHUTDOWN', 'uint8'))
s.bind()
s.put({'index': 1}, RTM_GETLINK)
s.get()
s.close()
Please notice, that the return value of `s.get()` can be
not the result of `s.put()`, but any broadcast message.
To fix that, use `msg_seq` -- the response must contain the
same `msg['header']['sequence_number']` value.
'''
if msg_seq != 0:
self.lock[msg_seq].acquire()
try:
if msg_seq not in self.backlog:
self.backlog[msg_seq] = []
if not isinstance(msg, nlmsg):
msg_class = self.marshal.msg_map[msg_type]
msg = msg_class(msg)
if msg_pid is None:
msg_pid = self.epid or os.getpid()
msg['header']['type'] = msg_type
msg['header']['flags'] = msg_flags
msg['header']['sequence_number'] = msg_seq
msg['header']['pid'] = msg_pid
self.sendto_gate(msg, addr)
except:
raise
finally:
if msg_seq != 0:
self.lock[msg_seq].release()
from pyroute2.netlink import nlmsg
class rtgenmsg(nlmsg):
fields = (('rtgen_family', 'B'),
('__pad', '3x'))
# 'IHHII' == 16 bytes
while offset <= len(data) - 16:
# pick type and length
length, = struct.unpack_from('I', data, offset)
if length == 0:
break
error = None
msg_type, = struct.unpack_from(self.type_format,
data,
offset + self.type_offset)
if msg_type == self.error_type:
code = abs(struct.unpack_from('i', data, offset + 16)[0])
if code > 0:
error = NetlinkError(code)
msg_class = self.msg_map.get(msg_type, nlmsg)
msg = msg_class(data, offset=offset)
try:
msg.decode()
msg['header']['error'] = error
# try to decode encapsulated error message
if error is not None:
enc_type = struct.unpack_from('H', data, offset + 24)[0]
enc_class = self.msg_map.get(enc_type, nlmsg)
enc = enc_class(data, offset=offset + 20)
enc.decode()
msg['header']['errmsg'] = enc
if callback and seq == msg['header']['sequence_number']:
if callback(msg):
offset += msg.length
continue
def decode(self):
nlmsg.decode(self)
if self['flags'] & 1:
self['state'] = 'up'
else:
self['state'] = 'down'