Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def init_xknx(self):
from xknx import XKNX
self.xknx = XKNX(
config=self.config_file(),
loop=self.hass.loop)
async def main():
"""Connect to KNX/IP device and broadcast time."""
xknx = XKNX()
datetime = DateTime(xknx, 'TimeTest', group_address='1/2/3', broadcast_type=DateTimeBroadcastType.TIME)
xknx.devices.add(datetime)
print("Sending time to KNX bus every hour")
await xknx.start(daemon_mode=True, state_updater=True)
await xknx.stop()
async def main():
"""Search for a Tunnelling device, walk through all possible channels and disconnect them."""
xknx = XKNX()
gatewayscanner = GatewayScanner(xknx)
gateways = await gatewayscanner.scan()
if not gateways:
print("No Gateways found")
return
gateway = gateways[0]
if not gateway.supports_tunnelling:
print("Gateway does not support tunneling")
return
udp_client = UDPClient(
xknx,
(gateway.local_ip, 0),
def __init__(self, hass, config):
xknx_config = XKNXConfig(hass, config)
self.xknx = xknx.XKNX(loop=hass.loop, start=False)
self.initialized = False
self.lock = threading.Lock()
self.hass = hass
self.config_file = xknx_config.config_file()
async def main():
"""Read xknx.yaml, walk through all devices and print them."""
xknx = XKNX(config='xknx.yaml')
for device in xknx.devices:
print(device)
#!/usr/bin/python3
from xknx import XKNX,Config,Multicast,Address,Telegram,TelegramDirection,DPTBinary
xknx = XKNX()
Config(xknx).read()
xknx.globals.own_address = Address("1.1.4")
telegram = Telegram(Address("1/2/7"), direction = TelegramDirection.OUTGOING, payload = DPTBinary(1) )
multicast = Multicast(xknx)
multicast.send(telegram)
if (device.name == "Livingroom.Switch_4" ):
if device.is_on():
xknx.devices.device_by_name("Livingroom.Outlet_2/1").set_on()
xknx.devices.device_by_name("Livingroom.Outlet_2/2").set_on()
xknx.devices.device_by_name("Livingroom.Outlet_2/3").set_on()
xknx.devices.device_by_name("Livingroom.Outlet_2/4").set_on()
elif device.is_off():
xknx.devices.device_by_name("Livingroom.Outlet_2/1").set_off()
xknx.devices.device_by_name("Livingroom.Outlet_2/2").set_off()
xknx.devices.device_by_name("Livingroom.Outlet_2/3").set_off()
xknx.devices.device_by_name("Livingroom.Outlet_2/4").set_off()
except CouldNotResolveAddress as c:
print(c)
xknx = XKNX()
Config(xknx).read()
#devices = xknx.devices.get_devices()
#for device in devices:
# print(device)
#print("down");
#xknx.devices.device_by_name("Livingroom.Shutter_1").set_down()
#time.sleep(2)
#print("up");
#xknx.devices.device_by_name("Livingroom.Shutter_1").set_up()
#time.sleep(5)
#print("short down")
#xknx.devices.device_by_name("Livingroom.Shutter_1").set_short_down()
#time.sleep(5)
async def main():
"""Connect to KNX/IP device and listen if a switch was updated via KNX bus."""
xknx = XKNX(device_updated_cb=device_updated_cb)
switch = Switch(xknx,
name='TestOutlet',
group_address='1/1/11')
xknx.devices.add(switch)
# Wait until Ctrl-C was pressed
await xknx.start(daemon_mode=True)
await xknx.stop()
async def main():
"""Connect to KNX/IP and read the state of a Climate device."""
xknx = XKNX()
await xknx.start()
climate = Climate(
xknx,
'TestClimate',
group_address_temperature='6/2/1')
await climate.sync()
# Will print out state of climate including current temperature:
print(climate)
await xknx.stop()