SiteWise Edge Gateway を介して OPC UA データを Kinesis にストリームする方法

SiteWise Edge Gateway を介して OPC UA データを Kinesis にストリームする方法

Takahiro Iwasa
(岩佐 孝浩)
Takahiro Iwasa (岩佐 孝浩)
6 min read
Greengrass IoT Kinesis OPC-UA SiteWise

この投稿では、 OPC UA データを SiteWise Edge Gateway を介して Kinesis Data Streams にストリームする方法について説明します。

概要

この投稿では、 EC2 インスタンスをダミーの OPC UA サーバーとして利用します。

AWS リソース作成

OPC UA Server セットアップ

opcua-asyncio は、ダミーの OPC UA サーバーに使用できます。 EC2 インスタンスにサンプルの Python スクリプト(examples/server-minimal.py)をダウンロードし、次のコマンドでスクリプトを実行してください。

pip install asyncua
curl -OL https://raw.githubusercontent.com/FreeOpcUa/opcua-asyncio/master/examples/server-minimal.py
python server-minimal.py

Greengrass V2 Core Device セットアップ

公式ドキュメントに従って、 Greengrass V2 コアデバイスをセットアップしてください。この投稿ではインストールの詳細な説明は行いません。

セットアップ後、以下の手順も行ってください。

  • aws.greengrass.StreamManager コンポーネントのデプロイ
  • Kinesis Data Streams にデータを送信するための Token Exchange Role に以下のIAMポリシー追加
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "kinesis:PutRecord",
                "kinesis:PutRecords"
            ],
            "Resource": "arn:aws:kinesis:*:$YOUR_AWS_ACCOUNT_ID:stream/$KINESIS_DATA_STREAM_NAME"
        }
    ]
}

SiteWise Edge Gateway セットアップ

以下に従って、 SiteWise Edge Gateway をセットアップしてください。

Data processing pack は不要です。

Publisher 設定はスキップしてください。

Add data source をクリックしてください.

local endpoint は OPC UA エンドポイントです。 Greengrass stream name はカスタムの Greengrass コンポーネントで使用されます。以下の例では、 SiteWise_Stream_Kinesis を使用しています。

レビュー後、 Create をクリックしてください。

Kinesis Data Streams

Kinesis Data Stream を作成してください。

Kinesis Data Firehose

以下の設定で Kinesis Data Firehose を作成してください。

  • Source: 上で作成した Kinesis Data Stream
  • Destination: データを永続化するための S3 バケット。 NDJSON を出力するために Dynamic Partitioning を使用できます。こちらの投稿もご参考ください。

Greengrass Component 作成

Greengrass Stream のデータを Kinesis Data Streams にストリーミングするには、 Greengrass コンポーネントを作成する必要があります。 Greengrass コンポーネントの開発に関する情報は、公式ドキュメントをご参照ください。

ディレクトリ構成

/
├── kinesis_data_stream.py
├── recipe.yaml
├── requirements.txt
└── stream_manager_sdk.zip

requirements.txt

requirements.txt を作成し、 pip install -r requirements.txt を実行してください。 stream-manager は AWS 公式ライブラリです。

cbor2~=5.4.2
stream-manager==1.1.1

Python スクリプト

以下のコードで kinesis_data_stream.py を作成してください。

"""
Script to use Greengrass Stream Manager to stream data to a Kinesis Data Stream
See also https://github.com/aws-greengrass/aws-greengrass-stream-manager-sdk-python/blob/main/samples/stream_manager_kinesis.py
"""

import argparse
import asyncio
import logging
import time

