Deterministic Orchestration¶
AgentEnsemble can orchestrate purely deterministic (non-AI) workflows with the same DAG execution, parallel phases, callbacks, guardrails, metrics, and review gates that AI ensembles use -- but with zero LLM calls.
The simplest possible pipeline¶
Every task in a deterministic pipeline has a handler -- a Java lambda that receives a
TaskHandlerContext and returns a ToolResult:
import net.agentensemble.Ensemble;
import net.agentensemble.Task;
import net.agentensemble.ensemble.EnsembleOutput;
import net.agentensemble.tool.ToolResult;
EnsembleOutput output = Ensemble.run(
Task.builder()
.description("Compute checksum of input data")
.expectedOutput("SHA-256 hex string")
.handler(ctx -> ToolResult.success(sha256(inputData)))
.build(),
Task.builder()
.description("Store checksum to database")
.expectedOutput("Storage confirmation")
.handler(ctx -> {
db.store(inputData, ctx.description());
return ToolResult.success("stored");
})
.build()
);
System.out.println(output.getRaw()); // "stored"
Ensemble.run(Task...) requires all tasks to have handlers. If any task lacks one, a
clear error message points to the offending task and suggests using
Ensemble.run(ChatModel, Task...) for AI tasks.
Passing data between tasks¶
Use Task.builder().context(List.of(upstreamTask)) to declare a data dependency.
Inside the downstream handler, ctx.contextOutputs() delivers the prior task's output
as a TaskOutput whose raw string is accessed via .getRaw():
Task fetchTask = Task.builder()
.description("Fetch product catalogue from REST API")
.expectedOutput("JSON product list")
.handler(ctx -> {
String json = httpClient.get("https://api.example.com/products");
return ToolResult.success(json);
})
.build();
Task parseTask = Task.builder()
.description("Parse product list into structured records")
.expectedOutput("CSV of product records")
.context(List.of(fetchTask))
.handler(ctx -> {
String json = ctx.contextOutputs().get(0).getRaw();
String csv = jsonToCsv(json);
return ToolResult.success(csv);
})
.build();
Task storeTask = Task.builder()
.description("Write CSV to data warehouse")
.expectedOutput("Row count written")
.context(List.of(parseTask))
.handler(ctx -> {
String csv = ctx.contextOutputs().get(0).getRaw();
int rows = warehouse.bulkInsert(csv);
return ToolResult.success(rows + " rows inserted");
})
.build();
EnsembleOutput output = Ensemble.run(fetchTask, parseTask, storeTask);
System.out.println(output.getRaw()); // "1234 rows inserted"
The full output of every task is accessible in output.getTaskOutputs():
for (TaskOutput taskOutput : output.getTaskOutputs()) {
System.out.printf(" %s -> %s%n",
taskOutput.getTaskDescription(),
taskOutput.getRaw());
}
Reading multiple upstream outputs¶
A task can declare multiple context dependencies. The outputs are delivered in declaration order:
Task serviceA = Task.builder()
.description("Call service A")
.handler(ctx -> ToolResult.success(serviceA.getData()))
.build();
Task serviceB = Task.builder()
.description("Call service B")
.handler(ctx -> ToolResult.success(serviceB.getData()))
.build();
Task aggregator = Task.builder()
.description("Merge results from A and B")
.context(List.of(serviceA, serviceB))
.handler(ctx -> {
String dataA = ctx.contextOutputs().get(0).getRaw(); // serviceA output
String dataB = ctx.contextOutputs().get(1).getRaw(); // serviceB output
return ToolResult.success(merge(dataA, dataB));
})
.build();
When aggregator is added to an ensemble alongside serviceA and serviceB, the
framework infers a PARALLEL workflow automatically -- serviceA and serviceB run
concurrently, and aggregator starts only after both complete:
EnsembleOutput output = Ensemble.builder()
.task(serviceA)
.task(serviceB)
.task(aggregator)
.build()
.run();
Parallel independent tasks¶
For tasks with no data dependency on each other, declare them in an ensemble with
workflow(Workflow.PARALLEL):
EnsembleOutput output = Ensemble.builder()
.task(Task.builder()
.description("Send email notification")
.handler(ctx -> { email.send(); return ToolResult.success("sent"); })
.build())
.task(Task.builder()
.description("Post Slack message")
.handler(ctx -> { slack.post(); return ToolResult.success("posted"); })
.build())
.task(Task.builder()
.description("Update metrics dashboard")
.handler(ctx -> { metrics.record(); return ToolResult.success("recorded"); })
.build())
.workflow(Workflow.PARALLEL)
.build()
.run();
All three tasks start immediately and run concurrently on virtual threads.
Named workstreams with phases¶
Use phases when the pipeline has distinct stages with named logical groupings:
Task extract = Task.builder().description("Extract").handler(ctx -> ...).build();
Task transform = Task.builder().description("Transform")
.context(List.of(extract)).handler(ctx -> ...).build();
Task load = Task.builder().description("Load")
.context(List.of(transform)).handler(ctx -> ...).build();
Phase extractPhase = Phase.of("extract", extract);
Phase transformPhase = Phase.builder()
.name("transform")
.task(transform)
.after(extractPhase)
.build();
Phase loadPhase = Phase.builder()
.name("load")
.task(load)
.after(transformPhase)
.build();
EnsembleOutput output = Ensemble.builder()
.phase(extractPhase)
.phase(transformPhase)
.phase(loadPhase)
.build()
.run();
// Per-phase results are accessible by name
List<TaskOutput> extractResults = output.getPhaseOutputs().get("extract");
List<TaskOutput> transformResults = output.getPhaseOutputs().get("transform");
List<TaskOutput> loadResults = output.getPhaseOutputs().get("load");
Independent phases (with no after() dependency between them) run in parallel automatically.
Observability: callbacks¶
All task lifecycle callbacks work on deterministic tasks exactly as they do for AI tasks:
EnsembleOutput output = Ensemble.builder()
.task(fetchTask)
.task(transformTask)
.onTaskStart(e -> log.info("Starting: {}", e.taskDescription()))
.onTaskComplete(e -> log.info("Completed: {} in {}",
e.taskDescription(), e.duration()))
.onTaskFailed(e -> log.error("Failed: {}", e.taskDescription(), e.cause()))
.build()
.run();
Guardrails on deterministic tasks¶
Input and output guardrails enforce pre/post conditions on handler tasks:
Task validate = Task.builder()
.description("Validate incoming payload")
.handler(ctx -> ToolResult.success(process(ctx.description())))
.inputGuardrail(input -> {
if (input.text().isBlank()) return GuardrailResult.failure("Input must not be blank");
return GuardrailResult.success();
})
.outputGuardrail(output -> {
if (output.text().length() > MAX_SIZE) return GuardrailResult.failure("Output too large");
return GuardrailResult.success();
})
.build();
Handling failures¶
When a handler returns ToolResult.failure(...), the ensemble throws a
TaskExecutionException. The exception contains all outputs from tasks that completed
before the failure:
try {
Ensemble.run(step1, step2, step3);
} catch (TaskExecutionException e) {
log.error("Pipeline failed at task: {}", e.getTaskDescription());
log.error("Completed before failure: {} tasks", e.getCompletedTaskOutputs().size());
}
Mixing deterministic and AI tasks¶
Deterministic and AI tasks compose freely in the same ensemble. Only tasks without a
handler require a ChatModel:
Task fetchData = Task.builder()
.description("Fetch raw user feedback from API")
.handler(ctx -> ToolResult.success(api.fetchFeedback()))
.build();
Task summarize = Task.builder()
.description("Summarise the feedback into three bullet points")
.expectedOutput("Three bullet point summary")
.context(List.of(fetchData))
.build(); // AI task -- will use the ensemble-level model
Task storeResult = Task.builder()
.description("Store summarised feedback to database")
.context(List.of(summarize))
.handler(ctx -> {
db.store(ctx.contextOutputs().get(0).getRaw());
return ToolResult.success("stored");
})
.build();
EnsembleOutput output = Ensemble.builder()
.chatLanguageModel(model) // required for the AI task only
.task(fetchData)
.task(summarize)
.task(storeResult)
.build()
.run();
Builder reference for deterministic-only ensembles¶
| Method | Required? | Notes |
|---|---|---|
task(...).handler(...) |
Yes (all tasks) | The functional handler to execute |
chatLanguageModel(model) |
No | Not needed when all tasks have handlers |
workflow(Workflow.SEQUENTIAL) |
No | Inferred as SEQUENTIAL when no context deps exist |
workflow(Workflow.PARALLEL) |
No | Use when tasks should run concurrently |
onTaskStart(...) |
No | Callback fired before each task |
onTaskComplete(...) |
No | Callback fired after each task succeeds |
onTaskFailed(...) |
No | Callback fired when a task fails |
phase(...) |
No | Use instead of task() for named workstreams |
reviewHandler(...) |
No | Human-in-the-loop review gates |