Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
@Throttle(MIN_UPDATE_SOURCES)
async def get_sources():
retry_attempts = 0
while True:
try:
favorites = {}
if controller.is_signed_in:
favorites = await controller.get_favorites()
inputs = await controller.get_input_sources()
return favorites, inputs
except HeosError as error:
if retry_attempts < self.max_retry_attempts:
retry_attempts += 1
_LOGGER.debug(
"Error retrieving sources and will retry: %s", error
)
await asyncio.sleep(self.retry_delay)
@Throttle(MIN_TIME_BETWEEN_UPDATES)
async def update(self):
"""Get the latest data from ecobee.com."""
try:
await self._hass.async_add_executor_job(self.ecobee.update)
_LOGGER.debug("Updating ecobee")
except ExpiredTokenError:
_LOGGER.warning(
"Ecobee update failed; attempting to refresh expired tokens"
)
await self.refresh()
@Throttle(MIN_TIME_BETWEEN_UPDATES)
def update(self):
"""Hit by the timer with the configured interval."""
if self.data is None:
self.initialize(utcnow())
else:
self.refresh()
@Throttle(min_time=DEFAULT_SCAN_INTERVAL * 0.8)
def update_spaces(self):
"""Get the latest spaces data from N26."""
self._spaces = self._api.get_spaces()
@util.Throttle(MIN_TIME_BETWEEN_SCANS, MIN_TIME_BETWEEN_FORCED_SCANS)
@_catch_login_errors
async def async_update(self):
"""Update Guard state."""
try:
if not self.enabled:
return
except AttributeError:
pass
import json
if self._login.session.closed:
self._available = False
self._assumed_state = True
return
_LOGGER.debug("%s: Refreshing %s", self.account, self.name)
state = None
def __init__(self, gitlab_id, priv_token, interval, url):
"""Fetch data from GitLab API for most recent CI job."""
self._gitlab_id = gitlab_id
self._gitlab = Gitlab(url, private_token=priv_token, per_page=1)
self._gitlab.auth()
self.update = Throttle(interval)(self._update)
self.available = False
self.status = None
self.started_at = None
self.finished_at = None
self.duration = None
self.commit_id = None
self.commit_date = None
self.build_id = None
self.branch = None
@Throttle(SCAN_INTERVAL)
def update(self):
"""Update probe data."""
self.api.login(self.username, self.password)
_LOGGER.debug("Updating data for %s", self.inverter_id)
try:
if self.is_total:
total_info = self.api.plant_info(self.inverter_id)
del total_info["deviceList"]
# PlantMoneyText comes in as "3.1/€" remove anything that isn't part of the number
total_info["plantMoneyText"] = re.sub(
r"[^\d.,]", "", total_info["plantMoneyText"]
)
self.data = total_info
else:
inverter_info = self.api.inverter_detail(self.inverter_id)
self.data = inverter_info["data"]
@Throttle(MIN_TIME_BETWEEN_UPDATES)
def update(self):
"""Get the latest data from the SHT sensor."""
temperature, humidity = self.adafruit_sht.read_temperature_humidity()
if math.isnan(temperature) or math.isnan(humidity):
_LOGGER.warning("Bad sample from sensor SHT31")
return
self.temperature = temperature
self.humidity = humidity
def setup(hass, config):
"""Set up the Securitas component."""
global HUB
HUB = SecuritasHub(config[DOMAIN])
HUB.update_overview = Throttle(config[DOMAIN][CONF_SCAN_INTERVAL])(
HUB.update_overview
)
if not HUB.login():
return False
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, lambda event: HUB.logout())
HUB.update_overview()
for component in (
"alarm_control_panel",
):
discovery.load_platform(hass, component, DOMAIN, {}, config)
return True
from pynetio import Netio
host = config.get(CONF_HOST)
username = config.get(CONF_USERNAME)
password = config.get(CONF_PASSWORD)
port = config.get(CONF_PORT)
if not DEVICES:
hass.http.register_view(NetioApiView)
dev = Netio(host, port, username, password)
DEVICES[host] = Device(dev, [])
# Throttle the update for all Netio switches of one Netio
dev.update = util.Throttle(MIN_TIME_BETWEEN_SCANS)(dev.update)
for key in config[CONF_OUTLETS]:
switch = NetioSwitch(
DEVICES[host].netio, key, config[CONF_OUTLETS][key])
DEVICES[host].entities.append(switch)
add_entities(DEVICES[host].entities)
hass.bus.listen_once(EVENT_HOMEASSISTANT_STOP, dispose)
return True