Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
const bulkProcessing = async (error, messages) => {
const histTimerEnd = Metrics.getHistogram(
'transfer_bulk_processing',
'Consume a bulkProcessing transfer message from the kafka topic and process it accordingly',
['success', 'fspId']
).startTimer()
if (error) {
// Logger.error(error)
throw error
}
let message = {}
try {
if (Array.isArray(messages)) {
message = messages[0]
} else {
message = messages
}
const payload = decodePayload(message.value.content.payload)
const getTransfer = async (error, messages) => {
const location = { module: 'GetTransferHandler', method: '', path: '' }
const histTimerEnd = Metrics.getHistogram(
'transfer_get',
'Consume a get transfer message from the kafka topic and process it accordingly',
['success', 'fspId']
).startTimer()
if (error) {
throw ErrorHandler.Factory.reformatFSPIOPError(error)
}
let message = {}
if (Array.isArray(messages)) {
message = messages[0]
} else {
message = messages
}
const contextFromMessage = EventSdk.Tracer.extractContextFromMessage(message.value)
const span = EventSdk.Tracer.createChildSpanFromContext('cl_transfer_get', contextFromMessage)
try {
const positions = async (error, messages) => {
const histTimerEnd = Metrics.getHistogram(
'transfer_position',
'Consume a prepare transfer message from the kafka topic and process it accordingly',
['success', 'fspId']
).startTimer()
if (error) {
histTimerEnd({ success: false, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
throw ErrorHandler.Factory.reformatFSPIOPError(error)
}
let message = {}
let prepareBatch = []
let contextFromMessage
let span
try {
if (Array.isArray(messages)) {
prepareBatch = Array.from(messages)
const bulkFulfil = async (error, messages) => {
const histTimerEnd = Metrics.getHistogram(
'transfer_bulk_fulfil',
'Consume a bulkFulfil transfer message from the kafka topic and process it accordingly',
['success', 'fspId']
).startTimer()
if (error) {
throw error
}
let message = {}
try {
if (Array.isArray(messages)) {
message = messages[0]
} else {
message = messages
}
const messageId = message.value.id
const payload = message.value.content.payload
const prepare = async (error, messages) => {
const location = { module: 'PrepareHandler', method: '', path: '' }
const histTimerEnd = Metrics.getHistogram(
'transfer_prepare',
'Consume a prepare transfer message from the kafka topic and process it accordingly',
['success', 'fspId']
).startTimer()
if (error) {
histTimerEnd({ success: false, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
throw ErrorHandler.Factory.reformatFSPIOPError(error)
}
let message = {}
if (Array.isArray(messages)) {
message = messages[0]
} else {
message = messages
}
const parentSpanService = 'cl_transfer_prepare'
const contextFromMessage = EventSdk.Tracer.extractContextFromMessage(message.value)
const fulfil = async (error, messages) => {
const location = { module: 'FulfilHandler', method: '', path: '' }
const histTimerEnd = Metrics.getHistogram(
'transfer_fulfil',
'Consume a fulfil transfer message from the kafka topic and process it accordingly',
['success', 'fspId']
).startTimer()
if (error) {
throw ErrorHandler.Factory.reformatFSPIOPError(error)
}
let message = {}
if (Array.isArray(messages)) {
message = messages[0]
} else {
message = messages
}
const contextFromMessage = EventSdk.Tracer.extractContextFromMessage(message.value)
const span = EventSdk.Tracer.createChildSpanFromContext('cl_transfer_fulfil', contextFromMessage)
try {
const bulkPrepare = async (error, messages) => {
const histTimerEnd = Metrics.getHistogram(
'transfer_bulk_prepare',
'Consume a bulkPrepare transfer message from the kafka topic and process it accordingly',
['success', 'fspId']
).startTimer()
if (error) {
throw error
}
let message = {}
try {
if (Array.isArray(messages)) {
message = messages[0]
} else {
message = messages
}
const messageId = message.value.id
const payload = message.value.content.payload
const bulkPrepare = async (error, messages) => {
const histTimerEnd = Metrics.getHistogram(
'transfer_bulk_prepare',
'Consume a bulkPrepare transfer message from the kafka topic and process it accordingly',
['success', 'fspId']
).startTimer()
if (error) {
throw error
}
let message = {}
try {
if (Array.isArray(messages)) {
message = messages[0]
} else {
message = messages
}
const messageId = message.value.id
const payload = message.value.content.payload
const initializeInstrumentation = () => {
if (!Config.INSTRUMENTATION_METRICS_DISABLED) {
Metrics.setup(Config.INSTRUMENTATION_METRICS_CONFIG)
}
}