Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
return function incomingRequest (event: string, ...args: any[]): boolean {
// Only traces request events
if (event !== 'connection') {
return original.apply(this, arguments)
}
const socket: netModule.Socket = args[0]
plugin.logger.debug('%s plugin incomingRequest', plugin.moduleName)
const traceOptions: TraceOptions = {
name: 'socket',
kind: SpanKind.SERVER,
spanContext: undefined
}
return plugin.tracer.startRootSpan(traceOptions, rootSpan => {
if (!rootSpan) return original.apply(this, arguments)
plugin.tracer.wrapEmitter(socket)
const address = socket.address()
if (typeof address === 'string') {
rootSpan.addAttribute('net.address', address)
} else {
rootSpan.addAttribute('net.host', address.address)
rootSpan.addAttribute('net.port', address.port)
rootSpan.addAttribute('net.family', address.family)
}
if (plugin.isIgnored(path, request, plugin.options.ignoreIncomingPaths)) {
return original.apply(this, arguments)
}
const propagation = plugin.tracer.propagation
const headers = request.headers
const getter: HeaderGetter = {
getHeader (name: string) {
return headers[name]
}
}
const context = propagation ? propagation.extract(getter) : null
const traceOptions: TraceOptions = {
name: path,
kind: SpanKind.SERVER,
spanContext: context !== null ? context : undefined
}
return plugin.createSpan(traceOptions, rootSpan => {
if (!rootSpan) return original.apply(this, arguments)
plugin.tracer.wrapEmitter(request)
plugin.tracer.wrapEmitter(response)
// Wraps end (inspired by:
// https://github.com/GoogleCloudPlatform/cloud-trace-nodejs/blob/master/src/plugins/plugin-connect.ts#L75)
const originalEnd = response.end
response.end = function (this: httpModule.ServerResponse) {
response.end = originalEnd
const returned = response.end.apply(this, arguments)
headers: http2.IncomingHttpHeaders
): http2.ClientHttp2Stream {
if (event !== 'stream') {
return original.apply(this, arguments);
}
const propagation = plugin.tracer.propagation;
const getter: HeaderGetter = {
getHeader(name: string) {
return headers[name];
},
};
const traceOptions: TraceOptions = {
name: headers[':path'] || '',
kind: SpanKind.SERVER,
};
if (propagation) {
const spanContext = propagation.extract(getter);
if (spanContext) {
traceOptions.spanContext = spanContext;
}
}
// Respond is called in a stream event. We wrap it to get the sent
// status code.
let statusCode: number;
const originalRespond = stream.respond;
stream.respond = function(this: http2.Http2Stream) {
// Unwrap it since respond is not allowed to be called more than once
// per stream.
stream.respond = originalRespond;
stream: http2.ServerHttp2Stream,
headers: http2.IncomingHttpHeaders): http2.ClientHttp2Stream {
if (event !== 'stream') {
return original.apply(this, arguments)
}
const propagation = plugin.tracer.propagation
const getter = {
getHeader (name: string) {
return headers[name]
}
} as HeaderGetter
const traceOptions = {
name: headers[':path'],
kind: SpanKind.SERVER,
spanContext: propagation ? propagation.extract(getter) : null
} as TraceOptions
// Respond is called in a stream event. We wrap it to get the sent
// status code.
let statusCode: number = 0
const originalRespond = stream.respond
stream.respond = function (this: http2.Http2Stream) {
// Unwrap it since respond is not allowed to be called more than once
// per stream.
stream.respond = originalRespond
statusCode = arguments[0][':status']
return stream.respond.apply(this, arguments)
}
return plugin.tracer.startRootSpan(traceOptions, rootSpan => {
plugin.isIgnored(path, request, plugin.options.ignoreIncomingPaths)
) {
return original.apply(this, arguments);
}
const propagation = plugin.tracer.propagation;
const headers = request.headers;
const getter: HeaderGetter = {
getHeader(name: string) {
return headers[name];
},
};
const traceOptions: TraceOptions = {
name: path,
kind: SpanKind.SERVER,
};
if (propagation) {
const spanContext = propagation.extract(getter);
if (spanContext) {
traceOptions.spanContext = spanContext;
}
}
return plugin.tracer.startRootSpan(traceOptions, rootSpan => {
if (!rootSpan) return original.apply(this, arguments);
plugin.tracer.wrapEmitter(request);
plugin.tracer.wrapEmitter(response);
// Wraps end (inspired by:
// https://github.com/GoogleCloudPlatform/cloud-trace-nodejs/blob/master/src/plugins/plugin-connect.ts#L75)
value: sizeof(reqOrRes),
});
measureList.push({
measure: clientStats.GRPC_CLIENT_RECEIVED_MESSAGES_PER_RPC,
value: 1,
});
measureList.push({
measure: clientStats.GRPC_CLIENT_SENT_MESSAGES_PER_RPC,
value: 1,
});
measureList.push({
measure: clientStats.GRPC_CLIENT_ROUNDTRIP_LATENCY,
value: ms,
});
break;
case SpanKind.SERVER:
measureList.push({
measure: serverStats.GRPC_SERVER_RECEIVED_BYTES_PER_RPC,
value: sizeof(reqOrRes),
});
measureList.push({
measure: serverStats.GRPC_SERVER_RECEIVED_MESSAGES_PER_RPC,
value: 1,
});
measureList.push({
measure: serverStats.GRPC_SERVER_SENT_BYTES_PER_RPC,
value: sizeof(argsOrValue),
});
measureList.push({
measure: serverStats.GRPC_SERVER_SENT_MESSAGES_PER_RPC,
value: 1,
});
const spanKindToEnum = (
kind: SpanKind
): opencensus.proto.trace.v1.Span.SpanKind => {
switch (kind) {
case SpanKind.SERVER: {
return opencensus.proto.trace.v1.Span.SpanKind.SERVER;
}
case SpanKind.CLIENT: {
return opencensus.proto.trace.v1.Span.SpanKind.CLIENT;
}
default: {
return opencensus.proto.trace.v1.Span.SpanKind.SPAN_KIND_UNSPECIFIED;
}
}
};
static recordStats(kind: SpanKind, tags: TagMap, ms: number) {
if (!plugin.stats) return;
try {
const measureList = [];
switch (kind) {
case SpanKind.CLIENT:
measureList.push({
measure: stats.HTTP_CLIENT_ROUNDTRIP_LATENCY,
value: ms,
});
break;
case SpanKind.SERVER:
measureList.push({ measure: stats.HTTP_SERVER_LATENCY, value: ms });
break;
default:
break;
}
plugin.stats.record(measureList, tags);
} catch (ignore) {}
}
private getSpanKind (kind: SpanKind) {
switch (kind) {
case SpanKind.CLIENT: {
return 'CLIENT'
}
case SpanKind.SERVER: {
return 'SERVER'
}
default: {
return 'UNKNOWN'
}
}
}
}
private _ocHandle = (message: IMessage, service: ICamundaService): Promise => {
log('handling message with tracing');
const { properties } = message;
const identifier = properties.activityId;
let spanContext: SpanContext | null = null;
const propagation = this._tracer.propagation as unknown;
const spanOptions: TraceOptions = {
name: identifier,
kind: SpanKind.SERVER
};
if (isWorkitPropagator(propagation)) {
spanContext = propagation.extractFromMessage(message);
spanOptions.spanContext = spanContext || undefined;
}
return this._tracer.startRootSpan(spanOptions, rootSpan => {
this._tracer.wrapEmitter(this);
rootSpan.addAttribute('wf.activityId', identifier);
rootSpan.addAttribute('wf.processInstanceId', properties.processInstanceId);
rootSpan.addAttribute('wf.workflowInstanceKey', properties.workflowInstanceKey);
if (properties.businessKey) {
rootSpan.addAttribute('wf.businessKey', properties.businessKey);
}