Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
from pyroute2.netlink.rtnl.nsidmsg import nsidmsg
from pyroute2.netlink.rtnl.fibmsg import fibmsg
from pyroute2.netlink.rtnl.ifinfmsg import ifinfmsg
from pyroute2.netlink.rtnl.ifaddrmsg import ifaddrmsg
class MarshalRtnl(Marshal):
msg_map = {rtnl.RTM_NEWLINK: ifinfmsg,
rtnl.RTM_DELLINK: ifinfmsg,
rtnl.RTM_GETLINK: ifinfmsg,
rtnl.RTM_SETLINK: ifinfmsg,
rtnl.RTM_NEWADDR: ifaddrmsg,
rtnl.RTM_DELADDR: ifaddrmsg,
rtnl.RTM_GETADDR: ifaddrmsg,
rtnl.RTM_NEWROUTE: rtmsg,
rtnl.RTM_DELROUTE: rtmsg,
rtnl.RTM_GETROUTE: rtmsg,
rtnl.RTM_NEWRULE: fibmsg,
rtnl.RTM_DELRULE: fibmsg,
rtnl.RTM_GETRULE: fibmsg,
rtnl.RTM_NEWNEIGH: ndmsg,
rtnl.RTM_DELNEIGH: ndmsg,
rtnl.RTM_GETNEIGH: ndmsg,
rtnl.RTM_NEWQDISC: tcmsg,
rtnl.RTM_DELQDISC: tcmsg,
rtnl.RTM_GETQDISC: tcmsg,
rtnl.RTM_NEWTCLASS: tcmsg,
rtnl.RTM_DELTCLASS: tcmsg,
rtnl.RTM_GETTCLASS: tcmsg,
rtnl.RTM_NEWTFILTER: tcmsg,
rtnl.RTM_DELTFILTER: tcmsg,
rtnl.RTM_GETTFILTER: tcmsg,
match = kwarg.pop('match', None)
callback = kwarg.pop('callback', None)
commands = {'add': (RTM_NEWROUTE, flags_make),
'set': (RTM_NEWROUTE, flags_replace),
'replace': (RTM_NEWROUTE, flags_replace),
'change': (RTM_NEWROUTE, flags_change),
'append': (RTM_NEWROUTE, flags_append),
'del': (RTM_DELROUTE, flags_make),
'remove': (RTM_DELROUTE, flags_make),
'delete': (RTM_DELROUTE, flags_make),
'get': (RTM_GETROUTE, NLM_F_REQUEST),
'show': (RTM_GETROUTE, flags_dump),
'dump': (RTM_GETROUTE, flags_dump)}
(command, flags) = commands.get(command, command)
msg = rtmsg()
# table is mandatory; by default == 254
# if table is not defined in kwarg, save it there
# also for nla_attr:
table = kwarg.get('table', 254)
msg['table'] = table if table <= 255 else 252
msg['family'] = kwarg.pop('family', AF_INET)
msg['scope'] = kwarg.pop('scope', rt_scope['universe'])
msg['dst_len'] = kwarg.pop('dst_len', None) or kwarg.pop('mask', 0)
msg['src_len'] = kwarg.pop('src_len', 0)
msg['tos'] = kwarg.pop('tos', 0)
msg['flags'] = kwarg.pop('flags', 0)
msg['type'] = kwarg.pop('type', rt_type['unspec'])
msg['proto'] = kwarg.pop('proto', rt_proto['unspec'])
msg['attrs'] = []
def convert_rt_msg(msg):
ret = rtmsg()
ret['header']['type'] = RTNL_NEWROUTE if \
msg['header']['type'] == RTM_ADD else \
RTNL_DELROUTE
ret['family'] = msg['DST']['header']['family']
ret['attrs'] = []
if 'address' in msg['DST']:
ret['attrs'].append(['RTA_DST', msg['DST']['address']])
if 'NETMASK' in msg and \
msg['NETMASK']['header']['family'] == ret['family']:
ret['dst_len'] = dqn2int(msg['NETMASK']['address'], ret['family'])
if 'GATEWAY' in msg:
if msg['GATEWAY']['header']['family'] not in (AF_INET, AF_INET6):
# interface routes, table 255
# discard for now
return None
ret['attrs'].append(['RTA_GATEWAY', msg['GATEWAY']['address']])
class DBSchema(object):
connection = None
thread = None
event_map = None
key_defaults = None
snapshots = None # :
spec = OrderedDict()
# main tables
spec['interfaces'] = OrderedDict(ifinfmsg.sql_schema())
spec['addresses'] = OrderedDict(ifaddrmsg.sql_schema())
spec['neighbours'] = OrderedDict(ndmsg.sql_schema())
spec['routes'] = OrderedDict(rtmsg.sql_schema() +
[(('route_id', ), 'TEXT UNIQUE'),
(('gc_mark', ), 'INTEGER')])
spec['nh'] = OrderedDict(nh.sql_schema() +
[(('route_id', ), 'TEXT'),
(('nh_id', ), 'INTEGER')])
spec['rules'] = OrderedDict(fibmsg.sql_schema())
spec['netns'] = OrderedDict(nsinfmsg.sql_schema())
# additional tables
spec['p2p'] = OrderedDict(p2pmsg.sql_schema())
classes = {'interfaces': ifinfmsg,
'addresses': ifaddrmsg,
'neighbours': ndmsg,
'routes': rtmsg,
'nh': nh,
'rules': fibmsg,
def get_routes(self, *argv, **kwarg):
ifc = self._ifc.parse(self._ifc.run())
rta = self._route.parse(self._route.run())
ret = []
for spec in rta:
if spec['ifname'] not in ifc['links']:
continue
idx = ifc['links'][spec['ifname']]['index']
spec['attrs'].append(['RTA_OIF', idx])
msg = rtmsg().load(spec)
msg['header']['type'] = RTM_NEWROUTE
del msg['value']
ret.append(msg)
return ret
self.add_nh(nh)
return
for (key, value) in msg.items():
self[key] = value
# cleanup multipath NH
if clean_mp:
for nh in self['multipath']:
self.del_nh(nh)
for cell in msg['attrs']:
#
# Parse on demand
#
norm = rtmsg.nla2name(cell[0])
if norm in self.cleanup:
continue
value = cell[1]
# normalize RTAX
if norm == 'metrics':
with self['metrics']._direct_state:
for metric in tuple(self['metrics'].keys()):
del self['metrics'][metric]
for (rtax, rtax_value) in value['attrs']:
rtax_norm = rtmsg.metrics.nla2name(rtax)
self['metrics'][rtax_norm] = rtax_value
elif norm == 'multipath':
for record in value:
nh = type(self)(ipdb=self.ipdb, parent=self)
nh.load_netlink(record)
with nh._direct_state:
def rtmsg_from_route(route, table):
"""
Convert a Route back to an rtmsg object
:param Route route: Route object
:param int table: route table
"""
msg = rtmsg.rtmsg()
msg['family'] = route.family
msg['dst_len'] = route.dst_len
msg['src_len'] = route.src_len
msg['tos'] = route.tos
msg['proto'] = route.proto
msg['scope'] = route.scope
msg['type'] = route.type
msg['flags'] = route.flags
msg["table"] = table if table < 256 else RT_TABLE_UNSPEC
msg['attrs'] = [[k, unfreeze(v)] for k, v in route.attributes.items()]
msg['attrs'].append(['RTA_TABLE', table])
return msg
from pyroute2.ndb.objects import RTNL_Object
from pyroute2.common import basestring
from pyroute2.netlink.rtnl.rtmsg import rtmsg
from pyroute2.netlink.rtnl.rtmsg import nh
_dump_rt = ['rt.f_%s' % x[0] for x in rtmsg.sql_schema()][:-2]
_dump_nh = ['nh.f_%s' % x[0] for x in nh.sql_schema()][:-2]
class Route(RTNL_Object):
table = 'routes'
msg_class = rtmsg
api = 'route'
summary = '''
SELECT
rt.f_target, rt.f_tflags, rt.f_RTA_TABLE, rt.f_RTA_DST,
rt.f_dst_len, rt.f_RTA_GATEWAY, nh.f_RTA_GATEWAY
FROM
routes AS rt
LEFT JOIN nh
ON
rt.f_route_id = nh.f_route_id
AND rt.f_target = nh.f_target
'''
table_alias = 'rt'
summary_header = ('target', 'tflags', 'table', 'dst',
'dst_len', 'gateway', 'nexthop')
dump = '''