OPC UA サーバーの変数ノードにおける値変更をサブスクライブする方法

OPC UA サーバーの変数ノードにおける値変更をサブスクライブする方法

Takahiro Iwasa
(岩佐 孝浩)
Takahiro Iwasa (岩佐 孝浩)
3 min read
OPC-UA Python

OPC UA サーバーの変数ノードで値変更をサブスクライブするには、 opcua-asyncio を使用できます。この機能により、「OPC UA クライアントが新しい値をポーリングする」必要がなくなります。

この投稿のサンプルは、 GitHub リポジトリから取得できます。

前提条件

以下のコマンドを使用して、 opcua-asyncio をインストールしてください。

pip install asyncua

OPC UA サーバー起動

以下の例を使用して、 server.py を作成してください。毎秒ランダムな整数値が、変数ノードに書き込まれる OPC UA テストサーバーが起動します。サーバーのエンドポイントは opc.tcp://localhost:4840 で、変数ノードの名前は MyObject/MyVariable です。

import asyncio
import random

from asyncua import Server

ENDPOINT = 'opc.tcp://localhost:4840'
NAMESPACE = 'http://examples.freeopcua.github.io'


async def main() -> None:
    # Start a server.
    server = Server()
    await server.init()
    server.set_endpoint(ENDPOINT)
    idx = await server.register_namespace(NAMESPACE)
    await server.start()
    print(f'Server started: {server}')

    # Create a node.
    myobj = await server.get_objects_node().add_object(idx, 'MyObject')
    myvar = await myobj.add_variable(idx, 'MyVariable', 1)
    await myvar.set_writable()

    # Write a new value every second.
    while True:
        await myvar.write_value(random.randint(1, 100))
        await asyncio.sleep(1)


if __name__ == '__main__':
    asyncio.run(main())

以下のコマンドを使用して、 OPC UA テストサーバーを起動してください。

$ python server.py
Server started: OPC UA Server(opc.tcp://localhost:4840)

OPC UA クライアント実行

以下の例を使用して、 client.py を作成してください。 OPC UA テストサーバーでデータの変更をサブスクライブし、 datachange_notification でデータをキューに入れ、 process で非同期に処理します。

import asyncio

from asyncua import Client, Node
from asyncua.common.subscription import DataChangeNotif, SubHandler

ENDPOINT = 'opc.tcp://localhost:4840'
NAMESPACE = 'http://examples.freeopcua.github.io'


class MyHandler(SubHandler):
    def __init__(self):
        self._queue = asyncio.Queue()

    def datachange_notification(self, node: Node, value, data: DataChangeNotif) -> None:
        self._queue.put_nowait([node, value, data])
        print(f'Data change notification was received and queued.')

    async def process(self) -> None:
        try:
            while True:
                # Get data in a queue.
                [node, value, data] = self._queue.get_nowait()
                path = await node.get_path(as_string=True)

                # *** Write your processing code ***

                print(f'New value {value} of "{path}" was processed.')

        except asyncio.QueueEmpty:
            pass


async def main() -> None:
    async with Client(url=ENDPOINT) as client:
        # Get a variable node.
        idx = await client.get_namespace_index(NAMESPACE)
        node = await client.get_objects_node().get_child([f'{idx}:MyObject', f'{idx}:MyVariable'])

        # Subscribe data change.
        handler = MyHandler()
        subscription = await client.create_subscription(period=0, handler=handler)
        await subscription.subscribe_data_change(node)

        # Process data change every 100ms
        while True:
            await handler.process()
            await asyncio.sleep(0.1)


if __name__ == '__main__':
    asyncio.run(main())

以下のコマンドを使用してクライアントを実行してください。次のようなデータ変更が表示されるはずです。

$ python client.py
Data change notification was received and queued.
New value 4 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.
Data change notification was received and queued.
New value 79 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.
Data change notification was received and queued.
New value 75 of "['0:Root', '0:Objects', '2:MyObject', '2:MyVariable']" was processed.
...
Takahiro Iwasa
(岩佐 孝浩)

Takahiro Iwasa (岩佐 孝浩)

Software Developer at iret, Inc.
主に AWS を利用したクラウドネイティブアプリケーションの設計および開発をしています。 Japan AWS Top Engineers 2020-2023