Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_clone(self):
ctx = Context()
ctx.sampling_priority = 2
# manually create a root-child trace
root = Span(tracer=None, name='root')
child = Span(tracer=None, name='child_1', trace_id=root.trace_id, parent_id=root.span_id)
child._parent = root
ctx.add_span(root)
ctx.add_span(child)
cloned_ctx = ctx.clone()
assert cloned_ctx._parent_trace_id == ctx._parent_trace_id
assert cloned_ctx._parent_span_id == ctx._parent_span_id
assert cloned_ctx._sampling_priority == ctx._sampling_priority
assert cloned_ctx._dd_origin == ctx._dd_origin
assert cloned_ctx._current_span == ctx._current_span
assert cloned_ctx._trace == []
def test_default_provider_set(self):
# The Context Provider can set the current active Context;
# this could happen in distributed tracing
ctx = Context(trace_id=42, span_id=100)
self.tracer.context_provider.activate(ctx)
span = self.trace('web.request')
span.assert_matches(name='web.request', trace_id=42, parent_id=100)
def test_default_provider_get(self):
# Tracer Context Provider must return a Context object
# even if empty
ctx = self.tracer.context_provider.active()
self.assertTrue(isinstance(ctx, Context))
self.assertEqual(len(ctx._trace), 0)
def test_set_tag_manual_keep(self):
ctx = Context()
s = Span(tracer=None, name='root.span', service='s', resource='r', context=ctx)
assert s.context == ctx
assert ctx.sampling_priority != priority.USER_KEEP
assert s.context.sampling_priority != priority.USER_KEEP
assert s.meta == dict()
s.set_tag('manual.keep')
assert ctx.sampling_priority == priority.USER_KEEP
assert s.context.sampling_priority == priority.USER_KEEP
assert s.meta == dict()
ctx.sampling_priority = priority.AUTO_REJECT
assert ctx.sampling_priority == priority.AUTO_REJECT
assert s.context.sampling_priority == priority.AUTO_REJECT
assert s.meta == dict()
from ddtrace.propagation.http import HTTPPropagator
def my_controller(url, headers):
propagator = HTTPPropagator()
context = propagator.extract(headers)
tracer.context_provider.activate(context)
with tracer.trace('my_controller') as span:
span.set_meta('http.url', url)
:param dict headers: HTTP headers to extract tracing attributes.
:return: New `Context` with propagated attributes.
"""
if not headers:
return Context()
try:
trace_id = HTTPPropagator.extract_trace_id(headers)
parent_span_id = HTTPPropagator.extract_parent_span_id(headers)
sampling_priority = HTTPPropagator.extract_sampling_priority(headers)
origin = HTTPPropagator.extract_origin(headers)
if sampling_priority is not None:
sampling_priority = int(sampling_priority)
return Context(
trace_id=trace_id,
span_id=parent_span_id,
sampling_priority=sampling_priority,
_dd_origin=origin,
)
default = "416", type = str)
parser.add_argument("--frame-image", dest="image", default=False, help="Save image of detected frame", type=bool)
parser.add_argument("--frame-skip", dest="skip", default=1, help="Skip N number of detected frames", type=int)
parser.add_argument("--post-url", dest="post_url", help="URL to POST JSON back to")
parser.add_argument("--trace-id", dest="trace_id", help="Trace ID")
parser.add_argument("--parent-id", dest="parent_id", help="Parent Trace ID")
parser.add_argument("--sampling-priority", dest="sampling_priority", help="Trace Sampling Priority")
return parser.parse_args()
if __name__ == '__main__':
args = arg_parse()
has_clocks = False
clock_frames = 0
if args.trace_id:
context = Context(trace_id=int(args.trace_id), span_id=int(args.parent_id), sampling_priority=int(args.sampling_priority))
tracer.context_provider.activate(context)
with tracer.trace("gpu.yolovideoinference", service="yolo-inference-process") as span:
confidence = float(args.confidence)
nms_thesh = float(args.nms_thresh)
start = 0
CUDA = torch.cuda.is_available()
num_classes = 80
bbox_attrs = 5 + num_classes
print("Loading network.....")
model = Darknet(args.cfgfile)
"""
Returns the scoped Context for this execution flow. The ``Context`` uses
the current task as a carrier so if a single task is used for the entire application,
the context must be handled separately.
"""
loop = self._get_loop(loop=loop)
if not loop:
return self._local.get()
# the current unit of work (if tasks are used)
task = asyncio.Task.current_task(loop=loop)
if task is None:
# providing a detached Context from the current Task, may lead to
# wrong traces. This defensive behavior grants that a trace can
# still be built without raising exceptions
return Context()
ctx = getattr(task, CONTEXT_ATTR, None)
if ctx is not None:
# return the active Context for this task (if any)
return ctx
# create a new Context using the Task as a Context carrier
ctx = Context()
setattr(task, CONTEXT_ATTR, ctx)
return ctx
root_span = tracer.start_span('web.request')
span = tracer.start_span('web.decoder', child_of=root_span)
Or if you have a ``Context`` object::
context = tracer.get_call_context()
span = tracer.start_span('web.worker', child_of=context)
"""
if child_of is not None:
# retrieve if the span is a child_of a Span or a of Context
child_of_context = isinstance(child_of, Context)
context = child_of if child_of_context else child_of.context
parent = child_of.get_current_span() if child_of_context else child_of
else:
context = Context()
parent = None
if parent:
trace_id = parent.trace_id
parent_span_id = parent.span_id
else:
trace_id = context.trace_id
parent_span_id = context.span_id
if trace_id:
# child_of a non-empty context, so either a local child span or from a remote context
# when not provided, inherit from parent's service
if parent:
service = service or parent.service