Custom Connectors¶
Machina's connector layer is designed to be trivial to extend. Most
CMMS integrations can be expressed as configuration of the built-in
GenericCmmsConnector; only exotic protocols (SOAP, OData, WebSockets)
or custom business logic call for writing a new class from scratch.
This page walks through both paths:
- Path A — Configure
GenericCmmsConnectorfor any REST-based CMMS, picking an authentication strategy, a pagination strategy, and an optional JMESPath field mapping. - Path B — Write a custom connector by satisfying the
BaseConnectorProtocol.
Both paths end at the same place: a connector object you can hand to an
Agent.
Which path do I pick?¶
| Your CMMS… | Use |
|---|---|
| Has a REST + JSON API, any auth + pagination style | Path A (configure GenericCmmsConnector) |
| Returns deeply nested response payloads | Path A with _fields JMESPath mapping |
| Uses OData (SAP PM), SOAP, gRPC, or WebSockets | Path B (custom class) |
| Requires custom business logic (rate limiting, caching, batch writes) | Path B |
| Needs to integrate two systems behind one connector | Path B |
Start with Path A. Drop down to Path B only when configuration can't express what you need.
Path A — Configure GenericCmmsConnector¶
The five capabilities¶
GenericCmmsConnector declares five CMMS capabilities that the agent
runtime discovers at startup:
| Capability | What it reads/writes |
|---|---|
read_assets |
Returns all known assets (paginated if configured) |
read_work_orders |
Returns work orders, optionally filtered by asset_id / status |
create_work_order |
POSTs a new work order to the CMMS |
read_spare_parts |
Returns spare parts, optionally filtered by asset_id / sku |
read_maintenance_history |
Returns completed work orders for a given asset |
Authentication strategies¶
Pick the one that matches your CMMS:
from machina.connectors.cmms import (
BearerAuth,
BasicAuth,
ApiKeyHeaderAuth,
NoAuth,
)
# Bearer token (default for most modern CMMS — UpKeep, MaintainX, Limble)
auth = BearerAuth(token="eyJhbGci...")
# HTTP Basic auth (older/on-prem deployments)
auth = BasicAuth(username="svc", password="...")
# API key in a custom header (e.g. X-API-Key)
auth = ApiKeyHeaderAuth(header_name="X-API-Key", value="k-123")
# No auth — public or intranet-only endpoints
auth = NoAuth()
For backwards compatibility, passing api_key="..." to the constructor
is still accepted and is equivalent to auth=BearerAuth(token=api_key).
Pagination strategies¶
Pick the one that matches your CMMS's list endpoints:
from machina.connectors.cmms import (
NoPagination,
OffsetLimitPagination,
PageNumberPagination,
CursorPagination,
)
# Single-shot GET — response is the full list (default)
pagination = NoPagination()
# ?offset=X&limit=Y style (UpKeep, Limble, and most REST CMMS)
pagination = OffsetLimitPagination(
limit_param="limit",
offset_param="offset",
page_size=100,
)
# ?page=N&per_page=M style (GitHub-style APIs)
pagination = PageNumberPagination(
page_param="page",
size_param="per_page",
page_size=50,
start_page=1, # use 0 if your API is zero-indexed
)
# Opaque cursor token from the response
pagination = CursorPagination(
cursor_param="cursor",
cursor_response_path="next_cursor", # JMESPath
items_path="items", # JMESPath
)
All strategies accept an optional items_path (JMESPath) for extracting
the list from a wrapped response like {"data": [...], "meta": {...}}.
CursorPagination requires it because it always walks a wrapped object.
Schema mapping¶
The schema_mapping parameter lets you bridge differences between what
the CMMS returns and what Machina expects, in two flavours:
Flat rename — top-level key rewrites:
cmms = GenericCmmsConnector(
url="https://cmms.example.com/api",
auth=BearerAuth(token="..."),
schema_mapping={
"assets": {"asset_id": "id", "display_name": "name"},
},
)
JMESPath extraction — for deeply nested response items:
cmms = GenericCmmsConnector(
url="https://cmms.example.com/api",
auth=BearerAuth(token="..."),
schema_mapping={
"assets": {
"_fields": {
"id": "equipment.id",
"name": "equipment.display_name",
"criticality": "meta.criticality_class",
"equipment_class_code": "meta.iso_code",
},
},
},
)
The presence of the _fields sentinel switches the mapper into
JMESPath mode. Each mapping value is a JMESPath expression evaluated
against the raw item; missing paths are silently dropped.
Supported entity keys in schema_mapping: assets, work_orders,
spare_parts.
Putting it all together¶
A "modern CMMS" example combining Basic auth, offset/limit pagination with custom param names, and nested-response field extraction:
from machina.connectors.cmms import (
BasicAuth,
GenericCmmsConnector,
OffsetLimitPagination,
)
cmms = GenericCmmsConnector(
url="https://modern-cmms.example.com/v2",
auth=BasicAuth(username="svc", password="s3cret"),
pagination=OffsetLimitPagination(
limit_param="size",
offset_param="start",
page_size=50,
items_path="data",
),
schema_mapping={
"assets": {
"_fields": {
"id": "equipment.id",
"name": "equipment.display_name",
"criticality": "meta.criticality_class",
"equipment_class_code": "meta.iso_code",
},
},
},
)
For offline / demo scenarios, point data_dir at a directory of
assets.json, work_orders.json, and spare_parts.json files — the
connector will load them into memory without touching the network:
Path B — Write a custom connector¶
For protocols and patterns GenericCmmsConnector can't express, write a
Python class that satisfies the BaseConnector Protocol.
The BaseConnector Protocol¶
Every connector must provide four things:
| Attribute | Signature | Purpose |
|---|---|---|
capabilities |
@property -> list[str] |
Declares which operations this connector supports (e.g. ["read_assets", "read_work_orders"]). The agent uses this to discover capabilities at runtime and enable matching LLM tools. Concrete classes typically use a ClassVar[list[str]] class attribute, which satisfies the Protocol's @property via structural typing. |
connect() |
async def connect() -> None |
Establish the underlying connection (open HTTP client, log into CMMS, subscribe to broker, …). |
disconnect() |
async def disconnect() -> None |
Clean up the connection. Called by Agent.stop(). |
health_check() |
async def health_check() -> ConnectorHealth |
Return a ConnectorHealth status the agent can use to decide whether the connector is usable. |
Beyond those four, implement whatever capability methods you declared.
A connector with capabilities = ["read_assets"] must also provide
async def read_assets(self, **kwargs) -> list[Asset].
Minimal example¶
Here's a complete custom connector that reads assets from an imaginary "Acme CMMS" REST API:
from typing import Any, ClassVar
import httpx
from machina.connectors.base import ConnectorHealth, ConnectorStatus
from machina.domain.asset import Asset, AssetType, Criticality
class AcmeCmmsConnector:
"""Custom connector for the Acme CMMS REST API."""
capabilities: ClassVar[list[str]] = ["read_assets"]
def __init__(self, *, base_url: str, api_key: str) -> None:
self.base_url = base_url.rstrip("/")
self._api_key = api_key
self._connected = False
async def connect(self) -> None:
# Real implementations typically verify reachability here
self._connected = True
async def disconnect(self) -> None:
self._connected = False
async def health_check(self) -> ConnectorHealth:
status = ConnectorStatus.HEALTHY if self._connected else ConnectorStatus.UNHEALTHY
return ConnectorHealth(status=status, message="")
async def read_assets(self, **kwargs: Any) -> list[Asset]:
async with httpx.AsyncClient(timeout=30.0) as client:
resp = await client.get(
f"{self.base_url}/equipment",
headers={"Authorization": f"Bearer {self._api_key}"},
)
resp.raise_for_status()
return [
Asset(
id=str(item["tag"]),
name=item["description"],
type=AssetType.ROTATING_EQUIPMENT,
criticality=Criticality(item.get("criticality", "C")),
equipment_class_code=item.get("iso_code"),
)
for item in resp.json()
]
Register it with an agent the same way as any built-in connector:
from machina import Agent, Plant
agent = Agent(
plant=Plant(name="Acme Plant 1"),
connectors=[AcmeCmmsConnector(base_url="https://cmms.acme.com", api_key="…")],
llm="openai:gpt-4o",
)
agent.run()
Testing conventions¶
Machina connectors follow a two-layer test strategy — unit tests for logic, integration tests for the HTTP surface. Use the built-in CMMS tests as canonical examples:
tests/unit/test_generic_cmms.py— exercisesGenericCmmsConnectorin local mode, plus pure-logic tests for auth strategies, pagination strategies, and schema mapping. Pagination is tested against a small fake client (_FakeClient/_FakeResponse) rather than real httpx, so the tests stay fast and don't require network mocks.tests/integration/test_generic_cmms_rest.py— exercises the full REST path viapytest-httpxmock fixtures. Tests combine auth, pagination, and mapping end-to-end against a simulated CMMS API.
A minimal REST test looks like this:
import pytest
from machina.connectors.cmms import BearerAuth, GenericCmmsConnector
BASE_URL = "https://cmms.example.com/api"
@pytest.mark.asyncio
async def test_reads_assets(httpx_mock) -> None:
conn = GenericCmmsConnector(url=BASE_URL, auth=BearerAuth(token="test"))
httpx_mock.add_response(
method="GET", url=f"{BASE_URL}/health", status_code=200, json={"status": "ok"}
)
httpx_mock.add_response(
method="GET",
url=f"{BASE_URL}/assets",
json=[{"id": "P-201", "name": "Pump", "type": "rotating_equipment"}],
)
await conn.connect()
assets = await conn.read_assets()
assert len(assets) == 1
assert assets[0].id == "P-201"
Never hit real external APIs from tests — always use pytest-httpx for
REST mocking or VCR for recorded responses.
Design rules (CLAUDE.md conventions)¶
- Always return domain entities. Never return raw API payloads to the
agent runtime — normalize everything into
Asset,WorkOrder,FailureMode, etc. This keeps the agent layer connector-agnostic. - Everything async. Use
async deffor any I/O. The agent runtime usesasyncio.gatherto call multiple connectors in parallel — a sync call would block the event loop. - Structured logging. Use
structlogand includeconnector=,asset_id=, andoperation=in every log line so operators can trace issues. - Graceful degradation. Declare only the capabilities you actually
support. If your CMMS doesn't expose spare parts, omit
"read_spare_parts"— the agent will simply not offer that tool to the LLM.
API Reference¶
BaseConnector ¶
Bases: Protocol
Protocol that all Machina connectors must satisfy.
Connectors are the integration layer between Machina and external systems (CMMS, IoT, ERP, communication platforms, document stores).
ConnectorHealth ¶
Bases: BaseModel
Result of a connector health check.
ConnectorStatus ¶
Bases: StrEnum
Health status of a connector.
ConnectorRegistry ¶
Registry for discovering connectors by capability.
Connectors register themselves, and the agent (or MCP layer) can query which connectors support a given capability.
register ¶
Register a connector under the given name.
find_by_capability ¶
Return all connectors that declare the given capability.
Accepts either a :class:Capability enum member (preferred) or a
raw string (deprecated; emits :class:DeprecationWarning). Raw
strings that do not correspond to any known capability return an
empty list without raising — callers may probe for optional
capabilities safely.
GenericCmmsConnector ¶
GenericCmmsConnector(*, url: str = '', api_key: str = '', data_dir: str | Path = '', schema_mapping: dict[str, dict[str, Any]] | None = None, auth: _AuthUnion | None = None, pagination: _PaginationUnion | None = None, endpoints: dict[str, dict[str, Any]] | None = None, yaml_mapping: GenericCmmsYamlConfig | None = None)
Configurable connector that wraps any REST-based CMMS.
Can also be pointed at local JSON files for offline / demo usage.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url
|
str
|
Base URL of the CMMS REST API (optional for local mode). |
''
|
api_key
|
str
|
Bearer token for authentication. Legacy shortcut —
equivalent to |
''
|
data_dir
|
str | Path
|
Path to a directory of JSON files used as a local data source. |
''
|
schema_mapping
|
dict[str, dict[str, Any]] | None
|
Dictionary that maps CMMS field names to Machina field names. Supports two forms:
|
None
|
auth
|
_AuthUnion | None
|
Authentication strategy for REST mode. Defaults to deriving
a :class: |
None
|
pagination
|
_PaginationUnion | None
|
Pagination strategy for list-style REST endpoints.
Defaults to :class: |
None
|
Example
# Local mode with sample data
cmms = GenericCmmsConnector(data_dir="sample_data/cmms")
await cmms.connect()
assets = await cmms.read_assets()
# REST mode, legacy single-key auth
cmms = GenericCmmsConnector(
url="https://cmms.example.com/api",
api_key="...",
)
# REST mode, modern CMMS with Basic auth, offset/limit pagination
# and nested response format
from machina.connectors.cmms import (
BasicAuth,
OffsetLimitPagination,
)
cmms = GenericCmmsConnector(
url="https://cmms.example.com/api",
auth=BasicAuth(username="svc", password="..."),
pagination=OffsetLimitPagination(
limit_param="size",
offset_param="start",
page_size=50,
items_path="data",
),
schema_mapping={
"assets": {
"_fields": {
"id": "equipment.id",
"name": "equipment.display_name",
"criticality": "meta.criticality_class",
},
},
},
)
capabilities
property
¶
Return capabilities based on configuration.
Base capabilities are always available. Optional capabilities are added when running in local mode (all supported) or when the corresponding endpoint is configured in REST mode.
connect
async
¶
Establish connection or load local data files.
Raises:
| Type | Description |
|---|---|
ConnectorError
|
If neither |
ConnectorAuthError
|
In REST mode, if no authentication strategy was supplied. |
read_failure_modes
async
¶
Return all known failure modes.
read_work_orders
async
¶
Read work orders, optionally filtered by asset or status.
create_work_order
async
¶
Create a new work order.
In local mode the operation is idempotent on the work-order ID:
re-creating a WO whose ID already exists returns the existing
record rather than appending a duplicate. New work orders are
persisted back to work_orders.json so changes survive process
restarts (skipped when an inbound schema_mapping is configured;
see :meth:_persist_work_orders).
read_spare_parts
async
¶
Read spare parts, optionally filtered.
read_maintenance_history
async
¶
Return completed work orders for an asset (maintenance history).
get_work_order
async
¶
Look up a single work order by ID.
update_work_order
async
¶
update_work_order(work_order_id: str, *, status: WorkOrderStatus | None = None, assigned_to: str | None = None, description: str | None = None) -> WorkOrder
Update an existing work order.
In local mode the in-memory work order is mutated directly. In REST mode the configured endpoint is called and the work order is re-fetched to return fresh state.
Raises:
| Type | Description |
|---|---|
ConnectorError
|
If the work order is not found, or the endpoint is not configured in REST mode. |
close_work_order
async
¶
Transition a work order to CLOSED status.
cancel_work_order
async
¶
Transition a work order to CANCELLED status.
read_maintenance_plans
async
¶
Read preventive-maintenance plans.
In local mode returns plans loaded from maintenance_plans.json.
In REST mode fetches from the configured endpoint with pagination.
BearerAuth ¶
Bases: BaseModel
Bearer token in the Authorization header.
Produces Authorization: Bearer <token>, the default scheme used by
most modern CMMS REST APIs (UpKeep, MaintainX, Limble, Fiix).
apply ¶
Return headers with a Bearer <token> Authorization entry.
BasicAuth ¶
Bases: BaseModel
HTTP Basic authentication (RFC 7617).
Produces Authorization: Basic <base64(user:password)>. Common in
older or on-premise CMMS deployments (eMaint, some Infor EAM setups).
apply ¶
Return headers with a Basic <base64> Authorization entry.
ApiKeyHeaderAuth ¶
Bases: BaseModel
API key passed in a custom HTTP header.
Produces <header_name>: <value>. Used by CMMS APIs that prefer a
dedicated key header (e.g. X-API-Key, api-key) over the
Authorization header.
apply ¶
Return headers with the configured API key header.
NoAuth ¶
Bases: BaseModel
No authentication — public or intranet-only endpoints.
apply ¶
Return headers unchanged (no credentials added).
NoPagination ¶
Bases: BaseModel
Fetch every item in a single request.
Use this when the CMMS endpoint returns the entire collection in one
response. This is the default and preserves the behaviour of earlier
GenericCmmsConnector versions that did not support pagination.
iterate
async
¶
iterate(client: AsyncClient, url: str, headers: dict[str, str], params: dict[str, str] | None = None) -> AsyncIterator[dict[str, Any]]
Perform a single GET and yield each item.
OffsetLimitPagination ¶
Bases: BaseModel
Paginate via ?offset=X&limit=Y query parameters.
Stops when a page returns fewer than page_size items (the typical
end-of-collection signal for offset-style APIs).
iterate
async
¶
iterate(client: AsyncClient, url: str, headers: dict[str, str], params: dict[str, str] | None = None) -> AsyncIterator[dict[str, Any]]
Walk the collection by incrementing offset until a short page.
PageNumberPagination ¶
Bases: BaseModel
Paginate via ?page=N&per_page=M query parameters.
Stops when a page returns fewer than page_size items. Supports APIs
that number pages starting at either 0 or 1 via :attr:start_page.
iterate
async
¶
iterate(client: AsyncClient, url: str, headers: dict[str, str], params: dict[str, str] | None = None) -> AsyncIterator[dict[str, Any]]
Walk the collection by incrementing the page number.
CursorPagination ¶
Bases: BaseModel
Paginate via opaque cursor tokens returned in the response body.
Each response is expected to contain a cursor value at
:attr:cursor_response_path (JMESPath) that is sent as the
:attr:cursor_param query parameter on the next request. Iteration
stops when the cursor is missing, empty, or None.
iterate
async
¶
iterate(client: AsyncClient, url: str, headers: dict[str, str], params: dict[str, str] | None = None) -> AsyncIterator[dict[str, Any]]
Walk the collection by following cursor tokens.
Document store¶
The DocumentStoreConnector exposes the RAG pipeline (chunk ingestion,
hybrid retrieval, reranking) to the agent. DocumentChunk is the
public shape returned by search() — agents consume its content,
source, and page fields when building citations. See
Security → Source-Path Sanitisation for
the boundary that strips host filesystem paths from source before
the LLM sees them.
DocumentStoreConnector ¶
DocumentStoreConnector(*, paths: list[str | Path] | None = None, collection_name: str = 'machina_docs', chunk_size: int = 1000, chunk_overlap: int = 200, reranker_model: str | None = None, embedder: str | None = None)
Connector for local PDF/DOCX documents with RAG retrieval.
Ingests documents from one or more directories, splits them into chunks, embeds them in a vector store, and provides semantic search.
When langchain and chromadb are not installed, falls back to
a simple in-memory keyword search so the quickstart works without
heavy dependencies.
Each ingested file can carry structured metadata via a sidecar
<file>.meta.yaml or YAML frontmatter (for .md / .txt).
Metadata fields (asset_id, equipment_class_code, doc_type,
section_title) are indexed and can be used to filter the search
space before retrieval via the filters= kwarg.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
paths
|
list[str | Path] | None
|
List of directories or files to ingest. |
None
|
collection_name
|
str
|
Name for the ChromaDB collection. |
'machina_docs'
|
chunk_size
|
int
|
Target size for text chunks (in characters). |
1000
|
chunk_overlap
|
int
|
Overlap between consecutive chunks. |
200
|
reranker_model
|
str | None
|
Optional |
None
|
embedder
|
str | None
|
Optional |
None
|
Example
docs = DocumentStoreConnector(
paths=["manuals/", "procedures/"],
embedder="BAAI/bge-m3",
reranker_model="BAAI/bge-reranker-base",
)
await docs.connect()
results = await docs.search(
"bearing replacement", filters={"asset_id": "P-201"}
)
for chunk in results:
print(f"[{chunk.source} p.{chunk.page}] {chunk.content[:100]}")
disconnect
async
¶
Release resources.
All connect-time state must be cleared so a subsequent connect() cannot serve chunks from the previous corpus or keep a stale reranker handle.
search
async
¶
search(query: str, *, top_k: int = 5, asset_id: str = '', filters: dict[str, Any] | None = None) -> list[DocumentChunk]
Search documents for passages relevant to the query.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
query
|
str
|
The search query. |
required |
top_k
|
int
|
Maximum number of results to return. |
5
|
asset_id
|
str
|
Optional asset ID to scope the search. Shortcut for
|
''
|
filters
|
dict[str, Any] | None
|
Optional metadata filter applied before retrieval.
Supported keys: |
None
|
Returns:
| Type | Description |
|---|---|
list[DocumentChunk]
|
List of relevant document chunks, ranked by relevance. |
search_documents
async
¶
search_documents(query: str = '', *, top_k: int = 5, asset_id: str = '', filters: dict[str, Any] | None = None, **kwargs: Any) -> list[DocumentChunk]
Alias for :meth:search matching the declared capability name.
retrieve_section
async
¶
Retrieve the full text of a specific page/section.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source
|
str
|
Document source path. |
required |
page
|
int
|
Page number. |
required |
Returns:
| Type | Description |
|---|---|
str
|
The text content of that section. |
DocumentChunk
dataclass
¶
DocumentChunk(content: str, source: str = '', page: int = 0, score: float = 0.0, chunk_id: str = '', asset_id: str = '', equipment_class_code: str = '', doc_type: str = '', section_title: str = '', metadata: dict[str, Any] = dict(), parent_id: str = '', start_offset: int = 0, is_table: bool = False)
A retrieved passage from a document.
Since v0.3, content carries the full parent section after
parent-document retrieval rather than the small match passage that
was embedded. The match passage is still what the embedder /
BM25 / reranker scored — only the surface returned to the caller
expands to its parent so the LLM sees the full surrounding
context. Callers that previously sliced content for a short
passage should switch to keying on chunk_id instead.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
content
|
str
|
Text content of the chunk — typically the full parent section the matched passage was nested under. |
required |
source
|
str
|
File path or document name. |
''
|
page
|
int
|
Page number (if available). |
0
|
score
|
float
|
Relevance score from the retriever. |
0.0
|
chunk_id
|
str
|
Deterministic identifier for this chunk. |
''
|
asset_id
|
str
|
Domain asset id (e.g. |
''
|
equipment_class_code
|
str
|
ISO 14224 Annex A code if known. Internal use. |
''
|
doc_type
|
str
|
One of |
''
|
section_title
|
str
|
Title of the section this chunk belongs to. |
''
|
metadata
|
dict[str, Any]
|
Raw metadata bag from the underlying document loader. |
dict()
|
parent_id
|
str
|
Identifier of the section this chunk's match was
extracted from. Joins back to a :class: |
''
|
start_offset
|
int
|
Character offset of the match inside its parent section body. Used for windowing oversized parents without a fragile substring search. |
0
|
is_table
|
bool
|
|
False
|
Example
DocumentMetadata defines the metadata schema attached to ingested
documents and consumed by pre-retrieval filtering (asset_id,
doc_type, equipment_class_code, section_title). See
connectors/document-store.md for the sidecar /
frontmatter ingestion paths.
DocumentMetadata
dataclass
¶
DocumentMetadata(asset_id: str = '', equipment_class_code: str = '', doc_type: str = '', section_title: str = '', extra: dict[str, Any] = dict())
Structured metadata for a document or chunk.
All fields are optional. Empty strings mean "unknown" — they do not cause Chroma filters to reject the chunk.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
asset_id
|
str
|
Domain asset identifier (e.g. |
''
|
equipment_class_code
|
str
|
ISO 14224 Annex A Table A.4 code (e.g. |
''
|
doc_type
|
str
|
One of |
''
|
section_title
|
str
|
Title of the section this chunk belongs to. Populated by section-aware chunking; empty by default. |
''
|
extra
|
dict[str, Any]
|
Any additional key/value pairs from the sidecar/frontmatter. |
dict()
|
to_chroma_dict ¶
Return a flat dict suitable for Chroma metadata fields.
Chroma's where= clause cannot match on empty strings reliably,
so empty fields are omitted from the output.
Free-form extra keys from sidecars / frontmatter are accepted
as filterable metadata but are sanitized first: non-scalar values
(lists, dicts, None) are dropped, strings are stripped of control
characters and capped at _MAX_METADATA_VALUE_LEN, and keys
that collide with reserved system fields are rejected. This keeps
a typo or hostile sidecar from injecting newlines / overlong
payloads / system-field overrides into downstream consumers (the
vector store, prompt context, or operator-facing logs).
merge ¶
Return a new metadata where non-empty fields in override win.
from_dict
classmethod
¶
Build a DocumentMetadata from a parsed YAML dict.
from_path
classmethod
¶
Load metadata for path, combining sidecar, frontmatter, and inference.
Channel protocols¶
Any custom communication channel (a Slack alternative, a custom webhook
receiver, an SMS bridge) must accept an IncomingMessage value when the
agent invokes the registered handler. Agents type-annotate against this
shape; documenting it here makes the contract explicit for third-party
channel authors.
IncomingMessage
dataclass
¶
IncomingMessage(text: str, chat_id: str = '', user_id: str = '', user_name: str = '', channel: str = 'telegram', raw: Any = None)
A message received from a communication channel.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
text
|
str
|
The message text. |
required |
chat_id
|
str
|
Identifier for the chat/conversation. |
''
|
user_id
|
str
|
Identifier for the sender. |
''
|
user_name
|
str
|
Display name of the sender. |
''
|
channel
|
str
|
Channel type ( |
'telegram'
|
raw
|
Any
|
Raw platform-specific message object. |
None
|