How to use the @mojaloop/central-services-shared.Util.breadcrumb function in @mojaloop/central-services-shared

To help you get started, we’ve selected a few @mojaloop/central-services-shared examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github mojaloop / central-ledger / src / handlers / bulk / processing / handler.js View on Github external
payerFsp: getBulkTransferByIdResult.payerFsp,
            payeeFsp: getBulkTransferByIdResult.payeeFsp,
            expiration: getBulkTransferByIdResult.expiration,
            extensionList: payeeBulkResponse.extensionList
          })
          const metadata = Util.StreamingProtocol.createMetadataWithCorrelatedEvent(params.message.value.metadata.event.id, params.message.value.metadata.type, params.message.value.metadata.action, Enum.Events.EventStatus.SUCCESS)
          params.message.value = Util.StreamingProtocol.createMessage(params.message.value.id, payeeBulkResponse.destination, payeeBulkResponse.headers[Enum.Http.Headers.FSPIOP.SOURCE], metadata, payeeBulkResponse.headers, payload)
          await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail })
          histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
          return true
        } else {
          // TODO: handle use case when no individual transfer has been accepted:
          // Switch to finilize bulk state and notify payer with PUT /bulkTransfers/{id}
          // const payerBulkResponse = Object.assign({}, { messageId: message.value.id, headers }, getBulkTransferByIdResult.payerBulkTransfer)
          Logger.info(Util.breadcrumb(location, `noTransfers--${actionLetter}1`))
          Logger.error(Util.breadcrumb(location, 'notImplemented'))
          return true
        }
      } else if (eventType === Enum.Events.Event.Type.BULK_PROCESSING && [Enum.Events.Event.Action.BULK_COMMIT, Enum.Events.Event.Action.BULK_TIMEOUT_RECEIVED, Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED].includes(action)) {
        Logger.info(Util.breadcrumb(location, `bulkFulfil--${actionLetter}3`))
        const participants = await BulkTransferService.getParticipantsById(bulkTransferInfo.bulkTransferId)
        const normalizedKeys = Object.keys(headers).reduce((keys, k) => { keys[k.toLowerCase()] = k; return keys }, {})
        const payeeBulkResponseHeaders = Util.Headers.transformHeaders(headers, { httpMethod: headers[normalizedKeys[Enum.Http.Headers.FSPIOP.HTTP_METHOD]], sourceFsp: Enum.Http.Headers.FSPIOP.SWITCH.value, destinationFsp: participants.payeeFsp })
        delete payeeBulkResponseHeaders[normalizedKeys[Enum.Http.Headers.FSPIOP.SIGNATURE]]
        const payerBulkResponse = Object.assign({}, { messageId: message.value.id, headers: Util.clone(headers) }, getBulkTransferByIdResult.payerBulkTransfer)
        const payeeBulkResponse = Object.assign({}, { messageId: message.value.id, headers: payeeBulkResponseHeaders }, getBulkTransferByIdResult.payeeBulkTransfer)
        const BulkTransferResultModel = BulkTransferModels.getBulkTransferResultModel()
        await (new BulkTransferResultModel(payerBulkResponse)).save()
        await (new BulkTransferResultModel(payeeBulkResponse)).save()
        const payerParams = Util.clone(params)
        const payeeParams = Util.clone(params)
github mojaloop / central-ledger / src / handlers / transfers / handler.js View on Github external
await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError, eventDetail, fromSwitch })
        histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
        return true
      }
    } else if (hasDuplicateId && !hasDuplicateHash) {
      let eventDetail
      const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.MODIFIED_REQUEST)
      if (!isTransferError) {
        Logger.info(Util.breadcrumb(location, `callbackErrorModified2--${actionLetter}7`))
        eventDetail = { functionality, action: TransferEventAction.FULFIL_DUPLICATE }
        /**
         * HOWTO: During bulk fulfil use an individualTransfer from a previous bulk fulfil,
         * but use different fulfilment value.
         */
      } else {
        Logger.info(Util.breadcrumb(location, `callbackErrorModified3--${actionLetter}8`))
        eventDetail = { functionality, action: TransferEventAction.ABORT_DUPLICATE }
      }
      await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch })
      throw fspiopError
    } else { // !hasDuplicateId
      if (type === TransferEventType.FULFIL && [TransferEventAction.COMMIT, TransferEventAction.REJECT, TransferEventAction.ABORT, TransferEventAction.BULK_COMMIT].includes(action)) {
        Util.breadcrumb(location, { path: 'validationFailed' })
        if (payload.fulfilment && !Validator.validateFulfilCondition(payload.fulfilment, transfer.condition)) {
          Logger.info(Util.breadcrumb(location, `callbackErrorInvalidFulfilment--${actionLetter}9`))
          const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.VALIDATION_ERROR, 'invalid fulfilment')
          const apiFspiopError = fspiopError.toApiErrorObject(Config.ERROR_HANDLING)
          await TransferService.handlePayeeResponse(transferId, payload, action, apiFspiopError)
          const eventDetail = { functionality: TransferEventType.POSITION, action: TransferEventAction.ABORT }
          /**
           * TODO: BulkProcessingHandler (not in scope of #967) The individual transfer is ABORTED by notification is never sent.
           */
github mojaloop / central-ledger / src / handlers / bulk / fulfil / handler.js View on Github external
* entire bulk. CAUTION: As of 20191111 this code would also execute when failure
         * reason is "FSPIOP-Source header should match Payee". In this case we should not
         * abort the bulk as we would have accepted non-legitimate source.
         */
        await BulkTransferService.bulkFulfil(payload, reasons.toString(), false)
      } catch (err) { // TODO: handle insert error
        Logger.info(Util.breadcrumb(location, `callbackErrorInternal2--${actionLetter}7`))
        Logger.error(Util.breadcrumb(location, 'notImplemented'))
        return true
      }
      Logger.info(Util.breadcrumb(location, `callbackErrorGeneric--${actionLetter}8`))
      Logger.error(Util.breadcrumb(location, 'notImplemented'))
      return true // TODO: store invalid bulk transfer to database and produce callback notification to payer
    }
  } catch (err) {
    Logger.error(`${Util.breadcrumb(location)}::${err.message}--BP0`)
    histTimerEnd({ success: false, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
    throw err
  }
}
github mojaloop / central-ledger / src / handlers / bulk / processing / handler.js View on Github external
params.message.value.content.uriParams.id = bulkTransferInfo.bulkTransferId
        await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail })
        throw fspiopError
      } else {
        // TODO: For the following (Internal Server Error) scenario a notification is produced for each individual transfer.
        // It also needs to be processed first in order to accumulate transfers and send the callback notification at bulk level.
        Logger.info(Util.breadcrumb(location, `invalidEventTypeOrAction--${actionLetter}4`))
        const fspiopError = ErrorHandler.Factory.createInternalServerFSPIOPError(`Invalid event action:(${action}) and/or type:(${eventType})`).toApiErrorObject(Config.ERROR_HANDLING)
        const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action: Enum.Events.Event.Action.BULK_PROCESSING }
        await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError, eventDetail, fromSwitch })
        histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
        return true
      }
    }
  } catch (err) {
    Logger.error(`${Util.breadcrumb(location)}::${err.message}--BP0`)
    histTimerEnd({ success: false, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
    throw ErrorHandler.Factory.reformatFSPIOPError(err)
  }
}
github mojaloop / central-ledger / src / handlers / bulk / fulfil / handler.js View on Github external
individualTransferFulfil.transferState = Enum.Transfers.TransferState.ABORTED
          } else {
            individualTransferFulfil.transferState = Enum.Transfers.TransferState.COMMITTED
          }
          const dataUri = encodePayload(JSON.stringify(individualTransferFulfil), headers[Enum.Http.Headers.GENERAL.CONTENT_TYPE.value])
          const metadata = Util.StreamingProtocol.createMetadataWithCorrelatedEventState(message.value.metadata.event.id, Enum.Events.Event.Type.FULFIL, Enum.Events.Event.Action.COMMIT, Enum.Events.EventStatus.SUCCESS.status, Enum.Events.EventStatus.SUCCESS.code, Enum.Events.EventStatus.SUCCESS.description) // TODO: switch action to 'bulk-fulfil' flow
          const msg = {
            value: Util.StreamingProtocol.createMessage(messageId, headers[Enum.Http.Headers.FSPIOP.DESTINATION], headers[Enum.Http.Headers.FSPIOP.SOURCE], metadata, headers, dataUri, { id: transferId })
          }
          params = { message: msg, kafkaTopic, consumer: Consumer, producer: Producer }
          const eventDetail = { functionality: Enum.Events.Event.Type.FULFIL, action: Enum.Events.Event.Action.BULK_COMMIT }
          await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, histTimerEnd, eventDetail })
          histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
        }
      } catch (err) { // TODO: handle individual transfers streaming error
        Logger.info(Util.breadcrumb(location, `callbackErrorInternal2--${actionLetter}6`))
        Logger.error(Util.breadcrumb(location, 'notImplemented'))
        return true
      }
    } else { // TODO: handle validation failure
      Logger.error(Util.breadcrumb(location, { path: 'validationFailed' }))
      try {
        Logger.info(Util.breadcrumb(location, 'saveInvalidRequest'))
        /**
         * TODO: Following the example for regular transfers, the folloing should ABORT the
         * entire bulk. CAUTION: As of 20191111 this code would also execute when failure
         * reason is "FSPIOP-Source header should match Payee". In this case we should not
         * abort the bulk as we would have accepted non-legitimate source.
         */
        await BulkTransferService.bulkFulfil(payload, reasons.toString(), false)
      } catch (err) { // TODO: handle insert error
        Logger.info(Util.breadcrumb(location, `callbackErrorInternal2--${actionLetter}7`))
github mojaloop / central-ledger / src / handlers / transfers / handler.js View on Github external
histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
          return true
        }
      }
    } else if (hasDuplicateId && !hasDuplicateHash) {
      Logger.error(Util.breadcrumb(location, `callbackErrorModified1--${actionLetter}5`))
      const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.MODIFIED_REQUEST)
      const eventDetail = { functionality, action }
      await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch })
      throw fspiopError
    } else { // !hasDuplicateId
      const { validationPassed, reasons } = await Validator.validateByName(payload, headers)
      if (validationPassed) {
        Logger.info(Util.breadcrumb(location, { path: 'validationPassed' }))
        try {
          Logger.info(Util.breadcrumb(location, 'saveTransfer'))
          await TransferService.prepare(payload)
        } catch (err) {
          Logger.info(Util.breadcrumb(location, `callbackErrorInternal1--${actionLetter}6`))
          Logger.error(`${Util.breadcrumb(location)}::${err.message}`)
          const fspiopError = ErrorHandler.Factory.reformatFSPIOPError(err, ErrorHandler.Enums.FSPIOPErrorCodes.INTERNAL_SERVER_ERROR)
          const eventDetail = { functionality, action: TransferEventAction.PREPARE }
          /**
           * TODO: BULK-Handle at BulkProcessingHandler (not in scope of #967)
           * HOWTO: Stop execution at the `TransferService.prepare`, stop mysql,
           * continue execution to catch block, start mysql
           */
          await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch })
          throw fspiopError
        }
        Logger.info(Util.breadcrumb(location, `positionTopic1--${actionLetter}7`))
        functionality = TransferEventType.POSITION
