Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
const calculateConditionFromFulfil = (fulfilment) => {
// TODO: The following hashing code should be moved into a re-usable common-shared-service at a later point
const hashSha256 = Crypto.createHash('sha256')
const preimage = base64url.toBuffer(fulfilment)
if (preimage.length !== 32) {
throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, 'Interledger preimages must be exactly 32 bytes.')
}
const calculatedConditionDigest = hashSha256.update(preimage).digest('base64')
console.log(`calculatedConditionDigest=${calculatedConditionDigest}`)
return base64url.fromBase64(calculatedConditionDigest)
}
const { reconciliationAccountId } = await knex('participantCurrency')
.select('participantCurrencyId AS reconciliationAccountId')
.where('participantId', Config.HUB_ID)
.andWhere('currencyId', payload.amount.currency)
.first()
.transacting(trx)
let ledgerEntryTypeId, amount
if (payload.action === Enum.Transfers.AdminTransferAction.RECORD_FUNDS_IN) {
ledgerEntryTypeId = enums.ledgerEntryType.RECORD_FUNDS_IN
amount = payload.amount.amount
} else if (payload.action === Enum.Transfers.AdminTransferAction.RECORD_FUNDS_OUT_PREPARE_RESERVE) {
ledgerEntryTypeId = enums.ledgerEntryType.RECORD_FUNDS_OUT
amount = -payload.amount.amount
} else {
throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, 'Action not allowed for reconciliationTransferPrepare')
}
// Insert transferParticipant records
await knex('transferParticipant')
.insert({
transferId: payload.transferId,
participantCurrencyId: reconciliationAccountId,
transferParticipantRoleTypeId: enums.transferParticipantRoleType.HUB,
ledgerEntryTypeId: ledgerEntryTypeId,
amount: amount,
createdDate: transactionTimestamp
})
.transacting(trx)
await knex('transferParticipant')
.insert({
transferId: payload.transferId,
bulkTransferState: payeeBulkResponse.bulkTransferState,
completedTimestamp: payeeBulkResponse.completedTimestamp,
extensionList: payeeBulkResponse.extensionList
})
const payeeMetadata = Util.StreamingProtocol.createMetadataWithCorrelatedEvent(params.message.value.metadata.event.id, payeeParams.message.value.metadata.type, payeeParams.message.value.metadata.action, Enum.Events.EventStatus.SUCCESS)
payeeParams.message.value = Util.StreamingProtocol.createMessage(params.message.value.id, participants.payeeFsp, Enum.Http.Headers.FSPIOP.SWITCH.value, payeeMetadata, payeeBulkResponse.headers, payeePayload)
if ([Enum.Events.Event.Action.BULK_TIMEOUT_RECEIVED, Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED].includes(action)) {
eventDetail.action = Enum.Events.Event.Action.BULK_COMMIT
}
await Kafka.proceed(Config.KAFKA_CONFIG, payerParams, { consumerCommit, eventDetail })
histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
await Kafka.proceed(Config.KAFKA_CONFIG, payeeParams, { consumerCommit, eventDetail })
histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
return true
} else if (eventType === Enum.Events.Event.Type.BULK_PROCESSING && [Enum.Events.Event.Action.BULK_TIMEOUT_RECEIVED, Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED].includes(action)) {
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED, null, null, null, payload.extensionList)
eventDetail.action = Enum.Events.Event.Action.BULK_ABORT
params.message.value.content.uriParams.id = bulkTransferInfo.bulkTransferId
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail })
throw fspiopError
} else {
// TODO: For the following (Internal Server Error) scenario a notification is produced for each individual transfer.
// It also needs to be processed first in order to accumulate transfers and send the callback notification at bulk level.
Logger.info(Util.breadcrumb(location, `invalidEventTypeOrAction--${actionLetter}4`))
const fspiopError = ErrorHandler.Factory.createInternalServerFSPIOPError(`Invalid event action:(${action}) and/or type:(${eventType})`).toApiErrorObject(Config.ERROR_HANDLING)
const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action: Enum.Events.Event.Action.BULK_PROCESSING }
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError, eventDetail, fromSwitch })
histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
return true
}
}
} catch (err) {
// start - To Do move to domain
const participant = await ParticipantService.getByName(request.params.name)
if (participant) {
const ledgerAccountType = await ParticipantService.getLedgerAccountTypeName(request.payload.type)
if (!ledgerAccountType) {
throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.ADD_PARTY_INFO_ERROR, 'Ledger account type was not found.')
}
const accountParams = {
participantId: participant.participantId,
currencyId: request.payload.currency,
ledgerAccountTypeId: ledgerAccountType.ledgerAccountTypeId,
isActive: 1
}
const participantAccount = await ParticipantService.getParticipantAccount(accountParams)
if (participantAccount) {
throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.ADD_PARTY_INFO_ERROR, 'Hub account has already been registered.')
}
if (participant.participantId !== Config.HUB_ID) {
throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.ADD_PARTY_INFO_ERROR, 'Endpoint is reserved for creation of Hub account types only.')
}
const isPermittedHubAccountType = Config.HUB_ACCOUNTS.indexOf(request.payload.type) >= 0
if (!isPermittedHubAccountType) {
throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.ADD_PARTY_INFO_ERROR, 'The requested hub operator account type is not allowed.')
}
const newCurrencyAccount = await ParticipantService.createHubAccount(participant.participantId, request.payload.currency, ledgerAccountType.ledgerAccountTypeId)
if (!newCurrencyAccount) {
throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.ADD_PARTY_INFO_ERROR, 'Participant account and Position create have failed.')
}
participant.currencyList.push(newCurrencyAccount.participantCurrency)
} else {
throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.ADD_PARTY_INFO_ERROR, 'Participant was not found.')
/* Validate entire batch if availablePosition >= sumTransfersInBatch - the impact is that applying per transfer rules would require to be handled differently
since further rules are expected we do not do this at this point
As we enter this next step the order in which the transfer is processed against the Position is critical.
Both positive and failure cases need to recorded in processing order
This means that they should not be removed from the list, and the participantPosition
*/
let sumReserved = 0 // Record the sum of the transfers we allow to progress to RESERVED
for (const transferId in reservedTransfers) {
const { transfer, transferState, rawMessage, transferAmount } = reservedTransfers[transferId]
if (new MLNumber(availablePosition).toNumber() >= transferAmount.toNumber()) {
availablePosition = new MLNumber(availablePosition).subtract(transferAmount).toFixed(Config.AMOUNT.SCALE)
transferState.transferStateId = Enum.Transfers.TransferState.RESERVED
sumReserved = new MLNumber(sumReserved).add(transferAmount).toFixed(Config.AMOUNT.SCALE) /* actually used */
} else {
transferState.transferStateId = Enum.Transfers.TransferInternalState.ABORTED_REJECTED
transferState.reason = ErrorHandler.Enums.FSPIOPErrorCodes.PAYER_FSP_INSUFFICIENT_LIQUIDITY.message
rawMessage.value.content.payload = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.PAYER_FSP_INSUFFICIENT_LIQUIDITY, null, null, null, rawMessage.value.content.payload.extensionList).toApiErrorObject(Config.ERROR_HANDLING)
}
const runningPosition = new MLNumber(currentPosition).add(sumReserved).toFixed(Config.AMOUNT.SCALE) /* effective position */
const runningReservedValue = new MLNumber(sumTransfersInBatch).subtract(sumReserved).toFixed(Config.AMOUNT.SCALE)
processedTransfers[transferId] = { transferState, transfer, rawMessage, transferAmount, runningPosition, runningReservedValue }
}
/*
Update the participantPosition with the eventual impact of the Batch
So the position moves forward by the sum of the transfers actually reserved (sumReserved)
and the reserved amount is cleared of the we reserved in the first instance (sumTransfersInBatch)
*/
const processedPositionValue = new MLNumber(initialParticipantPosition.value).add(sumReserved)
await knex('participantPosition').transacting(trx).where({ participantPositionId: initialParticipantPosition.participantPositionId }).update({
value: processedPositionValue.toFixed(Config.AMOUNT.SCALE),
reservedValue: new MLNumber(initialParticipantPosition.reservedValue).subtract(sumTransfersInBatch).toFixed(Config.AMOUNT.SCALE),
changedDate: transactionTimestamp
const timeout = async () => {
try {
const timeoutSegment = await TimeoutService.getTimeoutSegment()
const intervalMin = timeoutSegment ? timeoutSegment.value : 0
const segmentId = timeoutSegment ? timeoutSegment.segmentId : 0
const cleanup = await TimeoutService.cleanupTransferTimeout()
const latestTransferStateChange = await TimeoutService.getLatestTransferStateChange()
const intervalMax = (latestTransferStateChange && parseInt(latestTransferStateChange.transferStateChangeId)) || 0
const result = await TimeoutService.timeoutExpireReserved(segmentId, intervalMin, intervalMax)
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED).toApiErrorObject(Config.ERROR_HANDLING)
if (!Array.isArray(result)) {
result[0] = result
}
for (let i = 0; i < result.length; i++) {
const span = EventSdk.Tracer.createSpan('cl_transfer_timeout')
try {
const state = Utility.StreamingProtocol.createEventState(Enum.Events.EventStatus.FAILURE.status, fspiopError.errorInformation.errorCode, fspiopError.errorInformation.errorDescription)
const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(result[i].transferId, Enum.Kafka.Topics.NOTIFICATION, Enum.Events.Event.Action.TIMEOUT_RECEIVED, state)
const headers = Utility.Http.SwitchDefaultHeaders(result[i].payerFsp, Enum.Http.HeaderResources.TRANSFERS, Enum.Http.Headers.FSPIOP.SWITCH.value)
const message = Utility.StreamingProtocol.createMessage(result[i].transferId, result[i].payeeFsp, result[i].payerFsp, metadata, headers, fspiopError, { id: result[i].transferId }, 'application/vnd.interoperability.transfers+json;version=1.0')
span.setTags(Utility.EventFramework.getTransferSpanTags({ payload: message.content.payload, headers }, Enum.Events.Event.Type.TRANSFER, Enum.Events.Event.Action.TIMEOUT_RECEIVED))
await span.audit({
state,
metadata,
headers,
message
sc.on('close', () => {
throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, 'Sidecar connection closed')
})
const fulfilmentToCondition = (fulfilment) => {
const hashSha256 = Crypto.createHash('sha256')
const preimage = base64url.toBuffer(fulfilment)
if (preimage.length !== 32) {
throw ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR, 'Interledger preimages must be exactly 32 bytes')
}
const calculatedConditionDigest = hashSha256.update(preimage).digest('base64')
Logger.debug(`calculatedConditionDigest=${calculatedConditionDigest}`)
const calculatedConditionUrlEncoded = base64url.fromBase64(calculatedConditionDigest)
Logger.debug(`calculatedConditionUrlEncoded=${calculatedConditionUrlEncoded}`)
return calculatedConditionUrlEncoded
}
histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
return true
} else if (eventType === Enum.Events.Event.Type.POSITION && [Enum.Events.Event.Action.TIMEOUT_RESERVED, Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED].includes(action)) {
Logger.info(Utility.breadcrumb(location, { path: 'timeout' }))
span.setTags({ transactionId: transferId })
const transferInfo = await TransferService.getTransferInfoToChangePosition(transferId, Enum.Accounts.TransferParticipantRoleType.PAYER_DFSP, Enum.Accounts.LedgerEntryType.PRINCIPLE_VALUE)
if (transferInfo.transferStateId !== Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT) {
Logger.info(Utility.breadcrumb(location, `validationFailed::notReceivedFulfilState2--${actionLetter}6`))
throw ErrorHandler.Factory.createInternalServerFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR.message)
} else {
Logger.info(Utility.breadcrumb(location, `validationPassed2--${actionLetter}7`))
const isReversal = true
const transferStateChange = {
transferId: transferInfo.transferId,
transferStateId: Enum.Transfers.TransferInternalState.EXPIRED_RESERVED,
reason: ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED.message
}
await PositionService.changeParticipantPosition(transferInfo.participantCurrencyId, isReversal, transferInfo.amount, transferStateChange)
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.EXPIRED_ERROR, null, null, null, payload.extensionList)
if (action === Enum.Events.Event.Action.TIMEOUT_RESERVED) {
eventDetail.action = Enum.Events.Event.Action.ABORT
}
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail })
throw fspiopError
}
} else {
Logger.info(Utility.breadcrumb(location, `invalidEventTypeOrAction--${actionLetter}8`))
const fspiopError = ErrorHandler.Factory.createInternalServerFSPIOPError(`Invalid event action:(${action}) and/or type:(${eventType})`)
const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action: Enum.Events.Event.Action.POSITION }
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch })
throw fspiopError
}
Logger.info(Utility.breadcrumb(location, { path: 'timeout' }))
span.setTags({ transactionId: transferId })
const transferInfo = await TransferService.getTransferInfoToChangePosition(transferId, Enum.Accounts.TransferParticipantRoleType.PAYER_DFSP, Enum.Accounts.LedgerEntryType.PRINCIPLE_VALUE)
if (transferInfo.transferStateId !== Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT) {
Logger.info(Utility.breadcrumb(location, `validationFailed::notReceivedFulfilState2--${actionLetter}6`))
throw ErrorHandler.Factory.createInternalServerFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR.message)
} else {
Logger.info(Utility.breadcrumb(location, `validationPassed2--${actionLetter}7`))
const isReversal = true
const transferStateChange = {
transferId: transferInfo.transferId,
transferStateId: Enum.Transfers.TransferInternalState.EXPIRED_RESERVED,
reason: ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED.message
}
await PositionService.changeParticipantPosition(transferInfo.participantCurrencyId, isReversal, transferInfo.amount, transferStateChange)
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.EXPIRED_ERROR, null, null, null, payload.extensionList)
if (action === Enum.Events.Event.Action.TIMEOUT_RESERVED) {
eventDetail.action = Enum.Events.Event.Action.ABORT
}
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail })
throw fspiopError
}
} else {
Logger.info(Utility.breadcrumb(location, `invalidEventTypeOrAction--${actionLetter}8`))
const fspiopError = ErrorHandler.Factory.createInternalServerFSPIOPError(`Invalid event action:(${action}) and/or type:(${eventType})`)
const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action: Enum.Events.Event.Action.POSITION }
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch })
throw fspiopError
}
} catch (err) {
Logger.error(`${Utility.breadcrumb(location)}::${err.message}--0`)
histTimerEnd({ success: false, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })