How to use the @mojaloop/central-services-shared.Util.StreamingProtocol function in @mojaloop/central-services-shared

To help you get started, we’ve selected a few @mojaloop/central-services-shared examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github mojaloop / central-ledger / src / handlers / timeouts / handler.js View on Github external
try {
    const timeoutSegment = await TimeoutService.getTimeoutSegment()
    const intervalMin = timeoutSegment ? timeoutSegment.value : 0
    const segmentId = timeoutSegment ? timeoutSegment.segmentId : 0
    const cleanup = await TimeoutService.cleanupTransferTimeout()
    const latestTransferStateChange = await TimeoutService.getLatestTransferStateChange()
    const intervalMax = (latestTransferStateChange && parseInt(latestTransferStateChange.transferStateChangeId)) || 0
    const result = await TimeoutService.timeoutExpireReserved(segmentId, intervalMin, intervalMax)
    const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED).toApiErrorObject(Config.ERROR_HANDLING)
    if (!Array.isArray(result)) {
      result[0] = result
    }
    for (let i = 0; i < result.length; i++) {
      const span = EventSdk.Tracer.createSpan('cl_transfer_timeout')
      try {
        const state = Utility.StreamingProtocol.createEventState(Enum.Events.EventStatus.FAILURE.status, fspiopError.errorInformation.errorCode, fspiopError.errorInformation.errorDescription)
        const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(result[i].transferId, Enum.Kafka.Topics.NOTIFICATION, Enum.Events.Event.Action.TIMEOUT_RECEIVED, state)
        const headers = Utility.Http.SwitchDefaultHeaders(result[i].payerFsp, Enum.Http.HeaderResources.TRANSFERS, Enum.Http.Headers.FSPIOP.SWITCH.value)
        const message = Utility.StreamingProtocol.createMessage(result[i].transferId, result[i].payeeFsp, result[i].payerFsp, metadata, headers, fspiopError, { id: result[i].transferId }, 'application/vnd.interoperability.transfers+json;version=1.0')
        span.setTags(Utility.EventFramework.getTransferSpanTags({ payload: message.content.payload, headers }, Enum.Events.Event.Type.TRANSFER, Enum.Events.Event.Action.TIMEOUT_RECEIVED))
        await span.audit({
          state,
          metadata,
          headers,
          message
        }, EventSdk.AuditEventAction.start)
        if (result[i].bulkTransferId === null) { // regular transfer
          if (result[i].transferStateId === Enum.Transfers.TransferInternalState.EXPIRED_PREPARED) {
            message.to = message.from
            message.from = Enum.Http.Headers.FSPIOP.SWITCH.value
            // event & type set above when `const metadata` is initialized to NOTIFICATION / TIMEOUT_RECEIVED
            await Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Kafka.Topics.NOTIFICATION, Enum.Events.Event.Action.TIMEOUT_RECEIVED, message, state, null, span)
github mojaloop / central-ledger / src / handlers / timeouts / handler.js View on Github external
const timeoutSegment = await TimeoutService.getTimeoutSegment()
    const intervalMin = timeoutSegment ? timeoutSegment.value : 0
    const segmentId = timeoutSegment ? timeoutSegment.segmentId : 0
    const cleanup = await TimeoutService.cleanupTransferTimeout()
    const latestTransferStateChange = await TimeoutService.getLatestTransferStateChange()
    const intervalMax = (latestTransferStateChange && parseInt(latestTransferStateChange.transferStateChangeId)) || 0
    const result = await TimeoutService.timeoutExpireReserved(segmentId, intervalMin, intervalMax)
    const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED).toApiErrorObject(Config.ERROR_HANDLING)
    if (!Array.isArray(result)) {
      result[0] = result
    }
    for (let i = 0; i < result.length; i++) {
      const span = EventSdk.Tracer.createSpan('cl_transfer_timeout')
      try {
        const state = Utility.StreamingProtocol.createEventState(Enum.Events.EventStatus.FAILURE.status, fspiopError.errorInformation.errorCode, fspiopError.errorInformation.errorDescription)
        const metadata = Utility.StreamingProtocol.createMetadataWithCorrelatedEvent(result[i].transferId, Enum.Kafka.Topics.NOTIFICATION, Enum.Events.Event.Action.TIMEOUT_RECEIVED, state)
        const headers = Utility.Http.SwitchDefaultHeaders(result[i].payerFsp, Enum.Http.HeaderResources.TRANSFERS, Enum.Http.Headers.FSPIOP.SWITCH.value)
        const message = Utility.StreamingProtocol.createMessage(result[i].transferId, result[i].payeeFsp, result[i].payerFsp, metadata, headers, fspiopError, { id: result[i].transferId }, 'application/vnd.interoperability.transfers+json;version=1.0')
        span.setTags(Utility.EventFramework.getTransferSpanTags({ payload: message.content.payload, headers }, Enum.Events.Event.Type.TRANSFER, Enum.Events.Event.Action.TIMEOUT_RECEIVED))
        await span.audit({
          state,
          metadata,
          headers,
          message
        }, EventSdk.AuditEventAction.start)
        if (result[i].bulkTransferId === null) { // regular transfer
          if (result[i].transferStateId === Enum.Transfers.TransferInternalState.EXPIRED_PREPARED) {
            message.to = message.from
            message.from = Enum.Http.Headers.FSPIOP.SWITCH.value
            // event & type set above when `const metadata` is initialized to NOTIFICATION / TIMEOUT_RECEIVED
            await Kafka.produceGeneralMessage(Config.KAFKA_CONFIG, Producer, Enum.Kafka.Topics.NOTIFICATION, Enum.Events.Event.Action.TIMEOUT_RECEIVED, message, state, null, span)
          } else if (result[i].transferStateId === Enum.Transfers.TransferInternalState.RESERVED_TIMEOUT) {
github mojaloop / central-ledger / src / handlers / bulk / prepare / handler.js View on Github external
while ((doc = await streamReader.readAsync()) !== null) {
          const individualTransfer = doc.payload
          individualTransfer.payerFsp = payload.payerFsp
          individualTransfer.payeeFsp = payload.payeeFsp
          individualTransfer.amount = individualTransfer.transferAmount
          delete individualTransfer.transferAmount
          individualTransfer.expiration = payload.expiration
          const bulkTransferAssociationRecord = {
            transferId: individualTransfer.transferId,
            bulkTransferId: payload.bulkTransferId,
            bulkProcessingStateId: Enum.Transfers.BulkProcessingState.RECEIVED
          }
          await BulkTransferService.bulkTransferAssociationCreate(bulkTransferAssociationRecord)
          const dataUri = encodePayload(JSON.stringify(individualTransfer), headers[Enum.Http.Headers.GENERAL.CONTENT_TYPE.value])
          const metadata = Util.StreamingProtocol.createMetadataWithCorrelatedEventState(message.value.metadata.event.id, Enum.Events.Event.Type.TRANSFER, Enum.Events.Event.Action.BULK_PREPARE, Enum.Events.EventStatus.SUCCESS.status, Enum.Events.EventStatus.SUCCESS.code, Enum.Events.EventStatus.SUCCESS.description)
          const msg = {
            value: Util.StreamingProtocol.createMessage(messageId, headers[Enum.Http.Headers.FSPIOP.DESTINATION], headers[Enum.Http.Headers.FSPIOP.SOURCE], metadata, headers, dataUri)
          }
          params = { message: msg, kafkaTopic, consumer: Consumer, producer: Producer }
          const eventDetail = { functionality: Enum.Events.Event.Type.PREPARE, action: Enum.Events.Event.Action.BULK_PREPARE }
          await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail })
          histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
        }
      } catch (err) { // TODO: handle individual transfers streaming error
        Logger.info(Util.breadcrumb(location, `callbackErrorInternal2--${actionLetter}6`))
        Logger.error(Util.breadcrumb(location, 'notImplemented'))
        return true
      }
    } else { // TODO: handle validation failure
      Logger.error(Util.breadcrumb(location, { path: 'validationFailed' }))
      try {
github mojaloop / central-ledger / src / handlers / bulk / fulfil / handler.js View on Github external
transferId,
            bulkTransferId: payload.bulkTransferId,
            bulkProcessingStateId: Enum.Transfers.BulkProcessingState.PROCESSING
          }
          await BulkTransferService.bulkTransferAssociationUpdate(transferId, bulkTransferId, bulkTransferAssociationRecord)
          if (state === Enum.Transfers.BulkTransferState.INVALID ||
            individualTransferFulfil.errorInformation ||
            !individualTransferFulfil.fulfilment) {
            individualTransferFulfil.transferState = Enum.Transfers.TransferState.ABORTED
          } else {
            individualTransferFulfil.transferState = Enum.Transfers.TransferState.COMMITTED
          }
          const dataUri = encodePayload(JSON.stringify(individualTransferFulfil), headers[Enum.Http.Headers.GENERAL.CONTENT_TYPE.value])
          const metadata = Util.StreamingProtocol.createMetadataWithCorrelatedEventState(message.value.metadata.event.id, Enum.Events.Event.Type.FULFIL, Enum.Events.Event.Action.COMMIT, Enum.Events.EventStatus.SUCCESS.status, Enum.Events.EventStatus.SUCCESS.code, Enum.Events.EventStatus.SUCCESS.description) // TODO: switch action to 'bulk-fulfil' flow
          const msg = {
            value: Util.StreamingProtocol.createMessage(messageId, headers[Enum.Http.Headers.FSPIOP.DESTINATION], headers[Enum.Http.Headers.FSPIOP.SOURCE], metadata, headers, dataUri, { id: transferId })
          }
          params = { message: msg, kafkaTopic, consumer: Consumer, producer: Producer }
          const eventDetail = { functionality: Enum.Events.Event.Type.FULFIL, action: Enum.Events.Event.Action.BULK_COMMIT }
          await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, histTimerEnd, eventDetail })
          histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
        }
      } catch (err) { // TODO: handle individual transfers streaming error
        Logger.info(Util.breadcrumb(location, `callbackErrorInternal2--${actionLetter}6`))
        Logger.error(Util.breadcrumb(location, 'notImplemented'))
        return true
      }
    } else { // TODO: handle validation failure
      Logger.error(Util.breadcrumb(location, { path: 'validationFailed' }))
      try {
        Logger.info(Util.breadcrumb(location, 'saveInvalidRequest'))
        /**
github mojaloop / central-ledger / src / handlers / bulk / prepare / handler.js View on Github external
const individualTransfer = doc.payload
          individualTransfer.payerFsp = payload.payerFsp
          individualTransfer.payeeFsp = payload.payeeFsp
          individualTransfer.amount = individualTransfer.transferAmount
          delete individualTransfer.transferAmount
          individualTransfer.expiration = payload.expiration
          const bulkTransferAssociationRecord = {
            transferId: individualTransfer.transferId,
            bulkTransferId: payload.bulkTransferId,
            bulkProcessingStateId: Enum.Transfers.BulkProcessingState.RECEIVED
          }
          await BulkTransferService.bulkTransferAssociationCreate(bulkTransferAssociationRecord)
          const dataUri = encodePayload(JSON.stringify(individualTransfer), headers[Enum.Http.Headers.GENERAL.CONTENT_TYPE.value])
          const metadata = Util.StreamingProtocol.createMetadataWithCorrelatedEventState(message.value.metadata.event.id, Enum.Events.Event.Type.TRANSFER, Enum.Events.Event.Action.BULK_PREPARE, Enum.Events.EventStatus.SUCCESS.status, Enum.Events.EventStatus.SUCCESS.code, Enum.Events.EventStatus.SUCCESS.description)
          const msg = {
            value: Util.StreamingProtocol.createMessage(messageId, headers[Enum.Http.Headers.FSPIOP.DESTINATION], headers[Enum.Http.Headers.FSPIOP.SOURCE], metadata, headers, dataUri)
          }
          params = { message: msg, kafkaTopic, consumer: Consumer, producer: Producer }
          const eventDetail = { functionality: Enum.Events.Event.Type.PREPARE, action: Enum.Events.Event.Action.BULK_PREPARE }
          await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, eventDetail })
          histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
        }
      } catch (err) { // TODO: handle individual transfers streaming error
        Logger.info(Util.breadcrumb(location, `callbackErrorInternal2--${actionLetter}6`))
        Logger.error(Util.breadcrumb(location, 'notImplemented'))
        return true
      }
    } else { // TODO: handle validation failure
      Logger.error(Util.breadcrumb(location, { path: 'validationFailed' }))
      try {
        Logger.info(Util.breadcrumb(location, 'saveInvalidRequest'))
        await BulkTransferService.bulkPrepare(payload, { payerParticipantId, payeeParticipantId }, reasons.toString(), false)
github mojaloop / central-ledger / src / handlers / bulk / processing / handler.js View on Github external
const payerPayload = Util.omitNil({
          bulkTransferId: payerBulkResponse.bulkTransferId,
          bulkTransferState: payerBulkResponse.bulkTransferState,
          completedTimestamp: payerBulkResponse.completedTimestamp,
          extensionList: payerBulkResponse.extensionList
        })
        const payerMetadata = Util.StreamingProtocol.createMetadataWithCorrelatedEvent(params.message.value.metadata.event.id, payerParams.message.value.metadata.type, payerParams.message.value.metadata.action, Enum.Events.EventStatus.SUCCESS)
        payerParams.message.value = Util.StreamingProtocol.createMessage(params.message.value.id, participants.payerFsp, payerBulkResponse.headers[normalizedKeys[Enum.Http.Headers.FSPIOP.SOURCE]], payerMetadata, payerBulkResponse.headers, payerPayload)
        const payeePayload = Util.omitNil({
          bulkTransferId: payeeBulkResponse.bulkTransferId,
          bulkTransferState: payeeBulkResponse.bulkTransferState,
          completedTimestamp: payeeBulkResponse.completedTimestamp,
          extensionList: payeeBulkResponse.extensionList
        })
        const payeeMetadata = Util.StreamingProtocol.createMetadataWithCorrelatedEvent(params.message.value.metadata.event.id, payeeParams.message.value.metadata.type, payeeParams.message.value.metadata.action, Enum.Events.EventStatus.SUCCESS)
        payeeParams.message.value = Util.StreamingProtocol.createMessage(params.message.value.id, participants.payeeFsp, Enum.Http.Headers.FSPIOP.SWITCH.value, payeeMetadata, payeeBulkResponse.headers, payeePayload)
        if ([Enum.Events.Event.Action.BULK_TIMEOUT_RECEIVED, Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED].includes(action)) {
          eventDetail.action = Enum.Events.Event.Action.BULK_COMMIT
        }
        await Kafka.proceed(Config.KAFKA_CONFIG, payerParams, { consumerCommit, eventDetail })
        histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
        await Kafka.proceed(Config.KAFKA_CONFIG, payeeParams, { consumerCommit, eventDetail })
        histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
        return true
      } else if (eventType === Enum.Events.Event.Type.BULK_PROCESSING && [Enum.Events.Event.Action.BULK_TIMEOUT_RECEIVED, Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED].includes(action)) {
        const fspiopError = ErrorHandler.Factory.createFSPIOPError(ErrorHandler.Enums.FSPIOPErrorCodes.TRANSFER_EXPIRED, null, null, null, payload.extensionList)
        eventDetail.action = Enum.Events.Event.Action.BULK_ABORT
        params.message.value.content.uriParams.id = bulkTransferInfo.bulkTransferId
        await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, fspiopError: fspiopError.toApiErrorObject(Config.ERROR_HANDLING), eventDetail })
        throw fspiopError
      } else {
        // TODO: For the following (Internal Server Error) scenario a notification is produced for each individual transfer.
github mojaloop / central-ledger / src / handlers / bulk / fulfil / handler.js View on Github external
delete individualTransferFulfil.transferId
          const bulkTransferAssociationRecord = {
            transferId,
            bulkTransferId: payload.bulkTransferId,
            bulkProcessingStateId: Enum.Transfers.BulkProcessingState.PROCESSING
          }
          await BulkTransferService.bulkTransferAssociationUpdate(transferId, bulkTransferId, bulkTransferAssociationRecord)
          if (state === Enum.Transfers.BulkTransferState.INVALID ||
            individualTransferFulfil.errorInformation ||
            !individualTransferFulfil.fulfilment) {
            individualTransferFulfil.transferState = Enum.Transfers.TransferState.ABORTED
          } else {
            individualTransferFulfil.transferState = Enum.Transfers.TransferState.COMMITTED
          }
          const dataUri = encodePayload(JSON.stringify(individualTransferFulfil), headers[Enum.Http.Headers.GENERAL.CONTENT_TYPE.value])
          const metadata = Util.StreamingProtocol.createMetadataWithCorrelatedEventState(message.value.metadata.event.id, Enum.Events.Event.Type.FULFIL, Enum.Events.Event.Action.COMMIT, Enum.Events.EventStatus.SUCCESS.status, Enum.Events.EventStatus.SUCCESS.code, Enum.Events.EventStatus.SUCCESS.description) // TODO: switch action to 'bulk-fulfil' flow
          const msg = {
            value: Util.StreamingProtocol.createMessage(messageId, headers[Enum.Http.Headers.FSPIOP.DESTINATION], headers[Enum.Http.Headers.FSPIOP.SOURCE], metadata, headers, dataUri, { id: transferId })
          }
          params = { message: msg, kafkaTopic, consumer: Consumer, producer: Producer }
          const eventDetail = { functionality: Enum.Events.Event.Type.FULFIL, action: Enum.Events.Event.Action.BULK_COMMIT }
          await Kafka.proceed(Config.KAFKA_CONFIG, params, { consumerCommit, histTimerEnd, eventDetail })
          histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
        }
      } catch (err) { // TODO: handle individual transfers streaming error
        Logger.info(Util.breadcrumb(location, `callbackErrorInternal2--${actionLetter}6`))
        Logger.error(Util.breadcrumb(location, 'notImplemented'))
        return true
      }
    } else { // TODO: handle validation failure
      Logger.error(Util.breadcrumb(location, { path: 'validationFailed' }))
      try {
github mojaloop / central-ledger / src / handlers / bulk / processing / handler.js View on Github external
delete payeeBulkResponseHeaders[normalizedKeys[Enum.Http.Headers.FSPIOP.SIGNATURE]]
        const payerBulkResponse = Object.assign({}, { messageId: message.value.id, headers: Util.clone(headers) }, getBulkTransferByIdResult.payerBulkTransfer)
        const payeeBulkResponse = Object.assign({}, { messageId: message.value.id, headers: payeeBulkResponseHeaders }, getBulkTransferByIdResult.payeeBulkTransfer)
        const BulkTransferResultModel = BulkTransferModels.getBulkTransferResultModel()
        await (new BulkTransferResultModel(payerBulkResponse)).save()
        await (new BulkTransferResultModel(payeeBulkResponse)).save()
        const payerParams = Util.clone(params)
        const payeeParams = Util.clone(params)

        const payerPayload = Util.omitNil({
          bulkTransferId: payerBulkResponse.bulkTransferId,
          bulkTransferState: payerBulkResponse.bulkTransferState,
          completedTimestamp: payerBulkResponse.completedTimestamp,
          extensionList: payerBulkResponse.extensionList
        })
        const payerMetadata = Util.StreamingProtocol.createMetadataWithCorrelatedEvent(params.message.value.metadata.event.id, payerParams.message.value.metadata.type, payerParams.message.value.metadata.action, Enum.Events.EventStatus.SUCCESS)
        payerParams.message.value = Util.StreamingProtocol.createMessage(params.message.value.id, participants.payerFsp, payerBulkResponse.headers[normalizedKeys[Enum.Http.Headers.FSPIOP.SOURCE]], payerMetadata, payerBulkResponse.headers, payerPayload)
        const payeePayload = Util.omitNil({
          bulkTransferId: payeeBulkResponse.bulkTransferId,
          bulkTransferState: payeeBulkResponse.bulkTransferState,
          completedTimestamp: payeeBulkResponse.completedTimestamp,
          extensionList: payeeBulkResponse.extensionList
        })
        const payeeMetadata = Util.StreamingProtocol.createMetadataWithCorrelatedEvent(params.message.value.metadata.event.id, payeeParams.message.value.metadata.type, payeeParams.message.value.metadata.action, Enum.Events.EventStatus.SUCCESS)
        payeeParams.message.value = Util.StreamingProtocol.createMessage(params.message.value.id, participants.payeeFsp, Enum.Http.Headers.FSPIOP.SWITCH.value, payeeMetadata, payeeBulkResponse.headers, payeePayload)
        if ([Enum.Events.Event.Action.BULK_TIMEOUT_RECEIVED, Enum.Events.Event.Action.BULK_TIMEOUT_RESERVED].includes(action)) {
          eventDetail.action = Enum.Events.Event.Action.BULK_COMMIT
        }
        await Kafka.proceed(Config.KAFKA_CONFIG, payerParams, { consumerCommit, eventDetail })
        histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })
        await Kafka.proceed(Config.KAFKA_CONFIG, payeeParams, { consumerCommit, eventDetail })
        histTimerEnd({ success: true, fspId: Config.INSTRUMENTATION_METRICS_LABELS.fspId })