Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
var email = {
to: decodeURI(urlParts[1]),
subject: decodeURI(urlParts[2]),
text: decodeURI(urlParts[3]),
};
queue.enqueue('emailQueue', "sendEmail", email, function(error){
console.log('email :' + JSON.stringify(email));
if(error){ console.log(error) }
var response = {email: email};
res.writeHead(200, {'Content-Type': 'application/json'});
res.end(JSON.stringify(response, null, 2));
});
};
var queue = new NR.queue({connection: connectionDetails}, jobs);
queue.connect(function(){
http.createServer(server).listen(httpPort, httpHost);
console.log('Server running at ' + httpHost + ':' + httpPort);
console.log('send an email and message to /TO_ADDRESS/SUBJECT/YOUR_MESSAGE');
});
///////////////////
// RESQUE WORKER //
///////////////////
var worker = new NR.worker({connection: connectionDetails, queues: ['emailQueue']}, jobs);
worker.connect(function(){
worker.workerCleanup();
worker.start();
});
i++;
}
i = 0;
while(i < 500){
queue.enqueue('slowQueue', "slowSleepJob", []);
i++;
}
});
//////////
// WORK //
//////////
var multiWorker = new NR.multiWorker({
connection: connectionDetails,
queues: ['slowQueue'],
minTaskProcessors: 1,
maxTaskProcessors: 20,
}, jobs);
// normal worker emitters
multiWorker.on('start', function(workerId){ console.log("worker["+workerId+"] started"); });
multiWorker.on('end', function(workerId){ console.log("worker["+workerId+"] ended"); });
multiWorker.on('cleaning_worker', function(workerId, worker, pid){ console.log("cleaning old worker " + worker); });
multiWorker.on('poll', function(workerId, queue){ console.log("worker["+workerId+"] polling " + queue); });
multiWorker.on('job', function(workerId, queue, job){ console.log("worker["+workerId+"] working job " + queue + " " + JSON.stringify(job)); });
multiWorker.on('reEnqueue', function(workerId, queue, job, plugin){ console.log("worker["+workerId+"] reEnqueue job (" + plugin + ") " + queue + " " + JSON.stringify(job)); });
multiWorker.on('success', function(workerId, queue, job, result){ console.log("worker["+workerId+"] job success " + queue + " " + JSON.stringify(job) + " >> " + result); });
multiWorker.on('failure', function(workerId, queue, job, failure){ console.log("worker["+workerId+"] job failure " + queue + " " + JSON.stringify(job) + " >> " + failure); });
multiWorker.on('error', function(workerId, queue, job, error){ console.log("worker["+workerId+"] error " + queue + " " + JSON.stringify(job) + " >> " + error); });
res.end(JSON.stringify(response, null, 2));
});
};
var queue = new NR.queue({connection: connectionDetails}, jobs);
queue.connect(function(){
http.createServer(server).listen(httpPort, httpHost);
console.log('Server running at ' + httpHost + ':' + httpPort);
console.log('send an email and message to /TO_ADDRESS/SUBJECT/YOUR_MESSAGE');
});
///////////////////
// RESQUE WORKER //
///////////////////
var worker = new NR.worker({connection: connectionDetails, queues: ['emailQueue']}, jobs);
worker.connect(function(){
worker.workerCleanup();
worker.start();
});
worker.on('start', function(){ console.log("worker started"); });
worker.on('end', function(){ console.log("worker ended"); });
worker.on('cleaning_worker', function(worker, pid){ console.log("cleaning old worker " + worker); });
worker.on('poll', function(queue){ console.log("worker polling " + queue); });
worker.on('job', function(queue, job){ console.log("working job " + queue + " " + JSON.stringify(job)); });
worker.on('reEnqueue', function(queue, job, plugin){ console.log("reEnqueue job (" + plugin + ") " + queue + " " + JSON.stringify(job)); });
worker.on('success', function(queue, job, result){ console.log("job success " + queue + " " + JSON.stringify(job) + " >> " + result); });
worker.on('failure', function(queue, job, failure){ console.log("job failure " + queue + " " + JSON.stringify(job) + " >> " + failure); });
worker.on('error', function(queue, job, error){ console.log("error " + queue + " " + JSON.stringify(job) + " >> " + error); });
worker.on('pause', function(){ console.log("worker paused"); });
"slowCPUJob": {
plugins: [],
pluginOptions: {},
perform: function(callback){
var start = new Date().getTime();
blockingSleep(1000);
callback(null, (new Date().getTime() - start) );
},
},
};
///////////////////
// ENQUEUE TASKS //
///////////////////
var queue = new NR.queue({connection: connectionDetails}, jobs);
queue.connect(function(){
var i;
i = 0;
while(i < 10){
queue.enqueue('slowQueue', "slowCPUJob", []);
i++;
}
i = 0;
while(i < 500){
queue.enqueue('slowQueue', "slowSleepJob", []);
i++;
}
});
//////////
export default () => {
const scheduler = new NodeResque.Scheduler({
connection: storage.resqueConfig,
});
scheduler.on('start', () => {
logger('info', 'scheduler: started');
});
scheduler.on('end', () => {
logger('info', 'scheduler: ended');
});
// scheduler.on('poll', () => {
// logger('info', 'scheduler: polling');
// });
scheduler.on('master', () => {
startMultiWorker (callback) {
let self = this
self.workerLogging = self.api.config.tasks.workerLogging
self.schedulerLogging = self.api.config.tasks.schedulerLogging
// create a new multiworker instance
let MultiWorker = NR.multiWorker
self.multiWorker = new MultiWorker({
connection: self.connectionDetails,
queues: self.api.config.tasks.queues,
timeout: self.api.config.tasks.timeout,
checkTimeout: self.api.config.tasks.checkTimeout,
minTaskProcessors: self.api.config.tasks.minTaskProcessors,
maxTaskProcessors: self.api.config.tasks.maxTaskProcessors,
maxEventLoopDelay: self.api.config.tasks.maxEventLoopDelay,
toDisconnectProcessors: self.api.config.tasks.toDisconnectProcessors
}, self.api.tasks.jobs)
// normal worker emitters
self.multiWorker.on('start', workerId => self.api.log('worker: started', self.workerLogging.start, { workerId: workerId }))
self.multiWorker.on('end', workerId => self.api.log('worker: ended', self.workerLogging.end, { workerId: workerId }))
self.multiWorker.on('cleaning_worker', (workerId, worker, pid) => self.api.log(`worker: cleaning old worker ${worker}, (${pid})`, self.workerLogging.cleaning_worker))
// for debug: self.multiWorker.on('poll', (queue) => self.api.log(`worker: polling ${queue}`, self.workerLogging.poll))
public async startMultiworker(): Promise {
const workerLogging = this.api.configs.tasks.workerLogging;
const schedulerLogging = this.api.configs.tasks.schedulerLogging;
// create a new multiworker instance
this.multiWorker = new NodeResque.MultiWorker(
{
connection: this.connectionDetails,
queues: this.api.configs.tasks.queues,
timeout: this.api.configs.tasks.timeout,
checkTimeout: this.api.configs.tasks.checkTimeout,
minTaskProcessors: this.api.configs.tasks.minTaskProcessors,
maxTaskProcessors: this.api.configs.tasks.maxTaskProcessors,
maxEventLoopDelay: this.api.configs.tasks.maxEventLoopDelay,
toDisconnectProcessors: this.api.configs.tasks.toDisconnectProcessors,
},
this.api.tasks.jobs,
);
// normal worker emitters
this.multiWorker.on("start", workerId =>
this.api.log("worker: started", workerLogging.start, {
startScheduler (callback) {
let self = this
// check if the scheduler are enabled
if (self.api.config.tasks.scheduler !== true) { return callback() }
// get the scheduler logger
self.schedulerLogging = self.api.config.tasks.schedulerLogging
// create a new scheduler instance
let Scheduler = NR.scheduler
self.scheduler = new Scheduler({ connection: self.connectionDetails, timeout: self.api.config.tasks.timeout })
// define the handler for the on error event
self.scheduler.on('error', error => self.api.log(error, 'error', '[api.resque.scheduler]'))
// start the scheduler
self.scheduler.connect(() => {
// define some handlers to the scheduler events
self.scheduler.on('start', () => self.api.log('resque scheduler started', self.schedulerLogging.start))
self.scheduler.on('end', () => self.api.log('resque scheduler ended', self.schedulerLogging.end))
self.scheduler.on('poll', () => self.api.log('resque scheduler polling', self.schedulerLogging.poll))
self.scheduler.on('working_timestamp', timestamp => self.api.log(`resque scheduler working timestamp ${timestamp}`, self.schedulerLogging.working_timestamp))
self.scheduler.on('transferred_job', (timestamp, job) => self.api.log(`resque scheduler enqueuing job ${timestamp}`, self.schedulerLogging.transferred_job, job))
// start the scheduler
self.scheduler.start()
var log = options.log
var work = new Work(options)
if (!options.connection) {
return {
enqueue: function (foo, bar, job) {
doLocal(work, job)
},
removeFailed: function () {},
failed: function () {},
length: function () {},
allWorkingOn: function () {}
}
}
var connection = work.connection
connection.pkg = 'redis'
var queue = new Resque({connection: connection}, work.jobs)
process.on('SIGINT', function () {
queue.end(function () {
process.exit()
})
})
process.on('SIGTERM', function () {
queue.end(function () {
process.exit()
})
})
queue.connect(function () {
setInterval(function () {
log.info('Clearing old workers from the queue')
const connect = async redis => {
if (!queue) {
queue = new NR.queue({ connection: { redis: redis } }, tasks)
await queue.connect()
}
return queue
}