Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def simulate_time(hass, mock_lj, delta):
"""Test to simulate time."""
_LOGGER.info(
"*** simulate time change by %s: %s", delta, mock_lj.start_time + delta
)
mock_lj.last_delta = delta
with mock.patch(
"homeassistant.helpers.condition.dt_util.utcnow",
return_value=mock_lj.start_time + delta,
):
_LOGGER.info("now=%s", dt_util.utcnow())
async_fire_time_changed(hass, mock_lj.start_time + delta)
await hass.async_block_till_done()
_LOGGER.info("done with now=%s", dt_util.utcnow())
async def mock_discovery(hass, discoveries, config=BASE_CONFIG):
"""Mock discoveries."""
result = await async_setup_component(hass, "discovery", config)
assert result
await hass.async_start()
with patch.object(discovery, "_discover", discoveries), patch(
"homeassistant.components.discovery.async_discover", return_value=mock_coro()
) as mock_discover, patch(
"homeassistant.components.discovery.async_load_platform",
return_value=mock_coro(),
) as mock_platform:
async_fire_time_changed(hass, utcnow())
# Work around an issue where our loop.call_soon not get caught
await hass.async_block_till_done()
await hass.async_block_till_done()
return mock_discover, mock_platform
_LOGGER.warning("Value cannot be processed as a number: %s",
observed_entity_state)
return
"""If we are already in the correct state, no need to do anything"""
if self._is_on:
if obs_value > self._value_off:
self._pending = False
return
else:
if obs_value < self._value_on:
self._pending = False
return
if self._pending:
time_pending = dt_util.utcnow() - self._pending_since
if self._is_on:
time_remaining = self._time_off - time_pending
else:
time_remaining = self._time_on - time_pending
if time_remaining.total_seconds() <= 0:
self._is_on = not self._is_on
self._pending = False
self.schedule_update_ha_state()
else:
# enter pending mode (start counting time to change state)
self._pending_since = dt_util.utcnow()
self._pending = True
time_to_expire = dt_util.utcnow() + (
self._time_off if self._is_on else self._time_on)
@callback
def fire_time_event(target: float) -> None:
"""Fire next time event."""
now = dt_util.utcnow()
hass.bus.async_fire(EVENT_TIME_CHANGED,
{ATTR_NOW: now})
# If we are more than a second late, a tick was missed
late = monotonic() - target
if late > 1:
hass.bus.async_fire(EVENT_TIMER_OUT_OF_SYNC,
{ATTR_SECONDS: late})
schedule_tick(now)
if not await connection.update(journal=True):
_LOGGER.warning("Could not query server")
return False
for vehicle in connection.vehicles:
if vehicle.vin not in data.vehicles:
discover_vehicle(vehicle)
async_dispatcher_send(hass, SIGNAL_STATE_UPDATED)
return True
finally:
async_track_point_in_utc_time(hass, update, utcnow() + interval)
_LOGGER.info("Logging in to service")
return await update(utcnow())
def device_state_attributes(self):
"""Return the state attributes."""
if self._opendata is None:
return
self._remaining_time = dt_util.parse_datetime(
self._opendata.connections[0]['departure']) -\
dt_util.as_local(dt_util.utcnow())
attr = {
ATTR_TRAIN_NUMBER: self._opendata.connections[0]['number'],
ATTR_PLATFORM: self._opendata.connections[0]['platform'],
ATTR_TRANSFERS: self._opendata.connections[0]['transfers'],
ATTR_DURATION: self._opendata.connections[0]['duration'],
ATTR_DEPARTURE_TIME1: self._opendata.connections[1]['departure'],
ATTR_DEPARTURE_TIME2: self._opendata.connections[2]['departure'],
ATTR_START: self._opendata.from_name,
ATTR_TARGET: self._opendata.to_name,
ATTR_REMAINING_TIME: '{}'.format(self._remaining_time),
ATTR_ATTRIBUTION: ATTRIBUTION,
}
return attr
def camera_image(self):
"""Return a still image response from the camera."""
now = utcnow()
if self._ready_for_snapshot(now):
url = self.device.snapshot_url
try:
response = requests.get(url)
except requests.exceptions.RequestException as error:
_LOGGER.error("Error getting camera image: %s", error)
return None
self._next_snapshot_at = now + self._time_between_snapshots
self._last_image = response.content
return self._last_image
def _async_track_unavailable(self):
if self._remove_unavailability_tracker:
self._remove_unavailability_tracker()
self._remove_unavailability_tracker = async_track_point_in_utc_time(
self.hass, self._async_set_unavailable,
utcnow() + TIME_TILL_UNAVAILABLE)
if not self._is_available:
self._is_available = True
return True
return False
def icon(self) -> str:
"""Return the icon of the sensor."""
if "_state" in self._device.data: # only for v3 API
interval = timedelta(
seconds=self._device.data["_state"].get("wakeupInterval", 30 * 60)
)
if self._last_comms < dt_util.utcnow() - interval * 3:
return "mdi:battery-unknown"
battery_level = self._device.data["state"][self._state_attr]
if battery_level == 255:
return "mdi:battery-unknown"
if battery_level < 40:
return "mdi:battery-alert"
icon = "mdi:battery"
if battery_level <= 95:
icon += f"-{int(round(battery_level / 10 - 0.01)) * 10}"
return icon