Skip to content

Custom Connectors

Machina's connector layer is designed to be trivial to extend. Most CMMS integrations can be expressed as configuration of the built-in GenericCmmsConnector; only exotic protocols (SOAP, OData, WebSockets) or custom business logic call for writing a new class from scratch.

This page walks through both paths:

  • Path A — Configure GenericCmmsConnector for any REST-based CMMS, picking an authentication strategy, a pagination strategy, and an optional JMESPath field mapping.
  • Path B — Write a custom connector by satisfying the BaseConnector Protocol.

Both paths end at the same place: a connector object you can hand to an Agent.

Which path do I pick?

Your CMMS… Use
Has a REST + JSON API, any auth + pagination style Path A (configure GenericCmmsConnector)
Returns deeply nested response payloads Path A with _fields JMESPath mapping
Uses OData (SAP PM), SOAP, gRPC, or WebSockets Path B (custom class)
Requires custom business logic (rate limiting, caching, batch writes) Path B
Needs to integrate two systems behind one connector Path B

Start with Path A. Drop down to Path B only when configuration can't express what you need.

Path A — Configure GenericCmmsConnector

The five capabilities

GenericCmmsConnector declares five CMMS capabilities that the agent runtime discovers at startup:

Capability What it reads/writes
read_assets Returns all known assets (paginated if configured)
read_work_orders Returns work orders, optionally filtered by asset_id / status
create_work_order POSTs a new work order to the CMMS
read_spare_parts Returns spare parts, optionally filtered by asset_id / sku
read_maintenance_history Returns completed work orders for a given asset

Authentication strategies

Pick the one that matches your CMMS:

from machina.connectors.cmms import (
    BearerAuth,
    BasicAuth,
    ApiKeyHeaderAuth,
    NoAuth,
)

# Bearer token (default for most modern CMMS — UpKeep, MaintainX, Limble)
auth = BearerAuth(token="eyJhbGci...")

# HTTP Basic auth (older/on-prem deployments)
auth = BasicAuth(username="svc", password="...")

# API key in a custom header (e.g. X-API-Key)
auth = ApiKeyHeaderAuth(header_name="X-API-Key", value="k-123")

# No auth — public or intranet-only endpoints
auth = NoAuth()

For backwards compatibility, passing api_key="..." to the constructor is still accepted and is equivalent to auth=BearerAuth(token=api_key).

Pagination strategies

Pick the one that matches your CMMS's list endpoints:

from machina.connectors.cmms import (
    NoPagination,
    OffsetLimitPagination,
    PageNumberPagination,
    CursorPagination,
)

# Single-shot GET — response is the full list (default)
pagination = NoPagination()

# ?offset=X&limit=Y style (UpKeep, Limble, and most REST CMMS)
pagination = OffsetLimitPagination(
    limit_param="limit",
    offset_param="offset",
    page_size=100,
)

# ?page=N&per_page=M style (GitHub-style APIs)
pagination = PageNumberPagination(
    page_param="page",
    size_param="per_page",
    page_size=50,
    start_page=1,  # use 0 if your API is zero-indexed
)

# Opaque cursor token from the response
pagination = CursorPagination(
    cursor_param="cursor",
    cursor_response_path="next_cursor",  # JMESPath
    items_path="items",                  # JMESPath
)

All strategies accept an optional items_path (JMESPath) for extracting the list from a wrapped response like {"data": [...], "meta": {...}}. CursorPagination requires it because it always walks a wrapped object.

Schema mapping

The schema_mapping parameter lets you bridge differences between what the CMMS returns and what Machina expects, in two flavours:

Flat rename — top-level key rewrites:

cmms = GenericCmmsConnector(
    url="https://cmms.example.com/api",
    auth=BearerAuth(token="..."),
    schema_mapping={
        "assets": {"asset_id": "id", "display_name": "name"},
    },
)

JMESPath extraction — for deeply nested response items:

cmms = GenericCmmsConnector(
    url="https://cmms.example.com/api",
    auth=BearerAuth(token="..."),
    schema_mapping={
        "assets": {
            "_fields": {
                "id": "equipment.id",
                "name": "equipment.display_name",
                "criticality": "meta.criticality_class",
                "equipment_class_code": "meta.iso_code",
            },
        },
    },
)

