Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_from_upstream_fails_on_invalid_flags(self):
with self.assertRaises(ValueError) as e:
TraceInfo.from_upstream(1, 2, 3, True, -1)
self.assertEqual(str(e.exception), "invalid flags value")
def test_from_upstream_fails_on_invalid_sampled(self):
with self.assertRaises(ValueError) as e:
TraceInfo.from_upstream(1, 2, 3, "True", None)
self.assertEqual(str(e.exception), "invalid sampled value")
if client_span_observer:
class TestServerSpanObserver(ServerSpanObserver):
def on_child_span_created(self, span):
span.register(client_span_observer)
observer = TestServerSpanObserver()
class TestBaseplateObserver(BaseplateObserver):
def on_server_span_created(self, context, span):
span.register(observer)
baseplate.register(TestBaseplateObserver())
context = baseplate.make_context_object()
trace_info = TraceInfo.from_upstream(
trace_id=1234, parent_id=2345, span_id=3456, flags=4567, sampled=True
)
baseplate.configure_context(
{"example_service.endpoint": str(endpoint)},
{"example_service": ThriftClient(TestService.Client)},
)
baseplate.make_server_span(context, "example_service.example", trace_info)
edge_context_factory = make_edge_context_factory()
edge_context = edge_context_factory.from_upstream(SERIALIZED_EDGECONTEXT_WITH_VALID_AUTH)
edge_context.attach_context(context)
yield context
def call_processor_with_span_context(
self: Any, seqid: int, iprot: TProtocolBase, oprot: TProtocolBase
) -> Any:
context = baseplate.make_context_object()
# Allow case-insensitivity for THeader headers
headers: Mapping[bytes, bytes] = CaseInsensitiveDict( # type: ignore
data=iprot.get_headers()
)
trace_info: Optional[TraceInfo]
try:
sampled = bool(headers.get(b"Sampled") == b"1")
flags = headers.get(b"Flags", None)
trace_info = TraceInfo.from_upstream(
int(headers[b"Trace"]),
int(headers[b"Parent"]),
int(headers[b"Span"]),
sampled,
int(flags) if flags is not None else None,
)
except (KeyError, ValueError):
trace_info = None
edge_payload = headers.get(b"Edge-Request", None)
if edge_context_factory:
edge_context = edge_context_factory.from_upstream(edge_payload)
edge_context.attach_context(context)
else:
# just attach the raw context so it gets passed on
# downstream even if we don't know how to handle it.
def _get_trace_info(self, headers: Mapping[str, str]) -> TraceInfo:
sampled = bool(headers.get("X-Sampled") == "1")
flags = headers.get("X-Flags", None)
return TraceInfo.from_upstream(
int(headers["X-Trace"]),
int(headers["X-Parent"]),
int(headers["X-Span"]),
sampled,
int(flags) if flags is not None else None,
)