Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
class vf_mac(nla):
fields = (('vf', 'I'),
('mac', '32B'))
def decode(self):
nla.decode(self)
self['mac'] = ':'.join(['%02x' % x for x
in self['mac'][:6]])
def encode(self):
self['mac'] = ([int(x, 16) for x
in self['mac'].split(':')] +
[0] * 26)
nla.encode(self)
class vf_vlan(nla):
fields = (('vf', 'I'),
('vlan', 'I'),
('qos', 'I'))
class vf_tx_rate(nla):
fields = (('vf', 'I'),
('tx_rate', 'I'))
class vf_spoofchk(nla):
fields = (('vf', 'I'),
('spoofchk', 'I'))
class vf_link_state(nla):
fields = (('vf', 'I'),
('link_state', 'I'))
for key, value in kwargs.items():
if isinstance(value, NFCTAttr):
value = {'attrs': value.attrs()}
if value is not None:
self['attrs'].append([self.name2nla(key), value])
return self
class cta_tuple(nla):
nla_map = (
('CTA_TUPLE_UNSPEC', 'none'),
('CTA_TUPLE_IP', 'cta_ip'),
('CTA_TUPLE_PROTO', 'cta_proto'),
)
class cta_ip(nla):
nla_map = (
('CTA_IP_UNSPEC', 'none'),
('CTA_IP_V4_SRC', 'ip4addr'),
('CTA_IP_V4_DST', 'ip4addr'),
('CTA_IP_V6_SRC', 'ip6addr'),
('CTA_IP_V6_DST', 'ip6addr'),
)
class cta_proto(nla):
nla_map = (
('CTA_PROTO_UNSPEC', 'none'),
('CTA_PROTO_NUM', 'uint8'),
('CTA_PROTO_SRC_PORT', 'be16'),
('CTA_PROTO_DST_PORT', 'be16'),
('CTA_PROTO_ICMP_ID', 'be16'),
('CTA_PROTO_ICMP_TYPE', 'uint8'),
'''correlation'''
fields = (('delay_corr', 'I'),
('loss_corr', 'I'),
('dup_corr', 'I'))
class netem_reorder(nla):
'''reorder has probability and correlation'''
fields = (('prob_reorder', 'I'),
('corr_reorder', 'I'))
class netem_corrupt(nla):
'''corruption has probability and correlation'''
fields = (('prob_corrupt', 'I'),
('corr_corrupt', 'I'))
class options_fw(nla, nla_plus_police):
nla_map = (('TCA_FW_UNSPEC', 'none'),
('TCA_FW_CLASSID', 'uint32'),
('TCA_FW_POLICE', 'police'), # TODO string?
('TCA_FW_INDEV', 'hex'), # TODO string
('TCA_FW_ACT', 'hex'), # TODO
('TCA_FW_MASK', 'uint32'))
class options_bpf(nla, nla_plus_police):
nla_map = (('TCA_BPF_UNSPEC', 'none'),
('TCA_BPF_ACT', 'bpf_act'),
('TCA_BPF_POLICE', 'police'),
('TCA_BPF_CLASSID', 'uint32'),
('TCA_BPF_OPS_LEN', 'uint32'),
('TCA_BPF_OPS', 'uint32'),
('TCA_BPF_FD', 'uint32'),
('TCA_BPF_NAME', 'asciiz'))
elif k == 'flow_mode':
r = convert_flowmode(r)
elif k == 'diffserv_mode':
r = convert_diffserv(r)
elif k == 'ack_filter':
r = convert_ackfilter(r)
elif k == 'mpu':
check_range(k, r, 0, 256)
elif k == 'overhead':
check_range(k, r, -64, 256)
ret['attrs'].append([v, r])
return ret
class options(nla):
nla_map = (('TCA_CAKE_UNSPEC', 'none'),
('TCA_CAKE_PAD', 'uint64'),
('TCA_CAKE_BASE_RATE64', 'uint64'),
('TCA_CAKE_DIFFSERV_MODE', 'uint32'),
('TCA_CAKE_ATM', 'uint32'),
('TCA_CAKE_FLOW_MODE', 'uint32'),
('TCA_CAKE_OVERHEAD', 'int32'),
('TCA_CAKE_RTT', 'uint32'),
('TCA_CAKE_TARGET', 'uint32'),
('TCA_CAKE_AUTORATE', 'uint32'),
('TCA_CAKE_MEMORY', 'uint32'),
('TCA_CAKE_NAT', 'uint32'),
('TCA_CAKE_RAW', 'uint32'),
('TCA_CAKE_WASH', 'uint32'),
('TCA_CAKE_MPU', 'uint32'),
('TCA_CAKE_INGRESS', 'uint32'),
self['value'] = struct.pack('H', family) + addr
nla.encode(self)
def decode(self):
nla.decode(self)
family = struct.unpack('H', self['value'][:2])[0]
addr = self['value'][2:]
if len(addr):
if (family == AF_INET and len(addr) == 4) or \
(family == AF_INET6 and len(addr) == 16):
addr = inet_ntop(family, addr)
else:
addr = hexdump(addr)
self.value = {'family': family, 'addr': addr}
class cacheinfo(nla):
__slots__ = ()
fields = (('rta_clntref', 'I'),
('rta_lastuse', 'I'),
('rta_expires', 'i'),
('rta_error', 'I'),
('rta_used', 'I'),
('rta_id', 'I'),
('rta_ts', 'I'),
('rta_tsage', 'I'))
class rtmsg(rtmsg_base, nlmsg):
__slots__ = ()
('RTA_IIF', 'asciiz'),
('RTA_OIF', 'asciiz'),
('RTA_GATEWAY', 'ipaddr'),
('RTA_PRIORITY', 'uint32'),
('RTA_PREFSRC', 'ipaddr'),
('RTA_METRICS', 'metrics'),
('RTA_MULTIPATH', 'hex'),
('RTA_PROTOINFO', 'uint32'),
('RTA_FLOW', 'hex'),
('RTA_CACHEINFO', 'cacheinfo'),
('RTA_SESSION', 'hex'),
('RTA_MP_ALGO', 'hex'),
('RTA_TABLE', 'uint32'),
('RTA_MARK', 'uint32'))
class metrics(nla):
prefix = 'RTAX_'
nla_map = (('RTAX_UNSPEC', 'none'),
('RTAX_LOCK', 'uint32'),
('RTAX_MTU', 'uint32'),
('RTAX_WINDOW', 'uint32'),
('RTAX_RTT', 'uint32'),
('RTAX_RTTVAR', 'uint32'),
('RTAX_SSTHRESH', 'uint32'),
('RTAX_CWND', 'uint32'),
('RTAX_ADVMSS', 'uint32'),
('RTAX_REORDERING', 'uint32'),
('RTAX_HOPLIMIT', 'uint32'),
('RTAX_INITCWND', 'uint32'),
('RTAX_FEATURES', 'uint32'),
('RTAX_RTO_MIN', 'uint32'),
('RTAX_INITRWND', 'uint32'),
('index', 'i'),
('handle', 'I'),
('parent', 'I'),
('info', 'I'))
nla_map = (('TCA_UNSPEC', 'none'),
('TCA_KIND', 'asciiz'),
('TCA_OPTIONS', 'get_options'),
('TCA_STATS', 'stats'),
('TCA_XSTATS', 'get_xstats'),
('TCA_RATE', 'hex'),
('TCA_FCNT', 'hex'),
('TCA_STATS2', 'get_stats2'),
('TCA_STAB', 'hex'))
class stats(nla):
fields = (('bytes', 'Q'),
('packets', 'I'),
('drop', 'I'),
('overlimits', 'I'),
('bps', 'I'),
('pps', 'I'),
('qlen', 'I'),
('backlog', 'I'))
def get_plugin(self, plug, *argv, **kwarg):
# get the plugin name
kind = self.get_attr('TCA_KIND')
# get the plugin implementation or the default one
p = plugins.get(kind, sched_template)
# get the interface
interface = getattr(p,
('TCA_HFSC_FSC', 'hfsc_curve'), # link-share curve
('TCA_HFSC_USC', 'hfsc_curve')) # upper-limit curve
class hfsc_curve(nla):
fields = (('m1', 'I'), # slope of the first segment in bps
('d', 'I'), # x-projection of the first segment in us
('m2', 'I')) # slope of the second segment in bps
class options_htb(nla_plus_rtab):
nla_map = (('TCA_HTB_UNSPEC', 'none'),
('TCA_HTB_PARMS', 'htb_parms'),
('TCA_HTB_INIT', 'htb_glob'),
('TCA_HTB_CTAB', 'ctab'),
('TCA_HTB_RTAB', 'rtab'))
class htb_glob(nla):
fields = (('version', 'I'),
('rate2quantum', 'I'),
('defcls', 'I'),
('debug', 'I'),
('direct_pkts', 'I'))
class htb_parms(nla_plus_rtab.parms):
fields = (('rate_cell_log', 'B'),
('rate___reserved', 'B'),
('rate_overhead', 'H'),
('rate_cell_align', 'h'),
('rate_mpu', 'H'),
('rate', 'I'),
('ceil_cell_log', 'B'),
('ceil___reserved', 'B'),
('ceil_overhead', 'H'),
nla_map = (
('CTA_PROTOINFO_DCCP_UNSPEC', 'none'),
('CTA_PROTOINFO_DCCP_STATE', 'uint8'),
('CTA_PROTOINFO_DCCP_ROLE', 'uint8'),
('CTA_PROTOINFO_DCCP_HANDSHAKE_SEQ', 'be64'),
)
class cta_protoinfo_sctp(nla):
nla_map = (
('CTA_PROTOINFO_SCTP_UNSPEC', 'none'),
('CTA_PROTOINFO_SCTP_STATE', 'uint8'),
('CTA_PROTOINFO_SCTP_VTAG_ORIGINAL', 'be32'),
('CTA_PROTOINFO_SCTP_VTAG_REPLY', 'be32'),
)
class cta_nat(nla):
nla_map = (
('CTA_NAT_UNSPEC', 'none'),
('CTA_NAT_V4_MINIP', 'ip4addr'),
('CTA_NAT_V4_MAXIP', 'ip4addr'),
('CTA_NAT_PROTO', 'cta_protonat'),
('CTA_NAT_V6_MINIP', 'ip6addr'),
('CTA_NAT_V6_MAXIP', 'ip6addr'),
)
class cta_protonat(nla):
nla_map = (
('CTA_PROTONAT_UNSPEC', 'none'),
('CTA_PROTONAT_PORT_MIN', 'be16'),
('CTA_PROTONAT_PORT_MAX', 'be16'),
)
class parse_handshake_time(nla):
fields = (('tv_sec', 'Q'),
('tv_nsec', 'Q'))
def decode(self):
nla.decode(self)
self['latest handshake'] = ctime(self['tv_sec'])
class wgpeer_a_allowedips(nla):
nla_flags = NLA_F_NESTED
nla_map = tuple([('WGPEER_A_ALLOWEDIPS_%i' % x,
'wgpeer_allowedip') for x
in range(WG_MAX_ALLOWEDIPS)])
class wgpeer_allowedip(nla):
prefix = 'WGALLOWEDIP_A_'
nla_flags = NLA_F_NESTED
nla_map = (('WGALLOWEDIP_A_UNSPEC', 'none'),
('WGALLOWEDIP_A_FAMILY', 'uint16'),
('WGALLOWEDIP_A_IPADDR', 'hex'),
('WGALLOWEDIP_A_CIDR_MASK', 'uint8'))
def decode(self):
nla.decode(self)
if self.get_attr('WGALLOWEDIP_A_FAMILY') == AF_INET:
pre = (self
.get_attr('WGALLOWEDIP_A_IPADDR')
.replace(':', ''))
self['addr'] = inet_ntoa(a2b_hex(pre))
else: