Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
const mss = new Multistream.Dialer(muxedStream)
try {
const { stream, protocol } = await mss.select(protocols)
return { stream: { ...muxedStream, ...stream }, protocol }
} catch (err) {
throw errCode(err, codes.ERR_UNSUPPORTED_PROTOCOL)
}
}
// Pipe all data through the muxer
pipe(maConn, muxer, maConn)
maConn.timeline.upgraded = Date.now()
// Create the connection
const connection = new Connection({
localAddr: maConn.localAddr,
remoteAddr: maConn.remoteAddr,
localPeer: localPeer,
remotePeer: remotePeer,
stat: {
direction,
timeline: maConn.timeline,
multiplexer: Muxer.multicodec,
encryption: 'N/A'
},
newStream,
getStreams: () => muxer.streams,
close: err => maConn.close(err)
})
return connection
], (err) => {
if (err) {
// we don't return the error here,
// since multistream select don't expect one
callback()
return log(err)
}
const peerInfo = new PeerInfo(this.utils.peerIdFromId(msg.srcPeer.id))
msg.srcPeer.addrs.forEach((addr) => peerInfo.multiaddrs.add(addr))
const newConn = new Connection(sh.rest())
newConn.setPeerInfo(peerInfo)
setImmediate(() => this.emit('connection', newConn))
callback(newConn)
})
}
// Pipe all data through the muxer
pipe(muxedConnection, muxer, muxedConnection)
maConn.timeline.upgraded = Date.now()
const timelineProxy = new Proxy(maConn.timeline, {
set: (...args) => {
if (args[1] === 'close' && args[2]) {
this.onConnectionEnd(connection)
}
return Reflect.set(...args)
}
})
// Create the connection
const connection = new Connection({
localAddr: maConn.localAddr,
remoteAddr: maConn.remoteAddr,
localPeer: this.localPeer,
remotePeer: remotePeer,
stat: {
direction,
timeline: timelineProxy,
multiplexer: Muxer.multicodec,
encryption: cryptoProtocol
},
newStream,
getStreams: () => muxer.streams,
close: err => maConn.close(err)
})
this.onConnection(connection)
log('work:cancel')
// clean up already done dials
pull(empty(), conn)
// If we can close the connection, do it
if (typeof conn.close === 'function') {
return conn.close((_) => callback(null))
}
return callback(null)
}
// one is enough
token.cancel = true
log('work:success')
const proxyConn = new Connection()
proxyConn.setInnerConn(conn)
callback(null, { multiaddr: addr, conn: conn })
})
}
}
}))
log('write pubkey exchange to peer %j', remoteId)
// Get the Exchange message
const response = (await lp.decode.fromReader(shake.reader).next()).value
const id = Exchange.decode(response.slice())
log('read pubkey exchange from peer %j', remoteId)
let peerId
try {
peerId = await PeerId.createFromPubKey(id.pubkey.Data)
} catch (err) {
log.error(err)
throw new InvalidCryptoExchangeError('Remote did not provide its public key')
}
if (remoteId && !peerId.isEqual(remoteId)) {
throw new UnexpectedPeerError()
}
log('plaintext key exchange completed successfully with peer %j', peerId)
shake.rest()
return {
conn: shake.stream,
remotePeer: peerId
}
}
// Get the Exchange message
const response = (await lp.decode.fromReader(shake.reader).next()).value
const id = Exchange.decode(response.slice())
log('read pubkey exchange from peer %j', remoteId)
let peerId
try {
peerId = await PeerId.createFromPubKey(id.pubkey.Data)
} catch (err) {
log.error(err)
throw new InvalidCryptoExchangeError('Remote did not provide its public key')
}
if (remoteId && !peerId.isEqual(remoteId)) {
throw new UnexpectedPeerError()
}
log('plaintext key exchange completed successfully with peer %j', peerId)
shake.rest()
return {
conn: shake.stream,
remotePeer: peerId
}
}
register (topology) {
assert(
Topology.isTopology(topology),
'topology must be an instance of interfaces/topology')
// Create topology
const id = (parseInt(Math.random() * 1e9)).toString(36) + Date.now()
this.topologies.set(id, topology)
// Set registrar
topology.registrar = this
return id
}
const onAbort = () => {
deferred.reject(new AbortError())
signal.removeEventListener('abort', onAbort)
}
signal.addEventListener('abort', onAbort)
const onAbort = () => {
options.signal.removeEventListener('abort', onAbort)
deferredDial.reject(new AbortError())
}
options.signal.addEventListener('abort', onAbort)
setPeerInfo.call(connection, pi)
resolve(pi)
}
})
})
const stream = {
source: pull(
connection,
observer.incoming(transport, protocol, peerInfo)),
sink: pull(
observer.outgoing(transport, protocol, peerInfo),
connection)
}
return new Connection(stream, connection)
}