Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
module.exports = function _clusterMaster(context) {
const logger = makeLogger(context, 'cluster_master');
const clusterConfig = context.sysconfig.teraslice;
const assetsPort = process.env.assets_port;
const assetsUrl = `http://127.0.0.1:${assetsPort}`;
let running = false;
// Initialize the HTTP service for handling incoming requests.
const app = express();
const clusterMasterServer = new ClusterMaster.Server({
port: clusterConfig.port,
nodeDisconnectTimeout: clusterConfig.node_disconnect_timeout,
// setting request timeout to 5 minutes
serverTimeout: 300000,
// we do this to override express final response handler
requestListener(req, res) {
app(req, res, (err) => {
if (err) logger.warn(err, 'unexpected server error');
res.setHeader('Content-Type', 'application/json');
res.statusCode = 500;
res.end(JSON.stringify({ error: 'api is not available' }));
});
},
networkLatencyBuffer: clusterConfig.network_latency_buffer,
actionTimeout: clusterConfig.action_timeout,
logger,
const events = context.apis.foundation.getSystemEvents();
const {
slicer_port: slicerPort,
slicer_hostname: slicerHostname,
performance_metrics: performanceMetrics
} = executionContext.config;
const config = context.sysconfig.teraslice;
const networkLatencyBuffer = get(config, 'network_latency_buffer');
const actionTimeout = get(config, 'action_timeout');
const workerDisconnectTimeout = get(config, 'worker_disconnect_timeout');
const slicerTimeout = get(config, 'slicer_timeout');
const shutdownTimeout = get(config, 'shutdown_timeout');
this.client = new ExecutionController.Client({
executionControllerUrl: formatURL(slicerHostname, slicerPort),
workerId,
networkLatencyBuffer,
workerDisconnectTimeout,
// the connect timeout should be set to the same timeout that will
// cause the execution fail if no Workers connect
connectTimeout: slicerTimeout,
actionTimeout,
logger
});
this.slice = new Slice(context, executionContext);
this.metrics = performanceMetrics
? new Metrics({
logger
const {
slicer_port: slicerPort,
slicer_hostname: slicerHostname,
performance_metrics: performanceMetrics
} = executionContext.config;
const config = context.sysconfig.teraslice;
const networkLatencyBuffer = get(config, 'network_latency_buffer');
const actionTimeout = get(config, 'action_timeout');
const workerDisconnectTimeout = get(config, 'worker_disconnect_timeout');
const slicerTimeout = get(config, 'slicer_timeout');
const shutdownTimeout = get(config, 'shutdown_timeout');
this.client = new ExecutionController.Client({
executionControllerUrl: formatURL(slicerHostname, slicerPort),
workerId,
networkLatencyBuffer,
workerDisconnectTimeout,
// the connect timeout should be set to the same timeout that will
// cause the execution fail if no Workers connect
connectTimeout: slicerTimeout,
actionTimeout,
logger
});
this.slice = new Slice(context, executionContext);
this.metrics = performanceMetrics
? new Metrics({
logger
})
const ms = require('ms');
const _ = require('lodash');
const Messaging = require('@terascope/teraslice-messaging');
const {
TSError, get, pDelay, getFullErrorStack, logError, pWhile
} = require('@terascope/utils');
const { waitForWorkerShutdown } = require('../helpers/worker-shutdown');
const { makeStateStore, makeExStore, SliceState } = require('../../storage');
const { makeLogger, generateWorkerId } = require('../helpers/terafoundation');
const ExecutionAnalytics = require('./execution-analytics');
const makeSliceAnalytics = require('./slice-analytics');
const Scheduler = require('./scheduler');
const Metrics = require('../metrics');
const ExecutionControllerServer = Messaging.ExecutionController.Server;
const ClusterMasterClient = Messaging.ClusterMaster.Client;
const { formatURL } = Messaging;
class ExecutionController {
constructor(context, executionContext) {
const workerId = generateWorkerId(context);
const logger = makeLogger(context, 'execution_controller');
const events = context.apis.foundation.getSystemEvents();
const slicerPort = executionContext.config.slicer_port;
const performanceMetrics = executionContext.config.performance_metrics;
const config = context.sysconfig.teraslice;
const networkLatencyBuffer = get(config, 'network_latency_buffer');
const actionTimeout = get(config, 'action_timeout');
const workerDisconnectTimeout = get(config, 'worker_disconnect_timeout');
const nodeDisconnectTimeout = get(config, 'node_disconnect_timeout');
const shutdownTimeout = get(config, 'shutdown_timeout');
const ms = require('ms');
const _ = require('lodash');
const Messaging = require('@terascope/teraslice-messaging');
const {
TSError, get, pDelay, getFullErrorStack, logError, pWhile
} = require('@terascope/utils');
const { waitForWorkerShutdown } = require('../helpers/worker-shutdown');
const { makeStateStore, makeExStore, SliceState } = require('../../storage');
const { makeLogger, generateWorkerId } = require('../helpers/terafoundation');
const ExecutionAnalytics = require('./execution-analytics');
const makeSliceAnalytics = require('./slice-analytics');
const Scheduler = require('./scheduler');
const Metrics = require('../metrics');
const ExecutionControllerServer = Messaging.ExecutionController.Server;
const ClusterMasterClient = Messaging.ClusterMaster.Client;
const { formatURL } = Messaging;
class ExecutionController {
constructor(context, executionContext) {
const workerId = generateWorkerId(context);
const logger = makeLogger(context, 'execution_controller');
const events = context.apis.foundation.getSystemEvents();
const slicerPort = executionContext.config.slicer_port;
const performanceMetrics = executionContext.config.performance_metrics;
const config = context.sysconfig.teraslice;
const networkLatencyBuffer = get(config, 'network_latency_buffer');
const actionTimeout = get(config, 'action_timeout');
const workerDisconnectTimeout = get(config, 'worker_disconnect_timeout');
const nodeDisconnectTimeout = get(config, 'node_disconnect_timeout');
const shutdownTimeout = get(config, 'shutdown_timeout');