Skip to content

Workflows

Workflows are declarative trigger-step pipelines for repeatable maintenance processes (alarm response, predictive routines, scheduled checks). They live alongside the agent's free-form LLM reasoning — use a workflow when the steps are well-defined and need to be deterministic; use the agent's tool-calling loop when the path is open-ended.

For end-to-end examples see the alarm_to_workorder built-in and examples/alarm_to_workorder.

Definition

Workflow

Workflow dataclass

Workflow(name: str, description: str = '', trigger: Trigger | str = '', steps: list[Step] = list())

A named, trigger-driven sequence of :class:Step objects.

Workflows define the complete pipeline for a maintenance process — from trigger event to final notification.

Parameters:

Name Type Description Default
name str

Human-readable workflow name.

required
description str

What this workflow does.

''
trigger Trigger | str

Trigger specification or simple event-type string.

''
steps list[Step]

Ordered list of steps to execute.

list()
Example
from machina.workflows import Step, Workflow, Trigger, TriggerType

predictive = Workflow(
    name="Predictive Maintenance",
    trigger=Trigger(type=TriggerType.ALARM),
    steps=[
        Step("diagnose", action="failure_analyzer.diagnose"),
        Step("create_wo", action="work_order_factory.create"),
    ],
)

step_names property

step_names: list[str]

Return the ordered list of step names.

get_step

get_step(name: str) -> Step | None

Look up a step by name, or None if not found.

Step

Step dataclass

Step(name: str, action: str = '', description: str = '', prompt: str = '', template: str = '', depends_on: list[str] = list(), on_error: ErrorPolicy = ErrorPolicy.STOP, retries: int = 0, guard: GuardCondition | None = None, timeout_seconds: float | None = None, inputs: dict[str, Any] = dict(), is_write: bool | None = None)

A single step within a :class:Workflow.

Each step maps to an action (a connector method, domain service call, or LLM reasoning invocation) and may carry a prompt template for LLM-powered steps.

Parameters:

Name Type Description Default
name str

Short identifier for the step (e.g. "diagnose_rules").

required
action str

Dot-path to the action (e.g. "cmms.read_work_orders").

''
description str

Human-readable explanation of what this step does.

''
prompt str

Optional prompt template for agent.reason steps. Placeholders in {step_name} format reference outputs from prior steps.

''
template str

Optional message template for notification steps.

''
depends_on list[str]

List of step names whose output this step requires.

list()
on_error ErrorPolicy

What to do if this step fails.

STOP
retries int

Number of retry attempts (only meaningful when on_error is :attr:ErrorPolicy.RETRY).

0
guard GuardCondition | None

Optional guard condition — if it returns False the step is skipped.

None
timeout_seconds float | None

Optional per-step timeout.

None
inputs dict[str, Any]

Explicit input mappings using template variables, e.g. {"asset_id": "{trigger.asset_id}"}.

dict()
Example
Step(
    "diagnose_rules",
    action="failure_analyzer.diagnose",
    description="Rule-based diagnosis using failure taxonomy",
    on_error=ErrorPolicy.RETRY,
    retries=2,
)

Trigger

Trigger dataclass

Trigger(type: TriggerType = TriggerType.MANUAL, filter: dict[str, Any] = dict())

Describes what event starts a workflow.

Parameters:

Name Type Description Default
type TriggerType

The kind of event (alarm, schedule, manual, condition).

MANUAL
filter dict[str, Any]

Optional key-value filters to narrow matching events. For example {"severity": ["warning", "critical"]} means the workflow triggers only for those alarm severities.

dict()
Example
Trigger(type=TriggerType.ALARM, filter={"severity": ["critical"]})

matches

matches(event: dict[str, Any]) -> bool

Return True if event satisfies this trigger's filter.

Each key in self.filter must be present in event and the event value must be contained in the filter's list of allowed values. If no filter is set, every event matches.

TriggerType

TriggerType

Bases: StrEnum

Event types that can start a workflow.

GuardCondition

GuardCondition dataclass

