Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
runtime: new lambda.Runtime('ruby2.5'),
handler: 'lambda.handler',
code: lambda.Code.asset('./rails_sample_app'),
timeout: 60,
environment: {
QUEUE_STANDARD: queueStandardWorkers.queueName,
QUEUE_ACTIVEJOB: queueActiveJob.queueName,
RAILS_ENV: 'production'
}
})
queueActiveJob.grantSendMessages(fn.role)
// batchSize defaults to 10, if you use > 1, your Lambda needs to be able to process in parallel,
// otherwise, if your messages take 1 minute to be processed, the last one will take up to 10 minutes to start being processed
fn.addEventSource(new SqsEventSource(queueStandardWorkers, { batchSize: 1 }))
fn.addEventSource(new SqsEventSource(queueActiveJob, { batchSize: 1 }))
}
}
handler: 'lambda.handler',
code: lambda.Code.asset('./rails_sample_app'),
timeout: 60,
environment: {
QUEUE_STANDARD: queueStandardWorkers.queueName,
QUEUE_ACTIVEJOB: queueActiveJob.queueName,
RAILS_ENV: 'production'
}
})
queueActiveJob.grantSendMessages(fn.role)
// batchSize defaults to 10, if you use > 1, your Lambda needs to be able to process in parallel,
// otherwise, if your messages take 1 minute to be processed, the last one will take up to 10 minutes to start being processed
fn.addEventSource(new SqsEventSource(queueStandardWorkers, { batchSize: 1 }))
fn.addEventSource(new SqsEventSource(queueActiveJob, { batchSize: 1 }))
}
}
runtime: lambda.Runtime.Python37,
code: lambda.Code.asset('lambda/syncprocessor'),
handler: 'lambda_function.lambda_handler',
reservedConcurrentExecutions: 1,
timeout: 25,
environment: {
OUTPUT_TABLE: outputTable.tableName,
DOCUMENTS_TABLE: documentsTable.tableName,
AWS_DATA_PATH : "models"
}
});
//Layer
syncProcessor.addLayer(helperLayer)
syncProcessor.addLayer(textractorLayer)
//Trigger
syncProcessor.addEventSource(new SqsEventSource(syncJobsQueue, {
batchSize: 1
}));
//Permissions
contentBucket.grantReadWrite(syncProcessor)
existingContentBucket.grantReadWrite(syncProcessor)
outputTable.grantReadWriteData(syncProcessor)
documentsTable.grantReadWriteData(syncProcessor)
syncProcessor.addToRolePolicy(new iam.PolicyStatement().addAllResources().addActions("textract:*"))
//------------------------------------------------------------
// Async Job Processor (Start jobs using Async APIs)
const asyncProcessor = new lambda.Function(this, 'ASyncProcessor', {
runtime: lambda.Runtime.Python37,
code: lambda.Code.asset('lambda/asyncprocessor'),
handler: 'lambda_function.lambda_handler',
// Lambda Functions
const baseEnvVars = {
TASK_TOPIC: this.taskTopic.topicArn,
REPORT_TOPIC: this.reportTopic.topicArn,
CACHE_TABLE: this.cacheTable.tableName,
};
const nodeModulesLayer = new lambda.LayerVersion(this, "NodeModulesLayer", {
code: lambda.AssetCode.fromAsset("????"),
compatibleRuntimes: [lambda.Runtime.NODEJS_10_X],
});
this.recvAlert = new NodejsFunction(this, "recvAlert", {
entry: path.join(__dirname, "lambda/recvAlert.js"),
handler: "main",
timeout: alertQueueTimeout,
role: lambdaRole,
events: [new SqsEventSource(this.alertQueue)],
environment: Object.assign(baseEnvVars, {
INSPECTOR_MACHINE: "",
REVIEW_MACHINE: "",
}),
});
this.submitContent = new NodejsFunction(this, "submitContent", {
entry: path.join(__dirname, "lambda/submitContent.js"),
handler: "main",
role: lambdaRole,
events: [new SqsEventSource(this.alertQueue)],
environment: baseEnvVars,
});
this.feedbackAttribute = new NodejsFunction(this, "feedbackAttribute", {
entry: path.join(__dirname, "lambda/feedbackAttribute.js"),
environment: {
GITHUB_CLIENT_ID: githubSecrets.clientId,
GITHUB_SECRET: githubSecrets.secret,
CHANGELOGS_TABLE_NAME: props.changelogsTable.tableName,
FEEDS_TABLE_NAME: props.feedsTable.tableName,
SEARCH_INDEX_TABLE_NAME: props.searchIndexTable.tableName,
API_BUCKET_NAME: props.apiBucket.bucketName,
WEB_BUCKET_NAME: props.webBucket.bucketName,
REDIS_HOST: props.redis.cluster.attrRedisEndpointAddress,
REDIS_PORT: props.redis.cluster.attrRedisEndpointPort
}
});
// Attach the lambda to the SNS topic so that when the follower
// publishes to the SNS topic the Lambda gets invoked.
const crawlEventSource = new lambdaEvents.SnsEventSource(props.toCrawlTopic);
crawlLambda.addEventSource(crawlEventSource);
// Grant the lambda permission to modify the tables
props.changelogsTable.grantReadWriteData(crawlLambda.role);
props.feedsTable.grantReadWriteData(crawlLambda.role);
props.searchIndexTable.grantReadWriteData(crawlLambda.role);
// Grant the lambda permission to write to the buckets
props.webBucket.grantReadWrite(crawlLambda.role);
props.apiBucket.grantReadWrite(crawlLambda.role);
// Grant the lambda networking access to Redis
crawlLambda.connections.allowToDefaultPort(props.redis);
}
}
SNS_ROLE_ARN : textractServiceRole.roleArn,
AWS_DATA_PATH : "models"
}
});
//asyncProcessor.addEnvironment("SNS_TOPIC_ARN", textractServiceRole.topicArn)
//Layer
asyncProcessor.addLayer(helperLayer)
//Triggers
// Run async job processor every 5 minutes
const rule = new events.EventRule(this, 'Rule', {
scheduleExpression: 'rate(2 minutes)',
});
rule.addTarget(asyncProcessor);
//Run when a job is successfully complete
asyncProcessor.addEventSource(new SnsEventSource(jobCompletionTopic))
//Permissions
contentBucket.grantRead(asyncProcessor)
existingContentBucket.grantReadWrite(asyncProcessor)
asyncJobsQueue.grantConsumeMessages(asyncProcessor)
asyncProcessor.addToRolePolicy(new iam.PolicyStatement().addResource(textractServiceRole.roleArn).addAction('iam:PassRole'))
asyncProcessor.addToRolePolicy(new iam.PolicyStatement().addAllResources().addAction("textract:*"))
//------------------------------------------------------------
// Async Jobs Results Processor
const jobResultProcessor = new lambda.Function(this, 'JobResultProcessor', {
runtime: lambda.Runtime.Python37,
code: lambda.Code.asset('lambda/jobresultprocessor'),
handler: 'lambda_function.lambda_handler',
memorySize: 2000,
reservedConcurrentExecutions: 50,
// S3 Event processor
const s3Processor = new lambda.Function(this, 'S3Processor', {
runtime: lambda.Runtime.Python37,
code: lambda.Code.asset('lambda/s3processor'),
handler: 'lambda_function.lambda_handler',
environment: {
SYNC_QUEUE_URL: syncJobsQueue.queueUrl,
ASYNC_QUEUE_URL: asyncJobsQueue.queueUrl,
DOCUMENTS_TABLE: documentsTable.tableName,
OUTPUT_TABLE: outputTable.tableName
}
});
//Layer
s3Processor.addLayer(helperLayer)
//Trigger
s3Processor.addEventSource(new S3EventSource(contentBucket, {
events: [ s3.EventType.ObjectCreated ]
}));
//Permissions
documentsTable.grantReadWriteData(s3Processor)
syncJobsQueue.grantSendMessages(s3Processor)
asyncJobsQueue.grantSendMessages(s3Processor)
//------------------------------------------------------------
// S3 Batch Operations Event processor
const s3BatchProcessor = new lambda.Function(this, 'S3BatchProcessor', {
runtime: lambda.Runtime.Python37,
code: lambda.Code.asset('lambda/s3batchprocessor'),
handler: 'lambda_function.lambda_handler',
environment: {
DOCUMENTS_TABLE: documentsTable.tableName,
//------------------------------------------------------------
// Document processor (Router to Sync/Async Pipeline)
const documentProcessor = new lambda.Function(this, 'TaskProcessor', {
runtime: lambda.Runtime.Python37,
code: lambda.Code.asset('lambda/documentprocessor'),
handler: 'lambda_function.lambda_handler',
environment: {
SYNC_QUEUE_URL: syncJobsQueue.queueUrl,
ASYNC_QUEUE_URL: asyncJobsQueue.queueUrl
}
});
//Layer
documentProcessor.addLayer(helperLayer)
//Trigger
documentProcessor.addEventSource(new DynamoEventSource(documentsTable, {
startingPosition: lambda.StartingPosition.TrimHorizon
}));
//Permissions
documentsTable.grantReadWriteData(documentProcessor)
syncJobsQueue.grantSendMessages(documentProcessor)
asyncJobsQueue.grantSendMessages(documentProcessor)
//------------------------------------------------------------
// Sync Jobs Processor (Process jobs using sync APIs)
const syncProcessor = new lambda.Function(this, 'SyncProcessor', {
runtime: lambda.Runtime.Python37,
code: lambda.Code.asset('lambda/syncprocessor'),
handler: 'lambda_function.lambda_handler',
reservedConcurrentExecutions: 1,
super(scope, id, props);
const table = new dynamodb.Table(this, "resultTable", {
billingMode: dynamodb.BillingMode.PAY_PER_REQUEST,
partitionKey: { name: "pk", type: dynamodb.AttributeType.STRING },
sortKey: { name: "sk", type: dynamodb.AttributeType.STRING },
});
const buildPath = lambda.Code.fromAsset("./build");
const testInspector = new lambda.Function(this, "testInspector", {
runtime: lambda.Runtime.GO_1_X,
handler: "inspector",
timeout: cdk.Duration.seconds(30),
code: buildPath,
events: [new SnsEventSource(props.deepalert.taskTopic)],
environment: {
RESULT_TABLE: table.tableName,
FINDING_QUEUE: props.deepalert.findingQueue.queueUrl,
ATTRIBUTE_QUEUE: props.deepalert.attributeQueue.queueUrl,
},
});
const testEmitter = new lambda.Function(this, "testEmitter", {
runtime: lambda.Runtime.GO_1_X,
handler: "emitter",
timeout: cdk.Duration.seconds(30),
code: buildPath,
events: [new SnsEventSource(props.deepalert.reportTopic)],
environment: {
RESULT_TABLE: table.tableName,
},
timeout: cdk.Duration.seconds(30),
code: buildPath,
events: [new SnsEventSource(props.deepalert.taskTopic)],
environment: {
RESULT_TABLE: table.tableName,
FINDING_QUEUE: props.deepalert.findingQueue.queueUrl,
ATTRIBUTE_QUEUE: props.deepalert.attributeQueue.queueUrl,
},
});
const testEmitter = new lambda.Function(this, "testEmitter", {
runtime: lambda.Runtime.GO_1_X,
handler: "emitter",
timeout: cdk.Duration.seconds(30),
code: buildPath,
events: [new SnsEventSource(props.deepalert.reportTopic)],
environment: {
RESULT_TABLE: table.tableName,
},
});
table.grantReadWriteData(testInspector);
table.grantReadWriteData(testEmitter);
props.deepalert.findingQueue.grantSendMessages(testInspector);
props.deepalert.attributeQueue.grantSendMessages(testInspector);
}
}