Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
'use strict'
const Producer = require('@mojaloop/central-services-stream').Util.Producer
const Logger = require('@mojaloop/central-services-logger')
const Uuid = require('uuid4')
const Utility = require('@mojaloop/central-services-shared').Util.Kafka
const Enum = require('@mojaloop/central-services-shared').Enum
const Config = require('../../../src/lib/config')
const TransferState = Enum.Transfers.TransferState
const TransferInternalState = Enum.Transfers.TransferInternalState
const TransferEventType = Enum.Events.Event.Type
const TransferEventAction = Enum.Events.Event.Action
const amount = parseFloat(Number(Math.floor(Math.random() * 100 * 100) / 100 + 100).toFixed(2)) // decimal amount between 100.01 and 200.00
const expiration = new Date((new Date()).getTime() + (24 * 60 * 60 * 1000)) // tomorrow
const Time = require('@mojaloop/central-services-shared').Util.Time
const transfer = {
transferId: Uuid(),
payerFsp: 'dfsp1',
payeeFsp: 'dfsp2',
amount: {
currency: 'USD',
amount
},
ilpPacket: 'AYIBgQAAAAAAAASwNGxldmVsb25lLmRmc3AxLm1lci45T2RTOF81MDdqUUZERmZlakgyOVc4bXFmNEpLMHlGTFGCAUBQU0svMS4wCk5vbmNlOiB1SXlweUYzY3pYSXBFdzVVc05TYWh3CkVuY3J5cHRpb246IG5vbmUKUGF5bWVudC1JZDogMTMyMzZhM2ItOGZhOC00MTYzLTg0NDctNGMzZWQzZGE5OGE3CgpDb250ZW50LUxlbmd0aDogMTM1CkNvbnRlbnQtVHlwZTogYXBwbGljYXRpb24vanNvbgpTZW5kZXItSWRlbnRpZmllcjogOTI4MDYzOTEKCiJ7XCJmZWVcIjowLFwidHJhbnNmZXJDb2RlXCI6XCJpbnZvaWNlXCIsXCJkZWJpdE5hbWVcIjpcImFsaWNlIGNvb3BlclwiLFwiY3JlZGl0TmFtZVwiOlwibWVyIGNoYW50XCIsXCJkZWJpdElkZW50aWZpZXJcIjpcIjkyODA2MzkxXCJ9IgA',
condition: '47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU',
expiration: Time.getUTCString(expiration),
extensionList: {
extension: [
{
key: 'key1',
* Valentin Genev
* Rajiv Mothilal
* Miguel de Barros
* Nikolay Anastasov
--------------
******/
'use strict'
const Producer = require('@mojaloop/central-services-stream').Util.Producer
const Logger = require('@mojaloop/central-services-logger')
const Uuid = require('uuid4')
const Utility = require('@mojaloop/central-services-shared').Util.Kafka
const Enum = require('@mojaloop/central-services-shared').Enum
const Config = require('../../../src/lib/config')
const TransferState = Enum.Transfers.TransferState
const TransferInternalState = Enum.Transfers.TransferInternalState
const TransferEventType = Enum.Events.Event.Type
const TransferEventAction = Enum.Events.Event.Action
const amount = parseFloat(Number(Math.floor(Math.random() * 100 * 100) / 100 + 100).toFixed(2)) // decimal amount between 100.01 and 200.00
const expiration = new Date((new Date()).getTime() + (24 * 60 * 60 * 1000)) // tomorrow
const Time = require('@mojaloop/central-services-shared').Util.Time
const transfer = {
transferId: Uuid(),
payerFsp: 'dfsp1',
payeeFsp: 'dfsp2',
amount: {
currency: 'USD',
amount
},
ilpPacket: 'AYIBgQAAAAAAAASwNGxldmVsb25lLmRmc3AxLm1lci45T2RTOF81MDdqUUZERmZlakgyOVc4bXFmNEpLMHlGTFGCAUBQU0svMS4wCk5vbmNlOiB1SXlweUYzY3pYSXBFdzVVc05TYWh3CkVuY3J5cHRpb246IG5vbmUKUGF5bWVudC1JZDogMTMyMzZhM2ItOGZhOC00MTYzLTg0NDctNGMzZWQzZGE5OGE3CgpDb250ZW50LUxlbmd0aDogMTM1CkNvbnRlbnQtVHlwZTogYXBwbGljYXRpb24vanNvbgpTZW5kZXItSWRlbnRpZmllcjogOTI4MDYzOTEKCiJ7XCJmZWVcIjowLFwidHJhbnNmZXJDb2RlXCI6XCJpbnZvaWNlXCIsXCJkZWJpdE5hbWVcIjpcImFsaWNlIGNvb3BlclwiLFwiY3JlZGl0TmFtZVwiOlwibWVyIGNoYW50XCIsXCJkZWJpdElkZW50aWZpZXJcIjpcIjkyODA2MzkxXCJ9IgA',
* Nikolay Anastasov
--------------
******/
'use strict'
const Producer = require('@mojaloop/central-services-stream').Util.Producer
const Logger = require('@mojaloop/central-services-logger')
const Uuid = require('uuid4')
const Utility = require('@mojaloop/central-services-shared').Util.Kafka
const Enum = require('@mojaloop/central-services-shared').Enum
const Config = require('../../../src/lib/config')
const TransferState = Enum.Transfers.TransferState
const TransferInternalState = Enum.Transfers.TransferInternalState
const TransferEventType = Enum.Events.Event.Type
const TransferEventAction = Enum.Events.Event.Action
const amount = parseFloat(Number(Math.floor(Math.random() * 100 * 100) / 100 + 100).toFixed(2)) // decimal amount between 100.01 and 200.00
const expiration = new Date((new Date()).getTime() + (24 * 60 * 60 * 1000)) // tomorrow
const Time = require('@mojaloop/central-services-shared').Util.Time
const transfer = {
transferId: Uuid(),
payerFsp: 'dfsp1',
payeeFsp: 'dfsp2',
amount: {
currency: 'USD',
amount
},
ilpPacket: 'AYIBgQAAAAAAAASwNGxldmVsb25lLmRmc3AxLm1lci45T2RTOF81MDdqUUZERmZlakgyOVc4bXFmNEpLMHlGTFGCAUBQU0svMS4wCk5vbmNlOiB1SXlweUYzY3pYSXBFdzVVc05TYWh3CkVuY3J5cHRpb246IG5vbmUKUGF5bWVudC1JZDogMTMyMzZhM2ItOGZhOC00MTYzLTg0NDctNGMzZWQzZGE5OGE3CgpDb250ZW50LUxlbmd0aDogMTM1CkNvbnRlbnQtVHlwZTogYXBwbGljYXRpb24vanNvbgpTZW5kZXItSWRlbnRpZmllcjogOTI4MDYzOTEKCiJ7XCJmZWVcIjowLFwidHJhbnNmZXJDb2RlXCI6XCJpbnZvaWNlXCIsXCJkZWJpdE5hbWVcIjpcImFsaWNlIGNvb3BlclwiLFwiY3JlZGl0TmFtZVwiOlwibWVyIGNoYW50XCIsXCJkZWJpdElkZW50aWZpZXJcIjpcIjkyODA2MzkxXCJ9IgA',
condition: '47DEQpj8HBSa-_TImW-5JCeuQeRkm5NMpJWZG3hSuFU',
expiration: Time.getUTCString(expiration),
extensionList: {
exports.transferPrepare = async () => {
const config = Utility.getKafkaConfig(Config.KAFKA_CONFIG, Enum.Kafka.Config.PRODUCER, TransferEventType.TRANSFER.toUpperCase(), TransferEventType.PREPARE.toUpperCase())
config.logger = Logger
// extend the message with topic information
const transferObj = requestBodys().messageProtocol()
await Producer.produceMessage(transferObj, topicConfTransferPrepare, config)
return transferObj.id
// return true
}
try {
const timeoutSegment = await TimeoutService.getTimeoutSegment()
const intervalMin = timeoutSegment ? timeoutSegment.value : 0
const segmentId = timeoutSegment ? timeoutSegment.segmentId : 0
const cleanup = await TimeoutService.cleanupTransferTimeout()
const latestTransferStateChange = await TimeoutService.getLatestTransferStateChange()
const intervalMax = (latestTransferStateChange && parseInt(latestTransferStateChange.transferStateChangeId)) || 0
const result = await TimeoutService.timeoutExpireReserved(segmentId, intervalMin, intervalMax)
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED).toApiErrorObject(Config.ERROR_HANDLING)
if (!Array.isArray(result)) {
result[0] = result
}
for (let i = 0; i < result.length; i++) {
const span = EventSdk.Tracer.createSpan('cl_transfer_timeout')
try {
const state = Utility.StreamingProtocol.createEventState(Enum.Events.EventStatus.FAILURE.status, fspiopError.errorInformation.errorCode, fspiopError.errorInformation.errorDescription)
const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(result[i].transferId, Enum.Kafka.Topics.NOTIFICATION, Enum.Events.Event.Action.TIMEOUT_RECEIVED, state)
const headers = Utility.Http.SwitchDefaultHeaders(result[i].payerFsp, Enum.Http.HeaderResources.TRANSFERS, Enum.Http.Headers.FSPIOP.SWITCH.value)
const message = Utility.StreamingProtocol.createMessage(result[i].transferId, result[i].payeeFsp, result[i].payerFsp, metadata, headers, fspiopError, { id: result[i].transferId }, 'application/vnd.interoperability.transfers+json;version=1.0')
span.setTags(Utility.EventFramework.getTransferSpanTags({ payload: message.content.payload, headers }, Enum.Events.Event.Type.TRANSFER, Enum.Events.Event.Action.TIMEOUT_RECEIVED))
await span.audit({
state,
metadata,
headers,
message
}, EventSdk.AuditEventAction.start)
if (result[i].bulkTransferId === null) { // regular transfer
if (result[i].transferStateId === Enum.Transfers.TransferInternalState.EXPIRED_PREPARED) {
message.to = message.from
message.from = Enum.Http.Headers.FSPIOP.SWITCH.value
// event & type set above when `const metadata` is initialized to NOTIFICATION / TIMEOUT_RECEIVED
await Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Kafka.Topics.NOTIFICATION, Enum.Events.Event.Action.TIMEOUT_RECEIVED, message, state, null, span)
try {
const state = Utility.StreamingProtocol.createEventState(Enum.Events.EventStatus.FAILURE.status, fspiopError.errorInformation.errorCode, fspiopError.errorInformation.errorDescription)
const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(result[i].transferId, Enum.Kafka.Topics.NOTIFICATION, Enum.Events.Event.Action.TIMEOUT_RECEIVED, state)
const headers = Utility.Http.SwitchDefaultHeaders(result[i].payerFsp, Enum.Http.HeaderResources.TRANSFERS, Enum.Http.Headers.FSPIOP.SWITCH.value)
const message = Utility.StreamingProtocol.createMessage(result[i].transferId, result[i].payeeFsp, result[i].payerFsp, metadata, headers, fspiopError, { id: result[i].transferId }, 'application/vnd.interoperability.transfers+json;version=1.0')
span.setTags(Utility.EventFramework.getTransferSpanTags({ payload: message.content.payload, headers }, Enum.Events.Event.Type.TRANSFER, Enum.Events.Event.Action.TIMEOUT_RECEIVED))
await span.audit({
state,
metadata,
headers,
message
}, EventSdk.AuditEventAction.start)
if (result[i].bulkTransferId === null) { // regular transfer
if (result[i].transferStateId === Enum.Transfers.TransferInternalState.EXPIRED_PREPARED) {
message.to = message.from
message.from = Enum.Http.Headers.FSPIOP.SWITCH.value
// event & type set above when `const metadata` is initialized to NOTIFICATION / TIMEOUT_RECEIVED
await Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Kafka.Topics.NOTIFICATION, Enum.Events.Event.Action.TIMEOUT_RECEIVED, message, state, null, span)
} else if (result[i].transferStateId === Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT) {
message.metadata.event.type = Enum.Events.Event.Type.POSITION
message.metadata.event.action = Enum.Events.Event.Action.TIMEOUT_RESERVED
await Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Kafka.Topics.POSITION, Enum.Events.Event.Action.TIMEOUT_RESERVED, message, state, result[i].payerFsp, span)
}
} else { // individual transfer from a bulk
if (result[i].transferStateId === Enum.Transfers.TransferInternalState.EXPIRED_PREPARED) {
message.to = message.from
message.from = Enum.Http.Headers.FSPIOP.SWITCH.value
message.metadata.event.type = Enum.Events.Event.Type.BULK_PROCESSING
message.metadata.event.action = Enum.Events.Event.Action.BULK_TIMEOUT_RECEIVED
await Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Kafka.Topics.BULK_PROCESSING, Enum.Events.Event.Action.BULK_TIMEOUT_RECEIVED, message, state, null, span)
} else if (result[i].transferStateId === Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT) {
message.metadata.event.type = Enum.Events.Event.Type.POSITION
const intervalMin = timeoutSegment ? timeoutSegment.value : 0
const segmentId = timeoutSegment ? timeoutSegment.segmentId : 0
const cleanup = await TimeoutService.cleanupTransferTimeout()
const latestTransferStateChange = await TimeoutService.getLatestTransferStateChange()
const intervalMax = (latestTransferStateChange && parseInt(latestTransferStateChange.transferStateChangeId)) || 0
const result = await TimeoutService.timeoutExpireReserved(segmentId, intervalMin, intervalMax)
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED).toApiErrorObject(Config.ERROR_HANDLING)
if (!Array.isArray(result)) {
result[0] = result
}
for (let i = 0; i < result.length; i++) {
const span = EventSdk.Tracer.createSpan('cl_transfer_timeout')
try {
const state = Utility.StreamingProtocol.createEventState(Enum.Events.EventStatus.FAILURE.status, fspiopError.errorInformation.errorCode, fspiopError.errorInformation.errorDescription)
const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(result[i].transferId, Enum.Kafka.Topics.NOTIFICATION, Enum.Events.Event.Action.TIMEOUT_RECEIVED, state)
const headers = Utility.Http.SwitchDefaultHeaders(result[i].payerFsp, Enum.Http.HeaderResources.TRANSFERS, Enum.Http.Headers.FSPIOP.SWITCH.value)
const message = Utility.StreamingProtocol.createMessage(result[i].transferId, result[i].payeeFsp, result[i].payerFsp, metadata, headers, fspiopError, { id: result[i].transferId }, 'application/vnd.interoperability.transfers+json;version=1.0')
span.setTags(Utility.EventFramework.getTransferSpanTags({ payload: message.content.payload, headers }, Enum.Events.Event.Type.TRANSFER, Enum.Events.Event.Action.TIMEOUT_RECEIVED))
await span.audit({
state,
metadata,
headers,
message
}, EventSdk.AuditEventAction.start)
if (result[i].bulkTransferId === null) { // regular transfer
if (result[i].transferStateId === Enum.Transfers.TransferInternalState.EXPIRED_PREPARED) {
message.to = message.from
message.from = Enum.Http.Headers.FSPIOP.SWITCH.value
// event & type set above when `const metadata` is initialized to NOTIFICATION / TIMEOUT_RECEIVED
await Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Kafka.Topics.NOTIFICATION, Enum.Events.Event.Action.TIMEOUT_RECEIVED, message, state, null, span)
} else if (result[i].transferStateId === Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT) {
message.metadata.event.type = Enum.Events.Event.Type.POSITION
const timeoutSegment = await TimeoutService.getTimeoutSegment()
const intervalMin = timeoutSegment ? timeoutSegment.value : 0
const segmentId = timeoutSegment ? timeoutSegment.segmentId : 0
const cleanup = await TimeoutService.cleanupTransferTimeout()
const latestTransferStateChange = await TimeoutService.getLatestTransferStateChange()
const intervalMax = (latestTransferStateChange && parseInt(latestTransferStateChange.transferStateChangeId)) || 0
const result = await TimeoutService.timeoutExpireReserved(segmentId, intervalMin, intervalMax)
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED).toApiErrorObject(Config.ERROR_HANDLING)
if (!Array.isArray(result)) {
result[0] = result
}
for (let i = 0; i < result.length; i++) {
const span = EventSdk.Tracer.createSpan('cl_transfer_timeout')
try {
const state = Utility.StreamingProtocol.createEventState(Enum.Events.EventStatus.FAILURE.status, fspiopError.errorInformation.errorCode, fspiopError.errorInformation.errorDescription)
const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(result[i].transferId, Enum.Kafka.Topics.NOTIFICATION, Enum.Events.Event.Action.TIMEOUT_RECEIVED, state)
const headers = Utility.Http.SwitchDefaultHeaders(result[i].payerFsp, Enum.Http.HeaderResources.TRANSFERS, Enum.Http.Headers.FSPIOP.SWITCH.value)
const message = Utility.StreamingProtocol.createMessage(result[i].transferId, result[i].payeeFsp, result[i].payerFsp, metadata, headers, fspiopError, { id: result[i].transferId }, 'application/vnd.interoperability.transfers+json;version=1.0')
span.setTags(Utility.EventFramework.getTransferSpanTags({ payload: message.content.payload, headers }, Enum.Events.Event.Type.TRANSFER, Enum.Events.Event.Action.TIMEOUT_RECEIVED))
await span.audit({
state,
metadata,
headers,
message
}, EventSdk.AuditEventAction.start)
if (result[i].bulkTransferId === null) { // regular transfer
if (result[i].transferStateId === Enum.Transfers.TransferInternalState.EXPIRED_PREPARED) {
message.to = message.from
message.from = Enum.Http.Headers.FSPIOP.SWITCH.value
// event & type set above when `const metadata` is initialized to NOTIFICATION / TIMEOUT_RECEIVED
await Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Kafka.Topics.NOTIFICATION, Enum.Events.Event.Action.TIMEOUT_RECEIVED, message, state, null, span)
} else if (result[i].transferStateId === Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT) {
- Georgi Georgiev
- Rajiv Mothilal
--------------
******/
'use strict'
const Logger = require('@mojaloop/central-services-logger')
const BulkTransferService = require('../../../domain/bulkTransfer')
const Util = require('@mojaloop/central-services-shared').Util
const Kafka = require('@mojaloop/central-services-shared').Util.Kafka
const Producer = require('@mojaloop/central-services-stream').Util.Producer
const Consumer = require('@mojaloop/central-services-stream').Util.Consumer
const Enum = require('@mojaloop/central-services-shared').Enum
const Metrics = require('@mojaloop/central-services-metrics')
const Config = require('../../../lib/config')
const decodePayload = require('@mojaloop/central-services-shared').Util.StreamingProtocol.decodePayload
const BulkTransferModels = require('@mojaloop/central-object-store').Models.BulkTransfer
const ErrorHandler = require('@mojaloop/central-services-error-handling')
const location = { module: 'BulkProcessingHandler', method: '', path: '' } // var object used as pointer
const consumerCommit = true
const fromSwitch = true
/**
* @function BulkProcessingHandler
*
* @async
* @description This is the consumer callback function that gets registered to a topic. This then gets a list of messages,
* we will only ever use the first message in non batch processing. We then break down the message into its payload and
* begin validating the payload. Once the payload is validated successfully it will be written to the database to
* the relevant tables. If the validation fails it is still written to the database for auditing purposes but with an
* Gates Foundation
- Name Surname
* ModusBox
- Georgi Georgiev
- Rajiv Mothilal
--------------
******/
'use strict'
const AwaitifyStream = require('awaitify-stream')
const Logger = require('@mojaloop/central-services-logger')
const BulkTransferService = require('../../../domain/bulkTransfer')
const Util = require('@mojaloop/central-services-shared').Util
const Kafka = require('@mojaloop/central-services-shared').Util.Kafka
const Producer = require('@mojaloop/central-services-stream').Util.Producer
const Consumer = require('@mojaloop/central-services-stream').Util.Consumer
const Validator = require('../shared/validator')
const Enum = require('@mojaloop/central-services-shared').Enum
const Metrics = require('@mojaloop/central-services-metrics')
const Config = require('../../../lib/config')
const BulkTransferModels = require('@mojaloop/central-object-store').Models.BulkTransfer
const encodePayload = require('@mojaloop/central-services-shared').Util.StreamingProtocol.encodePayload
const Comparators = require('@mojaloop/central-services-shared').Util.Comparators
const ErrorHandler = require('@mojaloop/central-services-error-handling')
const location = { module: 'BulkFulfilHandler', method: '', path: '' } // var object used as pointer
const consumerCommit = true
const fromSwitch = true
/**