OPC-UA Connector¶
The OpcUaConnector reads real-time sensor data from OPC-UA-enabled PLCs and
SCADA systems. It subscribes to node value changes and normalises them into
Machina Alarm entities when configured thresholds are
exceeded.
Prerequisites¶
- An OPC-UA server accessible from the network (e.g. Siemens S7, Beckhoff, Prosys Simulation Server).
- Install the OPC-UA extra:
This installs asyncua.
Quick start¶
from machina.connectors.iot import OpcUaConnector
opcua = OpcUaConnector(
endpoint="opc.tcp://plc-line2:4840",
subscriptions=[
{
"node_id": "ns=2;s=Pump.P201.Vibration.DE",
"sampling_interval_ms": 1000,
"asset_id": "P-201",
"parameter": "vibration_velocity",
"threshold": 6.0,
"unit": "mm/s",
},
],
)
await opcua.connect()
async def on_alarm(alarm):
print(f"⚠ {alarm.asset_id}: {alarm.parameter}={alarm.value} {alarm.unit}")
sub = await opcua.subscribe(on_alarm)
YAML configuration¶
connectors:
sensors:
type: opcua
settings:
endpoint: opc.tcp://plc-line2:4840
security_mode: SignAndEncrypt
security_policy: Basic256Sha256
certificate: /certs/client.der
private_key: /certs/client_key.pem
subscriptions:
- node_id: "ns=2;s=Pump.P201.Vibration.DE"
sampling_interval_ms: 1000
asset_id: P-201
parameter: vibration_velocity
threshold: 6.0
unit: mm/s
- node_id: "ns=2;s=Pump.P201.Temperature"
sampling_interval_ms: 5000
asset_id: P-201
parameter: bearing_temperature
threshold: 80.0
unit: "°C"
severity: critical
Capabilities¶
| Capability | Description |
|---|---|
subscribe_to_nodes |
Subscribe to OPC-UA node value changes with alarm generation. |
read_node_value |
Read the current value of a single node on demand. |
read_node_values |
Read multiple node values in one call. |
browse_nodes |
Browse the OPC-UA address space from a given root. |
Reading values on demand¶
value = await opcua.read_value("ns=2;s=Pump.P201.Vibration.DE")
values = await opcua.read_values([
"ns=2;s=Pump.P201.Vibration.DE",
"ns=2;s=Pump.P201.Temperature",
])
Browsing the address space¶
nodes = await opcua.browse_nodes()
for node in nodes:
print(f"{node['browse_name']} ({node['node_class']}): {node['node_id']}")
Unsubscribing¶
Security modes¶
| Mode | Description |
|---|---|
None |
No security (suitable for isolated networks). |
Sign |
Messages are signed but not encrypted. |
SignAndEncrypt |
Messages are signed and encrypted (recommended for production). |
Certificate-based security
When using Sign or SignAndEncrypt, provide both certificate (DER or
PEM) and private_key (PEM) paths. The OPC-UA server must trust the
client certificate.
Alarm normalisation¶
When a subscribed node's value exceeds its configured threshold, an Alarm
entity is created:
| Alarm field | Source |
|---|---|
id |
Auto-generated (ALM-{uuid}) |
asset_id |
From subscription asset_id (falls back to node ID) |
severity |
From subscription severity (default WARNING) |
parameter |
From subscription parameter (falls back to node ID) |
value |
The new node value |
threshold |
From subscription config |
unit |
From subscription config |
source |
opcua://{endpoint}/{node_id} |
API reference¶
OpcUaConnector ¶
OpcUaConnector(*, endpoint: str = '', security_mode: str = 'None', security_policy: str = '', certificate: str = '', private_key: str = '', username: str = '', password: str = '', subscriptions: list[SubscriptionConfig | dict[str, Any]] | None = None, session_timeout: int = 30000)
Connector for OPC-UA servers.
Reads real-time sensor data from OPC-UA-enabled PLCs and SCADA
systems and maps value changes to Machina :class:~machina.domain.alarm.Alarm
entities.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
endpoint
|
str
|
OPC-UA server URL (e.g. |
''
|
security_mode
|
str
|
OPC-UA security mode ( |
'None'
|
security_policy
|
str
|
Security policy URI (e.g. |
''
|
certificate
|
str
|
Path to client X.509 certificate file. |
''
|
private_key
|
str
|
Path to client private key file. |
''
|
username
|
str
|
Username for user-token authentication. |
''
|
password
|
str
|
Password for user-token authentication. |
''
|
subscriptions
|
list[SubscriptionConfig | dict[str, Any]] | None
|
Pre-configured node subscriptions. |
None
|
session_timeout
|
int
|
Session timeout in milliseconds. |
30000
|
Example
from machina.connectors.iot import OpcUaConnector
opcua = OpcUaConnector(
endpoint="opc.tcp://plc-line2:4840",
security_mode="SignAndEncrypt",
subscriptions=[
{"node_id": "ns=2;s=Pump.P201.Vibration.DE",
"sampling_interval_ms": 1000,
"asset_id": "P-201",
"parameter": "vibration_velocity",
"threshold": 6.0, "unit": "mm/s"},
],
)
await opcua.connect()
async def on_alarm(alarm):
print(f"Alarm: {alarm.parameter}={alarm.value} {alarm.unit}")
sub = await opcua.subscribe(on_alarm)
connect
async
¶
Connect to the OPC-UA server.
Raises:
| Type | Description |
|---|---|
ConnectorError
|
If |
ConnectorAuthError
|
If authentication/security negotiation fails. |
ImportError
|
If |
disconnect
async
¶
Disconnect from the OPC-UA server and cancel all subscriptions.
health_check
async
¶
Check OPC-UA server connectivity.
Reads the server status node to verify the connection is alive.
subscribe
async
¶
subscribe(callback: AlarmCallback, *, configs: list[SubscriptionConfig | dict[str, Any]] | None = None) -> Subscription
Subscribe to OPC-UA node value changes.
Creates a subscription for each configured node. When a value change
exceeds the configured threshold, an :class:Alarm is created and
passed to the callback.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
AlarmCallback
|
Async function called with each :class: |
required |
configs
|
list[SubscriptionConfig | dict[str, Any]] | None
|
Optional subscription configs; defaults to the configs passed at construction time. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
A |
Subscription
|
class: |
Raises:
| Type | Description |
|---|---|
ConnectorError
|
If not connected. |
unsubscribe
async
¶
Cancel an active subscription.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
subscription
|
Subscription
|
The subscription handle returned by :meth: |
required |
read_value
async
¶
Read the current value of a single OPC-UA node.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node_id
|
str
|
OPC-UA node identifier. |
required |
Returns:
| Type | Description |
|---|---|
Any
|
The current node value. |
Raises:
| Type | Description |
|---|---|
ConnectorError
|
If not connected or read fails. |
read_values
async
¶
Read current values of multiple OPC-UA nodes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
node_ids
|
list[str]
|
List of OPC-UA node identifiers. |
required |
Returns:
| Type | Description |
|---|---|
dict[str, Any]
|
Mapping of node ID to its current value. |
Raises:
| Type | Description |
|---|---|
ConnectorError
|
If not connected or any read fails. |
browse_nodes
async
¶
Browse child nodes of the given root.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
root_node_id
|
str
|
Starting node; defaults to the server's Objects folder. |
''
|
Returns:
| Type | Description |
|---|---|
list[dict[str, str]]
|
List of dicts with |
Raises:
| Type | Description |
|---|---|
ConnectorError
|
If not connected or browse fails. |