GuardCondition(check: Callable[[dict[str, Any]], bool] = lambda _ctx: True, description: str = '')

A condition evaluated between steps to decide whether to proceed.

Parameters:

Name Type Description Default
check Callable[[dict[str, Any]], bool]

Callable that receives the current :class:WorkflowContext dict and returns True to proceed or False to skip the guarded step.

lambda _ctx: True
description str

Human-readable explanation of the guard.

''
Example
GuardCondition(
    check=lambda ctx: ctx.get("diagnose", {}).get("confidence") == "high",
    description="Only proceed if diagnosis confidence is high",
)

ErrorPolicy

ErrorPolicy

Bases: StrEnum

What to do when a step fails.

Execution

WorkflowEngine

The engine that walks a Workflow, resolves template placeholders, dispatches each step's action, and applies the per-step error policy. Sandbox mode is configurable via the engine's sandbox attribute; when the engine is owned by an Agent, the agent's sandbox property propagates here automatically.

WorkflowEngine

WorkflowEngine(*, registry: ConnectorRegistry | None = None, tracer: ActionTracer | None = None, llm: Any = None, services: dict[str, Any] | None = None, sandbox: bool = False)

Executes :class:Workflow definitions step by step.

The engine resolves {step_name} and {trigger.*} template variables, dispatches each step's action to the appropriate handler, and records every step in the :class:ActionTracer.

Parameters:

Name Type Description Default
registry ConnectorRegistry | None

Connector registry for looking up connectors by capability.

None
tracer ActionTracer | None

Action tracer for observability.

None
llm Any

Optional LLM provider for agent.reason steps.

None
services dict[str, Any] | None

Optional dict mapping service names to callables, e.g. {"failure_analyzer": analyzer_instance}.

None
sandbox bool

If True, write actions are logged but not executed — read-only actions still run normally.

False
Example
from machina.workflows import WorkflowEngine, Workflow, Step

engine = WorkflowEngine(registry=registry, tracer=tracer)
result = await engine.execute(my_workflow, {"asset_id": "P-201"})

execute async

execute(workflow: Workflow, trigger_event: dict[str, Any] | None = None) -> WorkflowResult

Execute a workflow from start to finish.

Parameters:

Name Type Description Default
workflow Workflow

The workflow definition.

required
trigger_event dict[str, Any] | None

The event data that triggered the workflow.

None

Returns:

Name Type Description
A WorkflowResult

class:WorkflowResult with per-step results.

Raises:

Type Description
WorkflowError

If a step fails with :attr:ErrorPolicy.STOP.

WorkflowContext

The mutable bag of data threaded through one workflow execution. Stores the trigger event and the output of every completed step, and exposes two template resolution methods — resolve() for free-text fields where the result must be a string, and resolve_input_value() for step inputs where complex outputs (dicts, objects) should flow through unchanged.

WorkflowContext

WorkflowContext(trigger_event: dict[str, Any] | None = None)

Mutable bag of data passed through a workflow execution.

Stores the trigger event and the output of every completed step, keyed by step name. Two template-resolution methods are provided:

  • :meth:resolve — interpolate {step_name} and {step_name.field} placeholders into a string. Used for free-text fields like :attr:Step.prompt and :attr:Step.template where the result must be a string.
  • :meth:resolve_input_value — like :meth:resolve, but when the entire template is a single {key} placeholder, return the referenced raw value (object, dict, list) instead of its str repr. Used by the engine for :attr:Step.inputs, so workflow steps can pass complex outputs (e.g. a WorkOrder instance from a factory) downstream without coercion.

trigger property

trigger: dict[str, Any]

The trigger event that started the workflow.

steps property

steps: dict[str, Any]

All step outputs collected so far.

set_step_output

set_step_output(step_name: str, output: Any) -> None

Record the output of a completed step.

get

get(key: str, default: Any = None) -> Any

Look up a value by step name, or return default.

as_dict

as_dict() -> dict[str, Any]

Flat dict with trigger and all step outputs.

resolve

resolve(template: str) -> str

