Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
this._mqtt.publish(topic, body.toString(), { qos: 0, retain: false }, (err, puback) => {
if (err) {
/* Codes_SRS_NODE_DEVICE_MQTT_18_016: [** If an error occurs in the `sendTwinRequest` method, the `done` callback shall be called with the error as the first parameter. **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_18_024: [** If an error occurs, the `sendTwinRequest` shall use the MQTT `translateError` module to convert the mqtt-specific error to a transport agnostic error before passing it into the `done` callback. **]** */
callback(translateError(err));
} else {
/* Codes_SRS_NODE_DEVICE_MQTT_18_004: [** If a `done` callback is passed as an argument, The `sendTwinRequest` method shall call `done` after the body has been published. **]** */
/* Codes_SRS_NODE_DEVICE_MQTT_18_017: [** If the `sendTwinRequest` method is successful, the first parameter to the `done` callback shall be null and the second parameter shall be a MessageEnqueued object. **]** */
callback(null, new results.MessageEnqueued(puback));
}
});
},
SimulatedAmqp.prototype.send = function send(deviceId, message, done) {
if (done) {
if (deviceId.search(/^no-device/) !== -1) {
done(new errors.DeviceNotFoundError());
}
else {
done(null, new results.MessageEnqueued());
if (message.ack === 'full') {
this._receiver.emit('message', {
body: [{
originalMessageId: message.messageId,
deviceId: deviceId
}]
});
}
}
}
};
return function onResponse(err, body, response) {
if (!err) {
done(null, new results.MessageEnqueued(response));
} else {
var error = response ? translateError('Could not send message: ' + err.message, body, response) : err;
done(error);
}
};
}
this.client.publish(topic, message.data.toString(), { qos: 1, retain: false }, function (err, puback) {
if (done) {
if (err) {
done(err);
} else {
debug('PUBACK: ' + JSON.stringify(puback));
done(null, new results.MessageEnqueued(puback));
}
}
}.bind(this));
};
this._mqtt.publish(topic, payload, options, (err, result) => {
if (err) {
/*Codes_SRS_NODE_DEVICE_MQTT_16_027: [The `sendEvent` method shall call its callback with an `Error` that has been translated using the `translateError` method if the `MqttBase` object fails to publish the message.]*/
sendEventCallback(translateError(err));
} else {
sendEventCallback(null, new results.MessageEnqueued(result));
}
});
},
.then(function (state) {
safeCallback(done, null, new results.MessageEnqueued(state));
return null;
})
.catch(function (err) {
this._upstreamAmqpLink.send(amqpMessage, (err, state) => {
if (err) {
debug(' amqp-twin-receiver: Bad disposition on the amqp message send: ' + err);
this._safeCallback(done, translateError('Unable to send Twin message', err));
} else {
debug(' amqp-twin-receiver: Good disposition on the amqp message send: ' + JSON.stringify(state));
this._safeCallback(done, null, new results.MessageEnqueued(state));
return null;
}
});
}
this._fsm.handle('sendEvent', topic, message.data, { qos: 1, retain: false }, (err, puback) => {
if (err) {
debug('send error: ' + err.toString());
done(err);
} else {
debug('PUBACK: ' + JSON.stringify(puback));
done(null, new results.MessageEnqueued(puback));
}
});
}
.then(function (state) {
if (done) {
var result = new results.MessageEnqueued(state);
done(null, result);
}
})
.catch(function (err) {
senderAcceptedEvent: (context: EventContext) => {
debug('in sender detaching state - accepted event for ' + context.sender.name);
const op = this._pendingMessageDictionary[context.delivery.id];
if (op) {
delete this._pendingMessageDictionary[context.delivery.id];
/*Codes_SRS_NODE_AMQP_SENDER_LINK_16_013: [If the message is successfully sent, the `callback` shall be called with a first parameter (error) set to `null` and a second parameter of type `MessageEnqueued`.]*/
if (op.callback) {
op.callback(null, new results.MessageEnqueued());
}
}
},
detach: (callback, err) => {