Handling NDJSON using Kinesis Data Firehose Dynamic Partitioning

Handling NDJSON using Kinesis Data Firehose Dynamic Partitioning

Takahiro Iwasa
(岩佐 孝浩)
Takahiro Iwasa (岩佐 孝浩)
3 min read
Firehose Kinesis

Kinesis Data Firehose recently added support for dynamic partitioning. Developers no longer need Lambda functions to just append new lines for NDJSON (Newline Delimited JSON).

Creating AWS Resources

Create a CloudFormation template with the following content. Key points are DynamicPartitioningConfiguration (line 13-14) and ProcessingConfiguration (line 17-29).

AWSTemplateFormatVersion: 2010-09-09
Description: Kinesis Data Firehose streaming NDJSON sample with dynamic partitioning
Resources:
  KinesisFirehoseDeliveryStream:
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamName: ndjson-firehose
      DeliveryStreamType: DirectPut
      ExtendedS3DestinationConfiguration:
        BucketARN: !GetAtt S3Bucket.Arn
        BufferingHints:
          IntervalInSeconds: 60
        DynamicPartitioningConfiguration:
          Enabled: true
        Prefix: "success/user_id=!{partitionKeyFromQuery:user_id}/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/"
        ErrorOutputPrefix: "error/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/type=!{firehose:error-output-type}/"
        ProcessingConfiguration:
          Enabled: true
          Processors:
            - Type: AppendDelimiterToRecord
              Parameters:
                - ParameterName: Delimiter
                  ParameterValue: '\\n'
            - Type: MetadataExtraction
              Parameters:
                - ParameterName: MetadataExtractionQuery
                  ParameterValue: '{user_id: .user_id}'
                - ParameterName: JsonParsingEngine
                  ParameterValue: JQ-1.6
        RoleARN: !GetAtt IAMRoleKinesisFirehose.Arn

  S3Bucket:
    Type: AWS::S3::Bucket
    Properties:
      BucketName: !Sub ${AWS::Region}-${AWS::AccountId}-ndjson-s3
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      PublicAccessBlockConfiguration:
        BlockPublicAcls: true
        BlockPublicPolicy: true
        IgnorePublicAcls: true
        RestrictPublicBuckets: true

  IAMRoleKinesisFirehose:
    Type: AWS::IAM::Role
    Properties:
      RoleName: ndjson-firehose-role
      AssumeRolePolicyDocument:
        Version: 2012-10-17
        Statement:
          - Effect: Allow
            Principal:
              Service: firehose.amazonaws.com
            Action: sts:AssumeRole
      MaxSessionDuration: 3600
      Policies:
        - PolicyName: policy1
          PolicyDocument:
            Version: 2012-10-17
            Statement:
              - Effect: Allow
                Action:
                  - glue:GetTable
                  - glue:GetTableVersion
                  - glue:GetTableVersions
                Resource:
                  - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog
                  - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
                  - !Sub arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
              - Effect: Allow
                Action:
                  - s3:AbortMultipartUpload
                  - s3:GetBucketLocation
                  - s3:GetObject
                  - s3:ListBucket
                  - s3:ListBucketMultipartUploads
                  - s3:PutObject
                Resource:
                  - !GetAtt S3Bucket.Arn
                  - !Sub ${S3Bucket.Arn}/*
              - Effect: Allow
                Action:
                  - lambda:InvokeFunction
                  - lambda:GetFunctionConfiguration
                Resource: !Sub arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
              - Effect: Allow
                Action:
                  - kms:GenerateDataKey
                  - kms:Decrypt
                Resource:
                  - !Sub arn:aws:kms:${AWS::Region}:${AWS::AccountId}:key/%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
                Condition:
                  StringEquals:
                    kms:ViaService: !Sub s3.${AWS::Region}.amazonaws.com
                  StringLike:
                    kms:EncryptionContext:aws:s3:arn:
                      - arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%/*
                      - arn:aws:s3:::%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%
              - Effect: Allow
                Action:
                  - logs:PutLogEvents
                Resource:
                  - !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:%FIREHOSE_POLICY_TEMPLATE_PLACEHOLDER%:log-stream:*

Replace <YOUR_S3_BUCKET> with the actual value and deploy the CloudFormation stack with the following command.

aws cloudformation deploy \
  --template-file stack.yaml \
  --stack-name firehose-ndjson-sample \
  --s3-bucket <YOUR_S3_BUCKET> \
  --s3-prefix firehose-ndjson-sample/$(date +%Y/%m/%d/%H) \
  --capabilities CAPABILITY_NAMED_IAM

Testing

Ingest two JSON records to Firehose with the following command.

The data blob, which is base64-encoded when the blob is serialized. The maximum size of the data blob, before base64-encoding, is 1,000 KiB.

$ echo -n '{"user_id": 1, "message": "Hello"}' | base64
eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiSGVsbG8ifQ==

$ echo -n '{"user_id": 1, "message": "World"}' | base64
eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiV29ybGQifQ==

$ echo '{
    "DeliveryStreamName": "ndjson-firehose",
    "Records": [
        {"Data": "eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiSGVsbG8ifQ=="},
        {"Data": "eyJ1c2VyX2lkIjogMSwgIm1lc3NhZ2UiOiAiV29ybGQifQ=="}
    ]
}' > input.json

$ aws firehose put-record-batch --cli-input-json file://~/input.json
{
    "FailedPutCount": 0,
    "Encrypted": false,
    ...
}

Check objects in the S3 bucket with the following command. You should see the two JSON records separated by a newline - NDJSON.

$ aws s3 ls --recursive s3://ap-northeast-1-xxxxxxxxxxxx-ndjson-s3/success/
2022-07-26 23:52:47         69 success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751

$ aws s3 cp s3://ap-northeast-1-xxxxxxxxxxxx-ndjson-s3/success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751 ./
download: s3://ap-northeast-1-xxxxxxxxxxxx-ndjson-s3/success/user_id=1/year=2022/month=07/day=26/hour=14/ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751 to ./ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751

$ cat -n ndjson-firehose-4-2022-07-26-14-50-22-b95618c0-e518-3b66-b06c-693b059cc751
     1  {"user_id": 1, "message": "Hello"}
     2  {"user_id": 1, "message": "World"}

Cleaning Up

Clean up the provisioned AWS resources with the following command.

aws s3 rm --recursive s3://ap-northeast-1-xxxxxxxxxxxx-ndjson-s3/
aws cloudformation delete-stack --stack-name firehose-ndjson-sample
Takahiro Iwasa
(岩佐 孝浩)

Takahiro Iwasa (岩佐 孝浩)

Software Developer at iret, Inc.
Architecting and developing cloud native applications mainly with AWS. Japan AWS Top Engineers 2020-2023