Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
def setUp(self):
self.double_value = value_module.ValueDouble(55.5)
self.long_value = value_module.ValueLong(9876543210)
self.timestamp = '2018-10-06T17:57:57.936475Z'
value_at_percentile = [summary_module.ValueAtPercentile(99.5, 10.2)]
snapshot = summary_module.Snapshot(10, 87.07, value_at_percentile)
self.summary = summary_module.Summary(10, 6.6, snapshot)
self.summary_value = value_module.ValueSummary(self.summary)
self.distribution_value = value_module.ValueDistribution(
100,
1000.0,
10.0,
value_module.BucketOptions(
value_module.Explicit(list(range(1, 10)))),
[value_module.Bucket(10, None) for ii in range(10)],
)
def test_init(self):
bounds = [1, 2]
explicit = value_module.Explicit(bounds)
self.assertEqual(explicit.bounds, bounds)
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import unittest
from opencensus.metrics import label_value
from opencensus.metrics.export import point, time_series, value
START_TIMESTAMP = '2018-10-09T22:33:44.012345Z'
LABEL_VALUE1 = label_value.LabelValue('value one')
LABEL_VALUE2 = label_value.LabelValue('价值二')
LABEL_VALUES = (LABEL_VALUE1, LABEL_VALUE2)
POINTS = (point.Point(
value.ValueLong(1), "2018-10-09T23:33:44.012345Z"),
point.Point(
value.ValueLong(2), "2018-10-10T00:33:44.012345Z"),
point.Point(
value.ValueLong(3), "2018-10-10T01:33:44.012345Z"),
point.Point(
value.ValueLong(4), "2018-10-10T02:33:44.012345Z"),
point.Point(
value.ValueLong(5), "2018-10-10T03:33:44.012345Z"))
class TestTimeSeries(unittest.TestCase):
def test_init(self):
ts = time_series.TimeSeries(LABEL_VALUES, POINTS, START_TIMESTAMP)
self.assertEqual(ts.start_timestamp, START_TIMESTAMP)
self.assertEqual(ts.label_values, LABEL_VALUES)
def test_new_aggregation_data_explicit(self):
measure = mock.Mock(spec=measure_module.MeasureInt)
last_value_aggregation = aggregation_module.LastValueAggregation(
value=6)
agg_data = last_value_aggregation.new_aggregation_data(measure)
self.assertEqual(6, agg_data.value)
self.assertEqual(value.ValueLong, agg_data.value_type)
def test_new_aggregation_data_defaults(self):
measure = mock.Mock(spec=measure_module.MeasureInt)
sum_aggregation = aggregation_module.SumAggregation()
agg_data = sum_aggregation.new_aggregation_data(measure)
self.assertEqual(0, agg_data.sum_data)
self.assertEqual(value.ValueLong, agg_data.value_type)
mock_view.aggregation = aggregation.DistributionAggregation()
mock_view.new_aggregation_data.return_value = \
mock_view.aggregation.new_aggregation_data()
tm = tag_map.TagMap()
tm.insert('k1', 'v1')
mm.record(tm)
metrics = list(stats.get_metrics())
self.assertEqual(len(metrics), 1)
[metric] = metrics
self.assertEqual(len(metric.time_series), 1)
[ts] = metric.time_series
self.assertEqual(len(ts.points), 1)
[point] = ts.points
self.assertTrue(isinstance(point.value, value.ValueDistribution))
def test_get_metric(self):
derived_cumulative = cumulative.DerivedDoubleCumulative(
Mock(), Mock(), Mock(), [])
mock_fn = Mock()
mock_fn.return_value = 1.23
derived_cumulative.create_default_time_series(mock_fn)
now1 = Mock()
[ts] = derived_cumulative.get_metric(now1).time_series
[ts_point] = ts.points
self.assertEqual(ts_point.timestamp, now1)
self.assertEqual(ts_point.value.value, 1.23)
self.assertIsInstance(ts_point.value, value_module.ValueDouble)
`buckets` attribute will be null.
:type timestamp: :class: `datetime.datetime`
:param timestamp: The time to report the point as having been recorded.
:rtype: :class: `opencensus.metrics.export.point.Point`
:return: a :class: `opencensus.metrics.export.value.ValueDistribution`
-valued Point.
"""
if self.bounds:
bucket_options = value.BucketOptions(value.Explicit(self.bounds))
buckets = [None] * len(self.counts_per_bucket)
for ii, count in enumerate(self.counts_per_bucket):
stat_ex = self.exemplars.get(ii) if self.exemplars else None
if stat_ex is not None:
metric_ex = value.Exemplar(stat_ex.value,
stat_ex.timestamp,
copy.copy(stat_ex.attachments))
buckets[ii] = value.Bucket(count, metric_ex)
else:
buckets[ii] = value.Bucket(count)
else:
bucket_options = value.BucketOptions()
buckets = None
return point.Point(
value.ValueDistribution(
count=self.count_data,
sum_=self.sum,
sum_of_squared_deviation=self.sum_of_sqd_deviations,
bucket_options=bucket_options,
buckets=buckets
This method creates a :class: `opencensus.metrics.export.point.Point`
with a :class: `opencensus.metrics.export.value.ValueDistribution`
value, and creates buckets and exemplars for that distribution from the
appropriate classes in the `metrics` package. If the distribution
doesn't have a histogram (i.e. `bounds` is empty) the converted point's
`buckets` attribute will be null.
:type timestamp: :class: `datetime.datetime`
:param timestamp: The time to report the point as having been recorded.
:rtype: :class: `opencensus.metrics.export.point.Point`
:return: a :class: `opencensus.metrics.export.value.ValueDistribution`
-valued Point.
"""
if self.bounds:
bucket_options = value.BucketOptions(value.Explicit(self.bounds))
buckets = [None] * len(self.counts_per_bucket)
for ii, count in enumerate(self.counts_per_bucket):
stat_ex = self.exemplars.get(ii) if self.exemplars else None
if stat_ex is not None:
metric_ex = value.Exemplar(stat_ex.value,
stat_ex.timestamp,
copy.copy(stat_ex.attachments))
buckets[ii] = value.Bucket(count, metric_ex)
else:
buckets[ii] = value.Bucket(count)
else:
bucket_options = value.BucketOptions()
buckets = None
return point.Point(
value.ValueDistribution(