Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def test_saving_with_delay(hass, store, hass_storage):
"""Test saving data after a delay."""
store.async_delay_save(lambda: MOCK_DATA, 1)
assert store.key not in hass_storage
async_fire_time_changed(hass, dt.utcnow() + timedelta(seconds=1))
await hass.async_block_till_done()
assert hass_storage[store.key] == {
"version": MOCK_VERSION,
"key": MOCK_KEY,
"data": MOCK_DATA,
}
# allow traffic to flow through the gateway and device
await async_enable_traffic(hass, zha_gateway, [zha_device])
# test that the state has changed from unavailable to not home
assert hass.states.get(entity_id).state == STATE_NOT_HOME
# turn state flip
attr = make_attribute(0x0020, 23)
cluster.handle_message(1, 0x0A, [[attr]])
attr = make_attribute(0x0021, 200)
cluster.handle_message(1, 0x0A, [[attr]])
zigpy_device.last_seen = time.time() + 10
next_update = dt_util.utcnow() + timedelta(seconds=30)
async_fire_time_changed(hass, next_update)
await hass.async_block_till_done()
assert hass.states.get(entity_id).state == STATE_HOME
entity = hass.data[DOMAIN].get_entity(entity_id)
assert entity.is_connected is True
assert entity.source_type == SOURCE_TYPE_ROUTER
assert entity.battery_level == 100
# test adding device tracker to the network and HA
await async_test_device_join(
hass,
zha_gateway,
PowerConfiguration.cluster_id,
def save_migration(migration_id):
"""Save and commit a migration to the database."""
cur.execute('INSERT INTO schema_version VALUES (?, ?)',
(migration_id, dt_util.utcnow()))
self.conn.commit()
_LOGGER.info("Database migrated to version %d", migration_id)
def __init__(self, event_type: str, data: Optional[Dict] = None,
origin: EventOrigin = EventOrigin.local,
time_fired: Optional[int] = None,
context: Optional[Context] = None) -> None:
"""Initialize a new event."""
self.event_type = event_type
self.data = data or {}
self.origin = origin
self.time_fired = time_fired or dt_util.utcnow()
self.context = context or Context()
def is_connected(self):
"""Return true if the client is connected to the network.
If is_wired and client.is_wired differ it means that the device is offline and UniFi bug shows device as wired.
"""
if self.is_wired != self.client.is_wired:
if not self.wired_bug:
self.wired_bug = dt_util.utcnow()
since_last_seen = dt_util.utcnow() - self.wired_bug
else:
self.wired_bug = None
since_last_seen = dt_util.utcnow() - dt_util.utc_from_timestamp(
float(self.client.last_seen)
)
if since_last_seen < self.controller.option_detection_time:
return True
return False
def update(self):
"""Get the latest data and updates the states."""
# Get previous values of start and end
p_start, p_end = self._period
# Parse templates
self.update_period()
start, end = self._period
# Convert times to UTC
start = dt_util.as_utc(start)
end = dt_util.as_utc(end)
p_start = dt_util.as_utc(p_start)
p_end = dt_util.as_utc(p_end)
now = datetime.datetime.now()
# Compute integer timestamps
start_timestamp = math.floor(dt_util.as_timestamp(start))
end_timestamp = math.floor(dt_util.as_timestamp(end))
p_start_timestamp = math.floor(dt_util.as_timestamp(p_start))
p_end_timestamp = math.floor(dt_util.as_timestamp(p_end))
now_timestamp = math.floor(dt_util.as_timestamp(now))
# If period has not changed and current time after the period end...
if (
start_timestamp == p_start_timestamp
and end_timestamp == p_end_timestamp
and end_timestamp <= now_timestamp
):
and
# only enable polling if websocket not connected
(
not websocket_enabled
or not seen_commands
or not (
"PUSH_AUDIO_PLAYER_STATE" in seen_commands
or "PUSH_MEDIA_CHANGE" in seen_commands
or "PUSH_MEDIA_PROGRESS_CHANGE" in seen_commands
)
)
):
self._should_poll = False # disable polling since manual update
if (
self._last_update == 0
or util.dt.as_timestamp(util.utcnow())
- util.dt.as_timestamp(self._last_update)
> PLAY_SCAN_INTERVAL
):
_LOGGER.debug(
"%s playing; scheduling update in %s seconds",
self.name,
PLAY_SCAN_INTERVAL,
)
async_call_later(
self.hass,
PLAY_SCAN_INTERVAL,
lambda _: self.async_schedule_update_ha_state(force_refresh=True),
)
elif self._should_poll: # Not playing, one last poll
self._should_poll = False
if not websocket_enabled:
self._title = feed_entry.title
# Convert distance if not metric system.
if self._unit_system == CONF_UNIT_SYSTEM_IMPERIAL:
self._distance = round(
IMPERIAL_SYSTEM.length(feed_entry.distance_to_home, LENGTH_KILOMETERS),
1,
)
else:
self._distance = round(feed_entry.distance_to_home, 1)
self._latitude = round(feed_entry.coordinates[0], 5)
self._longitude = round(feed_entry.coordinates[1], 5)
self._attribution = feed_entry.attribution
self._alert_level = feed_entry.alert_level
self._activity = feed_entry.activity
self._hazards = feed_entry.hazards
self._feed_last_update = dt.as_utc(last_update) if last_update else None
self._feed_last_update_successful = (
dt.as_utc(last_update_successful) if last_update_successful else None
)
def is_night(self):
if self.night_mode is None:
return False # if night mode is undefined, it's never night :)
else:
self.log.debug("NIGHT MODE ENABLED: " + str(self.night_mode))
start= dt.parse_time(self.night_mode['start_time'])
end= dt.parse_time(self.night_mode['end_time'])
return self.now_is_between(start, end)
def next_sunset(self, offset=0):
mod = offset
while True:
next_setting_dt = self.sunset(True) + timedelta(mod)
if next_setting_dt > dt.now():
break
mod += 1
return next_setting_dt