Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
params = hostapd.wpa2_eap_params(ssid="radius-das")
params['radius_das_port'] = "3799"
params['radius_das_client'] = "127.0.0.1 secret"
params['radius_das_require_event_timestamp'] = "1"
params['own_ip_addr'] = "127.0.0.1"
params['nas_identifier'] = "nas.example.com"
hapd = hostapd.add_ap(apdev[0], params)
connect(dev[0], "radius-das")
addr = dev[0].p2p_interface_addr()
sta = hapd.get_sta(addr)
id = sta['dot1xAuthSessionId']
dict = pyrad.dictionary.Dictionary("dictionary.radius")
srv = pyrad.client.Client(server="127.0.0.1", acctport=3799,
secret=b"secret", dict=dict)
srv.retries = 1
srv.timeout = 1
logger.info("Disconnect-Request with incorrect secret")
req = radius_das.DisconnectPacket(dict=dict, secret=b"incorrect",
User_Name="foo",
NAS_Identifier="localhost",
Event_Timestamp=int(time.time()))
logger.debug(req)
try:
reply = srv.SendPacket(req)
raise Exception("Unexpected response to Disconnect-Request")
except pyrad.client.Timeout:
logger.info("Disconnect-Request with incorrect secret properly ignored")
params['radius_das_port'] = "3799"
params['radius_das_client'] = "127.0.0.1 secret"
params['radius_das_require_event_timestamp'] = "1"
params['radius_das_require_message_authenticator'] = "1"
params['radius_das_time_window'] = "10"
params['own_ip_addr'] = "127.0.0.1"
params['nas_identifier'] = "nas.example.com"
hapd = hostapd.add_ap(apdev[0], params)
connect(dev[0], "radius-das")
addr = dev[0].own_addr()
sta = hapd.get_sta(addr)
id = sta['dot1xAuthSessionId']
dict = pyrad.dictionary.Dictionary("dictionary.radius")
srv = pyrad.client.Client(server="127.0.0.1", acctport=3799,
secret=b"secret", dict=dict)
srv.retries = 1
srv.timeout = 1
logger.info("Disconnect-Request with unsupported attribute")
req = radius_das.DisconnectPacket(dict=dict, secret=b"secret",
NAS_IP_Address="127.0.0.1",
NAS_Identifier="nas.example.com",
Calling_Station_Id=addr,
Event_Timestamp=int(time.time()) - 50)
add_message_auth_req(req)
logger.debug(req)
try:
reply = srv.SendPacket(req)
raise Exception("Unexpected response to Disconnect-Request")
except pyrad.client.Timeout:
except ImportError:
raise HwsimSkip("No pyrad modules available")
params = hostapd.wpa2_eap_params(ssid="radius-das")
params['radius_das_port'] = "3799"
params['radius_das_client'] = "127.0.0.1 secret"
params['radius_das_require_event_timestamp'] = "1"
hapd = hostapd.add_ap(apdev[0], params)
connect(dev[0], "radius-das")
addr = dev[0].p2p_interface_addr()
sta = hapd.get_sta(addr)
id = sta['dot1xAuthSessionId']
dict = pyrad.dictionary.Dictionary("dictionary.radius")
srv = pyrad.client.Client(server="127.0.0.1", acctport=3799,
secret=b"secret", dict=dict)
srv.retries = 1
srv.timeout = 1
# hostapd does not currently support CoA-Request, so NAK is expected
logger.info("CoA-Request with matching Acct-Session-Id")
req = radius_das.CoAPacket(dict=dict, secret=b"secret",
Acct_Session_Id=id,
Event_Timestamp=int(time.time()))
send_and_check_reply(srv, req, pyrad.packet.CoANAK, error_cause=405)
def test_radius_server_failures(dev, apdev):
"""RADIUS server failure cases"""
try:
import pyrad.client
import pyrad.packet
import pyrad.dictionary
except ImportError:
raise HwsimSkip("No pyrad modules available")
dict = pyrad.dictionary.Dictionary("dictionary.radius")
client = pyrad.client.Client(server="127.0.0.1", authport=1812,
secret=b"radius", dict=dict)
client.retries = 1
client.timeout = 1
# unexpected State
req = client.CreateAuthPacket(code=pyrad.packet.AccessRequest,
User_Name="foo")
req['State'] = b'foo-state'
add_message_auth(req)
reply = client.SendPacket(req)
if reply.code != pyrad.packet.AccessReject:
raise Exception("Unexpected RADIUS response code " + str(reply.code))
# no EAP-Message
req = client.CreateAuthPacket(code=pyrad.packet.AccessRequest,
User_Name="foo")
def __init__(self):
self.handler = RadiusAccounting()
def start(self):
self.handler.start()
def do_accounting(self, stats):
self.handler.put(stats)
def stop(self):
self.handler.stop()
self.handler.join()
class RadiusAccounting(EventQueue, pyrad.client.Client):
def __init__(self):
main_config_file = process.configuration.file(RadiusConfig.config_file)
if main_config_file is None:
raise RuntimeError('Cannot find the radius configuration file: %r' % RadiusConfig.config_file)
try:
config = dict(line.rstrip('\n').split(None, 1) for line in open(main_config_file) if len(line.split(None, 1)) == 2 and not line.startswith('#'))
secrets = dict(line.rstrip('\n').split(None, 1) for line in open(config['servers']) if len(line.split(None, 1)) == 2 and not line.startswith('#'))
server = config['acctserver']
try:
server, acctport = server.split(':')
acctport = int(acctport)
except ValueError:
acctport = 1813
secret = secrets[server]
dicts = [RadiusDictionaryFile(config['dictionary'])]
def authenticate(self, user=None, password=None, **kwargs):
radius_server = self.service.config.radius_server
radius_secret = self.service.config.radius_secret
client = Client(
server=radius_server,
secret=radius_secret,
dict=self.RADIUS_DICT
)
req = client.CreateAuthPacket(
code=packet.AccessRequest,
User_Name=user,
NAS_Identifier="noc"
)
req["User-Password"] = req.PwCrypt(password)
try:
reply = client.SendPacket(req)
except client.Timeout:
raise self.LoginError("Timed out")
if reply.code != AccessAccept:
raise self.LoginError(
def peers_alive(config):
from pyrad.client import Client
from pyrad.dictionary import Dictionary
radconfig = config['radius']
localconfig = config['local']
auth_OK = True
dictionary = Dictionary(localconfig['raddict'])
for server in radconfig['servers']:
srv = Client(server=server, secret=str(radconfig['secret']), dict=dictionary)
req = srv.CreateAuthPacket(code=pyrad.packet.AccessRequest,
User_Name=radconfig['username'],
NAS_Identifier="localhost")
req["User-Password"] = req.PwCrypt(radconfig['password'])
try:
reply = srv.SendPacket(req)
except pyrad.client.Timeout:
print "Could not contact:", server
auth_OK = False
if reply.code != 2:
print "Auth failed to", server
auth_OK = False
return auth_OK
def authenticate_using_radius_server(self, request, callback):
"""Authenticate a subject using a RADIUS server."""
try:
root_dir = os.path.abspath(os.path.join(_conf.install_location(), '..'))
client = Client(server=self.sec_conf['authentication']['server_ip'],
secret=bytes(self.sec_conf['authentication']['secret']),
dict=Dictionary(os.path.join(root_dir, "calvinextras", "pyrad_dicts", "dictionary"),
os.path.join(root_dir, "calvinextras", "pyrad_dicts", "dictionary.acc")))
req = client.CreateAuthPacket(code=pyrad.packet.AccessRequest,
User_Name=request['subject']['user'],
NAS_Identifier=self.node.id)
req["User-Password"] = req.PwCrypt(request['subject']['password'])
# FIXME: should not block here (use a callback instead)
reply = client.SendPacket(req)
if reply.code == pyrad.packet.AccessAccept:
_log.debug("Security: access accepted")
# Save attributes returned by server.
self._return_authentication_decision(True, json.loads(reply["Reply-Message"][0]),
callback)
return
_log.debug("Security: access denied")
def can_auth(config, username, password):
"""
Performs authentication against a RADIUS server.
"""
log = logging.getLogger('radius_source.can_auth')
log.debug("Attempting RADIUS auth to %s for user %s" % (config['rad_server'], username,))
processed_user = ldap_source.get_auth_username(config, username)
# Create a RADIUS client to communicate with the server.
srv = Client(
server = config['rad_server'],
secret = config['rad_secret'],
dict = Dictionary(config['rad_dictionary']),)
req = srv.CreateAuthPacket(
code = pyrad.packet.AccessRequest,
User_Name = processed_user,
NAS_Identifier = gethostname(),)
req['User-Password'] = req.PwCrypt(password)
try:
reply = srv.SendPacket(req)
except Exception:
log.exception("Problem contacting RADIUS server")
return False
acctport = 1813
secret = secrets[server]
dicts = [RadiusDictionaryFile(config['dictionary'])]
if RadiusConfig.additional_dictionary:
additional_dictionary = process.configuration.file(RadiusConfig.additional_dictionary)
if additional_dictionary:
dicts.append(RadiusDictionaryFile(additional_dictionary))
else:
log.warning('Could not load additional RADIUS dictionary file: %r' % RadiusConfig.additional_dictionary)
raddict = pyrad.dictionary.Dictionary(*dicts)
timeout = int(config['radius_timeout'])
retries = int(config['radius_retries'])
except Exception:
log.critical('cannot read the RADIUS configuration file')
raise
pyrad.client.Client.__init__(self, server, 1812, acctport, 3799, secret, raddict)
self.timeout = timeout
self.retries = retries
if 'bindaddr' in config and config['bindaddr'] != '*':
self.bind((config['bindaddr'], 0))
EventQueue.__init__(self, self.do_accounting)