The presence of the _fields sentinel switches the mapper into JMESPath mode. Each mapping value is a JMESPath expression evaluated against the raw item; missing paths are silently dropped.

Supported entity keys in schema_mapping: assets, work_orders, spare_parts.

Putting it all together

A "modern CMMS" example combining Basic auth, offset/limit pagination with custom param names, and nested-response field extraction:

from machina.connectors.cmms import (
    BasicAuth,
    GenericCmmsConnector,
    OffsetLimitPagination,
)

cmms = GenericCmmsConnector(
    url="https://modern-cmms.example.com/v2",
    auth=BasicAuth(username="svc", password="s3cret"),
    pagination=OffsetLimitPagination(
        limit_param="size",
        offset_param="start",
        page_size=50,
        items_path="data",
    ),
    schema_mapping={
        "assets": {
            "_fields": {
                "id": "equipment.id",
                "name": "equipment.display_name",
                "criticality": "meta.criticality_class",
                "equipment_class_code": "meta.iso_code",
            },
        },
    },
)

For offline / demo scenarios, point data_dir at a directory of assets.json, work_orders.json, and spare_parts.json files — the connector will load them into memory without touching the network:

cmms = GenericCmmsConnector(data_dir="sample_data/cmms")

Path B — Write a custom connector

For protocols and patterns GenericCmmsConnector can't express, write a Python class that satisfies the BaseConnector Protocol.

The BaseConnector Protocol

Every connector must provide four things:

Attribute Signature Purpose
capabilities @property -> list[str] Declares which operations this connector supports (e.g. ["read_assets", "read_work_orders"]). The agent uses this to discover capabilities at runtime and enable matching LLM tools. Concrete classes typically use a ClassVar[list[str]] class attribute, which satisfies the Protocol's @property via structural typing.
connect() async def connect() -> None Establish the underlying connection (open HTTP client, log into CMMS, subscribe to broker, …).
disconnect() async def disconnect() -> None Clean up the connection. Called by Agent.stop().
health_check() async def health_check() -> ConnectorHealth Return a ConnectorHealth status the agent can use to decide whether the connector is usable.

Beyond those four, implement whatever capability methods you declared. A connector with capabilities = ["read_assets"] must also provide async def read_assets(self, **kwargs) -> list[Asset].

Minimal example

Here's a complete custom connector that reads assets from an imaginary "Acme CMMS" REST API:

from typing import Any, ClassVar

import httpx

from machina.connectors.base import ConnectorHealth, ConnectorStatus
from machina.domain.asset import Asset, AssetType, Criticality


class AcmeCmmsConnector:
    """Custom connector for the Acme CMMS REST API."""

    capabilities: ClassVar[list[str]] = ["read_assets"]

    def __init__(self, *, base_url: str, api_key: str) -> None:
        self.base_url = base_url.rstrip("/")
        self._api_key = api_key
        self._connected = False

    async def connect(self) -> None:
        # Real implementations typically verify reachability here
        self._connected = True

    async def disconnect(self) -> None:
        self._connected = False

    async def health_check(self) -> ConnectorHealth:
        status = ConnectorStatus.HEALTHY if self._connected else ConnectorStatus.UNHEALTHY
        return ConnectorHealth(status=status, message="")

    async def read_assets(self, **kwargs: Any) -> list[Asset]:
        async with httpx.AsyncClient(timeout=30.0) as client:
            resp = await client.get(
                f"{self.base_url}/equipment",
                headers={"Authorization": f"Bearer {self._api_key}"},
            )
            resp.raise_for_status()
        return [
            Asset(
                id=str(item["tag"]),
                name=item["description"],
                type=AssetType.ROTATING_EQUIPMENT,
                criticality=Criticality(item.get("criticality", "C")),
                equipment_class_code=item.get("iso_code"),
            )
            for item in resp.json()
        ]

Register it with an agent the same way as any built-in connector:

from machina import Agent, Plant

agent = Agent(
    plant=Plant(name="Acme Plant 1"),
    connectors=[AcmeCmmsConnector(base_url="https://cmms.acme.com", api_key="…")],
    llm="openai:gpt-4o",
)
agent.run()