Interpolate {key} placeholders in template, returning a string.

Supported patterns:

  • {trigger.field} — value from the trigger event.
  • {step_name} — full output of a prior step (str-coerced).
  • {step_name.field} — nested field access (dict or object).

Unresolved placeholders are left as-is so the missing substitution is visible in the rendered string, and a structured warning is logged at the call site.

resolve_input_value

resolve_input_value(template: Any) -> Any

Resolve a step-input value, preserving raw object types.

When template is a string whose entire value is a single {key} placeholder, return the referenced raw value (object, dict, list, …) rather than its str repr. This lets steps pass complex outputs downstream — e.g. a WorkOrder produced by work_order_factory.create can flow into cmms.create_work_order(work_order=…) without coercion.

For templates with text surrounding the placeholder (or multiple placeholders), fall back to :meth:resolve so the result is a fully interpolated string — matching the existing behaviour for prompt and template fields.

When the referenced placeholder is absent (the step was skipped, never ran, or the field does not exist), this method returns None rather than the literal template string. An explicit None is far safer than a silent str-coercion of "{generate_work_order}" into a downstream kwarg; it lets connectors fail loudly with a clear type error instead of receiving a meaningless string. A structured warning is logged with the unresolved expression so the failure is observable.

Non-string values are returned as-is so callers that already pass raw objects in Step.inputs are not surprised.

Results

WorkflowResult

WorkflowResult dataclass

WorkflowResult(workflow_name: str, trigger_event: dict[str, Any] = dict(), step_results: list[StepResult] = list(), success: bool = True, duration_ms: float = 0.0)

Outcome of a full workflow execution.

Parameters:

Name Type Description Default
workflow_name str

Name of the workflow.

required
trigger_event dict[str, Any]

The event dict that started the workflow.

dict()
step_results list[StepResult]

Results for every step executed.

list()
success bool

True if all steps completed without fatal error.

True
duration_ms float

Total wall-clock time in milliseconds.

0.0

steps property

steps: list[StepResult]

Alias for :attr:step_results (used by examples).

duration_seconds property

duration_seconds: float

Duration in seconds (convenience wrapper over :attr:duration_ms).

StepResult

StepResult dataclass

StepResult(step_name: str, output: Any = None, success: bool = True, error: str | None = None, duration_ms: float = 0.0, skipped: bool = False)

Outcome of a single step execution.

Parameters:

Name Type Description Default
step_name str

Name of the step that produced this result.

required
output Any

Arbitrary output data returned by the step action.

None
success bool

Whether the step completed without error.

True
error str | None

Error message if success is False.

None
duration_ms float

Wall-clock time in milliseconds.

0.0
skipped bool

Whether the step was skipped (guard or error policy).

False

name property

name: str

Alias for :attr:step_name (used by examples).

DiagnosisResult

Wrapper returned by FailureAnalyzer.diagnose(...) when invoked through a workflow step. Iterable and indexable like a list, but exposes primary / primary_code / codes accessors so downstream steps (e.g. work_order_factory.create) can pluck the top-ranked failure mode without writing list-comprehension boilerplate inside templates.

DiagnosisResult dataclass

DiagnosisResult(matches: list[dict[str, Any]] = list())

Workflow-friendly wrapper around a ranked list of failure modes.

Returned by :meth:FailureAnalyzer.diagnose when called from the workflow engine (i.e. with kwargs). Carries the full ranked candidate list AND exposes a primary_code attribute that the workflow template engine can resolve via {analyze_alarm.primary_code} — a plain str suitable for WorkOrder.failure_mode which requires str | None.

Iterates, indexes, and stringifies in ways the rest of the workflow (notification templates, audit logs) expects, so existing callers that treat the diagnosis as a list-of-dicts keep working.

primary property

primary: dict[str, Any] | None

Top-ranked failure mode as a dict, or None when no match.

primary_code property

primary_code: str | None

str code of the top-ranked failure mode, suitable for WorkOrder.failure_mode.

codes property

codes: list[str]

All matching failure-mode codes, in rank order.