Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
Gates Foundation organization for an example). Those individuals should have
their names indented and be marked with a '-'. Email address can be added
optionally within square brackets .
* Gates Foundation
- Name Surname
* Valentin Genev
* Rajiv Mothilal
* Miguel de Barros
* Nikolay Anastasov
--------------
******/
'use strict'
const Producer = require('@mojaloop/central-services-stream').Util.Producer
const Logger = require('@mojaloop/central-services-logger')
const Uuid = require('uuid4')
const Utility = require('@mojaloop/central-services-shared').Util.Kafka
const Enum = require('@mojaloop/central-services-shared').Enum
const Config = require('../../../src/lib/config')
const TransferState = Enum.Transfers.TransferState
const TransferInternalState = Enum.Transfers.TransferInternalState
const TransferEventType = Enum.Events.Event.Type
const TransferEventAction = Enum.Events.Event.Action
const amount = parseFloat(Number(Math.floor(Math.random() * 100 * 100) / 100 + 100).toFixed(2)) // decimal amount between 100.01 and 200.00
const expiration = new Date((new Date()).getTime() + (24 * 60 * 60 * 1000)) // tomorrow
const Time = require('@mojaloop/central-services-shared').Util.Time
const transfer = {
transferId: Uuid(),
payerFsp: 'dfsp1',
--------------
******/
'use strict'
/**
* @module src/handlers/positions
*/
const Logger = require('@mojaloop/central-services-logger')
const EventSdk = require('@mojaloop/event-sdk')
const TransferService = require('../../domain/transfer')
const PositionService = require('../../domain/position')
const Utility = require('@mojaloop/central-services-shared').Util
const Kafka = require('@mojaloop/central-services-shared').Util.Kafka
const Producer = require('@mojaloop/central-services-stream').Util.Producer
const Consumer = require('@mojaloop/central-services-stream').Util.Consumer
const Enum = require('@mojaloop/central-services-shared').Enum
const Metrics = require('@mojaloop/central-services-metrics')
const Config = require('../../lib/config')
const Uuid = require('uuid4')
const decodePayload = require('@mojaloop/central-services-shared').Util.StreamingProtocol.decodePayload
const decodeMessages = require('@mojaloop/central-services-shared').Util.StreamingProtocol.decodeMessages
const ErrorHandler = require('@mojaloop/central-services-error-handling')
const location = { module: 'PositionHandler', method: '', path: '' } // var object used as pointer
const consumerCommit = true
const fromSwitch = true
/**
* @function positions
*
--------------
******/
'use strict'
/**
* @module src/handlers/positions
*/
const Logger = require('@mojaloop/central-services-logger')
const EventSdk = require('@mojaloop/event-sdk')
const TransferService = require('../../domain/transfer')
const PositionService = require('../../domain/position')
const Utility = require('@mojaloop/central-services-shared').Util
const Kafka = require('@mojaloop/central-services-shared').Util.Kafka
const Producer = require('@mojaloop/central-services-stream').Util.Producer
const Consumer = require('@mojaloop/central-services-stream').Util.Consumer
const Enum = require('@mojaloop/central-services-shared').Enum
const Metrics = require('@mojaloop/central-services-metrics')
const Config = require('../../lib/config')
const Uuid = require('uuid4')
const decodePayload = require('@mojaloop/central-services-shared').Util.StreamingProtocol.decodePayload
const decodeMessages = require('@mojaloop/central-services-shared').Util.StreamingProtocol.decodeMessages
const ErrorHandler = require('@mojaloop/central-services-error-handling')
const location = { module: 'PositionHandler', method: '', path: '' } // var object used as pointer
const consumerCommit = true
const fromSwitch = true
/**
* @function positions
/**
* @module src/domain/participant/
*/
const ParticipantModel = require('../../models/participant/participant')
const ParticipantCurrencyModel = require('../../models/participant/participantCurrency')
const ParticipantPositionModel = require('../../models/participant/participantPosition')
const ParticipantPositionChangeModel = require('../../models/participant/participantPositionChange')
const ParticipantLimitModel = require('../../models/participant/participantLimit')
const LedgerAccountTypeModel = require('../../models/ledgerAccountType/ledgerAccountType')
const ParticipantFacade = require('../../models/participant/facade')
const PositionFacade = require('../../models/position/facade')
const Config = require('../../lib/config')
const Kafka = require('@mojaloop/central-services-shared').Util.Kafka
const KafkaProducer = require('@mojaloop/central-services-stream').Util.Producer
const Uuid = require('uuid4')
const Enum = require('@mojaloop/central-services-shared').Enum
// Alphabetically ordered list of error texts used below
const AccountInactiveErrorText = 'Account is currently set inactive'
const AccountNotFoundErrorText = 'Account not found'
const AccountNotPositionTypeErrorText = 'Only position account update is permitted'
const AccountNotSettlementTypeErrorText = 'Account is not SETTLEMENT type'
const ActionNotSupportedText = 'The action is not supported'
const ParticipantAccountCurrencyMismatchText = 'The account does not match participant or currency specified'
const ParticipantAccountMismatchText = 'Participant/account mismatch'
const ParticipantInactiveText = 'Participant is currently set inactive'
const ParticipantInitialPositionExistsText = 'Participant Limit or Initial Position already set'
const ParticipantNotFoundText = 'Participant does not exist'
const ErrorHandler = require('@mojaloop/central-services-error-handling')
while ((doc = await streamReader.readAsync()) !== null) {
let individualTransfer = doc.payload
individualTransfer.payerFsp = payload.payerFsp
individualTransfer.payeeFsp = payload.payeeFsp
individualTransfer.amount = individualTransfer.transferAmount
delete individualTransfer.transferAmount
individualTransfer.expiration = payload.expiration
const bulkTransferAssociationRecord = {
transferId: individualTransfer.transferId,
bulkTransferId: payload.bulkTransferId,
bulkProcessingStateId: Enum.BulkProcessingState.RECEIVED
}
await BulkTransferService.bulkTransferAssociationCreate(bulkTransferAssociationRecord)
let dataUri = encodePayload(JSON.stringify(individualTransfer), headers['content-type'])
let msg = Object.assign({}, prepareHandlerMessageProtocol)
msg.value.id = messageId
msg.value.from = payload.payerFsp
msg.value.to = payload.payeeFsp
msg.value.content.headers = headers
msg.value.content.payload = dataUri
msg.value.metadata.event.id = message.value.metadata.event.id
msg.value.metadata.event.createdAt = new Date()
params = { message: msg, kafkaTopic, consumer }
const producer = { functionality: TransferEventType.PREPARE, action: TransferEventAction.BULK_PREPARE }
await Util.proceed(params, { consumerCommit, histTimerEnd, producer })
}
} catch (err) { // TODO: handle individual transfers streaming error
Logger.info(Util.breadcrumb(location, `callbackErrorInternal2--${actionLetter}6`))
Logger.info(Util.breadcrumb(location, `notImplemented`))
* ModusBox
- Georgi Georgiev
- Valentin Genev
- Rajiv Mothilal
--------------
******/
'use strict'
/**
* @module src/handlers/transfers
*/
const Logger = require('@mojaloop/central-services-logger')
const Kafka = require('@mojaloop/central-services-shared').Util.Kafka
const Consumer = require('@mojaloop/central-services-stream').Util.Consumer
const Enum = require('@mojaloop/central-services-shared').Enum
const Time = require('@mojaloop/central-services-shared').Util.Time
const ErrorHandler = require('@mojaloop/central-services-error-handling')
const Comparators = require('@mojaloop/central-services-shared').Util.Comparators
const Config = require('../../lib/config')
const TransferService = require('../../domain/transfer')
const Db = require('../../lib/db')
const httpPostRelatedActions = [Enum.Events.Event.Action.RECORD_FUNDS_IN, Enum.Events.Event.Action.RECORD_FUNDS_OUT_PREPARE_RESERVE]
const httpPutRelatedActions = [Enum.Events.Event.Action.RECORD_FUNDS_OUT_COMMIT, Enum.Events.Event.Action.RECORD_FUNDS_OUT_ABORT]
const allowedActions = [].concat(httpPostRelatedActions).concat(httpPutRelatedActions)
const createRecordFundsInOut = async (payload, transactionTimestamp, enums) => {
/** @namespace Db.getKnex **/
const knex = Db.getKnex()
Logger.info(`AdminTransferHandler::${payload.action}::validationPassed::newEntry`)
- Rajiv Mothilal
--------------
******/
'use strict'
/**
* @module src/handlers/timeout
*/
const CronJob = require('cron').CronJob
const Config = require('../../lib/config')
const TimeoutService = require('../../domain/timeout')
const Enum = require('@mojaloop/central-services-shared').Enum
const Kafka = require('@mojaloop/central-services-shared').Util.Kafka
const Producer = require('@mojaloop/central-services-stream').Util.Producer
const Utility = require('@mojaloop/central-services-shared').Util
const ErrorHandler = require('@mojaloop/central-services-error-handling')
const EventSdk = require('@mojaloop/event-sdk')
let timeoutJob
let isRegistered
/**
* @function TransferTimeoutHandler
*
* @async
* @description This is the consumer callback function that gets registered to a cron job.
*
* ... called to validate/insert ...
*
* @param {error} error - error thrown if something fails within Cron
contributed from an organization can be listed under the organization
that actually holds the copyright for their contributions (see the
Gates Foundation organization for an example). Those individuals should have
their names indented and be marked with a '-'. Email address can be added
optionally within square brackets .
* Gates Foundation
- Name Surname
* Lewis Daly
--------------
******/
'use strict'
const { statusEnum, serviceName } = require('@mojaloop/central-services-shared').HealthCheck.HealthCheckEnums
const Logger = require('@mojaloop/central-services-logger')
const Consumer = require('@mojaloop/central-services-stream').Util.Consumer
const MigrationLockModel = require('../../models/misc/migrationLock')
/**
* @function getSubServiceHealthBroker
*
* @description
* Gets the health for the broker, by checking that the consumer is
* connected for each topic
*
* @returns Promise The SubService health object for the broker
*/
const getSubServiceHealthBroker = async () => {
const consumerTopics = Consumer.getListOfTopics()
let status = statusEnum.OK
try {
* Gates Foundation
- Name Surname
* Lazola Lucas
* Rajiv Mothilal
* Miguel de Barros
--------------
******/
'use strict'
/**
* @module src/handlers/lib/kafka
*/
const Consumer = require('@mojaloop/central-services-stream').Kafka.Consumer
const Logger = require('@mojaloop/central-services-shared').Logger
const ErrorHandler = require('@mojaloop/central-services-error-handling')
const listOfConsumers = {}
/**
* @function CreateHandler
*
* @param {string} topicName - the topic name to be registered for the required handler. Example: 'topic-dfsp1-transfer-prepare'
* @param {object} config - the config for the consumer for the specific functionality and action, retrieved from the default.json. Example: found in default.json 'KAFKA.CONSUMER.TRANSFER.PREPARE'
* @param {function} command - the callback handler for the topic. Will be called when the topic is produced against. Example: Command.prepareHandler()
*
* @description Parses the accountUri into a participant name from the uri string
*
* @returns {object} - Returns a Promise
* @throws {Error} - if failure occurs
* Gates Foundation
- Name Surname
* Rajiv Mothilal
* Miguel de Barros
--------------
******/
'use strict'
/**
* @module src/handlers/lib/kafka
*/
const Producer = require('@mojaloop/central-services-stream').Kafka.Producer
const Logger = require('@mojaloop/central-services-shared').Logger
const ErrorHandler = require('@mojaloop/central-services-error-handling')
const listOfProducers = {}
/**
* @function ProduceMessage
*
* @param {string} messageProtocol - message being created against topic
* @param {object} topicConf - configuration for the topic to produce to
* @param {object} config - Producer configuration, eg: to produce batch or poll
*
* @description Creates a producer on Kafka for the specified topic and configuration
*
* @returns {boolean} - returns true if producer successfully created and producers to
* @throws {error} - if not successfully create/produced to