Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def _HandleAuthPacket(self, pkt):
pyrad.server.Server._HandleAuthPacket(self, pkt)
logger.info("Received authentication request")
reply = self.CreateReplyPacket(pkt)
reply.code = pyrad.packet.AccessAccept
if self.t_events['invalid_code']:
reply.code = pyrad.packet.AccessRequest
if self.t_events['reject']:
reply.code = pyrad.packet.AccessReject
data = build_tunnel_password(reply.secret, pkt.authenticator,
self.t_events['psk'])
reply.AddAttribute("Tunnel-Password", data)
if self.t_events['acct_interim_interval']:
reply.AddAttribute("Acct-Interim-Interval",
self.t_events['acct_interim_interval'])
if self.t_events['session_timeout']:
reply.AddAttribute("Session-Timeout",
self.t_events['session_timeout'])
self.SendReplyPacket(pkt.fd, reply)
req["NAS-Identifier"] = "trillian"
req["Called-Station-Id"] = "00-04-5F-00-0F-D1"
req["Calling-Station-Id"] = "00-01-24-80-B3-9C"
req["Framed-IP-Address"] = "10.0.0.100"
try:
print("Sending authentication request")
reply = srv.SendPacket(req)
except pyrad.client.Timeout:
print("RADIUS server does not reply")
sys.exit(1)
except socket.error as error:
print("Network error: " + error[1])
sys.exit(1)
if reply.code == pyrad.packet.AccessAccept:
print("Access accepted")
else:
print("Access denied")
print("Attributes returned by server:")
for i in reply.keys():
print("%s: %s" % (i, reply[i]))
client = Client(
server=radius_server,
secret=radius_secret,
dict=self.RADIUS_DICT
)
req = client.CreateAuthPacket(
code=packet.AccessRequest,
User_Name=user,
NAS_Identifier="noc"
)
req["User-Password"] = req.PwCrypt(password)
try:
reply = client.SendPacket(req)
except client.Timeout:
raise self.LoginError("Timed out")
if reply.code != AccessAccept:
raise self.LoginError(
"RADIUS Authentication failed. Code=%s", reply.code
)
return user
def _HandleAuthPacket(self, pkt):
server.Server._HandleAuthPacket(self, pkt)
passwd = []
for key in pkt.keys():
if key == "User-Password":
passwd = map(pkt.PwDecrypt, pkt[key])
reply = self.CreateReplyPacket(pkt)
if passwd == ['accept']:
reply.code = packet.AccessAccept
else:
reply.code = packet.AccessReject
self.SendReplyPacket(pkt.fd, reply)
def CreateReply(self, msg=None,**attributes):
reply = AuthPacket2(AccessAccept, self.id,
self.secret, self.authenticator, dict=self.dict,
**attributes)
if msg:
reply.set_reply_msg(tools.EncodeString(msg))
return reply
"""
try:
reply = client.SendPacket(packet)
except Timeout as e:
logging.error("RADIUS timeout occurred contacting %s:%s" % (
client.server, client.authport))
return None
except Exception as e:
logging.error("RADIUS error: %s" % e)
return None
if reply.code == AccessReject:
logging.warning("RADIUS access rejected for user '%s'" % (
packet['User-Name']))
return None
elif reply.code != AccessAccept:
logging.error("RADIUS access error for user '%s' (code %s)" % (
packet['User-Name'], reply.code))
return None
logging.info("RADIUS access granted for user '%s'" % (
packet['User-Name']))
if not "Class" in reply.keys():
return [], False, False
groups = []
is_staff = False
is_superuser = False
app_class_prefix = getattr(settings, 'RADIUS_CLASS_APP_PREFIX', '')
group_class_prefix = app_class_prefix + "group="
def HandleAuthPacket(self, pkt):
print("Received an authentication request")
print("Attributes: ")
for attr in pkt.keys():
print("%s: %s" % (attr, pkt[attr]))
reply = self.CreateReplyPacket(pkt, **{
"Service-Type": "Framed-User",
"Framed-IP-Address": '192.168.0.1',
"Framed-IPv6-Prefix": "fc66::1/64"
})
reply.code = packet.AccessAccept
self.SendReplyPacket(pkt.fd, reply)
def _HandleProxyPacket(self, pkt):
"""Process a packet received on the reply socket.
If this packet should be dropped instead of processed a
:obj:`ServerPacketError` exception should be raised. The main loop
will drop the packet and log the reason.
:param pkt: packet to process
:type pkt: Packet class instance
"""
if pkt.source[0] not in self.hosts:
raise ServerPacketError('Received packet from unknown host')
pkt.secret = self.hosts[pkt.source[0]].secret
if pkt.code not in [packet.AccessAccept, packet.AccessReject,
packet.AccountingResponse]:
raise ServerPacketError('Received non-response on proxy socket')
remote_host = self.hosts[addr[0]]
elif '0.0.0.0' in self.hosts:
remote_host = self.hosts['0.0.0.0'].secret
else:
self.logger.warn('[%s:%d] Drop package from unknown source %s', self.ip, self.port, addr)
return
try:
self.logger.debug('[%s:%d] Received from %s packet: %s', self.ip, self.port, addr, data.hex())
req = Packet(packet=data, dict=self.server.dict)
except Exception as exc:
self.logger.error('[%s:%d] Error on decode packet: %s', self.ip, self.port, exc)
return
try:
if req.code in (AccountingResponse, AccessAccept, AccessReject, CoANAK, CoAACK, DisconnectNAK, DisconnectACK):
raise ServerPacketError('Invalid response packet %d' % req.code)
elif self.server_type == ServerType.Auth:
if req.code != AccessRequest:
raise ServerPacketError('Received non-auth packet on auth port')
req = AuthPacket(secret=remote_host.secret,
dict=self.server.dict,
packet=data)
if self.server.enable_pkt_verify:
if req.VerifyAuthRequest():
raise PacketError('Packet verification failed')
elif self.server_type == ServerType.Coa:
if req.code != DisconnectRequest and req.code != CoARequest:
raise ServerPacketError('Received non-coa packet on coa port')
req = CoAPacket(secret=remote_host.secret,