Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _HandleAuthPacket(self, pkt):
pyrad.server.Server._HandleAuthPacket(self, pkt)
if len(pkt[79]) > 1:
logger.info("Multiple EAP-Message attributes")
# TODO: reassemble
eap = pkt[79][0]
eap_req = self.eap_handler(self.ctx, eap)
reply = self.CreateReplyPacket(pkt)
if eap_req:
if len(eap_req) > 253:
logger.info("Need to fragment EAP-Message")
# TODO: fragment
reply.AddAttribute("EAP-Message", eap_req)
else:
logger.info("No EAP request available")
reply.code = pyrad.packet.AccessChallenge
hmac_obj = hmac.new(reply.secret)
hmac_obj.update(struct.pack("B", reply.code))
hmac_obj.update(struct.pack("B", reply.id))
# reply attributes
reply.AddAttribute("Message-Authenticator",
"\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00")
attrs = reply._PktEncodeAttributes()
# Length
flen = 4 + 16 + len(attrs)
hmac_obj.update(struct.pack(">H", flen))
hmac_obj.update(pkt.authenticator)
hmac_obj.update(attrs)
del reply[80]
req = radius_das.DisconnectPacket(dict=dict, secret=b"secret",
User_Name="psk.user@example.com",
Event_Timestamp=123456789)
logger.debug(req)
try:
reply = srv.SendPacket(req)
raise Exception("Unexpected response to Disconnect-Request")
except pyrad.client.Timeout:
logger.info("Disconnect-Request with non-matching Event-Timestamp properly ignored")
logger.info("Disconnect-Request with unsupported attribute")
req = radius_das.DisconnectPacket(dict=dict, secret=b"secret",
User_Name="foo",
User_Password="foo",
Event_Timestamp=int(time.time()))
send_and_check_reply(srv, req, pyrad.packet.DisconnectNAK, 401)
logger.info("Disconnect-Request with invalid Calling-Station-Id")
req = radius_das.DisconnectPacket(dict=dict, secret=b"secret",
User_Name="foo",
Calling_Station_Id="foo",
Event_Timestamp=int(time.time()))
send_and_check_reply(srv, req, pyrad.packet.DisconnectNAK, 407)
logger.info("Disconnect-Request with mismatching User-Name")
req = radius_das.DisconnectPacket(dict=dict, secret=b"secret",
User_Name="foo",
Event_Timestamp=int(time.time()))
send_and_check_reply(srv, req, pyrad.packet.DisconnectNAK, 503)
logger.info("Disconnect-Request with mismatching Calling-Station-Id")
req = radius_das.DisconnectPacket(dict=dict, secret=b"secret",
# create coa client
client = Client(server=ADDRESS, secret=SECRET, dict=dictionary.Dictionary("dictionary"))
# set coa timeout
client.timeout = 30
# create coa request packet
attributes = {k.replace("-", "_"): ATTRIBUTES[k] for k in ATTRIBUTES}
if sys.argv[1] == "coa":
# create coa request
request = client.CreateCoAPacket(**attributes)
elif sys.argv[1] == "dis":
# create disconnect request
request = client.CreateCoAPacket(code=packet.DisconnectRequest, **attributes)
else:
sys.exit(1)
# send request
result = client.SendPacket(request)
print(result)
print(result.code)
def send_accept(self,req,nas,**args):
service.incr_stat(service.STAT_AUTH_ACCEPT)
reply = req.CreateReply()
reply.source = req.source
reply.code=packet.AccessAccept
if args:
reply.set_framed_ip_addr(args.get("ipaddr"))
reply.set_filter_id(nas.vendor_id,args.get("bandcode"))
reply.set_special_str(nas.vendor_id,"context",args.get("domain_code"))
reply.set_special_int(nas.vendor_id,"input_max_limit",args.get("input_max_limit"))
reply.set_special_int(nas.vendor_id,"output_max_limit",args.get("output_max_limit"))
reply.set_special_str(nas.vendor_id,"input_rate_code",args.get("input_rate_code"))
reply.set_special_str(nas.vendor_id,"output_rate_code",args.get("output_rate_code"))
req.sock.sendto(reply.ReplyPacket(), reply.source)
if is_debug():
radiuslog.debug("[Auth] send an authentication accept,user[%s],nas[%s]"\
%(req.get_username(),nas.ip_addr))
def _HandleAcctPacket(self, pkt):
"""Process a packet received on the accounting port.
If this packet should be dropped instead of processed a
ServerPacketError exception should be raised. The main loop will
drop the packet and log the reason.
:param pkt: packet to process
:type pkt: Packet class instance
"""
self._AddSecret(pkt)
if pkt.code not in [packet.AccountingRequest,
packet.AccountingResponse]:
raise ServerPacketError(
'Received non-accounting packet on accounting port')
self.HandleAcctPacket(pkt)
def _SendPacket(self, pkt, port):
"""Send a packet to a RADIUS server.
:param pkt: the packet to send
:type pkt: pyrad.packet.Packet
:param port: UDP port to send packet to
:type port: integer
:return: the reply packet received
:rtype: pyrad.packet.Packet
:raise Timeout: RADIUS server does not reply
"""
self._SocketOpen()
for attempt in range(self.retries):
if attempt and pkt.code == packet.AccountingRequest:
if "Acct-Delay-Time" in pkt:
pkt["Acct-Delay-Time"] = \
pkt["Acct-Delay-Time"][0] + self.timeout
else:
pkt["Acct-Delay-Time"] = self.timeout
now = time.time()
waitto = now + self.timeout
self._socket.sendto(pkt.RequestPacket(), (self.server, port))
while now < waitto:
ready = self._poll.poll((waitto - now) * 1000)
if ready:
rawreply = self._socket.recv(4096)
def processPacket(self, pkt):
if pkt.code != packet.AccountingRequest:
raise PacketError(
'non-AccountingRequest packet on authentication socket')