Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_sanitize_function_name():
assert sanitize_function_name('foobar()') == 'foobar'
assert sanitize_function_name('hastme') is None
assert sanitize_function_name(42) is None
assert sanitize_function_name(None) is None
def test_sanitize_function_name():
assert sanitize_function_name('foobar()') == 'foobar'
assert sanitize_function_name('hastme') is None
assert sanitize_function_name(42) is None
assert sanitize_function_name(None) is None
def test_sanitize_function_name():
assert sanitize_function_name('foobar()') == 'foobar'
assert sanitize_function_name('hastme') is None
assert sanitize_function_name(42) is None
assert sanitize_function_name(None) is None
def test_sanitize_function_name():
assert sanitize_function_name('foobar()') == 'foobar'
assert sanitize_function_name('hastme') is None
assert sanitize_function_name(42) is None
assert sanitize_function_name(None) is None
def xform(function, orig_value, transform_data):
"""
Attempt transformation on orig_value.
- 1st. function()
- 2nd. inline {xxxx}
"""
if orig_value is None:
return None
res = orig_value
if function is not None:
function_name = sanitize_function_name(function)
if function_name is not None:
try:
res = context.invoker.datamap(function_name, transform_data)
return res
except Exception as e:
logger.warning("Cannot invoke %s(): %s" % (function_name, e))
try:
res = Formatter().format(function, **transform_data)
except Exception as e:
logger.exception("Formatting message failed")
if isinstance(res, str):
res = res.replace("\\n", "\n")
return res
def send_to_targets(section, topic, payload):
if cf.has_section(section) == False:
logger.warning("Section [%s] does not exist in your INI file, skipping message on %s" % (section, topic))
return
# decode raw payload into transformation data
data = decode_payload(section, topic, payload)
dispatcher_dict = cf.getdict(section, 'targets')
function_name = sanitize_function_name(context.get_config(section, 'targets'))
if function_name is not None:
targetlist = context.get_topic_targets(section, topic, data)
if not isinstance(targetlist, list):
targetlist_type = type(targetlist)
logger.error('Topic target definition by function "{function_name}" ' \
'in section "{section}" is empty or incorrect. Should be a list. ' \
'targetlist={targetlist}, type={targetlist_type}'.format(**locals()))
return
elif isinstance(dispatcher_dict, dict):
def get_key(item):
# precede a key with the number of topic levels and then use reverse alphabetic sort order
# '+' is after '#' in ascii table
# caveat: for instance space is allowed in topic name but will be less specific than '+', '#'
# so replace '#' with first ascii character and '+' with second ascii character
def get_topic_targets(self, section, topic, data):
"""
Topic targets function invoker.
"""
if self.config.has_option(section, 'targets'):
name = sanitize_function_name(self.config.get(section, 'targets'))
try:
return self.invoker.topic_target_list(name, topic, data)
except Exception as ex:
error = repr(ex)
logger.warn('Error invoking topic targets function "{name}" ' \
'defined in section "{section}": {error}'.format(**locals()))
return None