Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
await asyncio.wait_for(coro, 30.0)
except OSError as err:
await self._on_error()
raise APIConnectionError(
"Error connecting to {}: {}".format(sockaddr, err))
except asyncio.TimeoutError:
await self._on_error()
raise APIConnectionError(
"Timeout while connecting to {}".format(sockaddr))
_LOGGER.debug("%s: Opened socket for", self._params.address)
self._socket_reader, self._socket_writer = await asyncio.open_connection(sock=self._socket)
self._socket_connected = True
self._params.eventloop.create_task(self.run_forever())
hello = pb.HelloRequest()
hello.client_info = self._params.client_info
try:
resp = await self.send_message_await_response(hello, pb.HelloResponse)
except APIConnectionError as err:
await self._on_error()
raise err
_LOGGER.debug("%s: Successfully connected ('%s' API=%s.%s)",
self._params.address, resp.server_info, resp.api_version_major,
resp.api_version_minor)
self._api_version = APIVersion(
resp.api_version_major, resp.api_version_minor)
if self._api_version.major > 2:
_LOGGER.error("%s: Incompatible version %s! Closing connection",
self._params.address, self._api_version.major)
await self._on_error()
raise APIConnectionError("Incompatible API version.")
import aioesphomeapi.api_pb2 as pb
class APIConnectionError(Exception):
pass
MESSAGE_TYPE_TO_PROTO = {
1: pb.HelloRequest,
2: pb.HelloResponse,
3: pb.ConnectRequest,
4: pb.ConnectResponse,
5: pb.DisconnectRequest,
6: pb.DisconnectResponse,
7: pb.PingRequest,
8: pb.PingResponse,
9: pb.DeviceInfoRequest,
10: pb.DeviceInfoResponse,
11: pb.ListEntitiesRequest,
12: pb.ListEntitiesBinarySensorResponse,
13: pb.ListEntitiesCoverResponse,
14: pb.ListEntitiesFanResponse,
15: pb.ListEntitiesLightResponse,
16: pb.ListEntitiesSensorResponse,
17: pb.ListEntitiesSwitchResponse,