Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def connection_config_tunneling(self):
"""Return the connection_config if tunneling is configured."""
gateway_ip = self.config[DOMAIN][CONF_XKNX_TUNNELING][CONF_HOST]
gateway_port = self.config[DOMAIN][CONF_XKNX_TUNNELING].get(CONF_PORT)
local_ip = self.config[DOMAIN][CONF_XKNX_TUNNELING].get(CONF_XKNX_LOCAL_IP)
if gateway_port is None:
gateway_port = DEFAULT_MCAST_PORT
return ConnectionConfig(
connection_type=ConnectionType.TUNNELING,
gateway_ip=gateway_ip,
gateway_port=gateway_port,
local_ip=local_ip,
auto_reconnect=True,
)
def connection_config_routing(self):
"""Return the connection_config if routing is configured."""
local_ip = self.config[DOMAIN][CONF_XKNX_ROUTING].get(CONF_XKNX_LOCAL_IP)
return ConnectionConfig(
connection_type=ConnectionType.ROUTING, local_ip=local_ip
)
def connection_config_routing(self):
from xknx.io import ConnectionConfig, ConnectionType
local_ip = \
self.config[DOMAIN][CONF_XKNX_ROUTING].get(CONF_XKNX_LOCAL_IP)
return ConnectionConfig(
connection_type=ConnectionType.ROUTING,
local_ip=local_ip)
def connection_config_routing(self):
"""Return the connection_config if routing is configured."""
local_ip = self.config[DOMAIN][CONF_KNX_ROUTING].get(CONF_KNX_LOCAL_IP)
return ConnectionConfig(
connection_type=ConnectionType.ROUTING, local_ip=local_ip
)
def parse_connection(self, doc):
"""Parse the connection section of xknx.yaml."""
if "connection" in doc \
and hasattr(doc["connection"], '__iter__'):
for conn, prefs in doc["connection"].items():
try:
if conn == "tunneling":
if prefs is None or \
"gateway_ip" not in prefs:
raise XKNXException("`gateway_ip` is required for tunneling connection.")
conn_type = ConnectionType.TUNNELING
elif conn == "routing":
conn_type = ConnectionType.ROUTING
else:
conn_type = ConnectionType.AUTOMATIC
self._parse_connection_prefs(conn_type, prefs)
except XKNXException as ex:
self.xknx.logger.error("Error while reading config file: Could not parse %s: %s", conn, ex)
raise ex
def connection_config_tunneling(self):
from xknx.io import ConnectionConfig, ConnectionType, \
DEFAULT_MCAST_PORT
gateway_ip = \
self.config[DOMAIN][CONF_XKNX_TUNNELING].get(CONF_HOST)
gateway_port = \
self.config[DOMAIN][CONF_XKNX_TUNNELING].get(CONF_PORT)
local_ip = \
self.config[DOMAIN][CONF_XKNX_TUNNELING].get(CONF_XKNX_LOCAL_IP)
if gateway_port is None:
gateway_port = DEFAULT_MCAST_PORT
return ConnectionConfig(
connection_type=ConnectionType.TUNNELING,
gateway_ip=gateway_ip,
gateway_port=gateway_port,
local_ip=local_ip)
def connection_config_tunneling(self):
"""Return the connection_config if tunneling is configured."""
gateway_ip = self.config[DOMAIN][CONF_KNX_TUNNELING].get(CONF_HOST)
gateway_port = self.config[DOMAIN][CONF_KNX_TUNNELING].get(CONF_PORT)
local_ip = self.config[DOMAIN][CONF_KNX_TUNNELING].get(CONF_KNX_LOCAL_IP)
if gateway_port is None:
gateway_port = DEFAULT_MCAST_PORT
return ConnectionConfig(
connection_type=ConnectionType.TUNNELING,
gateway_ip=gateway_ip,
gateway_port=gateway_port,
local_ip=local_ip,
)