How to use streaming-iterables - 10 common examples

To help you get started, we’ve selected a few streaming-iterables examples, based on popular ways it is used in public projects.

Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.

github ipfs / interface-js-ipfs-core / src / pubsub / subscribe.js View on Github external
await Promise.all([
          ipfs1.pubsub.subscribe(topic, sub),
          ipfs2.pubsub.subscribe(topic, () => {})
        ])

        await waitForPeers(ipfs1, topic, [ipfs2.peerId.id], 30000)

        const startTime = new Date().getTime()

        for (let i = 0; i < count; i++) {
          const msgData = Buffer.from(msgBase + i)
          await ipfs2.pubsub.publish(topic, msgData)
        }

        const msgs = await collect(msgStream)
        const duration = new Date().getTime() - startTime
        const opsPerSec = Math.floor(count / (duration / 1000))

        // eslint-disable-next-line
        console.log(`Send/Receive 100 messages took: ${duration} ms, ${opsPerSec} ops / s`)

        msgs.forEach(msg => {
          expect(msg.from).to.eql(ipfs2.peerId.id)
          expect(msg.data.toString().startsWith(msgBase)).to.be.true()
        })
      })
    })
github libp2p / js-libp2p / src / identify / index.js View on Github external
async _handlePush ({ connection, stream }) {
    const [data] = await pipe(
      stream,
      lp.decode(),
      take(1),
      toBuffer,
      collect
    )

    let message
    try {
      message = Message.decode(data)
    } catch (err) {
      return log.error('received invalid message', err)
    }

    // Update the listen addresses
    const peerInfo = new PeerInfo(connection.remotePeer)

    try {
      IdentifyService.updatePeerAddresses(peerInfo, message.listenAddrs)
github ipfs / interface-js-ipfs-core / src / pubsub / subscribe.js View on Github external
const handler2 = msg => {
          msgStream2.push(msg)
          msgStream2.end()
        }

        await Promise.all([
          ipfs1.pubsub.subscribe(topic, handler1),
          ipfs1.pubsub.subscribe(topic, handler2)
        ])

        await ipfs1.pubsub.publish(topic, Buffer.from('hello'))

        const [handler1Msg] = await collect(msgStream1)
        expect(handler1Msg.data.toString()).to.eql('hello')

        const [handler2Msg] = await collect(msgStream2)
        expect(handler2Msg.data.toString()).to.eql('hello')

        await ipfs1.pubsub.unsubscribe(topic, handler1)
        await delay(100)

        // Still subscribed as there is one listener left
        expect(await ipfs1.pubsub.ls()).to.eql([topic])

        await ipfs1.pubsub.unsubscribe(topic, handler2)
        await delay(100)

        // Now all listeners are gone no subscription anymore
        expect(await ipfs1.pubsub.ls()).to.eql([])
      })
