Skip to content

OPC-UA Connector

The OpcUaConnector reads real-time sensor data from OPC-UA-enabled PLCs and SCADA systems. It subscribes to node value changes and normalises them into Machina Alarm entities when configured thresholds are exceeded.

Prerequisites

  • An OPC-UA server accessible from the network (e.g. Siemens S7, Beckhoff, Prosys Simulation Server).
  • Install the OPC-UA extra:
pip install machina-ai[opcua]

This installs asyncua.

Quick start

from machina.connectors.iot import OpcUaConnector

opcua = OpcUaConnector(
    endpoint="opc.tcp://plc-line2:4840",
    subscriptions=[
        {
            "node_id": "ns=2;s=Pump.P201.Vibration.DE",
            "sampling_interval_ms": 1000,
            "asset_id": "P-201",
            "parameter": "vibration_velocity",
            "threshold": 6.0,
            "unit": "mm/s",
        },
    ],
)
await opcua.connect()

async def on_alarm(alarm):
    print(f"⚠ {alarm.asset_id}: {alarm.parameter}={alarm.value} {alarm.unit}")

sub = await opcua.subscribe(on_alarm)

YAML configuration

connectors:
  sensors:
    type: opcua
    settings:
      endpoint: opc.tcp://plc-line2:4840
      security_mode: SignAndEncrypt
      security_policy: Basic256Sha256
      certificate: /certs/client.der
      private_key: /certs/client_key.pem
      subscriptions:
        - node_id: "ns=2;s=Pump.P201.Vibration.DE"
          sampling_interval_ms: 1000
          asset_id: P-201
          parameter: vibration_velocity
          threshold: 6.0
          unit: mm/s
        - node_id: "ns=2;s=Pump.P201.Temperature"
          sampling_interval_ms: 5000
          asset_id: P-201
          parameter: bearing_temperature
          threshold: 80.0
          unit: "°C"
          severity: critical

Capabilities

Capability Description
subscribe_to_nodes Subscribe to OPC-UA node value changes with alarm generation.
read_node_value Read the current value of a single node on demand.
read_node_values Read multiple node values in one call.
browse_nodes Browse the OPC-UA address space from a given root.

Reading values on demand

value = await opcua.read_value("ns=2;s=Pump.P201.Vibration.DE")

values = await opcua.read_values([
    "ns=2;s=Pump.P201.Vibration.DE",
    "ns=2;s=Pump.P201.Temperature",
])

Browsing the address space

nodes = await opcua.browse_nodes()
for node in nodes:
    print(f"{node['browse_name']} ({node['node_class']}): {node['node_id']}")

Unsubscribing

await opcua.unsubscribe(sub)
await opcua.disconnect()

Security modes

Mode Description
None No security (suitable for isolated networks).
Sign Messages are signed but not encrypted.
SignAndEncrypt Messages are signed and encrypted (recommended for production).

Certificate-based security

When using Sign or SignAndEncrypt, provide both certificate (DER or PEM) and private_key (PEM) paths. The OPC-UA server must trust the client certificate.

Alarm normalisation

When a subscribed node's value exceeds its configured threshold, an Alarm entity is created:

Alarm field Source
id Auto-generated (ALM-{uuid})
asset_id From subscription asset_id (falls back to node ID)
severity From subscription severity (default WARNING)
parameter From subscription parameter (falls back to node ID)
value The new node value
threshold From subscription config
unit From subscription config
source opcua://{endpoint}/{node_id}

API reference

OpcUaConnector

OpcUaConnector(*, endpoint: str = '', security_mode: str = 'None', security_policy: str = '', certificate: str = '', private_key: str = '', username: str = '', password: str = '', subscriptions: list[SubscriptionConfig | dict[str, Any]] | None = None, session_timeout: int = 30000)

Connector for OPC-UA servers.

Reads real-time sensor data from OPC-UA-enabled PLCs and SCADA systems and maps value changes to Machina :class:~machina.domain.alarm.Alarm entities.

