Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
container.options.password = credentials.password;
}
let lastMsgId: number | undefined = undefined;
const self = this;
container.on('message', (context: any) => { // tslint:disable-line:no-any
if (context.message.message_id && context.message.message_id === lastMsgId) {
// ignore duplicate message check, don't think it's necessary, but it was in the rhea-lib example code
lastMsgId = context.message.message_id;
return;
}
self.emit([self.helpers.returnJsonArray([context.message])]);
});
const connection = container.connect(connectOptions);
let clientOptions = undefined;
if (durable) {
clientOptions = {
name: subscription,
source: {
address: sink,
durable: 2,
expiry_policy: 'never'
},
credit_window: 1 // prefetch 1
};
} else {
clientOptions = {
source: {
address: sink,
},
const allSent = new Promise(( resolve ) => {
container.on('sendable', (context: any) => { // tslint:disable-line:no-any
const message = {
application_properties: headerProperties,
body: JSON.stringify(item)
};
const sendResult = context.sender.send(message);
resolve(sendResult);
});
});
container.connect(connectOptions).open_sender(sink);
const sendResult: Delivery = await allSent as Delivery; // sendResult has a a property that causes circular reference if returned
return { json: { id: sendResult.id } } as INodeExecutionData;
}
}
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
var args = require('../options.js').options({
't': { alias: 'topic', default: 'topic://PRICE.STOCK.NYSE.RHT', describe: 'name of topic to which messages are sent'},
'h': { alias: 'host', default: 'localhost', describe: 'dns or ip name of server where you want to connect'},
'p': { alias: 'port', default: 5672, describe: 'port to connect to'}
}).usage('Usage: $0 [options] ').help('help').argv;
var connection = require('rhea').connect({ port: args.port, host: args.host });
var sender = connection.open_sender(args.topic);
sender.on('sendable', function(context) {
for (var i = 0; i < args._.length; i++) {
var m = args._[i];
console.log('sent ' + m);
sender.send({body:m});
}
connection.close();
});
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
var args = require('../options.js').options({
'n': { alias: 'node', default: 'examples', describe: 'name of node (e.g. queue or topic) to which messages are sent'},
'p': { alias: 'port', default: 5672, describe: 'port to connect to'}
}).usage('Usage: $0 [options] ').help('help').argv;
var connection = require('rhea').connect({'port':args.port});
var sender = connection.open_sender(args.node);
var messages;
if (args._.length > 0) {
messages = args._.map(JSON.parse);
} else {
messages = [{application_properties:{colour:'red'},body:'panda'},
{application_properties:{colour:'green'},body:'grasshopper'},
{application_properties:{colour:'red'},body:'squirrel'},
{application_properties:{colour:'blue'},body:'whale'}];
}
sender.on('sendable', function(context) {
for (var i = 0; i < messages.length; i++) {
var m = messages[i];
console.log('sent ' + m.application_properties.colour + '-' + m.body);
sender.send(m);
}
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
var args = require('../options.js').options({
'n': { alias: 'node', default: 'examples', describe: 'name of node (e.g. queue or topic) to which messages are sent'},
'h': { alias: 'host', default: 'localhost', describe: 'dns or ip name of server where you want to connect'},
'p': { alias: 'port', default: 5672, describe: 'port to connect to'}
}).usage('Usage: $0 [options] ').help('help').argv;
var connection = require('rhea').connect({ port: args.port, host: args.host});
var sender = connection.open_sender(args.node);
var messages;
if (args._.length > 0) {
messages = args._.map(JSON.parse);
} else {
messages = [{application_properties:{colour:'red'},body:'panda'},
{application_properties:{colour:'green'},body:'grasshopper'},
{application_properties:{colour:'red'},body:'squirrel'},
{application_properties:{colour:'blue'},body:'whale'}];
}
sender.on('sendable', function(context) {
for (var i = 0; i < messages.length; i++) {
var m = messages[i];
console.log('sent ' + m.application_properties.colour + '-' + m.body);
sender.send(m);
}
console.log('disconnected');
});
container.on('message', function (context) {
console.log('received ' + context.message.body);
if (current++ < requests) {
timer_task = setTimeout(next_request, args.request_interval);
} else {
sender = undefined;
if (timer_task) clearTimeout(timer_task);
context.connection.close();
console.log('connection closed');
}
});
var connection = container.connect(connect_options);
sender = connection.open_sender('examples');
connection.open_receiver('examples');
server.once('listening', function () {
container.connect({'port':8888}).open_sender('examples');
});
server.once('connection', function () {
server.once('listening', function () {
container.connect({'port':8888}).open_sender('examples');
});
server.once('connection', function () {
return new Promise((resolve, reject) => {
container.once('connection_error', (err: any) => reject(err.error));
container.once('error', (err: any) => reject(err.error));
container.once('sender_error', (err: any) => reject(err.error));
container.once('disconnected', (err: any) => reject(err.error));
container.once('rejected', (err: any) => reject(err.error));
container.once('connection_open', (context: any) => {
context.connection.open_sender(this.options);
});
container.once('sendable', (context: any) => {
const delivery = context.sender.send(this.payload);
this.messageReceived = new ObjectDecycler().decycle(delivery);
context.sender.detach();
resolve();
});
container.connect(this.connection);
});
}
return new Promise((resolve, reject) => {
this.registerFailures(reject);
if (this.server === true) {
const server = container.listen(this.connection);
server.once('listening', resolve);
server.once('connection', server.close);
server.once('error', (err: any) => reject(err.error));
} else {
container.connect(this.connection);
this.removeFailure();
}
container.once('connection_open', (context: any) => {
Logger.info(`${this.type} connection opened`);
this.removeFailure();
context.connection.open_receiver(this.options);
resolve();
});
});
}