Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
const transfer = async (error, messages) => {
if (error) {
Logger.error(error)
throw ErrorHandler.Factory.reformatFSPIOPError(error)
}
let message = {}
try {
if (Array.isArray(messages)) {
message = messages[0]
} else {
message = messages
}
const payload = message.value.content.payload
const metadata = message.value.metadata
const transferId = message.value.id
if (!payload) {
Logger.info('AdminTransferHandler::validationFailed')
// TODO: Cannot be saved because no payload has been provided. What action should be taken?
const batchParticipantPositionChange = []
for (const keyIndex in processedTransfersKeysList) {
const { runningPosition, runningReservedValue } = processedTransfers[processedTransfersKeysList[keyIndex]]
const participantPositionChange = {
participantPositionId: initialParticipantPosition.participantPositionId,
transferStateChangeId: processedTransferStateChangeIdList[keyIndex],
value: runningPosition,
// processBatch: - a single value uuid for this entire batch to make sure the set of transfers in this batch can be clearly grouped
reservedValue: runningReservedValue
}
batchParticipantPositionChange.push(participantPositionChange)
}
batchParticipantPositionChange.length && await knex.batchInsert('participantPositionChange', batchParticipantPositionChange).transacting(trx)
await trx.commit
} catch (err) {
Logger.error(err)
await trx.rollback
throw ErrorHandler.Factory.reformatFSPIOPError(err)
}
})
const preparedMessagesList = Array.from(transferIdList.map(transferId =>
const registerAllHandlers = async () => {
try {
await BulkPrepareHandlers.registerAllHandlers()
await BulkFulfilHandlers.registerAllHandlers()
await BulkProcessingHandlers.registerAllHandlers()
return true
} catch (err) {
Logger.error(err)
throw err
}
}
const saveTransfer = async (record) => {
Logger.debug('save transfer' + record.toString())
try {
return Db.transfer.insert(record)
} catch (err) {
Logger.error(err)
throw err
}
}
const registerBulkPrepareHandler = async () => {
try {
const bulkPrepareHandler = {
command: bulkPrepare,
topicName: Kafka.transformGeneralTopicName(Config.KAFKA_CONFIG.TOPIC_TEMPLATES.GENERAL_TOPIC_TEMPLATE.TEMPLATE, Enum.Events.Event.Type.BULK, Enum.Events.Event.Action.PREPARE),
config: Kafka.getKafkaConfig(Config.KAFKA_CONFIG, Enum.Kafka.Config.CONSUMER, Enum.Events.Event.Type.BULK.toUpperCase(), Enum.Events.Event.Action.PREPARE.toUpperCase())
}
bulkPrepareHandler.config.rdkafkaConf['client.id'] = bulkPrepareHandler.topicName
await Consumer.createHandler(bulkPrepareHandler.topicName, bulkPrepareHandler.config, bulkPrepareHandler.command)
return true
} catch (err) {
Logger.error(err)
throw err
}
}
const getByTransferId = async (id) => {
try {
const transferError = await Db.transferError.query(async (builder) => {
const result = builder
.where({ transferId: id })
.select('*')
.first()
return result
})
transferError.errorCode = transferError.errorCode.toString()
return transferError
} catch (err) {
Logger.error(err)
throw err
}
}
/**
* TODO: BULK-Handle at BulkProcessingHandler (not in scope of #967)
* HOWTO: Stop execution at the `TransferService.prepare`, stop mysql,
* continue execution to catch block, start mysql
*/
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch })
throw fspiopError
}
Logger.info(Util.breadcrumb(location, `positionTopic1--${actionLetter}7`))
functionality = TransferEventType.POSITION
const eventDetail = { functionality, action }
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, toDestination })
histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
return true
} else {
Logger.error(Util.breadcrumb(location, { path: 'validationFailed' }))
try {
Logger.info(Util.breadcrumb(location, 'saveInvalidRequest'))
await TransferService.prepare(payload, reasons.toString(), false)
} catch (err) {
Logger.info(Util.breadcrumb(location, `callbackErrorInternal2--${actionLetter}8`))
Logger.error(`${Util.breadcrumb(location)}::${err.message}`)
const fspiopError = ErrorHandler.Factory.reformatFSPIOPError(err, ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR)
const eventDetail = { functionality, action: TransferEventAction.PREPARE }
/**
* TODO: BULK-Handle at BulkProcessingHandler (not in scope of #967)
* HOWTO: For regular transfers this branch may be triggered by sending
* a transfer in a currency not supported by either dfsp and also stopping
* mysql at `TransferService.prepare` and starting it after entring catch.
* Not sure if it will work for bulk, because of the BulkPrepareHandler.
*/
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch })
const getByBulkTransferId = async (id) => {
try {
return await Db.bulkTransferExtension.query(async (builder) => {
const result = builder
.where({ bulkTransferId: id })
.select('key', 'value', 'isFulfilment')
return result
})
} catch (err) {
Logger.error(err)
throw err
}
}
const getLimitByCurrencyId = async (participantCurrencyId) => {
try {
return await Db.participantLimit.findOne({ participantCurrencyId })
} catch (err) {
Logger.error(err)
throw err
}
}
const insert = async (participantPosition) => {
try {
return await Db.participantPosition.insert(participantPosition)
} catch (err) {
Logger.error(err)
throw err
}
}