Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
onStream: async muxedStream => {
const mss = new Multistream.Listener(muxedStream)
try {
const { stream, protocol } = await mss.handle(Array.from(protocols.keys()))
connection.addStream(stream, protocol)
// Need to be able to notify a peer of this this._onStream({ connection, stream, protocol })
const handler = protocols.get(protocol)
handler({ connection, stream, protocol })
} catch (err) {
// Do nothing
}
},
// Run anytime a stream closes
constructor ({ connection, _switch, transportKey, peerInfo }) {
super({
_switch,
name: `inc:${_switch._peerInfo.id.toB58String().slice(0, 8)}`
})
this.conn = connection
this.theirPeerInfo = peerInfo || null
this.theirB58Id = this.theirPeerInfo ? this.theirPeerInfo.id.toB58String() : null
this.ourPeerInfo = this.switch._peerInfo
this.transportKey = transportKey
this.protocolMuxer = this.switch.protocolMuxer(this.transportKey)
this.msListener = new multistream.Listener()
this._state = FSM('DIALED', {
DISCONNECTED: {
disconnect: 'DISCONNECTED'
},
DIALED: { // Base connection to peer established
privatize: 'PRIVATIZING',
encrypt: 'ENCRYPTING'
},
PRIVATIZING: { // Protecting the base connection
done: 'PRIVATIZED',
disconnect: 'DISCONNECTING'
},
PRIVATIZED: { // Base connection is protected
encrypt: 'ENCRYPTING'
},
onStream: async muxedStream => {
const mss = new Multistream.Listener(muxedStream)
try {
const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys()))
log('%s: incoming stream opened on %s', direction, protocol)
connection.addStream(stream, protocol)
this._onStream({ connection, stream, protocol })
} catch (err) {
log.error(err)
}
},
// Run anytime a stream closes
return (transport) => (_parentConn, msListener) => {
const ms = msListener || new multistream.Listener()
let parentConn
// Only observe the transport if we have one, and there is not already a listener
if (transport && !msListener) {
parentConn = observeConn(transport, null, _parentConn, observer)
} else {
parentConn = _parentConn
}
Object.keys(protocols).forEach((protocol) => {
if (!protocol) {
return
}
const handler = (protocolName, _conn) => {
log('registering handler with protocol %s', protocolName)
async _multiplexInbound (connection, muxers) {
const listener = new Multistream.Listener(connection)
const protocols = Array.from(muxers.keys())
log('inbound handling muxers %s', protocols)
try {
const { stream, protocol } = await listener.handle(protocols)
const Muxer = muxers.get(protocol)
return { stream, Muxer }
} catch (err) {
throw errCode(err, codes.ERR_MUXER_UNAVAILABLE)
}
}
}
async _encryptInbound (localPeer, connection, cryptos) {
const mss = new Multistream.Listener(connection)
const protocols = Array.from(cryptos.keys())
log('handling inbound crypto protocol selection', protocols)
try {
const { stream, protocol } = await mss.handle(protocols)
const crypto = cryptos.get(protocol)
log('encrypting inbound connection...')
return {
...await crypto.secureInbound(localPeer, stream),
protocol
}
} catch (err) {
throw errCode(err, codes.ERR_ENCRYPTION_FAILED)
}
}
onStream: async muxedStream => {
const mss = new MSS.Listener(muxedStream)
const { stream, protocol } = await mss.handle(this._protocols)
log('outbound: new stream requested %s', protocol)
connection.addStream(stream, protocol)
this._onStream({ stream, protocol })
},
onStreamEnd: muxedStream => {
async function encryptInbound (localPeer, connection, cryptos) {
const mss = new MSS.Listener(connection)
const { stream, protocol } = await mss.handle(Array.from(cryptos.keys()))
const crypto = cryptos.get(protocol)
log('encrypting inbound connection...')
const cryptoResponse = await crypto.secureInbound(localPeer, stream)
if (cryptoResponse) return cryptoResponse
throw new Error('All encryption failed')
}
async function multiplexInbound (connection, muxers) {
const listener = new MSS.Listener(connection)
const protocols = Array.from(muxers.keys())
log('inbound handling muxers %s', protocols)
const { stream, protocol } = await listener.handle(protocols)
const Muxer = muxers.get(protocol)
if (stream) return { stream, Muxer }
throw new Error('All muxing failed')
}