Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
const transfer = async (error, messages) => {
if (error) {
Logger.error(error)
throw ErrorHandler.Factory.reformatFSPIOPError(error)
}
let message = {}
try {
if (Array.isArray(messages)) {
message = messages[0]
} else {
message = messages
}
const payload = message.value.content.payload
const metadata = message.value.metadata
const transferId = message.value.id
if (!payload) {
Logger.info('AdminTransferHandler::validationFailed')
// TODO: Cannot be saved because no payload has been provided. What action should be taken?
const batchParticipantPositionChange = []
for (const keyIndex in processedTransfersKeysList) {
const { runningPosition, runningReservedValue } = processedTransfers[processedTransfersKeysList[keyIndex]]
const participantPositionChange = {
participantPositionId: initialParticipantPosition.participantPositionId,
transferStateChangeId: processedTransferStateChangeIdList[keyIndex],
value: runningPosition,
// processBatch: - a single value uuid for this entire batch to make sure the set of transfers in this batch can be clearly grouped
reservedValue: runningReservedValue
}
batchParticipantPositionChange.push(participantPositionChange)
}
batchParticipantPositionChange.length && await knex.batchInsert('participantPositionChange', batchParticipantPositionChange).transacting(trx)
await trx.commit
} catch (err) {
Logger.error(err)
await trx.rollback
throw ErrorHandler.Factory.reformatFSPIOPError(err)
}
})
const preparedMessagesList = Array.from(transferIdList.map(transferId =>
const handler = {
type: 'bulkprepare',
enabled: true
}
handlerList.push(handler)
}
if (args.bulkfulfil) {
Logger.debug('CLI: Executing --bulkfulfil')
const handler = {
type: 'bulkfulfil',
enabled: true
}
handlerList.push(handler)
}
if (args.bulkprocessing) {
Logger.debug('CLI: Executing --bulkprocessing')
const handler = {
type: 'bulkprocessing',
enabled: true
}
handlerList.push(handler)
}
module.exports = Setup.initialize({
service: 'handler',
port: Config.PORT,
modules: [Plugin, MetricPlugin],
runMigrations: false,
handlers: handlerList,
runHandlers: true
})
// } else {
const toTransfer = (t) => {
// TODO: Validate 't' to confirm if its from the DB transferReadModel or from the saveTransferPrepare
if (t.isTransferReadModel) {
Logger.debug('In aggregate transfer transform for isTransferReadModel')
return Util.omitNil(fromTransferReadModel(t)) // TODO: Remove this once the DB validation is done for 't'
} else if (t.isSaveTransferPrepared) {
Logger.debug('In aggregate transfer transform for isSaveTransferPrepared')
return Util.omitNil(fromSaveTransferPrepared(t)) // TODO: Remove this once the DB validation is done for 't'
} else if (t.savePayeeTransferResponseExecuted) {
Logger.debug('In aggregate transfer transform for isSavePayeeTransferResponseExecuted')
return Util.omitNil(fromSavePayeeTransferResponseExecuted(t)) // TODO: Remove this once the DB validation is done for 't'
} else throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, `Unable to transform to transfer: ${t}`)
}
: (action === Enum.Events.Event.Action.ABORT ? Enum.Events.ActionLetter.abort
: (action === Enum.Events.Event.Action.TIMEOUT_RESERVED ? Enum.Events.ActionLetter.timeout
: (action === Enum.Events.Event.Action.BULK_PREPARE ? Enum.Events.ActionLetter.bulkPrepare
: (action === Enum.Events.Event.Action.BULK_COMMIT ? Enum.Events.ActionLetter.bulkCommit
: (action === Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED ? Enum.Events.ActionLetter.bulkTimeoutReserved
: Enum.Events.ActionLetter.unknown)))))))
const params = { message, kafkaTopic, decodedPayload: payload, span, consumer: Consumer, producer: Producer }
const eventDetail = { action }
if (![Enum.Events.Event.Action.BULK_PREPARE, Enum.Events.Event.Action.BULK_COMMIT, Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED].includes(action)) {
eventDetail.functionality = Enum.Events.Event.Type.NOTIFICATION
} else {
eventDetail.functionality = Enum.Events.Event.Type.BULK_PROCESSING
}
if (eventType === Enum.Events.Event.Type.POSITION && [Enum.Events.Event.Action.PREPARE, Enum.Events.Event.Action.BULK_PREPARE].includes(action)) {
Logger.info(Utility.breadcrumb(location, { path: 'prepare' }))
const { preparedMessagesList, limitAlarms } = await PositionService.calculatePreparePositionsBatch(decodeMessages(prepareBatch))
for (const limit of limitAlarms) {
// Publish alarm message to KafkaTopic for the Hub to consume as it is the Hub
// rather than the switch to manage this (the topic is an participantEndpoint)
Logger.info(`Limit alarm should be sent with ${limit}`)
}
for (const prepareMessage of preparedMessagesList) {
const { transferState } = prepareMessage
if (transferState.transferStateId === Enum.Transfers.TransferState.RESERVED) {
Logger.info(Utility.breadcrumb(location, `payer--${actionLetter}1`))
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail })
histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
return true
} else {
Logger.info(Utility.breadcrumb(location, `payerNotifyInsufficientLiquidity--${actionLetter}2`))
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.PAYER_FSP_INSUFFICIENT_LIQUIDITY)
Logger.error(error)
throw ErrorHandler.Factory.reformatFSPIOPError(error)
}
let message = {}
try {
if (Array.isArray(messages)) {
message = messages[0]
} else {
message = messages
}
const payload = message.value.content.payload
const metadata = message.value.metadata
const transferId = message.value.id
if (!payload) {
Logger.info('AdminTransferHandler::validationFailed')
// TODO: Cannot be saved because no payload has been provided. What action should be taken?
return false
}
payload.participantCurrencyId = metadata.request.params.id
const enums = metadata.request.enums
const transactionTimestamp = Time.getUTCString(new Date())
Logger.info(`AdminTransferHandler::${metadata.event.action}::${transferId}`)
const kafkaTopic = message.topic
if (!allowedActions.includes(payload.action)) {
Logger.info(`AdminTransferHandler::${payload.action}::invalidPayloadAction`)
}
if (httpPostRelatedActions.includes(payload.action)) {
const { hasDuplicateId, hasDuplicateHash } = await Comparators.duplicateCheckComparator(transferId, payload, TransferService.getTransferDuplicateCheck, TransferService.saveTransferDuplicateCheck)
if (!hasDuplicateId) {
throw error
}
let message = {}
try {
if (Array.isArray(messages)) {
message = messages[0]
} else {
message = messages
}
const messageId = message.value.id
const payload = message.value.content.payload
const headers = message.value.content.headers
const action = message.value.metadata.event.action
const bulkTransferId = payload.bulkTransferId
const kafkaTopic = message.topic
Logger.info(Util.breadcrumb(location, { method: 'bulkFulfil' }))
const actionLetter = action === Enum.Events.Event.Action.BULK_COMMIT ? Enum.Events.ActionLetter.bulkCommit : Enum.Events.ActionLetter.unknown
let params = { message, kafkaTopic, decodedPayload: payload, consumer: Consumer, producer: Producer }
Logger.info(Util.breadcrumb(location, { path: 'dupCheck' }))
const { hasDuplicateId, hasDuplicateHash } = await Comparators.duplicateCheckComparator(bulkTransferId, payload.hash, BulkTransferService.getBulkTransferFulfilmentDuplicateCheck, BulkTransferService.saveBulkTransferFulfilmentDuplicateCheck)
if (hasDuplicateId && hasDuplicateHash) { // TODO: handle resend :: GET /bulkTransfer
Logger.info(Util.breadcrumb(location, `resend--${actionLetter}1`))
Logger.error(Util.breadcrumb(location, 'notImplemented'))
return true
}
if (hasDuplicateId && !hasDuplicateHash) {
Logger.error(Util.breadcrumb(location, `callbackErrorModified--${actionLetter}2`))
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.MODIFIED_REQUEST)
const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action }
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch })
Logger.info(Util.breadcrumb(location, `resend--${actionLetter}1`))
Logger.error(Util.breadcrumb(location, 'notImplemented'))
return true
}
if (hasDuplicateId && !hasDuplicateHash) { // TODO: handle modified request
Logger.error(Util.breadcrumb(location, `callbackErrorModified--${actionLetter}2`))
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.MODIFIED_REQUEST)
const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action }
params.message.value.content.uriParams = { id: bulkTransferId }
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch })
throw fspiopError
}
const { isValid, reasons, payerParticipantId, payeeParticipantId } = await Validator.validateBulkTransfer(payload, headers)
if (isValid) {
Logger.info(Util.breadcrumb(location, { path: 'isValid' }))
try {
Logger.info(Util.breadcrumb(location, 'saveBulkTransfer'))
const participants = { payerParticipantId, payeeParticipantId }
await BulkTransferService.bulkPrepare(payload, participants)
} catch (err) { // TODO: handle insert error
Logger.info(Util.breadcrumb(location, `callbackErrorInternal1--${actionLetter}5`))
Logger.error(Util.breadcrumb(location, 'notImplemented'))
return true
}
try {
Logger.info(Util.breadcrumb(location, 'individualTransfers'))
// stream initialization
const IndividualTransferModel = BulkTransferModels.getIndividualTransferModel()
const indvidualTransfersStream = IndividualTransferModel.find({ messageId }).cursor()
// enable async/await operations for the stream
const streamReader = AwaitifyStream.createReader(indvidualTransfersStream)
const registerAllHandlers = async () => {
try {
await BulkPrepareHandlers.registerAllHandlers()
await BulkFulfilHandlers.registerAllHandlers()
await BulkProcessingHandlers.registerAllHandlers()
return true
} catch (err) {
Logger.error(err)
throw err
}
}
const saveTransfer = async (record) => {
Logger.debug('save transfer' + record.toString())
try {
return Db.transfer.insert(record)
} catch (err) {
Logger.error(err)
throw err
}
}