Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
getProcessor(_opConfig, extraContext) {
let opConfig = _opConfig;
if (_opConfig == null) {
opConfig = {};
}
if (!opConfig._op) {
opConfig._op = 'test-op-name';
}
const operation = this.operationFn;
const { schema, context } = this;
// run the jobConfig and opConfig through the validator to get
// complete and convict validated configs
const jobConfig = validateJobConfig(schema, { operations: [{ _op: 'noop' }, opConfig] });
return operation.newProcessor(
_.assign({}, context, extraContext),
validateOpConfig(operation.schema(), opConfig),
jobConfig
);
}
constructor(op) {
this.context = new TestContext('teraslice-op-test-harness');
this.schema = jobSchema(this.context);
this.events = this.context.apis.foundation.getSystemEvents();
this.logger = this.context.logger;
this.operationFn = op;
// This is for backwards compatiblity
this._jobSpec = executionSpec;
bindThis(this, TestHarness);
}
opConfig: newOpConfig = null,
executionConfig: newExConfig,
retryData = [],
clients = null,
type = 'slicer'
}) {
const {
context,
logger,
operationFn: op,
} = this;
const exConfig = executionSpec(newExConfig);
const isProcessor = op.Processor || (op.newProcessor != null);
const Schema = op.schema ? schemaShim(op).Schema : op.Schema;
const schema = new Schema(context);
let opConfig;
// This is kind of pain to deal with
// this can only work with exectionConfig
// with two operations
if (exConfig.operations.length < 2) {
opConfig = schema.validate(newOpConfig || exConfig.operations[0]);
if (isProcessor) {
exConfig.operations = [{ _op: 'test-reader' }, opConfig];
} else {
exConfig.operations = [opConfig, { _op: 'noop' }];
}
} else {
const opPosition = isProcessor ? 1 : 0;
opConfig = schema.validate(newOpConfig || exConfig.operations[opPosition]);
const checkStatus = async (): Promise => {
let result;
try {
const ex = await this.get(`/jobs/${this._jobId}/ex`, options);
if (exId && ex.ex_id !== exId) {
console.warn(`[WARNING] the execution ${ex.ex_id} has changed from ${exId}`);
}
exId = ex.ex_id;
result = ex._status;
} catch (err) {
if (/(timeout|timedout)/i.test(toString(err))) {
await pDelay(intervalMs);
return checkStatus();
}
throw err;
}
if (result === target) {
return result;
}
// These are terminal states for a job so if we're not explicitly
// watching for these then we need to stop waiting as the job
// status won't change further.
if (terminal[result]) {
throw new TSError(
`Job cannot reach the target status, "${target}", because it is in the terminal state, "${result}"`,
{ context: { lastStatus: result } }
const checkStatus = async (): Promise => {
let result;
try {
const ex = await this.get(`/jobs/${this._jobId}/ex`, options);
if (exId && ex.ex_id !== exId) {
console.warn(`[WARNING] the execution ${ex.ex_id} has changed from ${exId}`);
}
exId = ex.ex_id;
result = ex._status;
} catch (err) {
if (/(timeout|timedout)/i.test(toString(err))) {
await pDelay(intervalMs);
return checkStatus();
}
throw err;
}
if (result === target) {
return result;
}
// These are terminal states for a job so if we're not explicitly
// watching for these then we need to stop waiting as the job
// status won't change further.
if (terminal[result]) {
throw new TSError(
`Job cannot reach the target status, "${target}", because it is in the terminal state, "${result}"`,
async submit(jobSpec: JobConfig, shouldNotStart?: boolean): Promise {
if (!jobSpec) throw new TSError('submit requires a jobSpec');
const job: JobIDResponse = await this.post('/jobs', jobSpec, { query: { start: !shouldNotStart } });
// support older version of teraslice
if (!job.ex_id) {
const { ex_id: exId } = await this.get(`/jobs/${job.job_id}/ex`);
return this.wrap(exId);
}
return this.wrap(job.ex_id);
}
_.forEach(_.keys(connectionMap), (keyset) => {
multisend = true;
// All the keys in the map share the same connection and bulk context.
const connection = getClient(context, { connection: connectionMap[keyset] }, 'elasticsearch');
const data = [];
const keys = keyset.split(',');
for (let i = 0; i < keys.length; i += 1) {
bulkContexts[keys[i].toLowerCase()] = {
connection,
data
};
}
});
}
function newProcessor(context, opConfig) {
const {
connection_map: connectionMap,
multisend_index_append: multisendIndexAppend,
size: limit
} = opConfig;
const bulkContexts = {};
let logger;
let client;
let { multisend } = opConfig;
if (multisend) {
_initializeContexts();
} else {
client = getClient(context, opConfig, 'elasticsearch');
}
function _initializeContexts() {
// We create a bulk context for each keyset then map individual keys
// to the bulk context for their keyset.
_.forEach(_.keys(connectionMap), (keyset) => {
multisend = true;
// All the keys in the map share the same connection and bulk context.
const connection = getClient(context, { connection: connectionMap[keyset] }, 'elasticsearch');
const data = [];
const keys = keyset.split(',');
for (let i = 0; i < keys.length; i += 1) {
bulkContexts[keys[i].toLowerCase()] = {
connection,
data
const checkStatus = async (): Promise => {
let result;
try {
result = await this.status(options);
} catch (err) {
if (/(timeout|timedout)/i.test(toString(err))) {
await pDelay(intervalMs);
return checkStatus();
}
throw err;
}
if (result === target) {
return result;
}
// These are terminal states for a job so if we're not explicitly
// watching for these then we need to stop waiting as the job
// status won't change further.
if (terminal[result]) {
throw new TSError(
`Execution cannot reach the target status, "${target}", because it is in the terminal state, "${result}"`,
{ context: { lastStatus: result } }
const checkStatus = async (): Promise => {
let result;
try {
result = await this.status(options);
} catch (err) {
if (/(timeout|timedout)/i.test(toString(err))) {
await pDelay(intervalMs);
return checkStatus();
}
throw err;
}
if (result === target) {
return result;
}
// These are terminal states for a job so if we're not explicitly
// watching for these then we need to stop waiting as the job
// status won't change further.
if (terminal[result]) {
throw new TSError(
`Execution cannot reach the target status, "${target}", because it is in the terminal state, "${result}"`,