Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_load_function():
# Load valid functions file
py_mod = load_functions(filepath=funcfile)
assert py_mod is not None
# Load valid function
func = load_function(name='foobar', py_mod=py_mod)
assert func is not None
# Load invalid function, function name does not exist in "funcfile"
with pytest.raises(AttributeError) as excinfo:
load_function(name='unknown', py_mod=py_mod)
assert re.match("Function 'unknown' does not exist in '{}c?'".format(funcfile), str(excinfo.value))
def test_load_function():
# Load valid functions file
py_mod = load_functions(filepath=funcfile)
assert py_mod is not None
# Load valid function
func = load_function(name='foobar', py_mod=py_mod)
assert func is not None
# Load invalid function, function name does not exist in "funcfile"
with pytest.raises(AttributeError) as excinfo:
load_function(name='unknown', py_mod=py_mod)
assert re.match("Function 'unknown' does not exist in '{}c?'".format(funcfile), str(excinfo.value))
def datamap(self, name, topic):
"""
Invoke function "name" loaded from the "functions" Python module.
:param name: Function name to invoke
:param topic: Topic to pass to the invoked function
:return: Return value of function invocation
"""
val = None
try:
func = load_function(name=name, py_mod=self.config.functions)
try:
val = func(topic, self.srv) # new version
except TypeError:
val = func(topic) # legacy
except:
raise
return val
def filter(self, name, topic, payload, section=None):
"""
Invoke function "name" loaded from the "functions" Python module.
Return that function's True/False.
:param name: Function name to invoke
:param topic: Topic to pass to the invoked function
:param payload: Payload to pass to the invoked function
:return: Return value of function invocation
"""
rc = False
try:
func = load_function(name=name, py_mod=self.config.functions)
try:
rc = func(topic, payload, section, self.srv) # new version
except TypeError:
rc = func(topic, payload) # legacy signature
except:
raise
return rc
def topic_target_list(self, name, topic, data):
"""
Invoke function "name" loaded from the "functions" Python module.
Computes dynamic topic subscription targets.
Obtains MQTT topic and transformation data.
:param name: Function name to invoke
:param topic: Topic to pass to the invoked function
:param data: Data to pass to the invoked function
:return: Return value of function invocation
"""
val = None
try:
func = load_function(name=name, py_mod=self.config.functions)
val = func(topic=topic, data=data, srv=self.srv)
except:
raise
return val
# Launch worker threads to operate on queue
logger.info('Starting %s worker threads' % cf.num_workers)
for i in range(cf.num_workers):
t = threading.Thread(target=processor, kwargs={'worker_id': i})
t.daemon = True
t.start()
# If the config file has a [cron] section, the key names therein are
# functions from 'myfuncs.py' which should be invoked periodically.
# The key's value (must be numeric!) is the period in seconds.
if cf.has_section('cron'):
for name, val in cf.items('cron'):
try:
func = load_function(name=name, py_mod=cf.functions)
cron_options = parse_cron_options(val)
interval = cron_options['interval']
logger.debug('Scheduling function "{name}" as periodic task ' \
'to run each {interval} seconds via [cron] section'.format(name=name, interval=interval))
service = make_service(mqttc=mqttc, name='mqttwarn.cron')
ptlist[name] = PeriodicThread(callback=func, period=interval, name=name, srv=service, now=asbool(cron_options.get('now')))
ptlist[name].start()
except AttributeError:
logger.error("[cron] section has function [%s] specified, but that's not defined" % name)
continue
def alldata(self, name, topic, data):
"""
Invoke function "name" loaded from the "functions" Python module.
:param name: Function name to invoke
:param topic: Topic to pass to the invoked function
:param data: Data to pass to the invoked function
:return: Return value of function invocation
"""
val = None
try:
func = load_function(name=name, py_mod=self.config.functions)
val = func(topic, data, self.srv)
except:
raise
return val