Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
22: pb.CoverStateResponse,
23: pb.FanStateResponse,
24: pb.LightStateResponse,
25: pb.SensorStateResponse,
26: pb.SwitchStateResponse,
27: pb.TextSensorStateResponse,
28: pb.SubscribeLogsRequest,
29: pb.SubscribeLogsResponse,
30: pb.CoverCommandRequest,
31: pb.FanCommandRequest,
32: pb.LightCommandRequest,
33: pb.SwitchCommandRequest,
34: pb.SubscribeHomeassistantServicesRequest,
35: pb.HomeassistantServiceResponse,
36: pb.GetTimeRequest,
37: pb.GetTimeResponse,
38: pb.SubscribeHomeAssistantStatesRequest,
39: pb.SubscribeHomeAssistantStateResponse,
40: pb.HomeAssistantStateResponse,
41: pb.ListEntitiesServicesResponse,
42: pb.ExecuteServiceRequest,
43: pb.ListEntitiesCameraResponse,
44: pb.CameraImageResponse,
45: pb.CameraImageRequest,
46: pb.ListEntitiesClimateResponse,
47: pb.ClimateStateResponse,
48: pb.ClimateCommandRequest,
}
async def _handle_internal_messages(self, msg: Any) -> None:
if isinstance(msg, pb.DisconnectRequest):
await self.send_message(pb.DisconnectResponse())
await self.stop(force=True)
elif isinstance(msg, pb.PingRequest):
await self.send_message(pb.PingResponse())
elif isinstance(msg, pb.GetTimeRequest):
resp = pb.GetTimeResponse()
resp.epoch_seconds = int(time.time())
await self.send_message(resp)