Testing conventions

Machina connectors follow a two-layer test strategy — unit tests for logic, integration tests for the HTTP surface. Use the built-in CMMS tests as canonical examples:

  • tests/unit/test_generic_cmms.py — exercises GenericCmmsConnector in local mode, plus pure-logic tests for auth strategies, pagination strategies, and schema mapping. Pagination is tested against a small fake client (_FakeClient / _FakeResponse) rather than real httpx, so the tests stay fast and don't require network mocks.
  • tests/integration/test_generic_cmms_rest.py — exercises the full REST path via pytest-httpx mock fixtures. Tests combine auth, pagination, and mapping end-to-end against a simulated CMMS API.

A minimal REST test looks like this:

import pytest

from machina.connectors.cmms import BearerAuth, GenericCmmsConnector

BASE_URL = "https://cmms.example.com/api"


@pytest.mark.asyncio
async def test_reads_assets(httpx_mock) -> None:
    conn = GenericCmmsConnector(url=BASE_URL, auth=BearerAuth(token="test"))
    httpx_mock.add_response(
        method="GET", url=f"{BASE_URL}/health", status_code=200, json={"status": "ok"}
    )
    httpx_mock.add_response(
        method="GET",
        url=f"{BASE_URL}/assets",
        json=[{"id": "P-201", "name": "Pump", "type": "rotating_equipment"}],
    )
    await conn.connect()
    assets = await conn.read_assets()
    assert len(assets) == 1
    assert assets[0].id == "P-201"

Never hit real external APIs from tests — always use pytest-httpx for REST mocking or VCR for recorded responses.

Design rules (CLAUDE.md conventions)

  • Always return domain entities. Never return raw API payloads to the agent runtime — normalize everything into Asset, WorkOrder, FailureMode, etc. This keeps the agent layer connector-agnostic.
  • Everything async. Use async def for any I/O. The agent runtime uses asyncio.gather to call multiple connectors in parallel — a sync call would block the event loop.
  • Structured logging. Use structlog and include connector=, asset_id=, and operation= in every log line so operators can trace issues.
  • Graceful degradation. Declare only the capabilities you actually support. If your CMMS doesn't expose spare parts, omit "read_spare_parts" — the agent will simply not offer that tool to the LLM.

API Reference

BaseConnector

Bases: Protocol

Protocol that all Machina connectors must satisfy.

Connectors are the integration layer between Machina and external systems (CMMS, IoT, ERP, communication platforms, document stores).

capabilities property

capabilities: frozenset[Capability]

Typed set of actions this connector supports.

Example: frozenset({Capability.READ_ASSETS, Capability.CREATE_WORK_ORDER}).

connect async

connect() -> None

Establish a connection to the external system.

disconnect async

disconnect() -> None

Gracefully close the connection.

health_check async

health_check() -> ConnectorHealth

Check whether the external system is reachable and responsive.

ConnectorHealth

Bases: BaseModel

Result of a connector health check.

ConnectorStatus

Bases: StrEnum

Health status of a connector.

ConnectorRegistry

ConnectorRegistry()

Registry for discovering connectors by capability.

Connectors register themselves, and the agent (or MCP layer) can query which connectors support a given capability.

register

register(name: str, connector: BaseConnector) -> None

Register a connector under the given name.

get

get(name: str) -> BaseConnector | None

Retrieve a connector by name.

find_by_capability

find_by_capability(capability: Capability | str) -> list[tuple[str, BaseConnector]]

Return all connectors that declare the given capability.

Accepts either a :class:Capability enum member (preferred) or a raw string (deprecated; emits :class:DeprecationWarning). Raw strings that do not correspond to any known capability return an empty list without raising — callers may probe for optional capabilities safely.

all

all() -> dict[str, BaseConnector]

Return all registered connectors.

GenericCmmsConnector

GenericCmmsConnector(*, url: str = '', api_key: str = '', data_dir: str | Path = '', schema_mapping: dict[str, dict[str, Any]] | None = None, auth: _AuthUnion | None = None, pagination: _PaginationUnion | None = None, endpoints: dict[str, dict[str, Any]] | None = None, yaml_mapping: GenericCmmsYamlConfig | None = None)

