Separator to Kinesis Firehose Action in IoT Core Topic Rules using CloudFormation

Separator to Kinesis Firehose Action in IoT Core Topic Rules using CloudFormation

Takahiro Iwasa
(岩佐 孝浩)
Takahiro Iwasa (岩佐 孝浩)
4 min read
CloudFormation Firehose IoT Kinesis

When specifying Kinesis Firehose as destination in AWS IoT Core topic rules, newlines can be added as record separator.

Creating AWS Resources

Create a CloudFormation template with the following content. The key point is Separator: |+ <NEW_LINE> (lines 13-14). IamFirehose is based on the default automatically generated using Kinesis Firehose management console.

AWSTemplateFormatVersion: "2010-09-09"

Resources:
  TopicRule:
    Type: AWS::IoT::TopicRule
    Properties:
      RuleName: topic_rule_firehose_separator_test
      TopicRulePayload:
        Actions:
          - Firehose:
              DeliveryStreamName: !Ref Firehose
              RoleArn: !GetAtt IamTopicRule.Arn
              Separator: |+

        AwsIotSqlVersion: 2016-03-23
        RuleDisabled: false
        Sql: !Sub
          SELECT * FROM 'topic_rule_firehose_separator_test'

  Firehose:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: topic-rule-firehose-separator-test
      DeliveryStreamType: DirectPut
      S3DestinationConfiguration:
        BucketARN: !GetAtt S3.Arn
        BufferingHints:
          IntervalInSeconds: 60
          SizeInMBs: 5
        CompressionFormat: GZIP
        # Apache Hive Prefix
        ErrorOutputPrefix: "error/!{firehose:error-output-type}/!{timestamp:'year='yyyy'/month='MM'/day='dd'/hour='HH}/"
        Prefix: "success/!{timestamp:'year='yyyy'/month='MM'/day='dd'/hour='HH}/"
        RoleARN: !GetAtt IamFirehose.Arn

  S3:
    Type: AWS::S3::Bucket
    Properties:
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      BucketName: topic-rule-firehose-separator-test
      PublicAccessBlockConfiguration:
        BlockPublicAcls: TRUE
        BlockPublicPolicy: TRUE
        IgnorePublicAcls: TRUE
        RestrictPublicBuckets: TRUE

  IamTopicRule:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: iot.amazonaws.com
            Action: sts:AssumeRole
      Policies:
        - PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action: firehose:PutRecord
                Resource:
                  - !GetAtt Firehose.Arn
          PolicyName: policy
      RoleName: iam-topic-rule

  IamFirehose:
    Type: AWS::IAM::Role
    Properties:
      AssumeRolePolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Effect: Allow
            Principal:
              Service: firehose.amazonaws.com
            Action: sts:AssumeRole
            Condition:
              StringEquals:
                sts:ExternalId: !Ref AWS::AccountId
      Policies:
        - PolicyDocument:
            Version: '2012-10-17'
            Statement:
              - Effect: Allow
                Action:
                  - glue:GetTable
                  - glue:GetTableVersion
                  - glue:GetTableVersions
                Resource: "*"
              - Effect: Allow
                Action:
                  - s3:AbortMultipartUpload
                  - s3:GetBucketLocation
                  - s3:GetObject
                  - s3:ListBucket
                  - s3:ListBucketMultipartUploads
                  - s3:PutObject
                Resource:
                  - !GetAtt S3.Arn
                  - Fn::Sub:
                      - ${arn}/*
                      - {arn: !GetAtt S3.Arn}
              - Effect: Allow
                Action:
                  - lambda:InvokeFunction
                  - lambda:GetFunctionConfiguration
                Resource: !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:%FIREHOSE_DEFAULT_FUNCTION%:%FIREHOSE_DEFAULT_VERSION%
              - Effect: Allow
                Action:
                  - logs:PutLogEvents
                Resource:
                  - !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/topic-rule-firehose-separator-test
              - Effect: Allow
                Action:
                  - kinesis:DescribeStream
                  - kinesis:GetShardIterator
                  - kinesis:GetRecords
                Resource: !Sub arn:aws:kinesis:${AWS::Region}:${AWS::AccountId}:stream/%FIREHOSE_STREAM_NAME%
              - Effect: Allow
                Action:
                  - kms:Decrypt
                Resource:
                  - !Sub arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%SSE_KEY_ID%
                Condition:
                  StringEquals:
                    kms:ViaService: kinesis.%REGION_NAME%.amazonaws.com
                  StringLike:
                    kms:EncryptionContext:aws:kinesis:arn: !Sub arn:aws:kinesis:%REGION_NAME%:${AWS::AccountId}:stream/%FIREHOSE_STREAM_NAME%
          PolicyName: policy
      RoleName: iam-firehose

Deploy the CloudFormation stack with the following command.

aws cloudformation deploy --template template.yaml --stack-name topic-rule-firehose-separator-test --capabilities CAPABILITY_NAMED_IAM

Testing

Check the Kinesis Firehose action on the topic rule.

aws iot get-topic-rule --rule-name topic_rule_firehose_separator_test

The separator should be configured on line 12.

{
    "ruleArn": "arn:aws:iot:<YOUR_REGION>:<YOUR_ACCOUNT_ID>:rule/topic_rule_firehose_separator_test",
    "rule": {
        "ruleName": "topic_rule_firehose_separator_test",
        "sql": "SELECT * FROM 'topic_rule_firehose_separator_test'",
        "createdAt": "2020-05-13T10:29:18+09:00",
        "actions": [
            {
                "firehose": {
                    "roleArn": "arn:aws:iam::<YOUR_ACCOUNT_ID>:role/iam-topic-rule",
                    "deliveryStreamName": "topic-rule-firehose-separator-test",
                    "separator": "\n"
                }
            }
        ],
        "ruleDisabled": false,
        "awsIotSqlVersion": "2016-03-23"
    }
}

Publish the following two messages in a row to the topic topic_rule_firehose_separator_test from the AWS IoT MQTT Test Client.

aws iot-data publish \
  --topic topic_rule_firehose_separator_test \
  --payload '{"id": 1, "message": "Hello from AWS IoT"}' \
  --cli-binary-format raw-in-base64-out

aws iot-data publish \
  --topic topic_rule_firehose_separator_test \
  --payload '{"id": 2, "message": "Hello from AWS IoT"}' \
  --cli-binary-format raw-in-base64-out

Check an object in the S3 bucket which were streamed by Kinesis Firehose. It should contain the two records.

# Check an object.
$ aws s3 ls topic-rule-firehose-separator-test --recursive
2020-05-14 11:30:59         68 success/year=2020/month=05/day=14/hour=02/topic-rule-firehose-separator-test-1-2020-05-14-02-29-57-593d65e5-beb6-47b1-8266-83e869b0cccb.gz

# Download the object.
$ aws s3 cp s3://topic-rule-firehose-separator-test/success/year=2020/month=05/day=14/hour=02/topic-rule-firehose-separator-test-1-2020-05-14-02-29-57-593d65e5-beb6-47b1-8266-83e869b0cccb.gz ./result.gz

# Check the JSON.
$ gunzip -c result.gz > result.json
$ cat result.json
{"id": 1, "message": "Hello from AWS IoT"}
{"id": 2, "message": "Hello from AWS IoT"}

# Delete the results.
$ rm result.gz result.json
$ aws s3 rm s3://topic-rule-firehose-separator-test/success/year=2020/month=05/day=14/hour=02/topic-rule-firehose-separator-test-1-2020-05-14-02-29-57-593d65e5-beb6-47b1-8266-83e869b0cccb.gz

Incorrect CloudFormation Template

If you specify Separator: \n, CloudFormation will cause an error with the following message.

1 validation error detected: Value '\n' at 'topicRulePayload.actions.1.member.firehose.separator' failed to satisfy constraint: Member must satisfy regular expression pattern: ([\n\t])|(\r\n)|(,)
--- 	Sun Oct 10 17:31:16 2021 UTC
+++ 	Sun Oct 10 17:31:16 2021 UTC
@@ -7,8 +7,7 @@
           - Firehose:
               DeliveryStreamName: !Ref Firehose
               RoleArn: !GetAtt IamTopicRule.Arn
-              Separator: |+
-
+              Separator: '\n'
         AwsIotSqlVersion: 2016-03-23
         RuleDisabled: false
         Sql: !Sub

Cleaning Up

Clean up the provisioned AWS resources with the following command.

aws cloudformation delete-stack --stack-name topic-rule-firehose-separator-test
Takahiro Iwasa
(岩佐 孝浩)

Takahiro Iwasa (岩佐 孝浩)

Software Developer at iret, Inc.
Architecting and developing cloud native applications mainly with AWS. Japan AWS Top Engineers 2020-2023