Secure your code as it's written. Use Snyk Code to scan source code in minutes - no build needed - and fix issues immediately.
handler: "streamProcessor.processRecord",
runtime: lambda.Runtime.PYTHON_3_6,
description: "An Amazon Kinesis Firehose stream processor that enriches click records" +
" to not just include a mysfitId, but also other attributes that can be analyzed later.",
memorySize: 128,
code: lambda.Code.asset("../../../lambda-streaming-processor"),
timeout: cdk.Duration.seconds(30),
initialPolicy: [
lambdaFunctionPolicy
],
environment: {
MYSFITS_API_URL: "MysfitsApiUrl"
}
});
const mysfitsFireHoseToS3 = new CfnDeliveryStream(this, "DeliveryStream", {
extendedS3DestinationConfiguration: {
bucketArn: clicksDestinationBucket.bucketArn,
bufferingHints: {
intervalInSeconds: 60,
sizeInMBs: 50
},
compressionFormat: "UNCOMPRESSED",
prefix: "firehose/",
roleArn: firehoseDeliveryRole.roleArn,
processingConfiguration: {
enabled: true,
processors: [
{
parameters: [
{
parameterName: "LambdaArn",
initialPolicy: [
lambdaFunctionPolicy
],
environment: {
mysfits_api_url: `https://${props.apiId}.execute-api.${cdk.Aws.REGION}.amazonaws.com/prod/`
}
});
const firehoseDeliveryPolicyLambdaStm = new iam.PolicyStatement();
firehoseDeliveryPolicyLambdaStm.addActions("lambda:InvokeFunction");
firehoseDeliveryPolicyLambdaStm.addResources(mysfitsClicksProcessor.functionArn);
firehoseDeliveryRole.addToPolicy(firehoseDeliveryPolicyS3Stm);
firehoseDeliveryRole.addToPolicy(firehoseDeliveryPolicyLambdaStm);
const mysfitsFireHoseToS3 = new CfnDeliveryStream(this, "DeliveryStream", {
extendedS3DestinationConfiguration: {
bucketArn: clicksDestinationBucket.bucketArn,
bufferingHints: {
intervalInSeconds: 60,
sizeInMBs: 50
},
compressionFormat: "UNCOMPRESSED",
prefix: "firehose/",
roleArn: firehoseDeliveryRole.roleArn,
processingConfiguration: {
enabled: true,
processors: [
{
parameters: [
{
parameterName: "LambdaArn",
"s3:GetBucketLocation",
"s3:GetObject",
"s3:ListBucket",
"s3:ListBucketMultipartUploads",
"s3:PutObject");
firehoseDeliveryPolicyS3Stm.addResources(clicksDestinationBucket.bucketArn);
firehoseDeliveryPolicyS3Stm.addResources(clicksDestinationBucket.arnForObjects('*'));
const firehoseDeliveryPolicyLambdaStm = new iam.PolicyStatement();
firehoseDeliveryPolicyLambdaStm.addActions("lambda:InvokeFunction");
firehoseDeliveryPolicyLambdaStm.addResources(mysfitsClicksProcessor.functionArn);
firehoseDeliveryRole.addToPolicy(firehoseDeliveryPolicyS3Stm);
firehoseDeliveryRole.addToPolicy(firehoseDeliveryPolicyLambdaStm);
const mysfitsFireHoseToS3 = new CfnDeliveryStream(this, "DeliveryStream", {
extendedS3DestinationConfiguration: {
bucketArn: clicksDestinationBucket.bucketArn,
bufferingHints: {
intervalInSeconds: 60,
sizeInMBs: 50
},
compressionFormat: "UNCOMPRESSED",
prefix: "firehose/",
roleArn: firehoseDeliveryRole.roleArn,
processingConfiguration: {
enabled: true,
processors: [
{
parameters: [
{
parameterName: "LambdaArn",