Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
_.forEach(_.keys(connectionMap), (keyset) => {
multisend = true;
// All the keys in the map share the same connection and bulk context.
const connection = getClient(context, { connection: connectionMap[keyset] }, 'elasticsearch');
const data = [];
const keys = keyset.split(',');
for (let i = 0; i < keys.length; i += 1) {
bulkContexts[keys[i].toLowerCase()] = {
connection,
data
};
}
});
}
function newProcessor(context, opConfig) {
const {
connection_map: connectionMap,
multisend_index_append: multisendIndexAppend,
size: limit
} = opConfig;
const bulkContexts = {};
let logger;
let client;
let { multisend } = opConfig;
if (multisend) {
_initializeContexts();
} else {
client = getClient(context, opConfig, 'elasticsearch');
}
function _initializeContexts() {
// We create a bulk context for each keyset then map individual keys
// to the bulk context for their keyset.
_.forEach(_.keys(connectionMap), (keyset) => {
multisend = true;
// All the keys in the map share the same connection and bulk context.
const connection = getClient(context, { connection: connectionMap[keyset] }, 'elasticsearch');
const data = [];
const keys = keyset.split(',');
for (let i = 0; i < keys.length; i += 1) {
bulkContexts[keys[i].toLowerCase()] = {
connection,
data
module.exports = function logsStorage(context) {
if (_.includes(context.sysconfig.terafoundation.logging, 'elasticsearch')) {
const client = getClient(context, context.sysconfig.teraslice.state, 'elasticsearch');
const template = require('./backends/mappings/logs.json');
const elasticsearch = require('@terascope/elasticsearch-api')(client, context.logger, null);
const clusterName = context.sysconfig.teraslice.name;
const name = `${clusterName}_logs_template`;
// setting template name to reflect current teraslice instance name to help prevent
// conflicts with differing versions of teraslice with same elastic db
template.template = `${clusterName}${template.template}`;
return elasticsearch.putTemplate(template, name);
}
return Promise.resolve(true);
};
function newReader(context, opConfig, executionConfig) {
const client = getClient(context, opConfig, 'elasticsearch');
return require('./elasticsearch_date_range/reader.js')(context, opConfig, executionConfig, client);
}
function newSlicer(context, executionContext, retryData, logger) {
const opConfig = getOpConfig(executionContext.config, 'elasticsearch_reader');
const client = getClient(context, opConfig, 'elasticsearch');
return require('./elasticsearch_date_range/slicer.js')(context, opConfig, executionContext, retryData, logger, client);
}
return new Promise((resolve, reject) => {
const clientName = JSON.stringify({
connection: config.state.connection,
index: indexName,
});
const connectionConfig = Object.assign({}, config.state);
if (connectionConfig.connection_cache == null) {
connectionConfig.connection_cache = true;
}
client = getClient(context, connectionConfig, 'elasticsearch');
if (!client) {
reject(new Error(`Unable to get client for connection: ${config.state.connection}`));
return;
}
let { connection } = config.state;
if (config.state.endpoint) {
connection += `:${config.state.endpoint}`;
}
const options = {
full_response: !!fullResponse,
connection,
};
elasticsearch = elasticsearchApi(client, logger, options);