Parallel Workflows & Fan-Out¶
1. Why this matters¶
If two nodes don't depend on each other, running them sequentially wastes time and money. Examples:
- Get the weather AND the news AND the stock price for a city.
- Generate three blog drafts (one per LLM) and pick the best.
- Process every row of a list in parallel.
LangGraph runs branches concurrently when the graph shape allows — no extra code.
2. Mental model¶
Two flavors of parallelism:
| Flavor | When known | How |
|---|---|---|
| Static fan-out | At build time | Add multiple edges from one source. Engine runs them in parallel. |
| Dynamic fan-out (Send API) | At run time, count depends on state | Conditional edge returns [Send("worker", item) for item in batch] |
flowchart LR
subgraph Static
A1[Start] --> X1[do A]
A1 --> X2[do B]
A1 --> X3[do C]
X1 --> J1[Merge]
X2 --> J1
X3 --> J1
end
subgraph Dynamic-Send
A2[plan: produce N items] --> R{router returns N Sends}
R --> W1[worker item 1]
R --> W2[worker item 2]
R --> W3[worker item N]
W1 --> J2[reducer accumulates]
W2 --> J2
W3 --> J2
end
3. Architecture / Flow¶
Important: when multiple branches write the same state key, you need a reducer to merge — otherwise it's a race (last write wins).
flowchart TD
S[plan_node<br/>produces queries list] --> R{conditional edge<br/>returns list of Send}
R --> W1[worker query 1]
R --> W2[query 2]
R --> W3[query 3]
W1 -->|results: result_1| AGG[state.results<br/>Annotated list, operator.add]
W2 -->|results: result_2| AGG
W3 -->|results: result_3| AGG
AGG --> NEXT[next node sees<br/>combined results]
4. Core concepts¶
- Static fan-out —
add_edge("plan", "a"); add_edge("plan", "b")→ bothaandbrun afterplanin the same super-step. - Super-step — LangGraph's execution unit. All nodes in the same super-step run concurrently; their updates are merged before the next super-step.
Send(node_name, sub_state)— schedules a node invocation with a custom state for that branch. Used for dynamic per-item parallelism.- Fan-in — when parallel branches reconverge, the engine merges their state writes via reducers. Without a reducer for a multi-writer field → conflict.
- Order is not guaranteed — never assume branch A finishes before branch B.
5. Code — minimal working example¶
Static parallel — call two models, combine:
from typing import TypedDict, Annotated
from operator import add
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage
class S(TypedDict):
prompt: str
answers: Annotated[list[str], add] # reducer: append from each branch
openai_llm = ChatOpenAI(model="gpt-4o-mini")
anthropic_llm = ChatAnthropic(model="claude-haiku-4-5-20251001")
def ask_openai(state: S):
r = openai_llm.invoke([HumanMessage(state["prompt"])])
return {"answers": [f"OpenAI: {r.content}"]}
def ask_anthropic(state: S):
r = anthropic_llm.invoke([HumanMessage(state["prompt"])])
return {"answers": [f"Anthropic: {r.content}"]}
def combine(state: S):
return {} # downstream consumer just reads state.answers
b = StateGraph(S)
b.add_node("openai", ask_openai)
b.add_node("anthropic", ask_anthropic)
b.add_node("combine", combine)
# Fan-out: both run concurrently after START
b.add_edge(START, "openai")
b.add_edge(START, "anthropic")
# Fan-in: both feed into combine
b.add_edge("openai", "combine")
b.add_edge("anthropic", "combine")
b.add_edge("combine", END)
result = b.compile().invoke({"prompt": "Define vector embeddings in one sentence.",
"answers": []})
for a in result["answers"]:
print(a)
6. Code — real-world pattern¶
Dynamic fan-out with Send — answer N sub-questions in parallel, then synthesize:
from typing import TypedDict, Annotated
from operator import add
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)
class State(TypedDict):
question: str
sub_questions: list[str]
answers: Annotated[list[str], add]
final: str
class WorkerState(TypedDict):
question: str # the parent question (optional)
sub_question: str # what this worker should answer
def plan(state: State):
out = llm.invoke([HumanMessage(
f"Break this into 3 distinct sub-questions, one per line:\n{state['question']}")])
subs = [s.strip("-• ").strip() for s in out.content.splitlines() if s.strip()][:3]
return {"sub_questions": subs}
# Conditional edge: returns a list of Send to fan out N workers
def dispatch(state: State):
return [Send("answer", {"sub_question": q, "question": state["question"]})
for q in state["sub_questions"]]
def answer(state: WorkerState):
r = llm.invoke([HumanMessage(f"Answer briefly: {state['sub_question']}")])
return {"answers": [r.content]} # accumulated via reducer
def synthesize(state: State):
joined = "\n\n".join(f"- {a}" for a in state["answers"])
r = llm.invoke([HumanMessage(
f"Synthesize a final answer to '{state['question']}' from:\n{joined}")])
return {"final": r.content}
b = StateGraph(State)
b.add_node("plan", plan)
b.add_node("answer", answer)
b.add_node("synthesize", synthesize)
b.add_edge(START, "plan")
b.add_conditional_edges("plan", dispatch, ["answer"]) # fan-out
b.add_edge("answer", "synthesize") # fan-in (auto-join)
b.add_edge("synthesize", END)
graph = b.compile()
print(graph.invoke({
"question": "How would I design a RAG system for legal documents?",
"sub_questions": [], "answers": [], "final": "",
})["final"])
The Send(node, state) mechanism is what lets MapReduce-style work happen with a single conditional edge.
7. Common pitfalls¶
- ❗ Parallel branches writing the same field without a reducer. Race condition — one wins, others are lost. Always use
Annotated[..., reducer]on multi-writer fields. - ❗ Assuming branch order. It's non-deterministic. Don't write code that depends on "A finishes before B."
- ❗
Sendworkers mutating parent state directly.Sendcarries a sub-state for that branch. The worker's return is merged back via reducers — same rules apply. - ❗ Forgetting the conditional edge needs
path_mapforSend. The simplest pattern isadd_conditional_edges("plan", dispatch_fn, ["worker_node_name"])— the list tells the validator which destinations are possible. - ❗ Over-parallelizing with rate-limited APIs. 50 concurrent OpenAI calls = hitting rate limits. Use
model.with_config(max_concurrency=...)or batch.
8. When to use vs not use¶
| Use parallel when | Use sequential when |
|---|---|
| Branches are independent | Each step needs the previous step's output |
| You can afford the concurrent API calls | You're rate-limited or budget-sensitive |
| You can write a reducer to combine outputs | The branches produce conflicting state |
| Fan-out count is known → static edges | — |
| Fan-out count depends on data → Send API | — |
9. Cheatsheet¶
# STATIC FAN-OUT — add multiple edges
b.add_edge(START, "a")
b.add_edge(START, "b")
b.add_edge(START, "c")
b.add_edge("a", "merge")
b.add_edge("b", "merge")
b.add_edge("c", "merge")
# DYNAMIC FAN-OUT — Send API
from langgraph.types import Send
def dispatch(state) -> list[Send]:
return [Send("worker", {"item": x}) for x in state["batch"]]
b.add_conditional_edges("plan", dispatch, ["worker"])
# Multi-writer field — REDUCER REQUIRED
class S(TypedDict):
results: Annotated[list, operator.add] # or your own merge fn
# Limit concurrency at invoke
graph.invoke(state, config={"max_concurrency": 5})
10. Q&A — recall test¶
-
Q: What does "super-step" mean? A: A LangGraph execution round in which all currently-scheduled nodes run concurrently. After they finish, their state updates are merged and the next super-step is scheduled.
-
Q: When do you need
Sendinstead of multipleadd_edgecalls? A: When the number of parallel branches depends on state (e.g., one worker per sub-question, where the count varies). Staticadd_edgecan't express "make N branches at runtime." -
Q: What happens if two parallel branches return
{"x": "a"}and{"x": "b"}with no reducer onx? A: Race — one value wins, the other is lost. Always define a reducer for multi-writer fields. -
Q: Does order between parallel branches matter? A: No — it's non-deterministic. Treat branches as a set, not a sequence.
-
Q: How do you limit how many
Sendworkers run concurrently? A: Passconfig={"max_concurrency": N}to.invoke()or.stream().
Practice¶
What does this print?
Expected: True
Limit concurrency so you don't hit API rate limits
Expected: True
Quiz — Quick check¶
What you remember
Q1. What's the Send API for in LangGraph?
- Fan-out — dispatches the same node N times with different inputs, running them concurrently
- Sends a notification
- Forwards state to END
- Triggers an LLM call
Why:
Send("worker", item)schedules theworkernode to run withitemas part of its state. Returning a list ofSends fans out the graph — common for map-reduce patterns.
Q2. What's the benefit of parallel execution for LLM calls?
- LLM calls are I/O-bound; running 10 in parallel can be 10× faster than serial
- Uses less RAM
- Cheaper per call
- More accurate
Why: Each LLM call spends most of its time waiting for the API. Concurrency overlaps the waits — total wall time approaches the time of one call.
Q3. How do you prevent parallel calls from overwriting each other's state?
- Use reducers (
Annotated[list, operator.add]) so concurrent writes are merged instead of overwriting - Lock the state
- Don't run in parallel
- Use a database
Why: Without a reducer, concurrent writes race — last writer wins. With a reducer, contributions from each parallel branch are combined deterministically.
Common doubts¶
How do I set up parallel processing of a list of items?
The map-reduce pattern: (1) one node emits a list of Send objects, fanning out to a worker node, (2) the worker processes one item and writes to a state field with operator.add reducer, (3) a collector node reads the accumulated results. LangGraph orchestrates the concurrency.
What's the right max_concurrency?
Depends on your rate limits. For OpenAI: 5-10 is usually safe. For Anthropic: similar. For your own service: as much as your backend handles. Start low and increase if you don't hit rate limits.
Can parallel nodes communicate during execution?
No — that's a feature, not a bug. Each parallel branch runs independently against its own copy of state. They only "communicate" through reducers at merge time. Keeps things predictable.