Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
: (action === Enum.Events.Event.Action.ABORT ? Enum.Events.ActionLetter.abort
: (action === Enum.Events.Event.Action.TIMEOUT_RESERVED ? Enum.Events.ActionLetter.timeout
: (action === Enum.Events.Event.Action.BULK_PREPARE ? Enum.Events.ActionLetter.bulkPrepare
: (action === Enum.Events.Event.Action.BULK_COMMIT ? Enum.Events.ActionLetter.bulkCommit
: (action === Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED ? Enum.Events.ActionLetter.bulkTimeoutReserved
: Enum.Events.ActionLetter.unknown)))))))
const params = { message, kafkaTopic, decodedPayload: payload, span, consumer: Consumer, producer: Producer }
const eventDetail = { action }
if (![Enum.Events.Event.Action.BULK_PREPARE, Enum.Events.Event.Action.BULK_COMMIT, Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED].includes(action)) {
eventDetail.functionality = Enum.Events.Event.Type.NOTIFICATION
} else {
eventDetail.functionality = Enum.Events.Event.Type.BULK_PROCESSING
}
if (eventType === Enum.Events.Event.Type.POSITION && [Enum.Events.Event.Action.PREPARE, Enum.Events.Event.Action.BULK_PREPARE].includes(action)) {
Logger.info(Utility.breadcrumb(location, { path: 'prepare' }))
const { preparedMessagesList, limitAlarms } = await PositionService.calculatePreparePositionsBatch(decodeMessages(prepareBatch))
for (const limit of limitAlarms) {
// Publish alarm message to KafkaTopic for the Hub to consume as it is the Hub
// rather than the switch to manage this (the topic is an participantEndpoint)
Logger.info(`Limit alarm should be sent with ${limit}`)
}
for (const prepareMessage of preparedMessagesList) {
const { transferState } = prepareMessage
if (transferState.transferStateId === Enum.Transfers.TransferState.RESERVED) {
Logger.info(Utility.breadcrumb(location, `payer--${actionLetter}1`))
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail })
histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
return true
} else {
Logger.info(Utility.breadcrumb(location, `payerNotifyInsufficientLiquidity--${actionLetter}2`))
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.PAYER_FSP_INSUFFICIENT_LIQUIDITY)
Logger.error(error)
throw ErrorHandler.Factory.reformatFSPIOPError(error)
}
let message = {}
try {
if (Array.isArray(messages)) {
message = messages[0]
} else {
message = messages
}
const payload = message.value.content.payload
const metadata = message.value.metadata
const transferId = message.value.id
if (!payload) {
Logger.info('AdminTransferHandler::validationFailed')
// TODO: Cannot be saved because no payload has been provided. What action should be taken?
return false
}
payload.participantCurrencyId = metadata.request.params.id
const enums = metadata.request.enums
const transactionTimestamp = Time.getUTCString(new Date())
Logger.info(`AdminTransferHandler::${metadata.event.action}::${transferId}`)
const kafkaTopic = message.topic
if (!allowedActions.includes(payload.action)) {
Logger.info(`AdminTransferHandler::${payload.action}::invalidPayloadAction`)
}
if (httpPostRelatedActions.includes(payload.action)) {
const { hasDuplicateId, hasDuplicateHash } = await Comparators.duplicateCheckComparator(transferId, payload, TransferService.getTransferDuplicateCheck, TransferService.saveTransferDuplicateCheck)
if (!hasDuplicateId) {
throw error
}
let message = {}
try {
if (Array.isArray(messages)) {
message = messages[0]
} else {
message = messages
}
const messageId = message.value.id
const payload = message.value.content.payload
const headers = message.value.content.headers
const action = message.value.metadata.event.action
const bulkTransferId = payload.bulkTransferId
const kafkaTopic = message.topic
Logger.info(Util.breadcrumb(location, { method: 'bulkFulfil' }))
const actionLetter = action === Enum.Events.Event.Action.BULK_COMMIT ? Enum.Events.ActionLetter.bulkCommit : Enum.Events.ActionLetter.unknown
let params = { message, kafkaTopic, decodedPayload: payload, consumer: Consumer, producer: Producer }
Logger.info(Util.breadcrumb(location, { path: 'dupCheck' }))
const { hasDuplicateId, hasDuplicateHash } = await Comparators.duplicateCheckComparator(bulkTransferId, payload.hash, BulkTransferService.getBulkTransferFulfilmentDuplicateCheck, BulkTransferService.saveBulkTransferFulfilmentDuplicateCheck)
if (hasDuplicateId && hasDuplicateHash) { // TODO: handle resend :: GET /bulkTransfer
Logger.info(Util.breadcrumb(location, `resend--${actionLetter}1`))
Logger.error(Util.breadcrumb(location, 'notImplemented'))
return true
}
if (hasDuplicateId && !hasDuplicateHash) {
Logger.error(Util.breadcrumb(location, `callbackErrorModified--${actionLetter}2`))
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.MODIFIED_REQUEST)
const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action }
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch })
Logger.info(Util.breadcrumb(location, `resend--${actionLetter}1`))
Logger.error(Util.breadcrumb(location, 'notImplemented'))
return true
}
if (hasDuplicateId && !hasDuplicateHash) { // TODO: handle modified request
Logger.error(Util.breadcrumb(location, `callbackErrorModified--${actionLetter}2`))
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.MODIFIED_REQUEST)
const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action }
params.message.value.content.uriParams = { id: bulkTransferId }
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch })
throw fspiopError
}
const { isValid, reasons, payerParticipantId, payeeParticipantId } = await Validator.validateBulkTransfer(payload, headers)
if (isValid) {
Logger.info(Util.breadcrumb(location, { path: 'isValid' }))
try {
Logger.info(Util.breadcrumb(location, 'saveBulkTransfer'))
const participants = { payerParticipantId, payeeParticipantId }
await BulkTransferService.bulkPrepare(payload, participants)
} catch (err) { // TODO: handle insert error
Logger.info(Util.breadcrumb(location, `callbackErrorInternal1--${actionLetter}5`))
Logger.error(Util.breadcrumb(location, 'notImplemented'))
return true
}
try {
Logger.info(Util.breadcrumb(location, 'individualTransfers'))
// stream initialization
const IndividualTransferModel = BulkTransferModels.getIndividualTransferModel()
const indvidualTransfersStream = IndividualTransferModel.find({ messageId }).cursor()
// enable async/await operations for the stream
const streamReader = AwaitifyStream.createReader(indvidualTransfersStream)
const registerAllHandlers = async () => {
try {
const modules = await requireGlob(['./**/handler.js'])
Logger.info(JSON.stringify(modules))
for (const key in modules) {
Logger.info(`Registering handler module[${key}]: ${JSON.stringify(modules[key])}`)
if (Object.prototype.hasOwnProperty.call(modules[key], 'handler')) {
const handlerObject = modules[key]
Logger.info(JSON.stringify(handlerObject.handler))
await handlerObject.handler.registerAllHandlers()
} else {
for (const i in modules[key]) {
const handlerObject = modules[key][i]
Logger.info(JSON.stringify(handlerObject.handler))
await handlerObject.handler.registerAllHandlers()
}
}
}
return true
} catch (err) {
throw ErrorHandler.Factory.reformatFSPIOPError(err)
}
}
}
let message = {}
try {
if (Array.isArray(messages)) {
message = messages[0]
} else {
message = messages
}
const payload = decodePayload(message.value.content.payload)
const headers = message.value.content.headers
const eventType = message.value.metadata.event.type
const action = message.value.metadata.event.action
const state = message.value.metadata.event.state
const transferId = payload.transferId || (message.value.content.uriParams && message.value.content.uriParams.id)
const kafkaTopic = message.topic
Logger.info(Util.breadcrumb(location, { method: 'bulkProcessing' }))
const actionLetter = action === Enum.Events.Event.Action.BULK_PREPARE ? Enum.Events.ActionLetter.bulkPrepare
: (action === Enum.Events.Event.Action.BULK_COMMIT ? Enum.Events.ActionLetter.bulkCommit
: (action === Enum.Events.Event.Action.BULK_TIMEOUT_RECEIVED ? Enum.Events.ActionLetter.bulkTimeoutReceived
: (action === Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED ? Enum.Events.ActionLetter.bulkTimeoutReserved
: (action === Enum.Events.Event.Action.PREPARE_DUPLICATE ? Enum.Events.ActionLetter.bulkPrepareDuplicate
: (action === Enum.Events.Event.Action.FULFIL_DUPLICATE ? Enum.Events.ActionLetter.bulkFulfilDuplicate
: Enum.Events.ActionLetter.unknown)))))
const params = { message, kafkaTopic, decodedPayload: payload, consumer: Consumer, producer: Producer }
const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action }
/**
* Acquire bulk transfer info by transferId below needs to be improved. Currently, if
* an individual transfer fulfil is attempted as part of another bulk, bulkTransferInfo
* refers to the original bulkTransferId where that inidividual transfer has been added
* initially. This leads to an error which could be hard to trace back and determine
* HOWTO: Stop execution at the `TransferService.prepare`, stop mysql,
* continue execution to catch block, start mysql
*/
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch })
throw fspiopError
}
Logger.info(Util.breadcrumb(location, `positionTopic1--${actionLetter}7`))
functionality = TransferEventType.POSITION
const eventDetail = { functionality, action }
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, toDestination })
histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
return true
} else {
Logger.error(Util.breadcrumb(location, { path: 'validationFailed' }))
try {
Logger.info(Util.breadcrumb(location, 'saveInvalidRequest'))
await TransferService.prepare(payload, reasons.toString(), false)
} catch (err) {
Logger.info(Util.breadcrumb(location, `callbackErrorInternal2--${actionLetter}8`))
Logger.error(`${Util.breadcrumb(location)}::${err.message}`)
const fspiopError = ErrorHandler.Factory.reformatFSPIOPError(err, ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR)
const eventDetail = { functionality, action: TransferEventAction.PREPARE }
/**
* TODO: BULK-Handle at BulkProcessingHandler (not in scope of #967)
* HOWTO: For regular transfers this branch may be triggered by sending
* a transfer in a currency not supported by either dfsp and also stopping
* mysql at `TransferService.prepare` and starting it after entring catch.
* Not sure if it will work for bulk, because of the BulkPrepareHandler.
*/
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch })
throw fspiopError
}
}
})
server.ext('onRequest', function (request, h) {
const transferId = UrlParser.idFromTransferUri(`${Config.HOSTNAME}${request.url.path}`)
request.headers.traceid = request.headers.traceid || transferId || Uuid()
RequestLogger.logRequest(request)
return h.continue
})
server.ext('onPreResponse', function (request, h) {
RequestLogger.logResponse(request)
return h.continue
})
await Plugins.registerPlugins(server)
await server.register(modules)
await server.start()
Logger.info('Server running at: ', server.info.uri)
return server
})()
}