Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
initBuffer(refId: string) {
const { speed } = this.query;
const request = this.stream.request;
const maxRows = request.maxDataPoints || 1000;
const times = new CircularVector({ capacity: maxRows });
const lines = new CircularVector({ capacity: maxRows });
this.values = [times, lines];
this.data = new MutableDataFrame({
fields: [
{ name: 'Time', type: FieldType.time, values: times },
{ name: 'Line', type: FieldType.string, values: lines },
],
refId,
name: 'Logs ' + refId,
});
// Fill up the buffer
let time = Date.now() - maxRows * speed;
for (let i = 0; i < maxRows; i++) {
const row = this.nextRow(time);
times.add(row[0]);
lines.add(row[1]);
time += speed;
}
}
getStream(target: LegacyTarget): Observable {
let stream = this.streams[target.url];
if (stream) {
return stream;
}
const data = new CircularDataFrame({ capacity: target.size });
data.addField({ name: 'ts', type: FieldType.time, config: { title: 'Time' } });
data.addField({ name: 'tsNs', type: FieldType.time, config: { title: 'Time ns' } });
data.addField({ name: 'line', type: FieldType.string }).labels = parseLabels(target.query);
data.addField({ name: 'labels', type: FieldType.other }); // The labels for each line
data.addField({ name: 'id', type: FieldType.string });
stream = webSocket(target.url).pipe(
finalize(() => {
delete this.streams[target.url];
}),
map((response: LokiTailResponse) => {
appendResponseToBufferedData(response, data);
return [data];
})
);
this.streams[target.url] = stream;
return stream;
}
}
getStream(target: LegacyTarget): Observable {
let stream = this.streams[target.url];
if (stream) {
return stream;
}
const data = new CircularDataFrame({ capacity: target.size });
data.addField({ name: 'ts', type: FieldType.time, config: { title: 'Time' } });
data.addField({ name: 'line', type: FieldType.string }).labels = parseLabels(target.query);
data.addField({ name: 'labels', type: FieldType.other }); // The labels for each line
data.addField({ name: 'id', type: FieldType.string });
stream = webSocket(target.url).pipe(
finalize(() => {
delete this.streams[target.url];
}),
map((response: LokiTailResponse) => {
appendResponseToBufferedData(response, data);
return [data];
})
);
this.streams[target.url] = stream;
return stream;
}
}
series.addField({
name: 'level',
type: FieldType.string,
}).parse = (v: any) => {
return v || '';
};
}
for (const propName of propNames) {
if (propName === this.targets[0].timeField || propName === '_source') {
continue;
}
series.addField({
name: propName,
type: FieldType.string,
}).parse = (v: any) => {
return v || '';
};
}
// Add a row for each document
for (const doc of docs) {
series.add(doc);
}
dataFrame.push(series);
}
if (response.aggregations) {
const aggregations = response.aggregations;
const target = this.targets[n];
times.add(ts);
lines.add(entry.line);
uids.add(`${ts}_${stream.labels}`);
}
if (reverse) {
times.buffer = times.buffer.reverse();
lines.buffer = lines.buffer.reverse();
}
return {
refId,
fields: [
{ name: 'ts', type: FieldType.time, config: { title: 'Time' }, values: times }, // Time
{ name: 'line', type: FieldType.string, config: {}, values: lines, labels }, // Line
{ name: 'id', type: FieldType.string, config: {}, values: uids },
],
length: times.length,
};
}
if ('name' === calc) {
val = name;
} else {
if ('last_time' === calc) {
if (fieldInfo.frame.firstTimeField) {
calcField = fieldInfo.frame.firstTimeField;
calc = ReducerID.last;
}
}
// Normalize functions (avg -> mean, etc)
const r = fieldReducers.getIfExists(calc);
if (r) {
calc = r.id;
// With strings, don't accidentally use a math function
if (calcField.type === FieldType.string) {
const avoid = [ReducerID.mean, ReducerID.sum];
if (avoid.includes(calc)) {
calc = panel.valueName = ReducerID.first;
}
}
} else {
calc = ReducerID.lastNotNull;
}
// Calculate the value
val = reduceField({
field: calcField,
reducers: [calc],
})[calc];
}
export function appendResponseToBufferedData(response: LokiTailResponse, data: MutableDataFrame) {
// Should we do anything with: response.dropped_entries?
const streams: LokiStreamResult[] = response.streams;
if (!streams || !streams.length) {
return;
}
let baseLabels: Labels = {};
for (const f of data.fields) {
if (f.type === FieldType.string) {
if (f.labels) {
baseLabels = f.labels;
}
break;
}
}
for (const stream of streams) {
// Find unique labels
const unique = findUniqueLabels(stream.stream, baseLabels);
// Add each line
for (const [ts, line] of stream.values) {
data.values.ts.add(parseInt(ts, 10) / 1e6);
data.values.line.add(line);
data.values.labels.add(unique);
export function appendResponseToBufferedData(response: LokiTailResponse, data: MutableDataFrame) {
// Should we do anything with: response.dropped_entries?
const streams: LokiStreamResult[] = response.streams;
if (!streams || !streams.length) {
return;
}
let baseLabels: Labels = {};
for (const f of data.fields) {
if (f.type === FieldType.string) {
if (f.labels) {
baseLabels = f.labels;
}
break;
}
}
for (const stream of streams) {
// Find unique labels
const unique = findUniqueLabels(stream.stream, baseLabels);
const allLabelsString = Object.entries(stream.stream)
.map(([key, val]) => `${key}="${val}"`)
.sort()
.join('');
// Add each line
function buildData(theme: GrafanaTheme, overrides: ConfigOverrideRule[]): DataFrame {
const data = new MutableDataFrame({
fields: [
{ name: 'Time', type: FieldType.time, values: [] }, // The time field
{
name: 'Quantity',
type: FieldType.number,
values: [],
config: {
decimals: 0,
custom: {
align: 'center',
},
},
},
{ name: 'Status', type: FieldType.string, values: [] }, // The time field
{
name: 'Value',
type: FieldType.number,
values: [],
config: {
decimals: 2,
},
},
{
name: 'Progress',
type: FieldType.number,
values: [],
config: {
unit: 'percent',
custom: {
width: 50,
const field = frame.fields.find((f) => f.type === FieldType.string);
if (!field) {