MQTT Connector¶
The MqttConnector ingests IoT sensor data from an MQTT broker and normalises
incoming messages into Machina Alarm entities. It supports
JSON payloads, Sparkplug B (JSON representation), and raw numeric values.
Prerequisites¶
- An MQTT broker (e.g. Mosquitto, HiveMQ, EMQX).
- Install the MQTT extra:
This installs aiomqtt.
Quick start¶
from machina.connectors.iot import MqttConnector
mqtt = MqttConnector(
broker="mqtt.example.com",
port=1883,
topics=[
{
"topic": "plant/sensors/pump-201/vibration",
"asset_id": "P-201",
"parameter": "vibration_velocity",
"threshold": 6.0,
"unit": "mm/s",
"payload_format": "json",
"value_path": "data.value",
},
],
)
await mqtt.connect()
async def on_alarm(alarm):
print(f"⚠ {alarm.asset_id}: {alarm.parameter}={alarm.value} {alarm.unit}")
sub = await mqtt.subscribe(on_alarm)
YAML configuration¶
connectors:
iot:
type: mqtt
settings:
broker: mqtt.example.com
port: 1883
username: ${MQTT_USER}
password: ${MQTT_PASS}
tls: false
topics:
- topic: "plant/sensors/pump-201/vibration"
asset_id: P-201
parameter: vibration_velocity
threshold: 6.0
unit: mm/s
payload_format: json
value_path: data.value
- topic: "spBv1.0/Plant/DDATA/+/+"
payload_format: sparkplug_b
parameter: temperature
threshold: 80.0
unit: "°C"
severity: critical
Capabilities¶
| Capability | Description |
|---|---|
subscribe_to_topics |
Subscribe to MQTT topics with alarm generation. |
publish_message |
Publish a message to any MQTT topic. |
Publishing messages¶
Unsubscribing¶
Payload formats¶
JSON (default)¶
The most common format. Use value_path to specify where the numeric
value lives in the JSON object (dot notation).
Sparkplug B¶
Sparkplug B is a standard for industrial MQTT. This connector supports the JSON representation used by MQTT-to-JSON bridges (e.g. Ignition, HiveMQ Sparkplug extension):
{
"metrics": [
{"name": "temperature", "value": 72.5, "type": "Float"},
{"name": "vibration", "value": 4.2, "type": "Float"}
]
}
The parameter field in the topic config is matched against
metric.name.
Protobuf payloads
Native Sparkplug B protobuf payloads are not yet supported. Use an MQTT-to-JSON bridge or configure your gateway to publish JSON-encoded Sparkplug messages.
Raw¶
For simple sensors that publish a plain numeric string:
MQTT wildcards¶
Topic filters support standard MQTT wildcards:
| Wildcard | Description | Example |
|---|---|---|
+ |
Matches exactly one topic level. | plant/+/vibration |
# |
Matches any number of levels (must be last). | plant/sensors/# |
TLS / SSL¶
mqtt = MqttConnector(
broker="mqtt.example.com",
port=8883,
tls=True,
ca_certs="/path/to/ca.crt",
username="device01",
password="secret",
)
Alarm normalisation¶
When a message's extracted value exceeds the configured threshold, an
Alarm entity is created:
| Alarm field | Source |
|---|---|
id |
Auto-generated (ALM-{uuid}) |
asset_id |
From topic config asset_id (falls back to topic string) |
severity |
From topic config severity (default WARNING) |
parameter |
From topic config parameter (falls back to topic string) |
value |
Parsed from payload |
threshold |
From topic config |
unit |
From topic config |
source |
mqtt://{broker}/{topic} |
API reference¶
MqttConnector ¶
MqttConnector(*, broker: str = '', port: int = 1883, username: str = '', password: str = '', client_id: str = '', tls: bool = False, ca_certs: str = '', topics: list[TopicConfig | dict[str, Any]] | None = None, keepalive: int = 60)
Connector for MQTT brokers.
Ingests IoT sensor data from MQTT topics and maps messages to
Machina :class:~machina.domain.alarm.Alarm entities. Supports
JSON payloads, Sparkplug B format, and raw numeric values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
broker
|
str
|
MQTT broker hostname or IP. |
''
|
port
|
int
|
Broker port (default 1883, use 8883 for TLS). |
1883
|
username
|
str
|
Username for broker authentication. |
''
|
password
|
str
|
Password for broker authentication. |
''
|
client_id
|
str
|
MQTT client identifier; auto-generated if empty. |
''
|
tls
|
bool
|
Whether to use TLS for the connection. |
False
|
ca_certs
|
str
|
Path to CA certificate file for TLS verification. |
''
|
topics
|
list[TopicConfig | dict[str, Any]] | None
|
Pre-configured topic subscriptions. |
None
|
keepalive
|
int
|
Keep-alive interval in seconds. |
60
|
Example
from machina.connectors.iot import MqttConnector
mqtt = MqttConnector(
broker="mqtt.example.com",
port=1883,
topics=[
{"topic": "plant/sensors/pump-201/vibration",
"asset_id": "P-201",
"parameter": "vibration_velocity",
"threshold": 6.0, "unit": "mm/s",
"payload_format": "json", "value_path": "data.value"},
],
)
await mqtt.connect()
async def on_alarm(alarm):
print(f"Alarm: {alarm.parameter}={alarm.value} {alarm.unit}")
sub = await mqtt.subscribe(on_alarm)
connect
async
¶
Connect to the MQTT broker.
Raises:
| Type | Description |
|---|---|
ConnectorError
|
If |
ConnectorAuthError
|
If authentication fails. |
ImportError
|
If |
disconnect
async
¶
Disconnect from the MQTT broker and cancel all subscriptions.
subscribe
async
¶
subscribe(callback: AlarmCallback, *, configs: list[TopicConfig | dict[str, Any]] | None = None) -> MqttSubscription
Subscribe to MQTT topics and process incoming messages.
For each message that exceeds a configured threshold an
:class:Alarm is passed to callback.
Multiple calls to subscribe() are supported — a single
background reader fans out each message to all registered
subscriptions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
callback
|
AlarmCallback
|
Async function called with each :class: |
required |
configs
|
list[TopicConfig | dict[str, Any]] | None
|
Optional topic configs; defaults to the configs passed at construction time. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
An |
MqttSubscription
|
class: |
Raises:
| Type | Description |
|---|---|
ConnectorError
|
If not connected or no topics configured. |
unsubscribe
async
¶
Cancel an active MQTT subscription.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
subscription
|
MqttSubscription
|
The handle returned by :meth: |
required |
publish
async
¶
Publish a message to an MQTT topic.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
topic
|
str
|
The MQTT topic. |
required |
payload
|
str | bytes
|
Message payload (string or bytes). |
required |
qos
|
int
|
Quality of Service level (0, 1, or 2). |
0
|
Raises:
| Type | Description |
|---|---|
ConnectorError
|
If not connected or publish fails. |