Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
const pdrStream = stringToStream(pdr.pdr);
await aws.uploadS3FileStream(pdrStream, bucket, s3PdrKey);
log.debug(`PDR stored at [${s3PdrKey} in S3 bucket [${bucket}]`);
return {
pdr_file_name: fileName,
s3_bucket: bucket,
s3_key: s3PdrKey
};
});
returnValue = await Promise.all(S3UploadPromises);
}
catch (e) {
log.error('Failed to download file');
log.error(e);
throw e;
}
finally {
// Close the connection to the SIPS server
client.end();
}
return returnValue;
}
client.connect({
host: host,
port: port,
user: username,
password: password
});
await clientReady('ready');
try {
const stream = sts(panStr);
await ftp.uploadFile(client, folder, panFileName, stream);
}
catch (e) {
log.error(e);
log.error(e.stack);
throw e;
}
finally {
// Close the connection to the SIPS server
client.end();
}
// pass the payload to the next task, if any
return payload;
}
async function iterateOverStreamRecursivelyToDispatchShards(stream, shardPromiseList, params) {
const listShardsResponse = (await Kinesis.listShards(params).promise().catch(log.error));
if (!listShardsResponse || !listShardsResponse.Shards || listShardsResponse.Shards.length === 0) {
log.error(`No shards found for params ${JSON.stringify(params)}.`);
return shardPromiseList;
}
log.info(`Processing records from ${listShardsResponse.Shards.length} shards..`);
const shardCalls = listShardsResponse.Shards.map(
(shard) => processShard(stream, shard.ShardId).catch(log.error)
);
shardPromiseList.push(...shardCalls);
if (!listShardsResponse.NextToken) {
return shardPromiseList;
}
const newParams = { NextToken: listShardsResponse.NextToken };
return iterateOverStreamRecursivelyToDispatchShards(stream, shardPromiseList, newParams);
}
.catch((e) => {
if (e.toString().includes('ECONNREFUSED')) {
const err = new errors.RemoteResourceError('Connection Refused');
log.error(err);
throw err;
} else if (e.details && e.details.status === 'timeout') {
const err = new errors.ConnectionTimeout('connection Timed out');
log.error(err);
throw err;
}
log.error(e);
throw e;
});
}
try {
const messageData = sf.constructStepFunctionInput(resources, provider, collection);
const stateMachine = collection.workflow;
const executionName = messageData.ingest_meta.execution_name;
const message = JSON.stringify(messageData);
log.info(`Starting ingest of ${collection.id}`);
await aws.sfn().startExecution({
stateMachineArn: stateMachine,
input: message,
name: executionName
}).promise();
}
catch (err) {
log.error(err);
log.error(err.stack);
}
};
_bail(reason, queueItem) {
log.error(`[ERROR] Could not process ${queueItem.url}: ${reason}`);
return [];
}
async function checkExecution(arn, url, timestamp, esClient) {
let error = {
Error: 'Unknown',
Cause: 'The error cause could not be determined'
};
const r = await StepFunction.getExecution(arn, true);
r.status = r.status.toLowerCase();
r.status = r.status === 'succeeded' ? 'completed' : r.status;
if (r.status === 'not_found') {
log.error(`Execution does not exist: ${arn}`);
error = {
Error: 'Not Found',
Cause: 'Execution was not found. If an execution is '
+ 'finished and the state machine is deleted, this error is thrown'
};
await partialRecordUpdate(esClient, arn, 'execution', { status: 'failed', error });
await updateGranulesAndPdrs(esClient, url, error);
return;
}
let input = get(r, 'input');
let output = get(r, 'output');
if (!input) {
return;
}
const ingestGranule = async (granule) => {
try {
const startTime = Date.now();
const r = await ingest.ingest(granule, bucket);
const endTime = Date.now();
return {
...r,
sync_granule_duration: endTime - startTime
};
} catch (e) {
log.error(e);
throw e;
}
};
async function iterateOverShardRecursively(recordPromiseList, shardIterator) {
try {
const response = await Kinesis.getRecords({
ShardIterator: shardIterator
}).promise();
recordPromiseList.push(processRecordBatch(response.Records));
if (response.MillisBehindLatest === 0 || !response.NextShardIterator) return recordPromiseList;
const nextShardIterator = response.NextShardIterator;
return iterateOverShardRecursively(recordPromiseList, nextShardIterator);
} catch (error) {
log.error(error);
return recordPromiseList;
}
}
.catch((e) => {
log.error(e);
if (discover.connected) {
discover.end();
log.debug(`Ending ${provider.protocol} connection`);
}
if (e.toString().includes('ECONNREFUSED')) {
const err = new errors.RemoteResourceError('Connection Refused');
log.error(err);
throw err;
} else if (e.message.includes('Please login with USER and PASS')) {
const err = new errors.FTPError('Login incorrect');
log.error(err);
throw err;
} else if (e.details && e.details.status === 'timeout') {
const err = new errors.ConnectionTimeout('connection Timed out');
log.error(err);
throw err;
} else if (e.details && e.details.status === 'notfound') {
const err = new errors.HostNotFound(`${e.details.url} not found`);
log.error(err);
throw err;
}
throw e;
});
} catch (e) {