Configurable connector that wraps any REST-based CMMS.

Can also be pointed at local JSON files for offline / demo usage.

Parameters:

Name Type Description Default
url str

Base URL of the CMMS REST API (optional for local mode).

''
api_key str

Bearer token for authentication. Legacy shortcut — equivalent to auth=BearerAuth(token=api_key). Ignored when auth is provided.

''
data_dir str | Path

Path to a directory of JSON files used as a local data source.

''
schema_mapping dict[str, dict[str, Any]] | None

Dictionary that maps CMMS field names to Machina field names. Supports two forms:

  • Flat rename: {"assets": {"asset_id": "id"}} renames top-level keys in each raw item.
  • JMESPath extraction: {"assets": {"_fields": {"id": "equipment.id", "name": "meta.display_name"}}} extracts nested fields via JMESPath expressions.
None
auth _AuthUnion | None

Authentication strategy for REST mode. Defaults to deriving a :class:BearerAuth from api_key when the latter is set. Use :class:NoAuth explicitly for endpoints that require no credentials.

None
pagination _PaginationUnion | None

Pagination strategy for list-style REST endpoints. Defaults to :class:NoPagination (single-shot GET) which preserves the behaviour of earlier versions.

None
Example
# Local mode with sample data
cmms = GenericCmmsConnector(data_dir="sample_data/cmms")
await cmms.connect()
assets = await cmms.read_assets()

# REST mode, legacy single-key auth
cmms = GenericCmmsConnector(
    url="https://cmms.example.com/api",
    api_key="...",
)

# REST mode, modern CMMS with Basic auth, offset/limit pagination
# and nested response format
from machina.connectors.cmms import (
    BasicAuth,
    OffsetLimitPagination,
)

cmms = GenericCmmsConnector(
    url="https://cmms.example.com/api",
    auth=BasicAuth(username="svc", password="..."),
    pagination=OffsetLimitPagination(
        limit_param="size",
        offset_param="start",
        page_size=50,
        items_path="data",
    ),
    schema_mapping={
        "assets": {
            "_fields": {
                "id": "equipment.id",
                "name": "equipment.display_name",
                "criticality": "meta.criticality_class",
            },
        },
    },
)

capabilities property

capabilities: frozenset[Capability]

Return capabilities based on configuration.

Base capabilities are always available. Optional capabilities are added when running in local mode (all supported) or when the corresponding endpoint is configured in REST mode.

connect async

connect() -> None

Establish connection or load local data files.

Raises:

Type Description
ConnectorError

If neither url nor data_dir is provided.

ConnectorAuthError

In REST mode, if no authentication strategy was supplied.

disconnect async

disconnect() -> None

Close the connection.

health_check async

health_check() -> ConnectorHealth

Check whether the connector is operational.

read_failure_modes async

read_failure_modes() -> list[FailureMode]

Return all known failure modes.

read_assets async

read_assets() -> list[Asset]

Return all known assets.

get_asset async

get_asset(asset_id: str) -> Asset | None

Look up a single asset by ID.

read_work_orders async

read_work_orders(*, asset_id: str = '', status: str = '') -> list[WorkOrder]

Read work orders, optionally filtered by asset or status.

create_work_order async

create_work_order(work_order: WorkOrder) -> WorkOrder

Create a new work order.

In local mode the operation is idempotent on the work-order ID: re-creating a WO whose ID already exists returns the existing record rather than appending a duplicate. New work orders are persisted back to work_orders.json so changes survive process restarts (skipped when an inbound schema_mapping is configured; see :meth:_persist_work_orders).

read_spare_parts async

read_spare_parts(*, asset_id: str = '', sku: str = '') -> list[SparePart]

Read spare parts, optionally filtered.

read_maintenance_history async

read_maintenance_history(asset_id: str) -> list[WorkOrder]

Return completed work orders for an asset (maintenance history).

get_work_order async

get_work_order(work_order_id: str) -> WorkOrder | None

Look up a single work order by ID.

update_work_order async

update_work_order(work_order_id: str, *, status: WorkOrderStatus | None = None, assigned_to: str | None = None, description: str | None = None) -> WorkOrder

Update an existing work order.

