Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
(reticle_range * (1.0 - ratio))
return on_screen_reticle_scale
class HudDataCache(object):
TEXT_TEXTURE_CACHE = {}
__CACHE_ENTRY_LAST_USED__ = {}
__CACHE_INVALIDATION_TIME__ = 60 * 5
RELIABLE_TRAFFIC = []
IS_TRAFFIC_AVAILABLE = False
__LOCK__ = threading.Lock()
__TRAFFIC_CLIENT__ = AdsbTrafficClient(
configuration.CONFIGURATION.get_traffic_manager_address())
@staticmethod
def update_traffic_reports():
HudDataCache.__LOCK__.acquire()
try:
HudDataCache.RELIABLE_TRAFFIC = traffic.AdsbTrafficClient.TRAFFIC_MANAGER.get_traffic_with_position()
HudDataCache.IS_TRAFFIC_AVAILABLE = traffic.AdsbTrafficClient.TRAFFIC_MANAGER.is_traffic_available()
finally:
HudDataCache.__LOCK__.release()
@staticmethod
def get_reliable_traffic():
"""
Returns a thread safe copy of the currently known reliable traffic.
__aircraft__ = AhrsSimulation()
__pixels_per_degree_y__ = (__height__ / configuration.CONFIGURATION.get_degrees_of_pitch()) * \
configuration.CONFIGURATION.get_pitch_degrees_display_scaler()
hud_element = element_type(
configuration.CONFIGURATION.get_degrees_of_pitch(),
__pixels_per_degree_y__,
font,
(__width__, __height__))
while True:
for test_data in simulated_traffic:
test_data.simulate()
AdsbTrafficClient.TRAFFIC_MANAGER.handle_traffic_report(
test_data.icao_address,
test_data.to_json())
HudDataCache.purge_old_textures()
orientation = __aircraft__.get_ahrs()
__aircraft__.simulate()
__backpage_framebuffer__.fill(BLACK)
hud_element.render(__backpage_framebuffer__, orientation)
pygame.display.flip()
clock.tick(60)
"""
Prints our current traffic understanding.
"""
diag_traffic = AdsbTrafficClient.TRAFFIC_MANAGER.get_traffic_with_position()
if diag_traffic is not None:
for traffic in diag_traffic:
print("{0} - {1} - {2}".format(traffic.get_display_name(),
traffic.bearing, traffic.distance))
if __name__ == '__main__':
import time
trafficClient = AdsbTrafficClient(
configuration.CONFIGURATION.get_traffic_manager_address())
while True:
time.sleep(5)
print("position_valid:")
reports = AdsbTrafficClient.TRAFFIC_MANAGER.get_traffic_with_position()
for traffic_report in reports:
print(" {0} {1} {2}".format(traffic_report.get_display_name(
), traffic_report.bearing, traffic_report.distance))
def get_traffic_manager_service_status(
self
):
try:
status_json = self.__traffic_session__.get(
"http://{}/Service/Status".format(self.rest_address),
timeout=configuration.AHRS_TIMEOUT).json()
if status_json is not None and AdsbTrafficClient.TIME_SINCE_LAST_REPORT_KEY in status_json:
time_since_last_report = float(
status_json[AdsbTrafficClient.TIME_SINCE_LAST_REPORT_KEY])
if time_since_last_report < 60.0:
AdsbTrafficClient.TRAFFIC_MANAGER.heartbeat()
except:
pass
def __init__(
self,
rest_address
):
self.__traffic_session__ = requests.Session()
self.rest_address = rest_address
self.__update_traffic_task__ = recurring_task.RecurringTask(
'UpdateTraffic',
0.1,
self.update_reliable_traffic)
self.__update_service_health_task__ = recurring_task.RecurringTask(
'UpdateTrafficManagerHealth',
0.5,
self.get_traffic_manager_service_status)
AdsbTrafficClient.INSTANCE = self
def update_traffic_reports():
HudDataCache.RELIABLE_TRAFFIC_REPORTS = AdsbTrafficClient.TRAFFIC_MANAGER.get_traffic_with_position()
# The second hardest problem in comp-sci...
textures_to_purge = []
for texture_key in HudDataCache.__CACHE_ENTRY_LAST_USED__:
lsu = HudDataCache.__CACHE_ENTRY_LAST_USED__[texture_key]
time_since_last_use = (
datetime.datetime.now() - lsu).total_seconds()
if time_since_last_use > HudDataCache.__CACHE_INVALIDATION_TIME__:
textures_to_purge.append(texture_key)
for texture_to_purge in textures_to_purge:
del HudDataCache.TEXT_TEXTURE_CACHE[texture_to_purge]
del HudDataCache.__CACHE_ENTRY_LAST_USED__[texture_to_purge]
def received_message(
self,
icao_identifier,
adsb_traffic
):
"""
Handler for receiving a message.
Arguments:
adsb_traffic {map} -- The json message containing the traffic update
"""
try:
AdsbTrafficClient.TRAFFIC_MANAGER.handle_traffic_report(
icao_identifier,
adsb_traffic)
except:
print("Issue decoding JSON")
def __reset_traffic_manager__(self):
"""
Resets the traffic manager to essentially reset the receiver unit.
"""
try:
AdsbTrafficClient.INSTANCE.reset_traffic_manager()
except:
pass