Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async def sync(self, wait_for_result=True):
"""Read state of device from KNX bus."""
try:
await self._sync_impl(wait_for_result)
except XKNXException as ex:
self.xknx.logger.error("Error while syncing device: %s", ex)
async def connect(self):
"""Connect/build tunnel."""
connect = Connect(
self.xknx,
self.udp_client)
await connect.start()
if not connect.success:
if self.auto_reconnect:
msg = "Cannot connect to KNX. Retry in {} seconds.".format(
self.auto_reconnect_wait
)
self.xknx.logger.warning(msg)
task = self.xknx.loop.create_task(self.schedule_reconnect())
self._reconnect_task = task
return
raise XKNXException("Could not establish connection")
self.xknx.logger.debug(
"Tunnel established communication_channel=%s, id=%s",
connect.communication_channel,
connect.identifier)
self._reconnect_task = None
self.communication_channel = connect.communication_channel
self.sequence_number = 0
await self.start_heartbeat()
def parse_connection(self, doc):
"""Parse the connection section of xknx.yaml."""
if "connection" in doc \
and hasattr(doc["connection"], '__iter__'):
for conn, prefs in doc["connection"].items():
try:
if conn == "tunneling":
if prefs is None or \
"gateway_ip" not in prefs:
raise XKNXException("`gateway_ip` is required for tunneling connection.")
conn_type = ConnectionType.TUNNELING
elif conn == "routing":
conn_type = ConnectionType.ROUTING
else:
conn_type = ConnectionType.AUTOMATIC
self._parse_connection_prefs(conn_type, prefs)
except XKNXException as ex:
self.xknx.logger.error("Error while reading config file: Could not parse %s: %s", conn, ex)
raise ex
async def start_automatic(self, scan_filter: GatewayScanFilter):
"""Start GatewayScanner and connect to the found device."""
gatewayscanner = GatewayScanner(self.xknx, scan_filter=scan_filter)
gateways = await gatewayscanner.scan()
if not gateways:
raise XKNXException("No Gateways found")
gateway = gateways[0]
if gateway.supports_tunnelling and \
scan_filter.routing is not True:
await self.start_tunnelling(gateway.local_ip,
gateway.ip_addr,
gateway.port,
self.connection_config.auto_reconnect,
self.connection_config.auto_reconnect_wait)
elif gateway.supports_routing:
bind_to_multicast_addr = get_os_name() != "Darwin" # = Mac OS
await self.start_routing(gateway.local_ip, bind_to_multicast_addr)
def validate_ip(address: str, address_name: str = "IP address") -> None:
"""Raise an exception if address cannot be parsed as IPv4 address."""
try:
ipaddress.IPv4Address(address)
except ipaddress.AddressValueError as ex:
raise XKNXException("%s is not a valid IPv4 address." % address_name) from ex
async def async_setup(hass, config):
"""Set up the KNX component."""
try:
hass.data[DATA_XKNX] = KNXModule(hass, config)
hass.data[DATA_XKNX].async_create_exposures()
await hass.data[DATA_XKNX].start()
except XKNXException as ex:
_LOGGER.warning("Can't connect to KNX interface: %s", ex)
hass.components.persistent_notification.async_create(
f"Can't connect to KNX interface: <br><b>{ex}</b>", title="KNX"
)
for component, discovery_type in (
("switch", "Switch"),
("climate", "Climate"),
("cover", "Cover"),
("light", "Light"),
("sensor", "Sensor"),
("binary_sensor", "BinarySensor"),
("scene", "Scene"),
("notify", "Notification"),
):
found_devices = _get_devices(hass, discovery_type)
async def async_setup(hass, config):
"""Set up the KNX component."""
try:
hass.data[DATA_KNX] = KNXModule(hass, config)
hass.data[DATA_KNX].async_create_exposures()
await hass.data[DATA_KNX].start()
except XKNXException as ex:
_LOGGER.warning("Can't connect to KNX interface: %s", ex)
hass.components.persistent_notification.async_create(
"Can't connect to KNX interface: <br>" "<b>{0}</b>".format(ex), title="KNX"
)
for component, discovery_type in (
("switch", "Switch"),
("climate", "Climate"),
("cover", "Cover"),
("light", "Light"),
("sensor", "Sensor"),
("binary_sensor", "BinarySensor"),
("scene", "Scene"),
("notify", "Notification"),
):
found_devices = _get_devices(hass, discovery_type)
"""Parse the connection section of xknx.yaml."""
if "connection" in doc \
and hasattr(doc["connection"], '__iter__'):
for conn, prefs in doc["connection"].items():
try:
if conn == "tunneling":
if prefs is None or \
"gateway_ip" not in prefs:
raise XKNXException("`gateway_ip` is required for tunneling connection.")
conn_type = ConnectionType.TUNNELING
elif conn == "routing":
conn_type = ConnectionType.ROUTING
else:
conn_type = ConnectionType.AUTOMATIC
self._parse_connection_prefs(conn_type, prefs)
except XKNXException as ex:
self.xknx.logger.error("Error while reading config file: Could not parse %s: %s", conn, ex)
raise ex
async def disconnect(self, ignore_error=False):
"""Disconnect from tunnel device."""
# only send disconnect request if we ever were connected
if self.communication_channel is None:
# close udp client to prevent open file descriptors
await self.udp_client.stop()
return
disconnect = Disconnect(
self.xknx,
self.udp_client,
communication_channel_id=self.communication_channel)
await disconnect.start()
if not disconnect.success and not ignore_error:
raise XKNXException("Could not disconnect channel")
self.xknx.logger.debug("Tunnel disconnected (communication_channel: %s)", self.communication_channel)
# close udp client to prevent open file descriptors
await self.udp_client.stop()