In local mode the in-memory work order is mutated directly. In REST mode the configured endpoint is called and the work order is re-fetched to return fresh state.

Raises:

Type Description
ConnectorError

If the work order is not found, or the endpoint is not configured in REST mode.

close_work_order async

close_work_order(work_order_id: str) -> WorkOrder

Transition a work order to CLOSED status.

cancel_work_order async

cancel_work_order(work_order_id: str) -> WorkOrder

Transition a work order to CANCELLED status.

read_maintenance_plans async

read_maintenance_plans() -> list[MaintenancePlan]

Read preventive-maintenance plans.

In local mode returns plans loaded from maintenance_plans.json. In REST mode fetches from the configured endpoint with pagination.

BearerAuth

Bases: BaseModel

Bearer token in the Authorization header.

Produces Authorization: Bearer <token>, the default scheme used by most modern CMMS REST APIs (UpKeep, MaintainX, Limble, Fiix).

apply

apply(headers: dict[str, str]) -> dict[str, str]

Return headers with a Bearer <token> Authorization entry.

BasicAuth

Bases: BaseModel

HTTP Basic authentication (RFC 7617).

Produces Authorization: Basic <base64(user:password)>. Common in older or on-premise CMMS deployments (eMaint, some Infor EAM setups).

apply

apply(headers: dict[str, str]) -> dict[str, str]

Return headers with a Basic <base64> Authorization entry.

ApiKeyHeaderAuth

Bases: BaseModel

API key passed in a custom HTTP header.

Produces <header_name>: <value>. Used by CMMS APIs that prefer a dedicated key header (e.g. X-API-Key, api-key) over the Authorization header.

apply

apply(headers: dict[str, str]) -> dict[str, str]

Return headers with the configured API key header.

NoAuth

Bases: BaseModel

No authentication — public or intranet-only endpoints.

apply

apply(headers: dict[str, str]) -> dict[str, str]

Return headers unchanged (no credentials added).

NoPagination

Bases: BaseModel

Fetch every item in a single request.

Use this when the CMMS endpoint returns the entire collection in one response. This is the default and preserves the behaviour of earlier GenericCmmsConnector versions that did not support pagination.

iterate async

iterate(client: AsyncClient, url: str, headers: dict[str, str], params: dict[str, str] | None = None) -> AsyncIterator[dict[str, Any]]

Perform a single GET and yield each item.

OffsetLimitPagination

Bases: BaseModel

Paginate via ?offset=X&limit=Y query parameters.

Stops when a page returns fewer than page_size items (the typical end-of-collection signal for offset-style APIs).

iterate async

iterate(client: AsyncClient, url: str, headers: dict[str, str], params: dict[str, str] | None = None) -> AsyncIterator[dict[str, Any]]

Walk the collection by incrementing offset until a short page.

PageNumberPagination

Bases: BaseModel

Paginate via ?page=N&per_page=M query parameters.

Stops when a page returns fewer than page_size items. Supports APIs that number pages starting at either 0 or 1 via :attr:start_page.

iterate async

iterate(client: AsyncClient, url: str, headers: dict[str, str], params: dict[str, str] | None = None) -> AsyncIterator[dict[str, Any]]

Walk the collection by incrementing the page number.

CursorPagination

Bases: BaseModel

Paginate via opaque cursor tokens returned in the response body.

Each response is expected to contain a cursor value at :attr:cursor_response_path (JMESPath) that is sent as the :attr:cursor_param query parameter on the next request. Iteration stops when the cursor is missing, empty, or None.

iterate async

iterate(client: AsyncClient, url: str, headers: dict[str, str], params: dict[str, str] | None = None) -> AsyncIterator[dict[str, Any]]

Walk the collection by following cursor tokens.

Document store

The DocumentStoreConnector exposes the RAG pipeline (chunk ingestion, hybrid retrieval, reranking) to the agent. DocumentChunk is the public shape returned by search() — agents consume its content, source, and page fields when building citations. See Security → Source-Path Sanitisation for the boundary that strips host filesystem paths from source before the LLM sees them.

DocumentStoreConnector

DocumentStoreConnector(*, paths: list[str | Path] | None = None, collection_name: str = 'machina_docs', chunk_size: int = 1000, chunk_overlap: int = 200, reranker_model: str | None = None, embedder: str | None = None)

