Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_various_header_to_id_conversion():
# Get a hex string to test against & convert
header_id = instana.util.generate_id()
converted_id = instana.util.header_to_id(header_id)
assert(header_id == converted_id)
# Hex value - result should be left padded
result = instana.util.header_to_id('abcdef')
assert('0000000000abcdef' == result)
# Hex value
result = instana.util.header_to_id('0123456789abcdef')
assert('0123456789abcdef' == result)
# Very long incoming header should just return the rightmost 16 bytes
result = instana.util.header_to_id('0x0123456789abcdef0123456789abcdef')
assert('0123456789abcdef' == result)
def test_various_header_to_id_conversion():
# Get a hex string to test against & convert
header_id = instana.util.generate_id()
converted_id = instana.util.header_to_id(header_id)
assert(header_id == converted_id)
# Hex value - result should be left padded
result = instana.util.header_to_id('abcdef')
assert('0000000000abcdef' == result)
# Hex value
result = instana.util.header_to_id('0123456789abcdef')
assert('0123456789abcdef' == result)
# Very long incoming header should just return the rightmost 16 bytes
result = instana.util.header_to_id('0x0123456789abcdef0123456789abcdef')
assert('0123456789abcdef' == result)
def test_header_to_id_conversion_with_bogus_header():
# Bogus nil arg
bogus_result = instana.util.header_to_id(None)
assert(instana.util.BAD_ID == bogus_result)
# Bogus Integer arg
bogus_result = instana.util.header_to_id(1234)
assert(instana.util.BAD_ID == bogus_result)
# Bogus Array arg
bogus_result = instana.util.header_to_id([1234])
assert(instana.util.BAD_ID == bogus_result)
# Bogus Hex Values in String
bogus_result = instana.util.header_to_id('0xZZZZZZ')
assert(instana.util.BAD_ID == bogus_result)
bogus_result = instana.util.header_to_id('ZZZZZZ')
assert(instana.util.BAD_ID == bogus_result)
def test_header_to_id_conversion_with_bogus_header():
# Bogus nil arg
bogus_result = instana.util.header_to_id(None)
assert(instana.util.BAD_ID == bogus_result)
# Bogus Integer arg
bogus_result = instana.util.header_to_id(1234)
assert(instana.util.BAD_ID == bogus_result)
# Bogus Array arg
bogus_result = instana.util.header_to_id([1234])
assert(instana.util.BAD_ID == bogus_result)
# Bogus Hex Values in String
bogus_result = instana.util.header_to_id('0xZZZZZZ')
assert(instana.util.BAD_ID == bogus_result)
bogus_result = instana.util.header_to_id('ZZZZZZ')
assert(instana.util.BAD_ID == bogus_result)
bogus_result = instana.util.header_to_id(None)
assert(instana.util.BAD_ID == bogus_result)
# Bogus Integer arg
bogus_result = instana.util.header_to_id(1234)
assert(instana.util.BAD_ID == bogus_result)
# Bogus Array arg
bogus_result = instana.util.header_to_id([1234])
assert(instana.util.BAD_ID == bogus_result)
# Bogus Hex Values in String
bogus_result = instana.util.header_to_id('0xZZZZZZ')
assert(instana.util.BAD_ID == bogus_result)
bogus_result = instana.util.header_to_id('ZZZZZZ')
assert(instana.util.BAD_ID == bogus_result)
def test_various_header_to_id_conversion():
# Get a hex string to test against & convert
header_id = instana.util.generate_id()
converted_id = instana.util.header_to_id(header_id)
assert(header_id == converted_id)
# Hex value - result should be left padded
result = instana.util.header_to_id('abcdef')
assert('0000000000abcdef' == result)
# Hex value
result = instana.util.header_to_id('0123456789abcdef')
assert('0123456789abcdef' == result)
# Very long incoming header should just return the rightmost 16 bytes
result = instana.util.header_to_id('0x0123456789abcdef0123456789abcdef')
assert('0123456789abcdef' == result)
try:
if type(carrier) is dict or hasattr(carrier, "__getitem__"):
dc = carrier
elif hasattr(carrier, "__dict__"):
dc = carrier.__dict__
elif type(carrier) is list:
dc = dict(carrier)
else:
raise ot.SpanContextCorruptedException()
for key in dc.keys():
if self.HEADER_KEY_T == key:
trace_id = header_to_id(dc[key])
elif self.HEADER_KEY_S == key:
span_id = header_to_id(dc[key])
elif self.HEADER_KEY_L == key:
level = dc[key]
ctx = None
if trace_id is not None and span_id is not None:
ctx = SpanContext(span_id=span_id,
trace_id=trace_id,
level=level,
baggage={},
sampled=True)
return ctx
except Exception:
logger.debug("extract error:", exc_info=True)
# Headers can exist in the standard X-Instana-T/S format or the alternate HTTP_X_INSTANA_T/S style
# We do a case insensitive search to cover all possible variations of incoming headers.
for key in dc.keys():
lc_key = key.lower()
if self.LC_HEADER_KEY_T == lc_key:
trace_id = header_to_id(dc[key])
elif self.LC_HEADER_KEY_S == lc_key:
span_id = header_to_id(dc[key])
elif self.LC_HEADER_KEY_L == lc_key:
level = dc[key]
elif self.LC_HEADER_KEY_SYNTHETIC == lc_key:
synthetic = dc[key] == "1"
elif self.ALT_LC_HEADER_KEY_T == lc_key:
trace_id = header_to_id(dc[key])
elif self.ALT_LC_HEADER_KEY_S == lc_key:
span_id = header_to_id(dc[key])
elif self.ALT_LC_HEADER_KEY_L == lc_key:
level = dc[key]
elif self.ALT_LC_HEADER_KEY_SYNTHETIC == lc_key:
synthetic = dc[key] == "1"
ctx = None
if trace_id is not None and span_id is not None:
ctx = SpanContext(span_id=span_id,
trace_id=trace_id,
level=level,
baggage={},
sampled=True,
synthetic=synthetic)
elif synthetic:
span_id = None
level = 1
try:
if type(carrier) is dict or hasattr(carrier, "__getitem__"):
dc = carrier
elif hasattr(carrier, "__dict__"):
dc = carrier.__dict__
elif type(carrier) is list:
dc = dict(carrier)
else:
raise ot.SpanContextCorruptedException()
for key in dc.keys():
if self.HEADER_KEY_T == key:
trace_id = header_to_id(dc[key])
elif self.HEADER_KEY_S == key:
span_id = header_to_id(dc[key])
elif self.HEADER_KEY_L == key:
level = dc[key]
ctx = None
if trace_id is not None and span_id is not None:
ctx = SpanContext(span_id=span_id,
trace_id=trace_id,
level=level,
baggage={},
sampled=True)
return ctx
except Exception:
logger.debug("extract error:", exc_info=True)