Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
logger.log('debug', "Writing packet to dstClient", {
packet: packet,
validPacket: valid.packet
});
dstClient.write(generatedPacket);
}
} else {
// configurable disconnect in case of an unauthorized request
// always disconnect clients in case of a unauthorized connect request after forwarding the connack
if (
(packet.cmd === 'subscribe' && config.disconnect_on_unauthorized_subscribe) ||
(packet.cmd === 'publish' && config.disconnect_on_unauthorized_publish) ||
(packet.cmd === 'connect')
) {
if (valid.packet && valid.packet.cmd === "connack") {
let generatedPacket = mqtt.generate(valid.packet, opts);
logger.log('debug', "Writing packet to srcClient", {
packet: packet,
validPacket: valid.packet
});
valid.packet && srcClient.write(generatedPacket);
}
logger
.log('info', "Destroying clients because of unauthorized request", {packet: packet});
srcClient.destroy();
dstClient.destroy();
}
// if a disconnection is not configured, just forward the response to srcClient
else {
let generatedPacket = mqtt.generate(valid.packet, opts);
logger.log('debug', "Writing packet to srcClient", {
protocolId: 'MQTT',
protocolVersion: 4,
clean: true,
clientId: 'my-client-proxyV2'
}
var protocol = new proxyProtocol.V2ProxyProtocol(
proxyProtocol.Command.PROXY,
proxyProtocol.TransportProtocol.STREAM,
new proxyProtocol.IPv6ProxyAddress(
proxyProtocol.IPv6Address.createFrom(clientIpArray),
12345,
proxyProtocol.IPv6Address.createWithEmptyAddress(),
port
),
mqttPacket.generate(packet)
).build()
var broker = aedes({
preConnect: function (client, done) {
if (client.connDetails && client.connDetails.ipAddress) {
client.ip = client.connDetails.ipAddress
t.equal(clientIp, client.ip)
} else {
t.fail('no ip address present')
}
done(null, true)
setImmediate(finish)
},
trustProxy: true
})
parser.on('packet', function (packet) {
var packets = []
if (packet.cmd === 'connect') {
duplex.push(mqttPacket.generate({
cmd: 'connack',
sessionPresent: false,
returnCode: 0
}))
for (var i = 0; i < max; i++) {
packets.push(mqttPacket.generate({
cmd: 'publish',
topic: Buffer.from('hello'),
payload: Buffer.from('world'),
retain: false,
dup: false,
messageId: i + 1,
qos: 1
}))
}
function sendPacket (client, packet, cb) {
try {
var buf = mqttPacket.generate(packet);
client.emit('packetsend', packet);
if (client.stream.write(buf) && cb) {
cb();
} else if (cb) {
client.stream.once('drain', cb);
}
} catch (err) {
if (cb) {
cb(err);
} else {
client.emit('error', err);
}
}
}
wrappedValidate(clientAddress, subPacket, srcClient.credentials, ctx).then(result => {
let valid = result;
logger.log('debug', "Dummy response authorization packet validated", {
packet: subPacket,
result: valid
});
if (valid.status) {
logger.log('debug', "Writing packet to srcClient", {packet: packet});
srcClient.write(mqtt.generate(packet, opts));
} else {
logger.log('debug', "Disconnecting clients because of unauthorized response", {packet: packet});
srcClient.destroy();
dstClient.destroy();
}
}).catch(err => {
logger.log('error', "Error when validating", {
write(packet, done) {
if (!this._dead) {
this.stream.write(mqtt.generate(packet), 'binary', done)
}
}