Skip to content

Cross-Ensemble Delegation Example

This example demonstrates two ensembles communicating over WebSocket: a kitchen ensemble that shares a task and a tool, and a room service ensemble that uses them.

Kitchen Ensemble (Provider)

// A tool that checks ingredient availability
AgentTool inventoryTool = new AbstractAgentTool() {
    @Override public String name() { return "check-inventory"; }
    @Override public String description() { return "Check ingredient availability"; }
    @Override protected ToolResult doExecute(String input) {
        // In production, this would query a real inventory system
        return ToolResult.success("Available: " + input + " (3 portions)");
    }
};

// Build and start the kitchen ensemble
WebDashboard dashboard = WebDashboard.builder().port(7329).build();

Ensemble kitchen = Ensemble.builder()
    .chatLanguageModel(model)
    .task(Task.of("Manage kitchen operations"))
    .shareTask("prepare-meal", Task.builder()
        .description("Prepare a meal as specified by the guest")
        .expectedOutput("Confirmation with preparation details and estimated time")
        .build())
    .shareTool("check-inventory", inventoryTool)
    .webDashboard(dashboard)
    .build();

kitchen.start(7329);

Room Service Ensemble (Consumer)

// Configure connection to the kitchen
NetworkConfig config = NetworkConfig.builder()
    .ensemble("kitchen", "ws://localhost:7329/ws")
    .build();

NetworkClientRegistry registry = new NetworkClientRegistry(config);

// Build the room service ensemble with network capabilities
EnsembleOutput result = Ensemble.builder()
    .chatLanguageModel(model)
    .task(Task.builder()
        .description("Handle guest request: wagyu steak, medium-rare, room 403")
        .tools(
            NetworkTask.from("kitchen", "prepare-meal", registry),
            NetworkTool.from("kitchen", "check-inventory", registry))
        .build())
    .build()
    .run();

System.out.println(result.lastCompletedOutput().orElseThrow().getRaw());

// Clean up
registry.close();
kitchen.stop();

Testing Without Network

// Use stubs instead of real network connections
StubNetworkTask mealStub = NetworkTask.stub("kitchen", "prepare-meal",
    "Meal prepared: wagyu steak, medium-rare. Ready in 25 minutes.");

StubNetworkTool inventoryStub = NetworkTool.stub("kitchen", "check-inventory",
    "Wagyu beef: 3 portions available");

EnsembleOutput result = Ensemble.builder()
    .chatLanguageModel(model)
    .task(Task.builder()
        .description("Handle guest request: wagyu steak, medium-rare, room 403")
        .tools(mealStub, inventoryStub)
        .build())
    .build()
    .run();

Using Recordings for Assertions

RecordingNetworkTask recorder = NetworkTask.recording("kitchen", "prepare-meal");

Ensemble.builder()
    .chatLanguageModel(model)
    .task(Task.builder()
        .description("Handle guest request")
        .tools(recorder)
        .build())
    .build()
    .run();

assertThat(recorder.callCount()).isEqualTo(1);
assertThat(recorder.lastRequest()).contains("wagyu");

Transport SPI

The transport layer is pluggable. The default simple mode uses in-process queues:

// Transport for the kitchen ensemble (bound to its inbox)
Transport kitchenTransport = Transport.websocket("kitchen");

// Build a work request
WorkRequest request = new WorkRequest(
    UUID.randomUUID().toString(),
    "room-service",
    "prepare-meal",
    "Wagyu steak, medium-rare, room 403",
    Priority.NORMAL,
    Duration.ofMinutes(30),
    new DeliverySpec(DeliveryMethod.WEBSOCKET, null),
    null, null, null, null);

// Room service sends the request to the kitchen's inbox
kitchenTransport.send(request);

// Kitchen receives work from its inbox (blocking)
WorkRequest incoming = kitchenTransport.receive(Duration.ofSeconds(30));

// Kitchen processes the request and delivers the response
WorkResponse response = new WorkResponse(
    incoming.requestId(),
    "COMPLETED",
    "Meal prepared: wagyu steak, medium-rare. Ticket #4071.",
    null,
    25000L);

kitchenTransport.deliver(response);

The RequestQueue and ResultStore SPIs are also available independently:

// In-memory request queue
RequestQueue queue = RequestQueue.inMemory();
queue.enqueue("kitchen", request);
WorkRequest dequeued = queue.dequeue("kitchen", Duration.ofSeconds(10));

// In-memory result store
ResultStore store = ResultStore.inMemory();
store.store("req-42", response, Duration.ofHours(1));
WorkResponse retrieved = store.retrieve("req-42");

Priority Queue

Use RequestQueue.priority() to order incoming work by priority with optional aging to prevent starvation of low-priority requests.

// Create a priority queue with 30-minute aging intervals.
// LOW requests are promoted to NORMAL after 30 min, HIGH after 60 min, CRITICAL after 90 min.
PriorityWorkQueue queue = RequestQueue.priority(AgingPolicy.every(Duration.ofMinutes(30)));

// Enqueue requests with different priorities
queue.enqueue("kitchen", new WorkRequest(
    "req-vip-1", "room-service", "prepare-meal",
    "Wagyu steak for penthouse suite", Priority.CRITICAL,
    null, null, null, null, null, null));

queue.enqueue("kitchen", new WorkRequest(
    "req-4071", "room-service", "prepare-meal",
    "Club sandwich, room 205", Priority.NORMAL,
    null, null, null, null, null, null));

// Dequeue returns CRITICAL request first
WorkRequest next = queue.dequeue("kitchen", Duration.ofSeconds(30));
// next.requestId() -> "req-vip-1"

// Get queue status for task_accepted responses
QueueStatus status = queue.queueStatus("kitchen", "req-4071");
// status.queuePosition()       -> 0 (it's next)
// status.estimatedCompletion()  -> PT30S