Workflows¶
Workflows are declarative trigger-step pipelines for repeatable maintenance processes (alarm response, predictive routines, scheduled checks). They live alongside the agent's free-form LLM reasoning — use a workflow when the steps are well-defined and need to be deterministic; use the agent's tool-calling loop when the path is open-ended.
For end-to-end examples see the alarm_to_workorder built-in and examples/alarm_to_workorder.
Definition¶
Workflow¶
Workflow
dataclass
¶
A named, trigger-driven sequence of :class:Step objects.
Workflows define the complete pipeline for a maintenance process — from trigger event to final notification.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Human-readable workflow name. |
required |
description
|
str
|
What this workflow does. |
''
|
trigger
|
Trigger | str
|
Trigger specification or simple event-type string. |
''
|
steps
|
list[Step]
|
Ordered list of steps to execute. |
list()
|
Example
Step¶
Step
dataclass
¶
Step(name: str, action: str = '', description: str = '', prompt: str = '', template: str = '', depends_on: list[str] = list(), on_error: ErrorPolicy = ErrorPolicy.STOP, retries: int = 0, guard: GuardCondition | None = None, timeout_seconds: float | None = None, inputs: dict[str, Any] = dict(), is_write: bool | None = None)
A single step within a :class:Workflow.
Each step maps to an action (a connector method, domain service call, or LLM reasoning invocation) and may carry a prompt template for LLM-powered steps.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
name
|
str
|
Short identifier for the step (e.g. |
required |
action
|
str
|
Dot-path to the action (e.g. |
''
|
description
|
str
|
Human-readable explanation of what this step does. |
''
|
prompt
|
str
|
Optional prompt template for |
''
|
template
|
str
|
Optional message template for notification steps. |
''
|
depends_on
|
list[str]
|
List of step names whose output this step requires. |
list()
|
on_error
|
ErrorPolicy
|
What to do if this step fails. |
STOP
|
retries
|
int
|
Number of retry attempts (only meaningful when
|
0
|
guard
|
GuardCondition | None
|
Optional guard condition — if it returns |
None
|
timeout_seconds
|
float | None
|
Optional per-step timeout. |
None
|
inputs
|
dict[str, Any]
|
Explicit input mappings using template variables, e.g.
|
dict()
|
Example
Trigger¶
Trigger
dataclass
¶
Describes what event starts a workflow.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
type
|
TriggerType
|
The kind of event (alarm, schedule, manual, condition). |
MANUAL
|
filter
|
dict[str, Any]
|
Optional key-value filters to narrow matching events.
For example |
dict()
|
matches ¶
Return True if event satisfies this trigger's filter.
Each key in self.filter must be present in event and the
event value must be contained in the filter's list of allowed
values. If no filter is set, every event matches.
TriggerType¶
TriggerType ¶
Bases: StrEnum
Event types that can start a workflow.
GuardCondition¶
GuardCondition
dataclass
¶
A condition evaluated between steps to decide whether to proceed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
check
|
Callable[[dict[str, Any]], bool]
|
Callable that receives the current
:class: |
lambda _ctx: True
|
description
|
str
|
Human-readable explanation of the guard. |
''
|
Example
ErrorPolicy¶
ErrorPolicy ¶
Bases: StrEnum
What to do when a step fails.
Execution¶
WorkflowEngine¶
The engine that walks a Workflow, resolves template placeholders, dispatches each step's action, and applies the per-step error policy. Sandbox mode is configurable via the engine's sandbox attribute; when the engine is owned by an Agent, the agent's sandbox property propagates here automatically.
WorkflowEngine ¶
WorkflowEngine(*, registry: ConnectorRegistry | None = None, tracer: ActionTracer | None = None, llm: Any = None, services: dict[str, Any] | None = None, sandbox: bool = False)
Executes :class:Workflow definitions step by step.
The engine resolves {step_name} and {trigger.*} template
variables, dispatches each step's action to the appropriate
handler, and records every step in the :class:ActionTracer.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
registry
|
ConnectorRegistry | None
|
Connector registry for looking up connectors by capability. |
None
|
tracer
|
ActionTracer | None
|
Action tracer for observability. |
None
|
llm
|
Any
|
Optional LLM provider for |
None
|
services
|
dict[str, Any] | None
|
Optional dict mapping service names to callables,
e.g. |
None
|
sandbox
|
bool
|
If |
False
|
Example
execute
async
¶
Execute a workflow from start to finish.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
workflow
|
Workflow
|
The workflow definition. |
required |
trigger_event
|
dict[str, Any] | None
|
The event data that triggered the workflow. |
None
|
Returns:
| Name | Type | Description |
|---|---|---|
A |
WorkflowResult
|
class: |
Raises:
| Type | Description |
|---|---|
WorkflowError
|
If a step fails with :attr: |
WorkflowContext¶
The mutable bag of data threaded through one workflow execution. Stores the trigger event and the output of every completed step, and exposes two template resolution methods — resolve() for free-text fields where the result must be a string, and resolve_input_value() for step inputs where complex outputs (dicts, objects) should flow through unchanged.
WorkflowContext ¶
Mutable bag of data passed through a workflow execution.
Stores the trigger event and the output of every completed step, keyed by step name. Two template-resolution methods are provided:
- :meth:
resolve— interpolate{step_name}and{step_name.field}placeholders into a string. Used for free-text fields like :attr:Step.promptand :attr:Step.templatewhere the result must be a string. - :meth:
resolve_input_value— like :meth:resolve, but when the entire template is a single{key}placeholder, return the referenced raw value (object, dict, list) instead of itsstrrepr. Used by the engine for :attr:Step.inputs, so workflow steps can pass complex outputs (e.g. aWorkOrderinstance from a factory) downstream without coercion.
set_step_output ¶
Record the output of a completed step.
resolve ¶
Interpolate {key} placeholders in template, returning a string.
Supported patterns:
{trigger.field}— value from the trigger event.{step_name}— full output of a prior step (str-coerced).{step_name.field}— nested field access (dict or object).
Unresolved placeholders are left as-is so the missing substitution is visible in the rendered string, and a structured warning is logged at the call site.
resolve_input_value ¶
Resolve a step-input value, preserving raw object types.
When template is a string whose entire value is a single
{key} placeholder, return the referenced raw value (object,
dict, list, …) rather than its str repr. This lets steps
pass complex outputs downstream — e.g. a WorkOrder produced
by work_order_factory.create can flow into
cmms.create_work_order(work_order=…) without coercion.
For templates with text surrounding the placeholder (or
multiple placeholders), fall back to :meth:resolve so the
result is a fully interpolated string — matching the existing
behaviour for prompt and template fields.
When the referenced placeholder is absent (the step was
skipped, never ran, or the field does not exist), this method
returns None rather than the literal template string. An
explicit None is far safer than a silent str-coercion of
"{generate_work_order}" into a downstream kwarg; it lets
connectors fail loudly with a clear type error instead of
receiving a meaningless string. A structured warning is logged
with the unresolved expression so the failure is observable.
Non-string values are returned as-is so callers that already
pass raw objects in Step.inputs are not surprised.
Results¶
WorkflowResult¶
WorkflowResult
dataclass
¶
WorkflowResult(workflow_name: str, trigger_event: dict[str, Any] = dict(), step_results: list[StepResult] = list(), success: bool = True, duration_ms: float = 0.0)
Outcome of a full workflow execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
workflow_name
|
str
|
Name of the workflow. |
required |
trigger_event
|
dict[str, Any]
|
The event dict that started the workflow. |
dict()
|
step_results
|
list[StepResult]
|
Results for every step executed. |
list()
|
success
|
bool
|
|
True
|
duration_ms
|
float
|
Total wall-clock time in milliseconds. |
0.0
|
StepResult¶
StepResult
dataclass
¶
StepResult(step_name: str, output: Any = None, success: bool = True, error: str | None = None, duration_ms: float = 0.0, skipped: bool = False)
Outcome of a single step execution.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
step_name
|
str
|
Name of the step that produced this result. |
required |
output
|
Any
|
Arbitrary output data returned by the step action. |
None
|
success
|
bool
|
Whether the step completed without error. |
True
|
error
|
str | None
|
Error message if |
None
|
duration_ms
|
float
|
Wall-clock time in milliseconds. |
0.0
|
skipped
|
bool
|
Whether the step was skipped (guard or error policy). |
False
|
DiagnosisResult¶
Wrapper returned by FailureAnalyzer.diagnose(...) when invoked through a workflow step. Iterable and indexable like a list, but exposes primary / primary_code / codes accessors so downstream steps (e.g. work_order_factory.create) can pluck the top-ranked failure mode without writing list-comprehension boilerplate inside templates.
DiagnosisResult
dataclass
¶
Workflow-friendly wrapper around a ranked list of failure modes.
Returned by :meth:FailureAnalyzer.diagnose when called from the
workflow engine (i.e. with kwargs). Carries the full ranked
candidate list AND exposes a primary_code attribute that the
workflow template engine can resolve via {analyze_alarm.primary_code}
— a plain str suitable for WorkOrder.failure_mode which
requires str | None.
Iterates, indexes, and stringifies in ways the rest of the workflow (notification templates, audit logs) expects, so existing callers that treat the diagnosis as a list-of-dicts keep working.