Skip to content

MQTT Connector

The MqttConnector ingests IoT sensor data from an MQTT broker and normalises incoming messages into Machina Alarm entities. It supports JSON payloads, Sparkplug B (JSON representation), and raw numeric values.

Prerequisites

  • An MQTT broker (e.g. Mosquitto, HiveMQ, EMQX).
  • Install the MQTT extra:
pip install machina-ai[mqtt]

This installs aiomqtt.

Quick start

from machina.connectors.iot import MqttConnector

mqtt = MqttConnector(
    broker="mqtt.example.com",
    port=1883,
    topics=[
        {
            "topic": "plant/sensors/pump-201/vibration",
            "asset_id": "P-201",
            "parameter": "vibration_velocity",
            "threshold": 6.0,
            "unit": "mm/s",
            "payload_format": "json",
            "value_path": "data.value",
        },
    ],
)
await mqtt.connect()

async def on_alarm(alarm):
    print(f"⚠ {alarm.asset_id}: {alarm.parameter}={alarm.value} {alarm.unit}")

sub = await mqtt.subscribe(on_alarm)

YAML configuration

connectors:
  iot:
    type: mqtt
    settings:
      broker: mqtt.example.com
      port: 1883
      username: ${MQTT_USER}
      password: ${MQTT_PASS}
      tls: false
      topics:
        - topic: "plant/sensors/pump-201/vibration"
          asset_id: P-201
          parameter: vibration_velocity
          threshold: 6.0
          unit: mm/s
          payload_format: json
          value_path: data.value
        - topic: "spBv1.0/Plant/DDATA/+/+"
          payload_format: sparkplug_b
          parameter: temperature
          threshold: 80.0
          unit: "°C"
          severity: critical

Capabilities

Capability Description
subscribe_to_topics Subscribe to MQTT topics with alarm generation.
publish_message Publish a message to any MQTT topic.

Publishing messages

await mqtt.publish("plant/commands/pump-201", '{"action": "stop"}', qos=1)

Unsubscribing

await mqtt.unsubscribe(sub)
await mqtt.disconnect()

Payload formats

JSON (default)

The most common format. Use value_path to specify where the numeric value lives in the JSON object (dot notation).

{"data": {"value": 7.8, "unit": "mm/s", "ts": "2026-04-11T10:00:00Z"}}
payload_format: json
value_path: data.value

Sparkplug B

Sparkplug B is a standard for industrial MQTT. This connector supports the JSON representation used by MQTT-to-JSON bridges (e.g. Ignition, HiveMQ Sparkplug extension):

{
  "metrics": [
    {"name": "temperature", "value": 72.5, "type": "Float"},
    {"name": "vibration", "value": 4.2, "type": "Float"}
  ]
}

The parameter field in the topic config is matched against metric.name.

payload_format: sparkplug_b
parameter: temperature

Protobuf payloads

Native Sparkplug B protobuf payloads are not yet supported. Use an MQTT-to-JSON bridge or configure your gateway to publish JSON-encoded Sparkplug messages.

Raw

For simple sensors that publish a plain numeric string:

7.8
payload_format: raw

MQTT wildcards

Topic filters support standard MQTT wildcards:

Wildcard Description Example
+ Matches exactly one topic level. plant/+/vibration
# Matches any number of levels (must be last). plant/sensors/#

TLS / SSL

mqtt = MqttConnector(
    broker="mqtt.example.com",
    port=8883,
    tls=True,
    ca_certs="/path/to/ca.crt",
    username="device01",
    password="secret",
)

Alarm normalisation

When a message's extracted value exceeds the configured threshold, an Alarm entity is created:

Alarm field Source
id Auto-generated (ALM-{uuid})
asset_id From topic config asset_id (falls back to topic string)
severity From topic config severity (default WARNING)
parameter From topic config parameter (falls back to topic string)
value Parsed from payload
threshold From topic config
unit From topic config
source mqtt://{broker}/{topic}

API reference

MqttConnector

MqttConnector(*, broker: str = '', port: int = 1883, username: str = '', password: str = '', client_id: str = '', tls: bool = False, ca_certs: str = '', topics: list[TopicConfig | dict[str, Any]] | None = None, keepalive: int = 60)

Connector for MQTT brokers.

Ingests IoT sensor data from MQTT topics and maps messages to Machina :class:~machina.domain.alarm.Alarm entities. Supports JSON payloads, Sparkplug B format, and raw numeric values.

Parameters:

Name Type Description Default
broker str

MQTT broker hostname or IP.

''
port int

Broker port (default 1883, use 8883 for TLS).

1883
username str

Username for broker authentication.

''
password str

Password for broker authentication.

''
client_id str

MQTT client identifier; auto-generated if empty.

''
tls bool

Whether to use TLS for the connection.

False
ca_certs str

Path to CA certificate file for TLS verification.

''
topics list[TopicConfig | dict[str, Any]] | None

Pre-configured topic subscriptions.

None
keepalive int

Keep-alive interval in seconds.

60
Example
from machina.connectors.iot import MqttConnector

mqtt = MqttConnector(
    broker="mqtt.example.com",
    port=1883,
    topics=[
        {"topic": "plant/sensors/pump-201/vibration",
         "asset_id": "P-201",
         "parameter": "vibration_velocity",
         "threshold": 6.0, "unit": "mm/s",
         "payload_format": "json", "value_path": "data.value"},
    ],
)
await mqtt.connect()

async def on_alarm(alarm):
    print(f"Alarm: {alarm.parameter}={alarm.value} {alarm.unit}")

sub = await mqtt.subscribe(on_alarm)

connect async

connect() -> None

Connect to the MQTT broker.

Raises:

Type Description
ConnectorError

If broker is empty or connection fails.

ConnectorAuthError

If authentication fails.

ImportError

If aiomqtt is not installed.

disconnect async

disconnect() -> None

Disconnect from the MQTT broker and cancel all subscriptions.

health_check async

health_check() -> ConnectorHealth

Check MQTT broker connectivity.

subscribe async

subscribe(callback: AlarmCallback, *, configs: list[TopicConfig | dict[str, Any]] | None = None) -> MqttSubscription

Subscribe to MQTT topics and process incoming messages.

For each message that exceeds a configured threshold an :class:Alarm is passed to callback.

Multiple calls to subscribe() are supported — a single background reader fans out each message to all registered subscriptions.

Parameters:

Name Type Description Default
callback AlarmCallback

Async function called with each :class:Alarm.

required
configs list[TopicConfig | dict[str, Any]] | None

Optional topic configs; defaults to the configs passed at construction time.

None

Returns:

Name Type Description
An MqttSubscription

class:MqttSubscription handle for later cancellation.

Raises:

Type Description
ConnectorError

If not connected or no topics configured.

unsubscribe async

unsubscribe(subscription: MqttSubscription) -> None

Cancel an active MQTT subscription.

Parameters:

Name Type Description Default
subscription MqttSubscription

The handle returned by :meth:subscribe.

required

publish async

publish(topic: str, payload: str | bytes, *, qos: int = 0) -> None

Publish a message to an MQTT topic.

Parameters:

Name Type Description Default
topic str

The MQTT topic.

required
payload str | bytes

Message payload (string or bytes).

required
qos int

Quality of Service level (0, 1, or 2).

0

Raises:

Type Description
ConnectorError

If not connected or publish fails.