Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
"""
This file contains a config object that will hold configuration options for the package.
Defaults are set and can be overridden after package load.
"""
from __future__ import absolute_import
from .util import DictionaryOfStan
# La Protagonista
config = DictionaryOfStan()
# This option determines if tasks created via asyncio (with ensure_future or create_task) will
# automatically carry existing context into the created task.
config['asyncio_task_context_propagation']['enabled'] = False
"""
plugins = []
try:
containers = self.task_metadata.get("Containers", [])
for container in containers:
plugin_data = dict()
try:
labels = container.get("Labels", {})
name = container.get("Name", "")
taskArn = labels.get("com.amazonaws.ecs.container-name", "")
plugin_data["name"] = "com.instana.plugin.aws.ecs.container"
# "entityId": $taskARN + "::" + $containerName
plugin_data["entityId"] = "%s::%s" % (taskArn, name)
plugin_data["data"] = DictionaryOfStan()
if self.root_metadata["Name"] == name:
plugin_data["data"]["instrumented"] = True
plugin_data["data"]["runtime"] = "python"
plugin_data["data"]["dockerId"] = container.get("DockerId", None)
plugin_data["data"]["dockerName"] = container.get("DockerName", None)
plugin_data["data"]["containerName"] = container.get("Name", None)
plugin_data["data"]["image"] = container.get("Image", None)
plugin_data["data"]["imageId"] = container.get("ImageID", None)
plugin_data["data"]["taskArn"] = labels.get("com.amazonaws.ecs.task-arn", None)
plugin_data["data"]["taskDefinition"] = labels.get("com.amazonaws.ecs.task-definition-family", None)
plugin_data["data"]["taskDefinitionVersion"] = labels.get("com.amazonaws.ecs.task-definition-version", None)
plugin_data["data"]["clusterArn"] = labels.get("com.amazonaws.ecs.cluster", None)
plugin_data["data"]["desiredStatus"] = container.get("DesiredStatus", None)
plugin_data["data"]["knownStatus"] = container.get("KnownStatus", None)
plugin_data["data"]["ports"] = container.get("Ports", None)
plugin_data["data"]["createdAt"] = container.get("CreatedAt", None)
def prepare_payload(self):
payload = DictionaryOfStan()
payload["spans"] = None
payload["metrics"] = None
if not self.span_queue.empty():
payload["spans"] = self.__queued_spans()
if self.snapshot_data and self.snapshot_data_sent is False:
payload["metrics"] = self.snapshot_data
self.snapshot_data_sent = True
return payload
def _collect_process_snapshot(self):
plugin_data = dict()
try:
plugin_data["name"] = "com.instana.plugin.process"
plugin_data["entityId"] = str(os.getpid())
plugin_data["data"] = DictionaryOfStan()
plugin_data["data"]["pid"] = int(os.getpid())
env = dict()
for key in os.environ:
env[key] = os.environ[key]
plugin_data["data"]["env"] = env
plugin_data["data"]["exec"] = os.readlink("/proc/self/exe")
cmdline = get_proc_cmdline()
if len(cmdline) > 1:
# drop the exe
cmdline.pop(0)
plugin_data["data"]["args"] = cmdline
plugin_data["data"]["user"] = getpass.getuser()
try:
plugin_data["data"]["group"] = getpass.getuser(os.getegid()).gr_name
except:
def _collect_docker_snapshot(self):
plugins = []
try:
containers = self.task_metadata.get("Containers", [])
for container in containers:
plugin_data = dict()
try:
labels = container.get("Labels", {})
name = container.get("Name", "")
taskArn = labels.get("com.amazonaws.ecs.container-name", "")
plugin_data["name"] = "com.instana.plugin.docker"
# "entityId": $taskARN + "::" + $containerName
plugin_data["entityId"] = "%s::%s" % (taskArn, name)
plugin_data["data"] = DictionaryOfStan()
plugin_data["data"]["Id"] = container.get("DockerId", None)
plugin_data["data"]["Created"] = container.get("CreatedAt", None)
plugin_data["data"]["Started"] = container.get("StartedAt", None)
plugin_data["data"]["Image"] = container.get("Image", None)
plugin_data["data"]["Labels"] = container.get("Labels", None)
plugin_data["data"]["Ports"] = container.get("Ports", None)
networks = container.get("Networks", [])
if len(networks) >= 1:
plugin_data["data"]["NetworkMode"] = networks[0].get("NetworkMode", None)
except:
logger.debug("_collect_container_snapshots: ", exc_info=True)
finally:
plugins.append(plugin_data)
except:
logger.debug("_collect_container_snapshots: ", exc_info=True)
def _collect_runtime_snapshot(self):
plugin_data = dict()
lock_acquired = self.process_metadata_mutex.acquire(False)
if lock_acquired:
try:
plugin_data["name"] = "com.instana.plugin.python"
plugin_data["entityId"] = str(os.getpid())
plugin_data["data"] = DictionaryOfStan()
except:
logger.debug("_collect_runtime_snapshot: ", exc_info=True)
finally:
self.process_metadata_mutex.release()
return plugin_data
def collect_snapshot(self, event, context):
self.snapshot_data = DictionaryOfStan()
self.context = context
self.event = event
try:
plugin_data = dict()
plugin_data["name"] = "com.instana.plugin.aws.lambda"
plugin_data["entityId"] = self.get_fq_arn()
self.snapshot_data["plugins"] = [plugin_data]
except:
logger.debug("collect_snapshot error", exc_info=True)
finally:
return self.snapshot_data