Parameters:

Name Type Description Default
endpoint str

OPC-UA server URL (e.g. "opc.tcp://plc-line2:4840").

''
security_mode str

OPC-UA security mode ("None", "Sign", "SignAndEncrypt").

'None'
security_policy str

Security policy URI (e.g. "Basic256Sha256").

''
certificate str

Path to client X.509 certificate file.

''
private_key str

Path to client private key file.

''
username str

Username for user-token authentication.

''
password str

Password for user-token authentication.

''
subscriptions list[SubscriptionConfig | dict[str, Any]] | None

Pre-configured node subscriptions.

None
session_timeout int

Session timeout in milliseconds.

30000
Example
from machina.connectors.iot import OpcUaConnector

opcua = OpcUaConnector(
    endpoint="opc.tcp://plc-line2:4840",
    security_mode="SignAndEncrypt",
    subscriptions=[
        {"node_id": "ns=2;s=Pump.P201.Vibration.DE",
         "sampling_interval_ms": 1000,
         "asset_id": "P-201",
         "parameter": "vibration_velocity",
         "threshold": 6.0, "unit": "mm/s"},
    ],
)
await opcua.connect()

async def on_alarm(alarm):
    print(f"Alarm: {alarm.parameter}={alarm.value} {alarm.unit}")

sub = await opcua.subscribe(on_alarm)

connect async

connect() -> None

Connect to the OPC-UA server.

Raises:

Type Description
ConnectorError

If endpoint is empty or connection fails.

ConnectorAuthError

If authentication/security negotiation fails.

ImportError

If asyncua is not installed.

disconnect async

disconnect() -> None

Disconnect from the OPC-UA server and cancel all subscriptions.

health_check async

health_check() -> ConnectorHealth

Check OPC-UA server connectivity.

Reads the server status node to verify the connection is alive.

subscribe async

subscribe(callback: AlarmCallback, *, configs: list[SubscriptionConfig | dict[str, Any]] | None = None) -> Subscription

Subscribe to OPC-UA node value changes.

Creates a subscription for each configured node. When a value change exceeds the configured threshold, an :class:Alarm is created and passed to the callback.

Parameters:

Name Type Description Default
callback AlarmCallback

Async function called with each :class:Alarm.

required
configs list[SubscriptionConfig | dict[str, Any]] | None

Optional subscription configs; defaults to the configs passed at construction time.

None

Returns:

Name Type Description
A Subscription

class:Subscription handle for later cancellation.

Raises:

Type Description
ConnectorError

If not connected.

unsubscribe async

unsubscribe(subscription: Subscription) -> None

Cancel an active subscription.

Parameters:

Name Type Description Default
subscription Subscription

The subscription handle returned by :meth:subscribe.

required

read_value async

read_value(node_id: str) -> Any

Read the current value of a single OPC-UA node.

Parameters:

Name Type Description Default
node_id str

OPC-UA node identifier.

required

Returns:

Type Description
Any

The current node value.

Raises:

Type Description
ConnectorError

If not connected or read fails.

read_values async

read_values(node_ids: list[str]) -> dict[str, Any]

Read current values of multiple OPC-UA nodes.

Parameters:

Name Type Description Default
node_ids list[str]

List of OPC-UA node identifiers.

required

Returns:

Type Description
dict[str, Any]

Mapping of node ID to its current value.

Raises:

Type Description
ConnectorError

If not connected or any read fails.

browse_nodes async

browse_nodes(root_node_id: str = '') -> list[dict[str, str]]

Browse child nodes of the given root.

Parameters:

Name Type Description Default
root_node_id str

Starting node; defaults to the server's Objects folder.

''

Returns:

Type Description
list[dict[str, str]]

List of dicts with node_id, browse_name, and node_class.

Raises:

Type Description
ConnectorError

If not connected or browse fails.