from stream_manager import (
    ExportDefinition,
    KinesisConfig,
    MessageStreamDefinition,
    ReadMessagesOptions,
    ResourceNotFoundException,
    StrategyOnFull,
    StreamManagerClient,
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger()


def main(
        stream_name: str,
        kinesis_stream_name: str,
        batch_size: int = None
):
    try:
        # Create a client for the StreamManager
        client = StreamManagerClient()

        # Try deleting the stream (if it exists) so that we have a fresh start
        try:
            client.delete_message_stream(stream_name=stream_name)
        except ResourceNotFoundException:
            pass

        exports = ExportDefinition(
            kinesis=[KinesisConfig(
                identifier="KinesisExport" + stream_name,
                kinesis_stream_name=kinesis_stream_name,
                batch_size=batch_size,
            )]
        )
        client.create_message_stream(
            MessageStreamDefinition(
                name=stream_name,
                strategy_on_full=StrategyOnFull.OverwriteOldestData,
                export_definition=exports
            )
        )

        while True:
            time.sleep(1)

    except asyncio.TimeoutError:
        logger.exception("Timed out while executing")
    except Exception:
        logger.exception("Exception while running")
    finally:
        # Always close the client to avoid resource leaks
        if client:
            client.close()


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser()
    parser.add_argument('--greengrass-stream', required=True, default='SiteWise_Stream_Kinesis')
    parser.add_argument('--kinesis-stream', required=True)
    parser.add_argument('--batch-size', required=False, type=int, default=500)
    return parser.parse_args()


if __name__ == '__main__':
    args = parse_args()
    logger.info(f'args: {args.__dict__}')
    main(args.greengrass_stream, args.kinesis_stream, args.batch_size)

Recipe (recipe.yaml)

以下の内容で recipe.yaml を作成してください。この例では、コンポーネントの名前は jp.co.xyz.StreamManagerKinesis です。

# Replace $ArtifactsS3Bucket with your value to complete component registration.

RecipeFormatVersion: 2020-01-25

ComponentName: jp.co.xyz.StreamManagerKinesis
ComponentVersion: 1.0.0
ComponentDescription: Streams data in Greengrass stream to a Kinesis Data Stream.
ComponentPublisher: self
ComponentDependencies:
  aws.greengrass.StreamManager:
    VersionRequirement: '^2.0.0'
ComponentConfiguration:
  DefaultConfiguration:
    GreengrassStream: SiteWise_Stream_Kinesis
    KinesisStream: ''
    BatchSize: 100 # minimum 1, maximum 500

Manifests:
  - Platform:
      os: linux
    Lifecycle:
      Install: pip3 install --user -r {artifacts:decompressedPath}/component/requirements.txt
      Run: |
        export PYTHONPATH=$PYTHONPATH:{artifacts:decompressedPath}/stream_manager_sdk
        python3 {artifacts:decompressedPath}/component/kinesis_data_stream.py \
          --greengrass-stream {configuration:/GreengrassStream} \
          --kinesis-stream {configuration:/KinesisStream} \
          --batch-size {configuration:/BatchSize}
    Artifacts:
      - URI: s3://$ArtifactsS3Bucket/artifacts/jp.co.xyz.StreamManagerKinesis/1.0.0/component.zip
        Unarchive: ZIP

このレシピには ComponentConfiguration 内の以下の設定が含まれています。

名前説明
GreengrassStreamGreengrass ストリームの名前
KinesisStreamOPC UA データを受信する Kinesis Data Stream の名前
BatchSize送信されるバッチサイズ

Component Artifact アップロード

コンポーネントのファイルをアーカイブし、以下のコマンドを使用して S3 バケットにアップロードしてください。

S3_BUCKET=$YOUR_BUCKET_NAME
VERSION=1.0.0

zip component.zip kinesis_data_stream.py requirements.txt
aws s3 cp component.zip s3://$S3_BUCKET/artifacts/jp.co.xyz.StreamManagerKinesis/$VERSION/
rm component.zip

Component 登録

上記の recipe.yaml の内容をコピーし、 $ArtifactsS3Bucket を、コンポーネントのアーティファクトを含む実際の値に置き換えてください。

Component デプロイ

Deploy をクリックしてください。

Configure component をクリックしてください。

以下の例のように、 Greengrass コンポーネントの構成を更新してください。 KinesisStream は上で作成した Kinesis Data Stream の名前です。

確認後、コンポーネントをデプロイしてください。

テスト

以下のコマンドで S3 オブジェクトを確認してください。

aws s3 cp s3://$YOUR_S3_BUCKET/... ./

以下のようなデータが確認できるはずです。

{
  "propertyAlias": "/MyObject/MyVariable",
  "propertyValues": [
    {
      "value": {
        "doubleValue": 7.699999999999997
      },
      "timestamp": {
        "timeInSeconds": 1661581962,
        "offsetInNanos": 9000000
      },
      "quality": "GOOD"
    },
...
Takahiro Iwasa
(岩佐 孝浩)

Takahiro Iwasa (岩佐 孝浩)

Software Developer at iret, Inc.
主に AWS を利用したクラウドネイティブアプリケーションの設計および開発をしています。 Japan AWS Top Engineers 2020-2023