Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def test_make_batch(self):
transport = mock.Mock(spec=metrics.NullTransport)
client = metrics.Client(transport, "namespace")
batch = client.batch()
self.assertIsInstance(batch, metrics.Batch)
self.assertEqual(batch.namespace, b"namespace")
def setUp(self, mock_Session):
self.session = mock_Session.return_value
self.metrics_client = mock.MagicMock(autospec=metrics.Client)
self.zipkin_api_url = "http://test.local/api/v2"
self.publisher = trace_publisher.ZipkinPublisher(self.zipkin_api_url, self.metrics_client)
def setUp(self):
transport = mock.Mock(spec=metrics.NullTransport)
self.client = metrics.BaseClient(transport, "namespace")
def test_make_batch(self):
transport = mock.Mock(spec=metrics.NullTransport)
client = metrics.Client(transport, "namespace")
batch = client.batch()
self.assertIsInstance(batch, metrics.Batch)
self.assertEqual(batch.namespace, b"namespace")
def test_make_counter(self):
counter = self.client.counter("some_counter")
self.assertIsInstance(counter, metrics.Counter)
self.assertEqual(counter.name, b"namespace.some_counter")
with self.assertRaises(UnicodeEncodeError):
self.client.counter("☃")
def test_buffered_exception_is_caught(self):
raw_transport = metrics.RawTransport(EXAMPLE_ENDPOINT)
transport = metrics.BufferedTransport(raw_transport)
transport.send(b"x" * 65536)
with self.assertRaises(metrics.MessageTooBigTransportError):
transport.flush()
def setUp(self, Session):
self.config = config.ConfigNamespace()
self.config.collector = config.ConfigNamespace()
self.config.collector.hostname = "test.local"
self.config.collector.version = 1
self.config.key = config.ConfigNamespace()
self.config.key.name = "TestKey"
self.config.key.secret = b"hunter2"
self.session = Session.return_value
self.metrics_client = mock.MagicMock(autospec=metrics.Client)
self.publisher = event_publisher.BatchPublisher(self.metrics_client, self.config)
def test_no_endpoint(self):
client = metrics.make_client("namespace", None)
self.assertIsInstance(client.transport, metrics.NullTransport)
def test_two_nodes(self):
joined = metrics._metric_join(b"first", b"second")
self.assertEqual(joined, b"first.second")
def __init__(self, app_config: Optional[config.RawConfig] = None) -> None:
self.observers: List[BaseplateObserver] = []
self._metrics_client: Optional[metrics.Client] = None
self._context_config: Dict[str, Any] = {}
self._app_config = app_config