Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_1_pyhomematic_init(self):
LOG.info("TestPyhomematicBase.test_1_pyhomematic_init")
client = HMConnection(
interface_id=DEFAULT_INTERFACE_CLIENT,
autostart=False,
remotes={
DEFAULT_REMOTE: {
"ip": DEFAULT_IP,
"port": self.localport,
"connect": True
}
}
)
client.start()
time.sleep(STARTUP_DELAY)
servicemessages = client.getServiceMessages(DEFAULT_REMOTE)
self.assertEqual(len(servicemessages), 1)
self.assertEqual(servicemessages[0][0], 'VCU0000001:1')
self.assertIsInstance(client.devices, dict)
def test_0_pyhomematic_noinit(self):
LOG.info("TestPyhomematicBase.test_0_pyhomematic_noinit")
client = HMConnection(
interface_id=DEFAULT_INTERFACE_CLIENT,
autostart=False,
remotes={
DEFAULT_REMOTE: {
"ip": DEFAULT_IP,
"port": self.localport,
"connect": False
}
}
)
client.start()
time.sleep(STARTUP_DELAY)
servicemessages = client.getServiceMessages(DEFAULT_REMOTE)
self.assertEqual(len(servicemessages), 1)
self.assertEqual(servicemessages[0][0], 'VCU0000001:1')
client.stop()
def test_rssi_helper(self):
for klass in self.device_classes:
both_rssi_helpers_used = issubclass(HelperRssiDevice, klass) and issubclass(HelperRssiPeer, klass)
self.assertFalse(
both_rssi_helpers_used,
"The class %s inherits from both HelperRssiDevice and HelperRssiPeer, which is not supported." % klass
)
def setUp(self):
LOG.debug("TestPyhomematicDevices.setUp")
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.bind(("", 0))
self.localport = s.getsockname()[1]
s.close()
self.vccu = vccu.ServerThread(local=DEFAULT_IP,
localport=self.localport)
self.vccu.start()
time.sleep(0.5)
self.client = HMConnection(
interface_id=DEFAULT_INTERFACE_CLIENT,
autostart=False,
remotes={
DEFAULT_REMOTE: {
"ip": DEFAULT_IP,
"port": self.localport,
"connect": True
}
}
)
self.client.start()
time.sleep(STARTUP_DELAY)
DEVICE1 = 'address_of_rollershutter_device' # e.g. KEQ7654321
DEVICE2 = 'address_of_doorcontact' # e.g. LEQ1234567
DEVICE3 = 'address_of_thermostat'
def systemcallback(src, *args):
print(src)
for arg in args:
print(arg)
try:
# Create a server that listens on 127.0.0.1:7080 and identifies itself as myserver.
# Connect to Homegear at 127.0.0.1:2001
# Automatically start everything. Without autostart, pyhomematic.start() can be called.
# We add a systemcallback so we can see what else happens besides the regular events.
pyhomematic = HMConnection(interface_id="myserver",
autostart=True,
systemcallback=systemcallback,
remotes={"rf":{
"ip":"127.0.0.1",
"port": 2001}})
except Exception:
sys.exit(1)
sleepcounter = 0
def eventcallback(address, interface_id, key, value):
print("CALLBACK: %s, %s, %s, %s" % (address, interface_id, key, value))
while not pyhomematic.devices and sleepcounter < 20:
print("Waiting for devices")
sleepcounter += 1
def get_rssi(self, channel=0):
return self.getAttributeData("RSSI_DEVICE", channel)
class HelperRssiPeer(HMDevice):
"""Used for devices which report their RSSI value through RSSI_PEER"""
def __init__(self, device_description, proxy, resolveparamsets=False):
super().__init__(device_description, proxy, resolveparamsets)
self.ATTRIBUTENODE["RSSI_PEER"] = [0]
def get_rssi(self, channel=0):
return self.getAttributeData("RSSI_PEER", channel)
class HelperDeviceTemperature(HMDevice):
"""Used for devices that report their actual device temperature values (such as the HmIP Wired devices)"""
def __init__(self, device_description, proxy, resolveparamsets=False):
super().__init__(device_description, proxy, resolveparamsets)
self.ATTRIBUTENODE["ACTUAL_TEMPERATURE"] = [0]
def get_device_temperature(self, channel=0):
return self.getAttributeData("ACTUAL_TEMPERATURE", channel)
def __init__(self, device_description, proxy, resolveparamsets=False):
super().__init__(device_description, proxy, resolveparamsets)
self.ACTIONNODE.update({"PRESS_SHORT": self.ELEMENT,
"PRESS_LONG": self.ELEMENT})
def press_long(self, channel=None):
"""Simulat a button press long."""
self.actionNodeData("PRESS_LONG", 1, channel)
def press_short(self, channel=None):
"""Simulat a button press short."""
self.actionNodeData("PRESS_SHORT", 1, channel)
class HelperEventPress(HMDevice):
"""Remote handle buttons."""
def __init__(self, device_description, proxy, resolveparamsets=False):
super().__init__(device_description, proxy, resolveparamsets)
self.EVENTNODE.update({"PRESS": self.ELEMENT})
class HelperEventRemote(HMDevice):
"""Remote handle buttons."""
def __init__(self, device_description, proxy, resolveparamsets=False):
super().__init__(device_description, proxy, resolveparamsets)
self.EVENTNODE.update({"PRESS_SHORT": self.ELEMENT,
"PRESS_LONG": self.ELEMENT,
"PRESS_CONT": self.ELEMENT,
"PRESS_LONG_RELEASE": self.ELEMENT})
# init metadata
self.ACTIONNODE.update({"ON_TIME": self.ELEMENT})
def set_ontime(self, ontime):
"""Set duration th switch stays on when toggled. """
try:
ontime = float(ontime)
except Exception as err:
LOG.debug("SwitchPowermeter.set_ontime: Exception %s" % (err,))
return False
self.actionNodeData("ON_TIME", ontime)
class HelperActionPress(HMDevice):
"""Helper for simulate press button."""
def __init__(self, device_description, proxy, resolveparamsets=False):
super().__init__(device_description, proxy, resolveparamsets)
self.ACTIONNODE.update({"PRESS_SHORT": self.ELEMENT,
"PRESS_LONG": self.ELEMENT})
def press_long(self, channel=None):
"""Simulat a button press long."""
self.actionNodeData("PRESS_LONG", 1, channel)
def press_short(self, channel=None):
"""Simulat a button press short."""
self.actionNodeData("PRESS_SHORT", 1, channel)
import logging
from pyhomematic.devicetypes import generic, misc, sensors, actors, thermostats
LOG = logging.getLogger(__name__)
try:
UNSUPPORTED = generic.HMDevice
SUPPORTED = {}
SUPPORTED.update(actors.DEVICETYPES)
SUPPORTED.update(sensors.DEVICETYPES)
SUPPORTED.update(thermostats.DEVICETYPES)
SUPPORTED.update(misc.DEVICETYPES)
except Exception as err:
LOG.critical("devicetypes Exception: %s" % (err,))
UNSUPPORTED = False
SUPPORTED = {}
"""Returns True if the devicecase has been opened."""
return bool(self.getAttributeData("SABOTAGE", 0))
class HelperLowBat(HMDevice):
"""This Helper adds easy access to read the LOWBAT state"""
def __init__(self, device_description, proxy, resolveparamsets=False):
super().__init__(device_description, proxy, resolveparamsets)
# init metadata
self.ATTRIBUTENODE.update({"LOWBAT": [0]})
def low_batt(self, channel=None):
""" Returns if the battery is low. """
return self.getAttributeData("LOWBAT", channel)
class HelperLowBatIP(HMDevice):
"""This Helper adds easy access to read the LOWBAT state"""
def __init__(self, device_description, proxy, resolveparamsets=False):
super().__init__(device_description, proxy, resolveparamsets)
# init metadata
self.ATTRIBUTENODE.update({"LOW_BAT": [0]})
# pylint: disable=unused-argument
def low_batt(self, channel=None):
""" Returns if the battery is low. """
return self.getAttributeData("LOW_BAT", 0)
class HelperOperatingVoltageIP(HMDevice):
"""This Helper adds easy access to read the OPERATING_VOLTAGE state"""
def __init__(self, device_description, proxy, resolveparamsets=False):