Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
await Promise.all([
ipfs1.pubsub.subscribe(topic, sub),
ipfs2.pubsub.subscribe(topic, () => {})
])
await waitForPeers(ipfs1, topic, [ipfs2.peerId.id], 30000)
const startTime = new Date().getTime()
for (let i = 0; i < count; i++) {
const msgData = Buffer.from(msgBase + i)
await ipfs2.pubsub.publish(topic, msgData)
}
const msgs = await collect(msgStream)
const duration = new Date().getTime() - startTime
const opsPerSec = Math.floor(count / (duration / 1000))
// eslint-disable-next-line
console.log(`Send/Receive 100 messages took: ${duration} ms, ${opsPerSec} ops / s`)
msgs.forEach(msg => {
expect(msg.from).to.eql(ipfs2.peerId.id)
expect(msg.data.toString().startsWith(msgBase)).to.be.true()
})
})
})
async _handlePush ({ connection, stream }) {
const [data] = await pipe(
stream,
lp.decode(),
take(1),
toBuffer,
collect
)
let message
try {
message = Message.decode(data)
} catch (err) {
return log.error('received invalid message', err)
}
// Update the listen addresses
const peerInfo = new PeerInfo(connection.remotePeer)
try {
IdentifyService.updatePeerAddresses(peerInfo, message.listenAddrs)
const handler2 = msg => {
msgStream2.push(msg)
msgStream2.end()
}
await Promise.all([
ipfs1.pubsub.subscribe(topic, handler1),
ipfs1.pubsub.subscribe(topic, handler2)
])
await ipfs1.pubsub.publish(topic, Buffer.from('hello'))
const [handler1Msg] = await collect(msgStream1)
expect(handler1Msg.data.toString()).to.eql('hello')
const [handler2Msg] = await collect(msgStream2)
expect(handler2Msg.data.toString()).to.eql('hello')
await ipfs1.pubsub.unsubscribe(topic, handler1)
await delay(100)
// Still subscribed as there is one listener left
expect(await ipfs1.pubsub.ls()).to.eql([topic])
await ipfs1.pubsub.unsubscribe(topic, handler2)
await delay(100)
// Now all listeners are gone no subscription anymore
expect(await ipfs1.pubsub.ls()).to.eql([])
})
return async function * rm (cids, options) {
options = options || {}
if (!Array.isArray(cids)) {
cids = [cids]
}
// We need to take a write lock here to ensure that adding and removing
// blocks are exclusive operations
const release = await gcLock.writeLock()
try {
yield * pipe(
cids,
parallelMap(BLOCK_RM_CONCURRENCY, async cid => {
cid = cleanCid(cid)
const result = { hash: cid.toString() }
try {
const pinResult = await pinManager.isPinnedWithType(cid, PinTypes.all)
if (pinResult.pinned) {
if (CID.isCID(pinResult.reason)) { // eslint-disable-line max-depth
throw errCode(new Error(`pinned via ${pinResult.reason}`))
}
throw errCode(new Error(`pinned: ${pinResult.reason}`))
}
// remove has check when https://github.com/ipfs/js-ipfs-block-service/pull/88 is merged
options = options || {}
const recursive = options.recursive !== false
if (options.cidBase && !multibase.names.includes(options.cidBase)) {
throw errCode(new Error('invalid multibase'), 'ERR_INVALID_MULTIBASE')
}
const cids = await resolvePath(object, paths)
const release = await gcLock.readLock()
try {
// verify that each hash can be unpinned
const results = await pipe(
cids,
parallelMap(PIN_RM_CONCURRENCY, async cid => {
const res = await pinManager.isPinnedWithType(cid, PinTypes.all)
const { pinned, reason } = res
const key = cid.toBaseEncodedString()
if (!pinned) {
throw new Error(`${key} is not pinned`)
}
if (reason !== PinTypes.recursive && reason !== PinTypes.direct) {
throw new Error(`${key} is pinned indirectly under ${reason}`)
}
if (reason === PinTypes.recursive && !recursive) {
throw new Error(`${key} is pinned recursively`)
}
return key
if (typeof options.type === 'string') {
type = options.type.toLowerCase()
}
const err = PinManager.checkPinType(type)
if (err) {
throw err
}
}
if (paths) {
paths = Array.isArray(paths) ? paths : [paths]
// check the pinned state of specific hashes
const cids = await resolvePath(object, paths)
yield * parallelMap(PIN_LS_CONCURRENCY, async cid => {
const { reason, pinned } = await pinManager.isPinnedWithType(cid, type)
if (!pinned) {
throw new Error(`path '${paths[cids.indexOf(cid)]}' is not pinned`)
}
if (reason === PinTypes.direct || reason === PinTypes.recursive) {
return { cid, type: reason }
}
return { cid, type: `${PinTypes.indirect} through ${reason}` }
}, cids)
return
}
try {
await repo.blocks.delete(cid)
removedBlocksCount++
} catch (err) {
res.err = new Error(`Could not delete block with CID ${cid}: ${err.message}`)
}
return res
} catch (err) {
const msg = `Could not convert block with key '${k}' to CID`
log(msg, err)
return { err: new Error(msg + `: ${err.message}`) }
}
}
for await (const res of transform(BLOCK_RM_CONCURRENCY, removeBlock, blockKeys)) {
// filter nulls (blocks that were retained)
if (res) yield res
}
log(`Marked set has ${markedSet.size} unique blocks. Blockstore has ${blocksCount} blocks. ` +
`Deleted ${removedBlocksCount} blocks.`)
}
const listener = new Muxer(async stream => {
expect(stream).to.exist // eslint-disable-line
check()
await pipe(
stream,
tap(chunk => check()),
consume
)
check()
pipe([], stream)
})
it('many writes', async function () {
this.timeout(10000)
const s = goodbye({
source: pipe(
{
[Symbol.iterator] () { return this },
next: () => ({ done: false, value: Buffer.from(Math.random().toString()) })
},
take(20000)
),
sink: collect
})
const result = await pipe(s, conn, s)
expect(result).to.have.length(20000)
})
})
listener = tcp.createListener((conn) => {
pipe(
conn,
map((x) => Buffer.from(x.toString() + '!')),
conn
)
})
await listener.listen(ma)