Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
async _handlePush ({ connection, stream }) {
const [data] = await pipe(
stream,
lp.decode(),
take(1),
toBuffer,
collect
)
let message
try {
message = Message.decode(data)
} catch (err) {
return log.error('received invalid message', err)
}
// Update the listen addresses
const peerInfo = new PeerInfo(connection.remotePeer)
try {
async function encrypt (localId, conn, remoteId) {
const { reader, writer, rest } = handshake(conn)
writer.push(lpEncodeExchange({
id: localId.toBytes(),
pubkey: {
Type: KeyType.RSA, // TODO: dont hard code
Data: localId.marshalPubKey()
}
}))
log('write pubkey exchange to peer %j', remoteId)
// Get the Exchange message
const response = (await lp.decodeFromReader(reader).next()).value
const id = Exchange.decode(response.slice())
log('read pubkey exchange from peer %j', remoteId)
if (!id || !id.pubkey) {
throw new Error('Remote did not provide their public key')
}
const peerId = await PeerId.createFromPubKey(id.pubkey.Data)
if (remoteId && !peerId.isEqual(remoteId)) {
throw new Error('Remote peer id does not match known target id')
}
log('crypto exchange completed successfully: %j', peerId)
writer.end()
async _processMessages (idB58Str, conn, peer) {
const onRpcFunc = this._onRpc
try {
await pipe(
conn,
lp.decode(),
async function (source) {
for await (const data of source) {
const rpc = Buffer.isBuffer(data) ? data : data.slice()
onRpcFunc(idB58Str, message.rpc.RPC.decode(rpc))
}
}
)
} catch (err) {
this._onPeerDisconnected(peer, err)
}
}
async identify (connection) {
const { stream } = await connection.newStream(MULTICODEC_IDENTIFY)
const [data] = await pipe(
stream,
lp.decode(),
take(1),
toBuffer,
collect
)
if (!data) {
throw errCode(new Error('No data could be retrieved'), codes.ERR_CONNECTION_ENDED)
}
let message
try {
message = Message.decode(data)
} catch (err) {
throw errCode(err, codes.ERR_INVALID_MESSAGE)
}
if (this.peerInfo.id.pubKey) {
publicKey = this.peerInfo.id.pubKey.bytes
}
const message = Message.encode({
protocolVersion: PROTOCOL_VERSION,
agentVersion: AGENT_VERSION,
publicKey,
listenAddrs: this.peerInfo.multiaddrs.toArray().map((ma) => ma.buffer),
observedAddr: connection.remoteAddr.buffer,
protocols: Array.from(this._protocols.keys())
})
pipe(
[message],
lp.encode(),
stream
)
}