github ipfs / js-ipfs / src / core / components / block / rm.js View on Github external
return async function * rm (cids, options) {
    options = options || {}

    if (!Array.isArray(cids)) {
      cids = [cids]
    }

    // We need to take a write lock here to ensure that adding and removing
    // blocks are exclusive operations
    const release = await gcLock.writeLock()

    try {
      yield * pipe(
        cids,
        parallelMap(BLOCK_RM_CONCURRENCY, async cid => {
          cid = cleanCid(cid)

          const result = { hash: cid.toString() }

          try {
            const pinResult = await pinManager.isPinnedWithType(cid, PinTypes.all)

            if (pinResult.pinned) {
              if (CID.isCID(pinResult.reason)) { // eslint-disable-line max-depth
                throw errCode(new Error(`pinned via ${pinResult.reason}`))
              }

              throw errCode(new Error(`pinned: ${pinResult.reason}`))
            }

            // remove has check when https://github.com/ipfs/js-ipfs-block-service/pull/88 is merged
github ipfs / js-ipfs / src / core / components / pin / rm.js View on Github external
options = options || {}

    const recursive = options.recursive !== false

    if (options.cidBase && !multibase.names.includes(options.cidBase)) {
      throw errCode(new Error('invalid multibase'), 'ERR_INVALID_MULTIBASE')
    }

    const cids = await resolvePath(object, paths)
    const release = await gcLock.readLock()

    try {
      // verify that each hash can be unpinned
      const results = await pipe(
        cids,
        parallelMap(PIN_RM_CONCURRENCY, async cid => {
          const res = await pinManager.isPinnedWithType(cid, PinTypes.all)

          const { pinned, reason } = res
          const key = cid.toBaseEncodedString()

          if (!pinned) {
            throw new Error(`${key} is not pinned`)
          }
          if (reason !== PinTypes.recursive && reason !== PinTypes.direct) {
            throw new Error(`${key} is pinned indirectly under ${reason}`)
          }
          if (reason === PinTypes.recursive && !recursive) {
            throw new Error(`${key} is pinned recursively`)
          }

          return key
github ipfs / js-ipfs / src / core / components / pin / ls.js View on Github external
if (typeof options.type === 'string') {
        type = options.type.toLowerCase()
      }
      const err = PinManager.checkPinType(type)
      if (err) {
        throw err
      }
    }

    if (paths) {
      paths = Array.isArray(paths) ? paths : [paths]

      // check the pinned state of specific hashes
      const cids = await resolvePath(object, paths)

      yield * parallelMap(PIN_LS_CONCURRENCY, async cid => {
        const { reason, pinned } = await pinManager.isPinnedWithType(cid, type)

        if (!pinned) {
          throw new Error(`path '${paths[cids.indexOf(cid)]}' is not pinned`)
        }

        if (reason === PinTypes.direct || reason === PinTypes.recursive) {
          return { cid, type: reason }
        }

        return { cid, type: `${PinTypes.indirect} through ${reason}` }
      }, cids)

      return
    }
github ipfs / js-ipfs / src / core / components / repo / gc.js View on Github external
try {
        await repo.blocks.delete(cid)
        removedBlocksCount++
      } catch (err) {
        res.err = new Error(`Could not delete block with CID ${cid}: ${err.message}`)
      }

      return res
    } catch (err) {
      const msg = `Could not convert block with key '${k}' to CID`
      log(msg, err)
      return { err: new Error(msg + `: ${err.message}`) }
    }
  }

  for await (const res of transform(BLOCK_RM_CONCURRENCY, removeBlock, blockKeys)) {
    // filter nulls (blocks that were retained)
    if (res) yield res
  }

  log(`Marked set has ${markedSet.size} unique blocks. Blockstore has ${blocksCount} blocks. ` +
  `Deleted ${removedBlocksCount} blocks.`)
}
github libp2p / js-interfaces / src / stream-muxer / spawner.js View on Github external
const listener = new Muxer(async stream => {
    expect(stream).to.exist // eslint-disable-line
    check()

    await pipe(
      stream,
      tap(chunk => check()),
      consume
    )

    check()
    pipe([], stream)
  })
github libp2p / js-libp2p-websockets / test / browser.js View on Github external
it('many writes', async function () {
      this.timeout(10000)
      const s = goodbye({
        source: pipe(
          {
            [Symbol.iterator] () { return this },
            next: () => ({ done: false, value: Buffer.from(Math.random().toString()) })
          },
          take(20000)
        ),
        sink: collect
      })

      const result = await pipe(s, conn, s)
      expect(result).to.have.length(20000)
    })
  })
github libp2p / js-libp2p-tcp / test / listen-dial.spec.js View on Github external
listener = tcp.createListener((conn) => {
      pipe(
        conn,
        map((x) => Buffer.from(x.toString() + '!')),
        conn
      )
    })
    await listener.listen(ma)

streaming-iterables

A collection of utilities for async iterables. Designed to replace your streams.

MIT
Latest version published 1 year ago

Package Health Score

67 / 100
Full package analysis