Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def __init__(self, code=AccessRequest, id=None, secret=six.b(''),
authenticator=None, **attributes):
AuthPacket.__init__(self, code, id, secret, authenticator, **attributes)
def CreateAuthPacket(self, **args):
"""Create a new RADIUS packet.
This utility function creates a new RADIUS packet which can
be used to communicate with the RADIUS server this client
talks to. This is initializing the new packet with the
dictionary and secret used for the client.
:return: a new empty packet instance
:rtype: pyrad.packet.Packet
"""
if not self.protocol_auth:
raise Exception('Transport not initialized')
return AuthPacket(dict=self.dict,
id=self.protocol_auth.create_id(),
secret=self.secret, **args)
def CreateAuthPacket(self, **args):
"""Create a new authentication RADIUS packet.
This utility function creates a new RADIUS authentication
packet which can be used to communicate with the RADIUS server
this client talks to. This is initializing the new packet with
the dictionary and secret used for the client.
:return: a new empty packet instance
:rtype: pyrad.packet.AuthPacket
"""
return packet.AuthPacket(dict=self.dict, **args)
def CreateReply(self, **attributes):
"""Create a new packet as a reply to this one. This method
makes sure the authenticator and secret are copied over
to the new instance.
"""
return AuthPacket(AccessAccept, self.id,
self.secret, self.authenticator, dict=self.dict,
**attributes)
try:
self.logger.debug('[%s:%d] Received from %s packet: %s', self.ip, self.port, addr, data.hex())
req = Packet(packet=data, dict=self.server.dict)
except Exception as exc:
self.logger.error('[%s:%d] Error on decode packet: %s', self.ip, self.port, exc)
return
try:
if req.code in (AccountingResponse, AccessAccept, AccessReject, CoANAK, CoAACK, DisconnectNAK, DisconnectACK):
raise ServerPacketError('Invalid response packet %d' % req.code)
elif self.server_type == ServerType.Auth:
if req.code != AccessRequest:
raise ServerPacketError('Received non-auth packet on auth port')
req = AuthPacket(secret=remote_host.secret,
dict=self.server.dict,
packet=data)
if self.server.enable_pkt_verify:
if req.VerifyAuthRequest():
raise PacketError('Packet verification failed')
elif self.server_type == ServerType.Coa:
if req.code != DisconnectRequest and req.code != CoARequest:
raise ServerPacketError('Received non-coa packet on coa port')
req = CoAPacket(secret=remote_host.secret,
dict=self.server.dict,
packet=data)
if self.server.enable_pkt_verify:
if req.VerifyCoARequest():
raise PacketError('Packet verification failed')
def SendPacket(self, pkt):
"""Send a packet to a RADIUS server.
:param pkt: the packet to send
:type pkt: pyrad.packet.Packet
:return: Future related with packet to send
:rtype: asyncio.Future
"""
ans = asyncio.Future(loop=self.loop)
if isinstance(pkt, AuthPacket):
if not self.protocol_auth:
raise Exception('Transport not initialized')
self.protocol_auth.send_packet(pkt, ans)
elif isinstance(pkt, AcctPacket):
if not self.protocol_acct:
raise Exception('Transport not initialized')
elif isinstance(pkt, CoAPacket):
if not self.protocol_coa:
raise Exception('Transport not initialized')
else:
raise Exception('Unsupported packet')
return ans
return binascii.hexlify(result)
def decrypt(x):
if not x or len(x)%16 > 0 :return ''
x = binascii.unhexlify(str(x))
return AES.new(_key, AES.MODE_CBC).decrypt(x).strip()
def is_valid_date(dstr1,dstr2):
d1 = datetime.datetime.strptime("%s 00:00:00"%dstr1,"%Y-%m-%d %H:%M:%S")
d2 = datetime.datetime.strptime("%s 23:59:59"%dstr2,"%Y-%m-%d %H:%M:%S")
now = datetime.datetime.now()
return now >= d1 and now <= d2
class AuthPacket2(AuthPacket):
def __init__(self, code=AccessRequest, id=None, secret=six.b(''),
authenticator=None, **attributes):
AuthPacket.__init__(self, code, id, secret, authenticator, **attributes)
def CreateReply(self, msg=None,**attributes):
reply = AuthPacket2(AccessAccept, self.id,
self.secret, self.authenticator, dict=self.dict,
**attributes)
if msg:
reply.set_reply_msg(tools.EncodeString(msg))
return reply
def set_reply_msg(self,msg):
if msg:self.AddAttribute(18,msg)
def SendPacket(self, pkt):
"""Send a packet to a RADIUS server.
:param pkt: the packet to send
:type pkt: pyrad.packet.Packet
:return: the reply packet received
:rtype: pyrad.packet.Packet
:raise Timeout: RADIUS server does not reply
"""
if isinstance(pkt, packet.AuthPacket):
return self._SendPacket(pkt, self.authport)
elif isinstance(pkt, packet.CoAPacket):
return self._SendPacket(pkt, self.coaport)
else:
return self._SendPacket(pkt, self.acctport)