Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
if module_name not in data.get_module_names():
_LOGGER.error('Module name: "%s" not found', module_name)
continue
# Only create sensors for monitored properties
for variable in monitored_conditions:
dev.append(NetAtmoSensor(data, module_name, variable))
else:
for module_name in data.get_module_names():
for variable in \
data.station_data.monitoredConditions(module_name):
if variable in SENSOR_TYPES.keys():
dev.append(NetAtmoSensor(data, module_name, variable))
else:
_LOGGER.warning("Ignoring unknown var %s for mod %s",
variable, module_name)
except pyatmo.NoDevice:
return None
add_entities(dev, True)
return entities
def _retry(_data):
try:
entities = find_devices(_data)
except requests.exceptions.Timeout:
return call_later(
hass, NETATMO_UPDATE_INTERVAL, lambda _: _retry(_data)
)
if entities:
add_entities(entities, True)
for data_class in [pyatmo.WeatherStationData, pyatmo.HomeCoachData]:
try:
data = NetatmoData(auth, data_class, config.get(CONF_STATION))
except pyatmo.NoDevice:
_LOGGER.info(
"No %s devices found", NETATMO_DEVICE_TYPES[data_class.__name__]
)
continue
try:
dev.extend(find_devices(data))
except requests.exceptions.Timeout:
call_later(hass, NETATMO_UPDATE_INTERVAL, lambda _: _retry(data))
if dev:
add_entities(dev, True)
def setup_platform(hass, config, add_entities, discovery_info=None):
"""Set up the access to Netatmo binary sensor."""
home = config.get(CONF_HOME)
timeout = config.get(CONF_TIMEOUT)
if timeout is None:
timeout = DEFAULT_TIMEOUT
module_name = None
auth = hass.data[DATA_NETATMO_AUTH]
try:
data = CameraData(hass, auth, home)
if not data.get_camera_names():
return None
except NoDevice:
return None
welcome_sensors = config.get(CONF_WELCOME_SENSORS, WELCOME_SENSOR_TYPES)
presence_sensors = config.get(CONF_PRESENCE_SENSORS, PRESENCE_SENSOR_TYPES)
tag_sensors = config.get(CONF_TAG_SENSORS, TAG_SENSOR_TYPES)
for camera_name in data.get_camera_names():
camera_type = data.get_camera_type(camera=camera_name, home=home)
if camera_type == "NACamera":
if CONF_CAMERAS in config:
if (
config[CONF_CAMERAS] != []
and camera_name not in config[CONF_CAMERAS]
):
continue
for variable in welcome_sensors:
camera_type = data.get_camera_type(camera=camera_name, home=home)
if CONF_CAMERAS in config:
if (
config[CONF_CAMERAS] != []
and camera_name not in config[CONF_CAMERAS]
):
continue
add_entities(
[
NetatmoCamera(
data, camera_name, home, camera_type, verify_ssl, quality
)
]
)
data.get_persons()
except NoDevice:
return None
async def async_service_handler(call):
"""Handle service call."""
_LOGGER.debug(
"Service handler invoked with service=%s and data=%s",
call.service,
call.data,
)
service = call.service
entity_id = call.data["entity_id"][0]
async_dispatcher_send(hass, f"{service}_{entity_id}")
hass.services.async_register(
DOMAIN, "set_light_auto", async_service_handler, CAMERA_SERVICE_SCHEMA
)
if homes_conf is not None:
for home_conf in homes_conf:
home = home_conf[CONF_NAME]
home_id = home_data.homedata.gethomeId(home)
if home_conf[CONF_ROOMS] != []:
rooms[home_id] = home_conf[CONF_ROOMS]
home_ids.append(home_id)
else:
home_ids = home_data.get_home_ids()
devices = []
for home_id in home_ids:
_LOGGER.debug("Setting up %s ...", home_id)
try:
room_data = ThermostatData(auth, home_id)
except pyatmo.NoDevice:
continue
for room_id in room_data.get_room_ids():
room_name = room_data.homedata.rooms[home_id][room_id]["name"]
_LOGGER.debug("Setting up %s (%s) ...", room_name, room_id)
if home_id in rooms and room_name not in rooms[home_id]:
_LOGGER.debug("Excluding %s ...", room_name)
continue
_LOGGER.debug("Adding devices for room %s (%s) ...", room_name, room_id)
devices.append(NetatmoThermostat(room_data, room_id))
add_entities(devices, True)