Long-running executions
How to handle long-running executions on the Letta API
When agents need to execute multiple tool calls or perform complex operations (like deep research, data analysis, or multi-step workflows), processing time can vary significantly.
Letta supports various ways to handle long-running agents, so you can choose the approach that best fits your use case:
| Use Case | Duration | Recommendedation | Key Benefits |
|---|---|---|---|
| Few-step invocations | < 2-3 minutes | Standard streaming with extended timeouts + keepalive pings | Simpler implementation, easy to add to standard streaming |
| Variable length runs | 3-10+ minutes | Background mode | Survives disconnects, resumable streams |
Background Mode with Resumable Streaming
Section titled “Background Mode with Resumable Streaming”Background mode decouples agent execution from your client connection. The agent processes your request on the server while streaming results to a persistent store, allowing you to reconnect and resume from any point — even if your application crashes or network fails.
curl --request POST \ --url https://api.letta.com/v1/agents/$AGENT_ID/messages/stream \ --header 'Authorization: Bearer $LETTA_API_KEY' \ --header 'Content-Type: application/json' \ --data '{ "messages": [ { "role": "user", "content": "Run comprehensive analysis on this dataset" } ], "stream_tokens": true, "background": true}'
# Response stream includes run_id and seq_id for each chunk:
data: {"run_id":"run-123","seq_id":0,"message_type":"reasoning_message","reasoning":"Analyzing"}data: {"run_id":"run-123","seq_id":1,"message_type":"reasoning_message","reasoning":" the dataset"}data: {"run_id":"run-123","seq_id":2,"message_type":"tool_call","tool_call":{...}}
# ... stream continues
# Step 2: If disconnected, resume from last received seq_id
curl --request GET \ --url https://api.letta.com/v1/runs/$RUN_ID/stream \ --header 'Accept: text/event-stream' \ --data '{"starting_after": 57}'stream = client.agents.messages.create( agent_id=agent_state.id, messages=[ { "role": "user", "content": "Run comprehensive analysis on this dataset" } ], streaming=True, background=True,)run_id = Nonelast_seq_id = Nonefor chunk in stream: if hasattr(chunk, "run_id") and hasattr(chunk, "seq_id"): run_id = chunk.run_id # Save this to reconnect if your connection drops last_seq_id = chunk.seq_id # Save this as your resumption point for cursor-based pagination print(chunk)
# If disconnected, resume from last received seq_id:for chunk in client.runs.stream(run_id, starting_after=last_seq_id): print(chunk)const stream = await client.agents.messages.create(agentState.id, { messages: [ { role: "user", content: "Run comprehensive analysis on this dataset", }, ], streaming: true, background: true,});
let runId = null;let lastSeqId = null;for await (const chunk of stream) { if (chunk.run_id && chunk.seq_id) { runId = chunk.run_id; // Save this to reconnect if your connection drops lastSeqId = chunk.seq_id; // Save this as your resumption point for cursor-based pagination } console.log(chunk);}
// If disconnected, resume from last received seq_idfor await (const chunk of client.runs.stream(runId, { starting_after: lastSeqId,})) { console.log(chunk);}HITL in Background Mode
Section titled “HITL in Background Mode”When Human‑in‑the‑Loop (HITL) approval is enabled for a tool, your background stream may pause and emit an approval_request_message. In background mode, send the approval via a separate background stream and capture that stream’s run_id/seq_id.
# 1) Start background stream; capture approval requestcurl --request POST \ --url https://api.letta.com/v1/agents/$AGENT_ID/messages/stream \ --header 'Authorization: Bearer $LETTA_API_KEY' \ --header 'Content-Type: application/json' \ --data '{ "messages": [{"role": "user", "content": "Do a sensitive operation"}], "stream_tokens": true, "background": true}'
# Example stream output (approval request arrives):
data: {"run_id":"run-abc","seq_id":0,"message_type":"reasoning_message","reasoning":"..."}data: {"run_id":"run-abc","seq_id":1,"message_type":"approval_request_message","id":"message-abc","tool_call":{"name":"sensitive_operation","arguments":"{...}","tool_call_id":"tool-xyz"}}
# 2) Approve in background; capture approval stream cursor (this creates a new run)
curl --request POST \ --url https://api.letta.com/v1/agents/$AGENT_ID/messages/stream \ --header 'Authorization: Bearer $LETTA_API_KEY' \ --header 'Content-Type: application/json' \ --data '{"messages": [{"type": "approval", "approve": true, "approval_request_id": "message-abc"}],"stream_tokens": true,"background": true}'
# Example approval stream output (tool result arrives here):
data: {"run_id":"run-new","seq_id":0,"message_type":"tool_return_message","status":"success","tool_return":"..."}
# 3) Resume the approval stream's run to continue
curl --request GET \ --url https://api.letta.com/v1/runs/$RUN_ID/stream \ --header 'Accept: text/event-stream' \ --data '{"starting_after": 0}'# 1) Start background stream and capture approval requeststream = client.agents.messages.create( agent_id=agent.id, messages=[{"role": "user", "content": "Do a sensitive operation"}], streaming=True, background=True,)
approval_request_id = Noneorig_run_id = Nonelast_seq_id = 0for chunk in stream: if hasattr(chunk, "run_id") and hasattr(chunk, "seq_id"): orig_run_id = chunk.run_id last_seq_id = chunk.seq_id if getattr(chunk, "message_type", None) == "approval_request_message": approval_request_id = chunk.id break
# 2) Approve in background; capture the approval stream cursor (this creates a new run)approve = client.agents.messages.create( agent_id=agent.id, messages=[{"type": "approval", "approve": True, "approval_request_id": approval_request_id}], streaming=True, background=True,)
run_id = Noneapprove_seq = 0for chunk in approve: if hasattr(chunk, "run_id") and hasattr(chunk, "seq_id"): run_id = chunk.run_id approve_seq = chunk.seq_id if getattr(chunk, "message_type", None) == "tool_return_message": # Tool result arrives here on the approval stream break
# 3) Resume that run to read follow-up tokensfor chunk in client.runs.stream(run_id, starting_after=approve_seq): print(chunk)// 1) Start background stream and capture approval requestconst stream = await client.agents.messages.create(agent.id, { messages: [{ role: "user", content: "Do a sensitive operation" }], streaming: true, background: true,});
let approvalRequestId: string | null = null;let origRunId: string | null = null;let lastSeqId = 0;for await (const chunk of stream) { if (chunk.run_id && chunk.seq_id) { origRunId = chunk.run_id; lastSeqId = chunk.seq_id; } if (chunk.message_type === "approval_request_message") { approvalRequestId = chunk.id; break; }}
// 2) Approve in background; capture the approval stream cursor (this creates a new run)const approve = await client.agents.messages.create(agent.id, { messages: [{ type: "approval", approve: true, approval_request_id: approvalRequestId }], streaming: true, background: true,});
let runId: string | null = null;let approveSeq = 0;for await (const chunk of approve) { if (chunk.run_id && chunk.seq_id) { runId = chunk.run_id; approveSeq = chunk.seq_id; } if (chunk.message_type === "tool_return_message") { // Tool result arrives here on the approval stream break; }}
// 3) Resume that run to read follow-up tokensconst resume = await client.runs.stream(runId!, { starting_after: approveSeq });for await (const chunk of resume) { console.log(chunk);}Discovering and Resuming Active Streams
Section titled “Discovering and Resuming Active Streams”When your application starts or recovers from a crash, you can check for any active background streams and resume them. This is particularly useful for:
- Application restarts: Resume processing after deployments or crashes
- Load balancing: Pick up streams started by other instances
- Monitoring: Check progress of long-running operations from different clients
# Step 1: Find active background streams for your agentscurl --request GET \ --url https://api.letta.com/v1/runs/active \ --header 'Authorization: Bearer $LETTA_API_KEY' \ --header 'Content-Type: application/json' \ --data '{ "agent_ids": [ "agent-123", "agent-456" ], "background": true}'# Returns: [{"run_id": "run-abc", "agent_id": "agent-123", "status": "processing", ...}]
# Step 2: Resume streaming from the beginning (or any specified seq_id)
curl --request GET \ --url https://api.letta.com/v1/runs/$RUN_ID/stream \ --header 'Accept: text/event-stream' \ --data '{"starting_after": 0, # Start from beginning"batch_size": 1000 # Fetch historical chunks in larger batches}'# Find and resume active background streamsactive_runs = client.runs.active( agent_ids=["agent-123", "agent-456"], background=True,)
if active_runs: # Resume the first active stream from the beginning run = active_runs[0] print(f"Resuming stream for run {run.id}, status: {run.status}")
stream = client.runs.stream( run_id=run.id, starting_after=0, # Start from beginning batch_size=1000 # Fetch historical chunks in larger batches )
# Each historical chunk is streamed one at a time, followed by new chunks as they become available for chunk in stream: print(chunk)// Find and resume active background streamsconst activeRuns = await client.runs.active({ agentIds: ["agent-123", "agent-456"], background: true,});
if (activeRuns.length > 0) { // Resume the first active stream from the beginning const run = activeRuns[0]; console.log(`Resuming stream for run ${run.id}, status: ${run.status}`);
const stream = await client.runs.stream(run.id, { starting_after: 0, // Start from beginning batch_size: 1000, // Fetch historical chunks in larger batches });
// Each historical chunk is streamed one at a time, followed by new chunks as they become available for await (const chunk of stream) { console.log(chunk); }}Alternate option: Standard Streaming with Keepalive Pings and Longer Timeouts
Section titled “Alternate option: Standard Streaming with Keepalive Pings and Longer Timeouts”For operations under 10 minutes that need real-time updates without the complexity of background processing. Configure keepalive pings and timeouts to maintain stable connections:
curl --request POST \ --url https://api.letta.com/v1/agents/$AGENT_ID/messages/stream \ --header 'Authorization: Bearer $LETTA_API_KEY' \ --header 'Content-Type: application/json' \ --data '{ "messages": [ { "role": "user", "content": "Execute this long-running analysis" } ], "include_pings": true}'# Configure client with extended timeoutfrom letta_client import Lettaimport os
client = Letta(api_key=os.getenv("LETTA_API_KEY"))
# Enable pings to prevent timeout during long operationsstream = client.agents.messages.create( agent_id=agent_state.id, messages=[ { "role": "user", "content": "Execute this long-running analysis" } ], streaming=True, stream_tokens=True, include_pings=True, # Sends periodic keepalive messages request_options={"timeout_in_seconds": 600} # 10 min timeout)
# Process the stream (pings will keep connection alive)for chunk in stream: if chunk.message_type == "ping": # Keepalive ping received, connection is still active continue print(chunk)// Configure client with extended timeoutimport Letta from "@letta-ai/letta-client";
const client = new Letta({ apiKey: process.env.LETTA_API_KEY,});
// Enable pings to prevent timeout during long operationsconst stream = await client.agents.messages.create(agentState.id, { messages: [ { role: "user", content: "Execute this long-running analysis" } ], streaming: true, stream_tokens: true, include_pings: true, // Sends periodic keepalive messages timeout_in_seconds: 600 // 10 minutes timeout in seconds});
// Process the stream (pings will keep connection alive)for await (const chunk of stream) { if (chunk.message_type === "ping") { // Keepalive ping received, connection is still active continue; } console.log(chunk);}Configuration Guidelines
Section titled “Configuration Guidelines”| Parameter | Purpose | When to Use |
|---|---|---|
| Timeout in seconds | Extends request timeout beyond 60s default | Set to 1.5x your expected max duration |
| Include pings | Sends keepalive messages every ~30s | Enable for operations with long gaps between outputs |