Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
while not t_stop.is_set():
for (fd, event) in self._poll.poll(1000):
if event == select.POLLIN:
try:
fdo = self._fdmap[fd]
self._ProcessInput(fdo)
except pyrad.server.ServerPacketError as err:
logger.info("pyrad server dropping packet: " + str(err))
except pyrad.packet.PacketError as err:
logger.info("pyrad server received invalid packet: " + str(err))
else:
logger.error("Unexpected event in pyrad server main loop")
srv = TestServer(dict=pyrad.dictionary.Dictionary("dictionary.radius"),
authport=18138, acctport=18139)
srv.hosts["127.0.0.1"] = pyrad.server.RemoteHost("127.0.0.1",
"radius",
"localhost")
srv.BindToAddress("")
t_stop = threading.Event()
t = threading.Thread(target=run_pyrad_server, args=(srv, t_stop, eap_handler))
t.start()
return { 'srv': srv, 'stop': t_stop, 'thread': t }
while not t_events['stop'].is_set():
for (fd, event) in self._poll.poll(1000):
if event == select.POLLIN:
try:
fdo = self._fdmap[fd]
self._ProcessInput(fdo)
except pyrad.server.ServerPacketError as err:
logger.info("pyrad server dropping packet: " + str(err))
except pyrad.packet.PacketError as err:
logger.info("pyrad server received invalid packet: " + str(err))
else:
logger.error("Unexpected event in pyrad server main loop")
srv = TestServer(dict=pyrad.dictionary.Dictionary("dictionary.radius"),
authport=18138, acctport=18139)
srv.hosts["127.0.0.1"] = pyrad.server.RemoteHost("127.0.0.1",
b"radius",
"localhost")
srv.BindToAddress("")
t_events = {}
t_events['stop'] = threading.Event()
t_events['msg_auth'] = threading.Event()
t_events['wrong_secret'] = threading.Event()
t_events['double_msg_auth'] = threading.Event()
t = threading.Thread(target=run_pyrad_server, args=(srv, t_events))
t.start()
try:
params = hostapd.wpa2_eap_params(ssid="radius-test")
params['auth_server_port'] = "18138"
hapd = hostapd.add_ap(apdev[0], params)
connect(dev[0], "radius-test", wait_connect=False)
while not t_events['stop'].is_set():
for (fd, event) in self._poll.poll(1000):
if event == select.POLLIN:
try:
fdo = self._fdmap[fd]
self._ProcessInput(fdo)
except pyrad.server.ServerPacketError as err:
logger.info("pyrad server dropping packet: " + str(err))
except pyrad.packet.PacketError as err:
logger.info("pyrad server received invalid packet: " + str(err))
else:
logger.error("Unexpected event in pyrad server main loop")
srv = TestServer(dict=pyrad.dictionary.Dictionary("dictionary.radius"),
authport=18138, acctport=18139)
srv.hosts["127.0.0.1"] = pyrad.server.RemoteHost("127.0.0.1",
b"radius",
"localhost")
srv.BindToAddress("")
t_events = {}
t_events['stop'] = threading.Event()
t_events['psk'] = psk
t_events['invalid_code'] = invalid_code
t_events['acct_interim_interval'] = acct_interim_interval
t_events['session_timeout'] = session_timeout
t_events['reject'] = reject
t = threading.Thread(target=run_pyrad_server, args=(srv, t_events))
t.start()
return t, t_events
def RunWithStop(self, t_events):
self._poll = select.poll()
self._fdmap = {}
self._PrepareSockets()
self.t_events = t_events
while not t_events['stop'].is_set():
for (fd, event) in self._poll.poll(1000):
if event == select.POLLIN:
try:
fdo = self._fdmap[fd]
self._ProcessInput(fdo)
except pyrad.server.ServerPacketError as err:
logger.info("pyrad server dropping packet: " + str(err))
except pyrad.packet.PacketError as err:
logger.info("pyrad server received invalid packet: " + str(err))
else:
logger.error("Unexpected event in pyrad server main loop")
print("Attributes: ")
for attr in pkt.keys():
print("%s: %s" % (attr, pkt[attr]))
reply = self.CreateReplyPacket(pkt)
# COA NAK
reply.code = 45
self.SendReplyPacket(pkt.fd, reply)
if __name__ == '__main__':
# create server and read dictionary
srv = FakeServer(dict=dictionary.Dictionary("dictionary"), coa_enabled=True)
# add clients (address, secret, name)
srv.hosts["127.0.0.1"] = server.RemoteHost("127.0.0.1", b"Kah3choteereethiejeimaeziecumi", "localhost")
srv.BindToAddress("")
# start server
srv.Run()
passwd = []
for key in pkt.keys():
if key == "User-Password":
passwd = map(pkt.PwDecrypt, pkt[key])
reply = self.CreateReplyPacket(pkt)
if passwd == ['accept']:
reply.code = packet.AccessAccept
else:
reply.code = packet.AccessReject
self.SendReplyPacket(pkt.fd, reply)
srv = TestServer(addresses=["localhost"],
hosts={"127.0.0.1":
server.RemoteHost("127.0.0.1", "foo", "localhost")},
dict=dictionary.Dictionary(StringIO.StringIO(DICTIONARY)))
# Write a sentinel character to let the parent process know we're listening.
sys.stdout.write("~")
sys.stdout.flush()
srv.Run()