Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def new_start_response(status, headers, exc_info=None):
"""Modified start response with additional headers."""
tracer.inject(self.scope.span.context, ot.Format.HTTP_HEADERS, headers)
headers.append(('Server-Timing', "intid;desc=%s" % self.scope.span.context.trace_id))
res = start_response(status, headers, exc_info)
sc = status.split(' ')[0]
if 500 <= int(sc) <= 511:
self.scope.span.mark_as_errored()
self.scope.span.set_tag(tags.HTTP_STATUS_CODE, sc)
self.scope.close()
return res
ctx = tracer.extract(ot.Format.HTTP_HEADERS, env)
self.scope = tracer.start_active_span("wsgi", child_of=ctx)
if hasattr(agent, 'extra_headers') and agent.extra_headers is not None:
for custom_header in agent.extra_headers:
# Headers are available in this format: HTTP_X_CAPTURE_THIS
wsgi_header = ('HTTP_' + custom_header.upper()).replace('-', '_')
if wsgi_header in env:
self.scope.span.set_tag("http.%s" % custom_header, env[wsgi_header])
if 'PATH_INFO' in env:
self.scope.span.set_tag('http.path', env['PATH_INFO'])
if 'QUERY_STRING' in env and len(env['QUERY_STRING']):
scrubbed_params = strip_secrets(env['QUERY_STRING'], agent.secrets_matcher, agent.secrets_list)
self.scope.span.set_tag("http.params", scrubbed_params)
if 'REQUEST_METHOD' in env:
self.scope.span.set_tag(tags.HTTP_METHOD, env['REQUEST_METHOD'])
tracer.inject(scope.span.context, ot.Format.HTTP_HEADERS, headers)
headers.append(('Server-Timing', "intid;desc=%s" % scope.span.context.trace_id))
res = start_response(status, headers, exc_info)
sc = status.split(' ')[0]
if 500 <= int(sc) <= 511:
scope.span.mark_as_errored()
scope.span.set_tag(tags.HTTP_STATUS_CODE, sc)
scope.close()
return res
else:
return start_response(status, headers, exc_info)
ctx = tracer.extract(ot.Format.HTTP_HEADERS, env)
scope = env['stan_scope'] = tracer.start_active_span("wsgi", child_of=ctx)
if hasattr(agent, 'extra_headers') and agent.extra_headers is not None:
for custom_header in agent.extra_headers:
# Headers are available in this format: HTTP_X_CAPTURE_THIS
wsgi_header = ('HTTP_' + custom_header.upper()).replace('-', '_')
if wsgi_header in env:
scope.span.set_tag("http.%s" % custom_header, env[wsgi_header])
if 'PATH_INFO' in env:
scope.span.set_tag('http.path', env['PATH_INFO'])
if 'QUERY_STRING' in env and len(env['QUERY_STRING']):
scrubbed_params = strip_secrets(env['QUERY_STRING'], agent.secrets_matcher, agent.secrets_list)
scope.span.set_tag("http.params", scrubbed_params)
if 'REQUEST_METHOD' in env:
scope.span.set_tag(tags.HTTP_METHOD, env['REQUEST_METHOD'])
def before_request_with_instana(*argv, **kwargs):
try:
env = flask.request.environ
ctx = None
if 'HTTP_X_INSTANA_T' in env and 'HTTP_X_INSTANA_S' in env:
ctx = tracer.extract(opentracing.Format.HTTP_HEADERS, env)
flask.g.scope = tracer.start_active_span('wsgi', child_of=ctx)
span = flask.g.scope.span
if hasattr(agent, 'extra_headers') and agent.extra_headers is not None:
for custom_header in agent.extra_headers:
# Headers are available in this format: HTTP_X_CAPTURE_THIS
header = ('HTTP_' + custom_header.upper()).replace('-', '_')
if header in env:
span.set_tag("http.%s" % custom_header, env[header])
span.set_tag(ext.HTTP_METHOD, flask.request.method)
if 'PATH_INFO' in env:
span.set_tag(ext.HTTP_URL, env['PATH_INFO'])
if 'QUERY_STRING' in env and len(env['QUERY_STRING']):
scrubbed_params = strip_secrets(env['QUERY_STRING'], agent.secrets_matcher, agent.secrets_list)
def process_request(self, request):
try:
env = request.environ
ctx = tracer.extract(ot.Format.HTTP_HEADERS, env)
request.iscope = tracer.start_active_span('django', child_of=ctx)
if hasattr(agent, 'extra_headers') and agent.extra_headers is not None:
for custom_header in agent.extra_headers:
# Headers are available in this format: HTTP_X_CAPTURE_THIS
django_header = ('HTTP_' + custom_header.upper()).replace('-', '_')
if django_header in env:
request.iscope.span.set_tag("http.%s" % custom_header, env[django_header])
request.iscope.span.set_tag(ext.HTTP_METHOD, request.method)
if 'PATH_INFO' in env:
request.iscope.span.set_tag(ext.HTTP_URL, env['PATH_INFO'])
if 'QUERY_STRING' in env and len(env['QUERY_STRING']):
scrubbed_params = strip_secrets(env['QUERY_STRING'], agent.secrets_matcher, agent.secrets_list)
request.iscope.span.set_tag("http.params", scrubbed_params)
if 'HTTP_HOST' in env:
def call_behavior_with_instana(wrapped, instance, argv, kwargs):
# Prep any incoming context headers
metadata = argv[0].invocation_metadata
metadata_dict = {}
for c in metadata:
metadata_dict[c.key] = c.value
ctx = tracer.extract(opentracing.Format.BINARY, metadata_dict)
with tracer.start_active_span("rpc-server", child_of=ctx) as scope:
try:
collect_tags(scope.span, instance, argv, kwargs)
rv = wrapped(*argv, **kwargs)
except Exception as e:
scope.span.log_exception(e)
raise
else:
return rv
def request_started_with_instana(sender, **extra):
try:
env = flask.request.environ
ctx = None
if 'HTTP_X_INSTANA_T' in env and 'HTTP_X_INSTANA_S' in env:
ctx = tracer.extract(opentracing.Format.HTTP_HEADERS, env)
flask.g.scope = tracer.start_active_span('wsgi', child_of=ctx)
span = flask.g.scope.span
if hasattr(agent, 'extra_headers') and agent.extra_headers is not None:
for custom_header in agent.extra_headers:
# Headers are available in this format: HTTP_X_CAPTURE_THIS
header = ('HTTP_' + custom_header.upper()).replace('-', '_')
if header in env:
span.set_tag("http.%s" % custom_header, env[header])
span.set_tag(ext.HTTP_METHOD, flask.request.method)
if 'PATH_INFO' in env:
span.set_tag(ext.HTTP_URL, env['PATH_INFO'])
if 'QUERY_STRING' in env and len(env['QUERY_STRING']):
scrubbed_params = strip_secrets(env['QUERY_STRING'], agent.secrets_matcher, agent.secrets_list)