github mojaloop / central-ledger / src / handlers / bulk / fulfil / handler.js View on Github external
const bulkTransferId = payload.bulkTransferId
    const kafkaTopic = message.topic
    Logger.info(Util.breadcrumb(location, { method: 'bulkFulfil' }))
    const actionLetter = action === Enum.Events.Event.Action.BULK_COMMIT ? Enum.Events.ActionLetter.bulkCommit : Enum.Events.ActionLetter.unknown
    let params = { message, kafkaTopic, decodedPayload: payload, consumer: Consumer, producer: Producer }

    Logger.info(Util.breadcrumb(location, { path: 'dupCheck' }))

    const { hasDuplicateId, hasDuplicateHash } = await Comparators.duplicateCheckComparator(bulkTransferId, payload.hash, BulkTransferService.getBulkTransferFulfilmentDuplicateCheck, BulkTransferService.saveBulkTransferFulfilmentDuplicateCheck)
    if (hasDuplicateId && hasDuplicateHash) { // TODO: handle resend :: GET /bulkTransfer
      Logger.info(Util.breadcrumb(location, `resend--${actionLetter}1`))
      Logger.error(Util.breadcrumb(location, 'notImplemented'))
      return true
    }
    if (hasDuplicateId && !hasDuplicateHash) {
      Logger.error(Util.breadcrumb(location, `callbackErrorModified--${actionLetter}2`))
      const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.MODIFIED_REQUEST)
      const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action }
      await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch })
      throw fspiopError
    }

    // TODO: move FSPIOP-Source validation before Transfer Duplicate Check to accept only Payee's first request
    const { isValid, reasons } = await Validator.validateBulkTransferFulfilment(payload, headers)
    if (isValid) {
      let state
      Logger.info(Util.breadcrumb(location, { path: 'isValid' }))
      try {
        Logger.info(Util.breadcrumb(location, 'saveBulkTransfer'))
        state = await BulkTransferService.bulkFulfil(payload)
      } catch (err) { // TODO: handle insert errors
        Logger.info(Util.breadcrumb(location, `callbackErrorInternal1--${actionLetter}5`))
github mojaloop / central-ledger / src / handlers / bulk / processing / handler.js View on Github external
})
          const metadata = Util.StreamingProtocol.createMetadataWithCorrelatedEvent(params.message.value.metadata.event.id, params.message.value.metadata.type, params.message.value.metadata.action, Enum.Events.EventStatus.SUCCESS)
          params.message.value = Util.StreamingProtocol.createMessage(params.message.value.id, payeeBulkResponse.destination, payeeBulkResponse.headers[Enum.Http.Headers.FSPIOP.SOURCE], metadata, payeeBulkResponse.headers, payload)
          await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail })
          histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
          return true
        } else {
          // TODO: handle use case when no individual transfer has been accepted:
          // Switch to finilize bulk state and notify payer with PUT /bulkTransfers/{id}
          // const payerBulkResponse = Object.assign({}, { messageId: message.value.id, headers }, getBulkTransferByIdResult.payerBulkTransfer)
          Logger.info(Util.breadcrumb(location, `noTransfers--${actionLetter}1`))
          Logger.error(Util.breadcrumb(location, 'notImplemented'))
          return true
        }
      } else if (eventType === Enum.Events.Event.Type.BULK_PROCESSING && [Enum.Events.Event.Action.BULK_COMMIT, Enum.Events.Event.Action.BULK_TIMEOUT_RECEIVED, Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED].includes(action)) {
        Logger.info(Util.breadcrumb(location, `bulkFulfil--${actionLetter}3`))
        const participants = await BulkTransferService.getParticipantsById(bulkTransferInfo.bulkTransferId)
        const normalizedKeys = Object.keys(headers).reduce((keys, k) => { keys[k.toLowerCase()] = k; return keys }, {})
        const payeeBulkResponseHeaders = Util.Headers.transformHeaders(headers, { httpMethod: headers[normalizedKeys[Enum.Http.Headers.FSPIOP.HTTP_METHOD]], sourceFsp: Enum.Http.Headers.FSPIOP.SWITCH.value, destinationFsp: participants.payeeFsp })
        delete payeeBulkResponseHeaders[normalizedKeys[Enum.Http.Headers.FSPIOP.SIGNATURE]]
        const payerBulkResponse = Object.assign({}, { messageId: message.value.id, headers: Util.clone(headers) }, getBulkTransferByIdResult.payerBulkTransfer)
        const payeeBulkResponse = Object.assign({}, { messageId: message.value.id, headers: payeeBulkResponseHeaders }, getBulkTransferByIdResult.payeeBulkTransfer)
        const BulkTransferResultModel = BulkTransferModels.getBulkTransferResultModel()
        await (new BulkTransferResultModel(payerBulkResponse)).save()
        await (new BulkTransferResultModel(payeeBulkResponse)).save()
        const payerParams = Util.clone(params)
        const payeeParams = Util.clone(params)

        const payerPayload = Util.omitNil({
          bulkTransferId: payerBulkResponse.bulkTransferId,
          bulkTransferState: payerBulkResponse.bulkTransferState,
          completedTimestamp: payerBulkResponse.completedTimestamp,
github mojaloop / central-ledger / src / handlers / transfers / handler.js View on Github external
: Enum.Events.ActionLetter.unknown)
    let functionality = action === TransferEventAction.PREPARE ? TransferEventType.NOTIFICATION
      : (action === TransferEventAction.BULK_PREPARE ? TransferEventType.BULK_PROCESSING
        : Enum.Events.ActionLetter.unknown)
    const params = { message, kafkaTopic, decodedPayload: payload, span, consumer: Consumer, producer: Producer }

    Logger.info(Util.breadcrumb(location, { path: 'dupCheck' }))

    const { hasDuplicateId, hasDuplicateHash } = await Comparators.duplicateCheckComparator(transferId, payload, TransferService.getTransferDuplicateCheck, TransferService.saveTransferDuplicateCheck)
    if (hasDuplicateId && hasDuplicateHash) {
      Logger.info(Util.breadcrumb(location, 'handleResend'))
      const transfer = await TransferService.getByIdLight(transferId)
      const transferStateEnum = transfer && transfer.transferStateEnumeration
      const eventDetail = { functionality, action: TransferEventAction.PREPARE_DUPLICATE }
      if ([TransferState.COMMITTED, TransferState.ABORTED].includes(transferStateEnum)) {
        Logger.info(Util.breadcrumb(location, 'finalized'))
        if (action === TransferEventAction.PREPARE) {
          Logger.info(Util.breadcrumb(location, `callback--${actionLetter}1`))
          message.value.content.payload = TransferObjectTransform.toFulfil(transfer)
          message.value.content.uriParams = { id: transferId }
          await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail, fromSwitch })
          histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
          return true
        } else if (action === TransferEventAction.BULK_PREPARE) {
          Logger.info(Util.breadcrumb(location, `validationError1--${actionLetter}2`))
          const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.MODIFIED_REQUEST, 'Individual transfer prepare duplicate')
          await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch })
          throw fspiopError
        }
      } else {
        Logger.info(Util.breadcrumb(location, 'inProgress'))
        if (action === TransferEventAction.BULK_PREPARE) {
github mojaloop / central-ledger / src / handlers / positions / handler.js View on Github external
const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.EXPIRED_ERROR, null, null, null, payload.extensionList)
        if (action === Enum.Events.Event.Action.TIMEOUT_RESERVED) {
          eventDetail.action = Enum.Events.Event.Action.ABORT
        }
        await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail })
        throw fspiopError
      }
    } else {
      Logger.info(Utility.breadcrumb(location, `invalidEventTypeOrAction--${actionLetter}8`))
      const fspiopError = ErrorHandler.Factory.createInternalServerFSPIOPError(`Invalid event action:(${action}) and/or type:(${eventType})`)
      const eventDetail = { functionality: Enum.Events.Event.Type.NOTIFICATION, action: Enum.Events.Event.Action.POSITION }
      await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail, fromSwitch })
      throw fspiopError
    }
  } catch (err) {
    Logger.error(`${Utility.breadcrumb(location)}::${err.message}--0`)
    histTimerEnd({ success: false, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
    const fspiopError = ErrorHandler.Factory.reformatFSPIOPError(err)
    const state = new EventSdk.EventStateMetadata(EventSdk.EventStatusType.failed, fspiopError.apiErrorCode.code, fspiopError.apiErrorCode.message)
    await span.error(fspiopError, state)
    await span.finish(fspiopError.message, state)
    return true
  } finally {
    if (!span.isFinished) {
      await span.finish()
    }
  }
}