Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def set_loop(self, loop: asyncio.AbstractEventLoop):
self.loop = loop
def _set_params(self, **kwargs):
for name, value in kwargs.items():
setattr(self, name, value)
async def start(self):
raise NotImplementedError
async def stop(self, exception: Exception=None):
pass
class SimpleServer(Service):
def __init__(self):
self.server = None # type: asyncio.AbstractServer
super().__init__()
async def start(self):
raise NotImplementedError
async def stop(self, exc: Exception=None):
self.server.close()
class TCPServer(SimpleServer):
PROTO_NAME = 'tcp'
def __init__(self, address: str=None, port: int=None,
options: OptionsType = (), sock=None):
return ClientSession(
connector=self.connector, loop=self._loop,
connector_owner=False
)
async def _close(self):
transport = await super()._close()
if inspect.iscoroutinefunction(self.connector.close()):
await self.connection.close()
else:
self.connector.close()
return transport
class RavenSender(Service):
__required__ = 'sentry_dsn',
sentry_dsn = None # type: yarl.URL
min_level = logging.WARNING # type: int
client_options = MappingProxyType({}) # type: Mapping
client = None # type: Client
filters = () # type: Iterable[logging.Filter]
async def start(self):
self.client = Client(
str(self.sentry_dsn),
transport=QueuedKeepaliveAioHttpTransport,
**self.client_options
)
log = logging.getLogger(__name__)
def strip_carbon_ns(string):
return re.sub(r'[^\w\d\-]+', '_', string).strip("_").lower()
PROTOCOLS = MappingProxyType({
"udp": UDPClient,
"tcp": TCPClient,
"pickle": PickleClient,
})
class CarbonSender(Service):
host = '127.0.0.1' # type: str
port = 2003 # type: int
send_interval = 5 # type: int
protocol = 'udp' # type: str
namespace = '' # type: List[str]
storage = TotalStorage
_handle = None # type: PeriodicCallback
async def start(self):
namespace = ".".join(
strip_carbon_ns(item) for item in self.namespace
)
client = PROTOCOLS[self.protocol](
self.host,
self.port,
from ..periodic import PeriodicCallback
from ..service import Service
from ..thread_pool import threaded
log = logging.getLogger(__name__)
class GroupBy(Enum):
lineno = 'lineno'
filename = 'filename'
traceback = 'traceback'
class MemoryTracer(Service):
_tracer = None # type: PeriodicCallback
_log = None # type: logging.Logger
_snapshot_on_start = None
logger = log.info
interval = 5 # type: int
top_results = 20 # type: int
group_by = GroupBy.lineno # type: GroupBy
STAT_FORMAT = (
"%(count)8s | "
"%(count_diff)8s | "
"%(size)8s | "
"%(size_diff)8s | "