Connector for local PDF/DOCX documents with RAG retrieval.

Ingests documents from one or more directories, splits them into chunks, embeds them in a vector store, and provides semantic search.

When langchain and chromadb are not installed, falls back to a simple in-memory keyword search so the quickstart works without heavy dependencies.

Each ingested file can carry structured metadata via a sidecar <file>.meta.yaml or YAML frontmatter (for .md / .txt). Metadata fields (asset_id, equipment_class_code, doc_type, section_title) are indexed and can be used to filter the search space before retrieval via the filters= kwarg.

Parameters:

Name Type Description Default
paths list[str | Path] | None

List of directories or files to ingest.

None
collection_name str

Name for the ChromaDB collection.

'machina_docs'
chunk_size int

Target size for text chunks (in characters).

1000
chunk_overlap int

Overlap between consecutive chunks.

200
reranker_model str | None

Optional sentence-transformers cross-encoder model name to rerank fused results (e.g. "BAAI/bge-reranker-base"). Requires the [docs-rag-rerank] extra.

None
embedder str | None

Optional sentence-transformers model name used to embed chunks into Chroma. When set (e.g. "BAAI/bge-m3"), requires the [docs-rag-rerank] extra (which pulls in sentence-transformers). If the model fails to load — extra absent, model not downloaded, GPU/CPU issue — the connector silently falls back to Chroma's default embedder so ingest does not crash.

None
Example
docs = DocumentStoreConnector(
    paths=["manuals/", "procedures/"],
    embedder="BAAI/bge-m3",
    reranker_model="BAAI/bge-reranker-base",
)
await docs.connect()
results = await docs.search(
    "bearing replacement", filters={"asset_id": "P-201"}
)
for chunk in results:
    print(f"[{chunk.source} p.{chunk.page}] {chunk.content[:100]}")

connect async

connect() -> None

Ingest documents and build the vector index.

disconnect async

disconnect() -> None

Release resources.

All connect-time state must be cleared so a subsequent connect() cannot serve chunks from the previous corpus or keep a stale reranker handle.

health_check async

health_check() -> ConnectorHealth

Check connector status.

search async

search(query: str, *, top_k: int = 5, asset_id: str = '', filters: dict[str, Any] | None = None) -> list[DocumentChunk]

Search documents for passages relevant to the query.

Parameters:

Name Type Description Default
query str

The search query.

required
top_k int

Maximum number of results to return.

5
asset_id str

Optional asset ID to scope the search. Shortcut for filters={"asset_id": asset_id}.

''
filters dict[str, Any] | None

Optional metadata filter applied before retrieval. Supported keys: asset_id, equipment_class_code, doc_type, section_title, plus any custom field stored in chunk metadata.

None

Returns:

Type Description
list[DocumentChunk]

List of relevant document chunks, ranked by relevance.

search_documents async

search_documents(query: str = '', *, top_k: int = 5, asset_id: str = '', filters: dict[str, Any] | None = None, **kwargs: Any) -> list[DocumentChunk]

Alias for :meth:search matching the declared capability name.

retrieve_section async

retrieve_section(source: str, page: int) -> str

Retrieve the full text of a specific page/section.

Parameters:

Name Type Description Default
source str

Document source path.

required
page int

Page number.

required

Returns:

Type Description
str

The text content of that section.

DocumentChunk dataclass

DocumentChunk(content: str, source: str = '', page: int = 0, score: float = 0.0, chunk_id: str = '', asset_id: str = '', equipment_class_code: str = '', doc_type: str = '', section_title: str = '', metadata: dict[str, Any] = dict(), parent_id: str = '', start_offset: int = 0, is_table: bool = False)

A retrieved passage from a document.

Since v0.3, content carries the full parent section after parent-document retrieval rather than the small match passage that was embedded. The match passage is still what the embedder / BM25 / reranker scored — only the surface returned to the caller expands to its parent so the LLM sees the full surrounding context. Callers that previously sliced content for a short passage should switch to keying on chunk_id instead.

Parameters:

Name Type Description Default
content str

Text content of the chunk — typically the full parent section the matched passage was nested under.

required
source str

File path or document name.

''
page int

