Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Args:
spans: The list of `opentelemetry.trace.Span` objects to be exported
Returns:
The result of the export
"""
def shutdown(self) -> None:
"""Shuts down the exporter.
Called when the SDK is shut down.
"""
class SimpleExportSpanProcessor(SpanProcessor):
"""Simple SpanProcessor implementation.
SimpleExportSpanProcessor is an implementation of `SpanProcessor` that
passes ended spans directly to the configured `SpanExporter`.
"""
def __init__(self, span_exporter: SpanExporter):
self.span_exporter = span_exporter
def on_start(self, span: Span) -> None:
pass
def on_end(self, span: Span) -> None:
with Context.use(suppress_instrumentation=True):
try:
self.span_exporter.export((span,))
def on_start(self, span: Span) -> None:
pass
def on_end(self, span: Span) -> None:
with Context.use(suppress_instrumentation=True):
try:
self.span_exporter.export((span,))
# pylint: disable=broad-except
except Exception:
logger.exception("Exception while exporting Span.")
def shutdown(self) -> None:
self.span_exporter.shutdown()
class BatchExportSpanProcessor(SpanProcessor):
"""Batch span processor implementation.
BatchExportSpanProcessor is an implementation of `SpanProcessor` that
batches ended spans and pushes them to the configured `SpanExporter`.
"""
def __init__(
self,
span_exporter: SpanExporter,
max_queue_size: int = 2048,
schedule_delay_millis: float = 5000,
max_export_batch_size: int = 512,
):
if max_queue_size <= 0:
raise ValueError("max_queue_size must be a positive integer.")