Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
onStream: async muxedStream => {
const mss = new Multistream.Listener(muxedStream)
try {
const { stream, protocol } = await mss.handle(Array.from(protocols.keys()))
connection.addStream(stream, protocol)
// Need to be able to notify a peer of this this._onStream({ connection, stream, protocol })
const handler = protocols.get(protocol)
handler({ connection, stream, protocol })
} catch (err) {
// Do nothing
}
},
// Run anytime a stream closes
conn.dialStream(function (err, stream) {
if (err) {
return console.log(err)
}
var msi = new Interactive()
msi.handle(stream, function () {
msi.select('/ipfs/identify/1.0.0', function (err, ds) {
if (err) { return console.log(err) }
var identifyMsg = {}
identifyMsg = {}
identifyMsg.sender = exportPeer(peerSelf)
// TODO (daviddias) populate with the way I see the other peer
stream.write(JSON.stringify(identifyMsg))
var answer = ''
stream.on('data', function (chunk) {
answer = answer + chunk.toString()
})
_attemptMuxerUpgrade (connection, b58Id, callback, abort) {
const muxers = Object.keys(this.switch.muxers)
if (muxers.length === 0) {
return callback(new Error('no muxers available'))
}
const msDialer = new multistream.Dialer()
handleSafe(msDialer, connection, (err) => {
if (err) {
return callback(new Error('multistream not supported'))
}
// 1. try to handshake in one of the muxers available
// 2. if succeeds
// - add the muxedConn to the list of muxedConns
// - add incomming new streams to connHandler
const nextMuxer = (key) => {
log('selecting %s', key)
selectSafe(msDialer, key, (err, _conn) => {
if (err) {
if (muxers.length === 0) {
return callback(new Error('could not upgrade to stream muxing'))
}
_performProtocolHandshake (connection, callback) {
// If there is no protocol set yet, don't perform the handshake
if (!this.protocol) {
callback()
}
const msDialer = new multistream.Dialer()
handleSafe(msDialer, connection, (err) => {
if (err) {
return callback(err)
}
selectSafe(msDialer, this.protocol, (err, _conn) => {
if (err) {
log(`could not perform protocol handshake: `, err)
return callback(err)
}
const conn = observeConnection(null, this.protocol, _conn, this.switch.observer)
callback(null, conn)
}, callback)
}, callback)
}
}
_onUpgrading () {
const muxers = Object.keys(this.switch.muxers)
this.log('upgrading connection to %s', this.theirB58Id)
if (muxers.length === 0) {
return this._state('stop')
}
const msDialer = new multistream.Dialer()
msDialer.handle(this.conn, (err) => {
if (err) {
return this._didUpgrade(err)
}
// 1. try to handshake in one of the muxers available
// 2. if succeeds
// - add the muxedConn to the list of muxedConns
// - add incomming new streams to connHandler
const nextMuxer = (key) => {
this.log('selecting %s', key)
msDialer.select(key, (err, _conn) => {
if (err) {
if (muxers.length === 0) {
return this._didUpgrade(err)
}
constructor ({ connection, _switch, transportKey, peerInfo }) {
super({
_switch,
name: `inc:${_switch._peerInfo.id.toB58String().slice(0, 8)}`
})
this.conn = connection
this.theirPeerInfo = peerInfo || null
this.theirB58Id = this.theirPeerInfo ? this.theirPeerInfo.id.toB58String() : null
this.ourPeerInfo = this.switch._peerInfo
this.transportKey = transportKey
this.protocolMuxer = this.switch.protocolMuxer(this.transportKey)
this.msListener = new multistream.Listener()
this._state = FSM('DIALED', {
DISCONNECTED: {
disconnect: 'DISCONNECTED'
},
DIALED: { // Base connection to peer established
privatize: 'PRIVATIZING',
encrypt: 'ENCRYPTING'
},
PRIVATIZING: { // Protecting the base connection
done: 'PRIVATIZED',
disconnect: 'DISCONNECTING'
},
PRIVATIZED: { // Base connection is protected
encrypt: 'ENCRYPTING'
},
onStream: async muxedStream => {
const mss = new Multistream.Listener(muxedStream)
try {
const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys()))
log('%s: incoming stream opened on %s', direction, protocol)
connection.addStream(stream, protocol)
this._onStream({ connection, stream, protocol })
} catch (err) {
log.error(err)
}
},
// Run anytime a stream closes
return (transport) => (_parentConn, msListener) => {
const ms = msListener || new multistream.Listener()
let parentConn
// Only observe the transport if we have one, and there is not already a listener
if (transport && !msListener) {
parentConn = observeConn(transport, null, _parentConn, observer)
} else {
parentConn = _parentConn
}
Object.keys(protocols).forEach((protocol) => {
if (!protocol) {
return
}
const handler = (protocolName, _conn) => {
log('registering handler with protocol %s', protocolName)
_protocolHandshake (protocol, connection, callback) {
const msDialer = new multistream.Dialer()
msDialer.handle(connection, (err) => {
if (err) {
return callback(err, null)
}
msDialer.select(protocol, (err, _conn) => {
if (err) {
this.log('could not perform protocol handshake:', err)
return callback(err, null)
}
const conn = observeConnection(null, protocol, _conn, this.switch.observer)
this.log('successfully performed handshake of %s to %s', protocol, this.theirB58Id)
this.emit('connection', conn)
callback(null, conn)
})
async _multiplexOutbound (connection, muxers) {
const dialer = new Multistream.Dialer(connection)
const protocols = Array.from(muxers.keys())
log('outbound selecting muxer %s', protocols)
try {
const { stream, protocol } = await dialer.select(protocols)
log('%s selected as muxer protocol', protocol)
const Muxer = muxers.get(protocol)
return { stream, Muxer }
} catch (err) {
throw errCode(err, codes.ERR_MUXER_UNAVAILABLE)
}
}