Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _HandleAuthPacket(self, pkt):
pyrad.server.Server._HandleAuthPacket(self, pkt)
logger.info("Received authentication request")
reply = self.CreateReplyPacket(pkt)
reply.code = pyrad.packet.AccessAccept
if self.t_events['invalid_code']:
reply.code = pyrad.packet.AccessRequest
if self.t_events['reject']:
reply.code = pyrad.packet.AccessReject
data = build_tunnel_password(reply.secret, pkt.authenticator,
self.t_events['psk'])
reply.AddAttribute("Tunnel-Password", data)
if self.t_events['acct_interim_interval']:
reply.AddAttribute("Acct-Interim-Interval",
self.t_events['acct_interim_interval'])
if self.t_events['session_timeout']:
reply.AddAttribute("Session-Timeout",
self.t_events['session_timeout'])
def _HandleAuthPacket(self, pkt):
pyrad.server.Server._HandleAuthPacket(self, pkt)
if len(pkt[79]) > 1:
logger.info("Multiple EAP-Message attributes")
# TODO: reassemble
eap = pkt[79][0]
eap_req = self.eap_handler(self.ctx, eap)
reply = self.CreateReplyPacket(pkt)
if eap_req:
if len(eap_req) > 253:
logger.info("Need to fragment EAP-Message")
# TODO: fragment
reply.AddAttribute("EAP-Message", eap_req)
else:
logger.info("No EAP request available")
reply.code = pyrad.packet.AccessChallenge
hmac_obj = hmac.new(reply.secret)
def start_radius_server(eap_handler):
try:
import pyrad.server
import pyrad.packet
import pyrad.dictionary
except ImportError:
return None
class TestServer(pyrad.server.Server):
def _HandleAuthPacket(self, pkt):
pyrad.server.Server._HandleAuthPacket(self, pkt)
if len(pkt[79]) > 1:
logger.info("Multiple EAP-Message attributes")
# TODO: reassemble
eap = pkt[79][0]
eap_req = self.eap_handler(self.ctx, eap)
reply = self.CreateReplyPacket(pkt)
if eap_req:
if len(eap_req) > 253:
logger.info("Need to fragment EAP-Message")
# TODO: fragment
reply.AddAttribute("EAP-Message", eap_req)
else:
logger.info("No EAP request available")
reply.code = pyrad.packet.AccessChallenge
def test_ap_vlan_wpa2_psk_radius_required(dev, apdev):
"""AP VLAN with WPA2-PSK and RADIUS attributes required"""
try:
import pyrad.server
import pyrad.packet
import pyrad.dictionary
except ImportError:
raise HwsimSkip("No pyrad modules available")
class TestServer(pyrad.server.Server):
def _HandleAuthPacket(self, pkt):
pyrad.server.Server._HandleAuthPacket(self, pkt)
logger.info("Received authentication request")
reply = self.CreateReplyPacket(pkt)
reply.code = pyrad.packet.AccessAccept
secret = reply.secret
if self.t_events['extra'].is_set():
reply.AddAttribute("Chargeable-User-Identity", "test-cui")
reply.AddAttribute("User-Name", "test-user")
if self.t_events['long'].is_set():
reply.AddAttribute("Tunnel-Type", 13)
reply.AddAttribute("Tunnel-Medium-Type", 6)
reply.AddAttribute("Tunnel-Private-Group-ID", "1")
self.SendReplyPacket(pkt.fd, reply)
def RunWithStop(self, t_events):
try:
from pyrad import dictionary, packet, server
except ImportError:
sys.stderr.write("pyrad not found!\n")
sys.exit(0)
# We could use a dictionary file, but since we need
# such few attributes, we'll just include them here
DICTIONARY = """
ATTRIBUTE\tUser-Name\t1\tstring
ATTRIBUTE\tUser-Password\t2\toctets
ATTRIBUTE\tNAS-Identifier\t32\tstring
"""
class TestServer(server.Server):
def _HandleAuthPacket(self, pkt):
server.Server._HandleAuthPacket(self, pkt)
passwd = []
for key in pkt.keys():
if key == "User-Password":
passwd = map(pkt.PwDecrypt, pkt[key])
reply = self.CreateReplyPacket(pkt)
if passwd == ['accept']:
reply.code = packet.AccessAccept
else:
reply.code = packet.AccessReject
self.SendReplyPacket(pkt.fd, reply)
def _HandleAuthPacket(self, pkt):
server.Server._HandleAuthPacket(self, pkt)
passwd = []
for key in pkt.keys():
if key == "User-Password":
passwd = map(pkt.PwDecrypt, pkt[key])
reply = self.CreateReplyPacket(pkt)
if passwd == ['accept']:
reply.code = packet.AccessAccept
else:
reply.code = packet.AccessReject
self.SendReplyPacket(pkt.fd, reply)
This function calls either :obj:`HandleAuthPacket`,
:obj:`HandleAcctPacket` or :obj:`_HandleProxyPacket` depending on
which socket is being processed.
:param fd: socket to read packet from
:type fd: socket class instance
:param pkt: packet to process
:type pkt: Packet class instance
"""
if fd.fileno() == self._proxyfd.fileno():
pkt = self._GrabPacket(
lambda data, s=self: s.CreatePacket(packet=data), fd)
self._HandleProxyPacket(pkt)
else:
Server._ProcessInput(self, fd)
#!/usr/bin/python
from __future__ import print_function
from pyrad import dictionary, packet, server
import logging
logging.basicConfig(filename="pyrad.log", level="DEBUG",
format="%(asctime)s [%(levelname)-8s] %(message)s")
class FakeServer(server.Server):
def HandleAuthPacket(self, pkt):
print("Received an authentication request")
print("Attributes: ")
for attr in pkt.keys():
print("%s: %s" % (attr, pkt[attr]))
reply = self.CreateReplyPacket(pkt, **{
"Service-Type": "Framed-User",
"Framed-IP-Address": '192.168.0.1',
"Framed-IPv6-Prefix": "fc66::1/64"
})
reply.code = packet.AccessAccept
self.SendReplyPacket(pkt.fd, reply)
# proxy.py
#
# Copyright 2005,2007 Wichert Akkerman
#
# A RADIUS proxy as defined in RFC 2138
from pyrad.server import ServerPacketError
from pyrad.server import Server
from pyrad import packet
import select
import socket
class Proxy(Server):
"""Base class for RADIUS proxies.
This class extends tha RADIUS server class with the capability to
handle communication with other RADIUS servers as well.
:ivar _proxyfd: network socket used to communicate with other servers
:type _proxyfd: socket class instance
"""
def _PrepareSockets(self):
Server._PrepareSockets(self)
self._proxyfd = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self._fdmap[self._proxyfd.fileno()] = self._proxyfd
self._poll.register(self._proxyfd.fileno(),
(select.POLLIN | select.POLLPRI | select.POLLERR))
def _HandleProxyPacket(self, pkt):