Page number (if available).

0
score float

Relevance score from the retriever.

0.0
chunk_id str

Deterministic identifier for this chunk.

''
asset_id str

Domain asset id (e.g. "P-201") if known.

''
equipment_class_code str

ISO 14224 Annex A code if known. Internal use.

''
doc_type str

One of manual, procedure, datasheet, troubleshooting, other.

''
section_title str

Title of the section this chunk belongs to.

''
metadata dict[str, Any]

Raw metadata bag from the underlying document loader.

dict()
parent_id str

Identifier of the section this chunk's match was extracted from. Joins back to a :class:ParentSection.

''
start_offset int

Character offset of the match inside its parent section body. Used for windowing oversized parents without a fragile substring search.

0
is_table bool

True when this chunk represents an atomic table block extracted by the layout-aware parser. Retrieval and chunking never split it mid-row; the LLM prompt surfaces a [TABLE] tag so the model treats the content as structured rows / columns.

False
Example
from machina.connectors.docs.document_store import DocumentChunk

chunk = DocumentChunk("Pump P-201 bearing procedure", source="manual.pdf", page=42)

DocumentMetadata defines the metadata schema attached to ingested documents and consumed by pre-retrieval filtering (asset_id, doc_type, equipment_class_code, section_title). See connectors/document-store.md for the sidecar / frontmatter ingestion paths.

DocumentMetadata dataclass

DocumentMetadata(asset_id: str = '', equipment_class_code: str = '', doc_type: str = '', section_title: str = '', extra: dict[str, Any] = dict())

Structured metadata for a document or chunk.

All fields are optional. Empty strings mean "unknown" — they do not cause Chroma filters to reject the chunk.

Parameters:

Name Type Description Default
asset_id str

Domain asset identifier (e.g. "P-201").

''
equipment_class_code str

ISO 14224 Annex A Table A.4 code (e.g. "PU", "CO"). Internal use only — not surfaced in product positioning.

''
doc_type str

One of manual, procedure, datasheet, troubleshooting, other.

''
section_title str

Title of the section this chunk belongs to. Populated by section-aware chunking; empty by default.

''
extra dict[str, Any]

Any additional key/value pairs from the sidecar/frontmatter.

dict()

to_chroma_dict

to_chroma_dict() -> dict[str, Any]

Return a flat dict suitable for Chroma metadata fields.

Chroma's where= clause cannot match on empty strings reliably, so empty fields are omitted from the output.

Free-form extra keys from sidecars / frontmatter are accepted as filterable metadata but are sanitized first: non-scalar values (lists, dicts, None) are dropped, strings are stripped of control characters and capped at _MAX_METADATA_VALUE_LEN, and keys that collide with reserved system fields are rejected. This keeps a typo or hostile sidecar from injecting newlines / overlong payloads / system-field overrides into downstream consumers (the vector store, prompt context, or operator-facing logs).

merge

merge(override: DocumentMetadata) -> DocumentMetadata

Return a new metadata where non-empty fields in override win.

from_dict classmethod

from_dict(data: dict[str, Any]) -> DocumentMetadata

Build a DocumentMetadata from a parsed YAML dict.

from_path classmethod

from_path(path: Path) -> DocumentMetadata

Load metadata for path, combining sidecar, frontmatter, and inference.

Channel protocols

Any custom communication channel (a Slack alternative, a custom webhook receiver, an SMS bridge) must accept an IncomingMessage value when the agent invokes the registered handler. Agents type-annotate against this shape; documenting it here makes the contract explicit for third-party channel authors.

IncomingMessage dataclass

IncomingMessage(text: str, chat_id: str = '', user_id: str = '', user_name: str = '', channel: str = 'telegram', raw: Any = None)

A message received from a communication channel.

Parameters:

Name Type Description Default
text str

The message text.

required
chat_id str

Identifier for the chat/conversation.

''
user_id str

Identifier for the sender.

''
user_name str

Display name of the sender.

''
channel str

Channel type ("telegram", "slack", "email", "cli").

'telegram'
raw Any

Raw platform-specific message object.

None
Example
from machina.connectors.comms import IncomingMessage

msg = IncomingMessage("Check pump P-201", chat_id="123", user_name="Mario")