Skip to content

Parallel Workflows & Fan-Out

1. Why this matters

If two nodes don't depend on each other, running them sequentially wastes time and money. Examples:

  • Get the weather AND the news AND the stock price for a city.
  • Generate three blog drafts (one per LLM) and pick the best.
  • Process every row of a list in parallel.

LangGraph runs branches concurrently when the graph shape allows — no extra code.

2. Mental model

Two flavors of parallelism:

Flavor When known How
Static fan-out At build time Add multiple edges from one source. Engine runs them in parallel.
Dynamic fan-out (Send API) At run time, count depends on state Conditional edge returns [Send("worker", item) for item in batch]
flowchart LR
    subgraph Static
      A1[Start] --> X1[do A]
      A1 --> X2[do B]
      A1 --> X3[do C]
      X1 --> J1[Merge]
      X2 --> J1
      X3 --> J1
    end
    subgraph Dynamic-Send
      A2[plan: produce N items] --> R{router returns N Sends}
      R --> W1[worker item 1]
      R --> W2[worker item 2]
      R --> W3[worker item N]
      W1 --> J2[reducer accumulates]
      W2 --> J2
      W3 --> J2
    end

3. Architecture / Flow

Important: when multiple branches write the same state key, you need a reducer to merge — otherwise it's a race (last write wins).

flowchart TD
    S[plan_node<br/>produces queries list] --> R{conditional edge<br/>returns list of Send}
    R --> W1[worker query 1]
    R --> W2[query 2]
    R --> W3[query 3]
    W1 -->|results: result_1| AGG[state.results<br/>Annotated list, operator.add]
    W2 -->|results: result_2| AGG
    W3 -->|results: result_3| AGG
    AGG --> NEXT[next node sees<br/>combined results]

4. Core concepts

  • Static fan-outadd_edge("plan", "a"); add_edge("plan", "b") → both a and b run after plan in the same super-step.
  • Super-step — LangGraph's execution unit. All nodes in the same super-step run concurrently; their updates are merged before the next super-step.
  • Send(node_name, sub_state) — schedules a node invocation with a custom state for that branch. Used for dynamic per-item parallelism.
  • Fan-in — when parallel branches reconverge, the engine merges their state writes via reducers. Without a reducer for a multi-writer field → conflict.
  • Order is not guaranteed — never assume branch A finishes before branch B.

5. Code — minimal working example

Static parallel — call two models, combine:

from typing import TypedDict, Annotated
from operator import add
from langgraph.graph import StateGraph, START, END
from langchain_openai import ChatOpenAI
from langchain_anthropic import ChatAnthropic
from langchain_core.messages import HumanMessage

class S(TypedDict):
    prompt: str
    answers: Annotated[list[str], add]   # reducer: append from each branch

openai_llm = ChatOpenAI(model="gpt-4o-mini")
anthropic_llm = ChatAnthropic(model="claude-haiku-4-5-20251001")

def ask_openai(state: S):
    r = openai_llm.invoke([HumanMessage(state["prompt"])])
    return {"answers": [f"OpenAI: {r.content}"]}

def ask_anthropic(state: S):
    r = anthropic_llm.invoke([HumanMessage(state["prompt"])])
    return {"answers": [f"Anthropic: {r.content}"]}

def combine(state: S):
    return {}      # downstream consumer just reads state.answers

b = StateGraph(S)
b.add_node("openai", ask_openai)
b.add_node("anthropic", ask_anthropic)
b.add_node("combine", combine)

# Fan-out: both run concurrently after START
b.add_edge(START, "openai")
b.add_edge(START, "anthropic")
# Fan-in: both feed into combine
b.add_edge("openai", "combine")
b.add_edge("anthropic", "combine")
b.add_edge("combine", END)

result = b.compile().invoke({"prompt": "Define vector embeddings in one sentence.",
                              "answers": []})
for a in result["answers"]:
    print(a)

6. Code — real-world pattern

Dynamic fan-out with Send — answer N sub-questions in parallel, then synthesize:

from typing import TypedDict, Annotated
from operator import add
from langgraph.graph import StateGraph, START, END
from langgraph.types import Send
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage

llm = ChatOpenAI(model="gpt-4o-mini", temperature=0)

class State(TypedDict):
    question: str
    sub_questions: list[str]
    answers: Annotated[list[str], add]
    final: str

class WorkerState(TypedDict):
    question: str           # the parent question (optional)
    sub_question: str       # what this worker should answer

def plan(state: State):
    out = llm.invoke([HumanMessage(
        f"Break this into 3 distinct sub-questions, one per line:\n{state['question']}")])
    subs = [s.strip("-• ").strip() for s in out.content.splitlines() if s.strip()][:3]
    return {"sub_questions": subs}

# Conditional edge: returns a list of Send to fan out N workers
def dispatch(state: State):
    return [Send("answer", {"sub_question": q, "question": state["question"]})
            for q in state["sub_questions"]]

def answer(state: WorkerState):
    r = llm.invoke([HumanMessage(f"Answer briefly: {state['sub_question']}")])
    return {"answers": [r.content]}      # accumulated via reducer

def synthesize(state: State):
    joined = "\n\n".join(f"- {a}" for a in state["answers"])
    r = llm.invoke([HumanMessage(
        f"Synthesize a final answer to '{state['question']}' from:\n{joined}")])
    return {"final": r.content}

b = StateGraph(State)
b.add_node("plan", plan)
b.add_node("answer", answer)
b.add_node("synthesize", synthesize)

b.add_edge(START, "plan")
b.add_conditional_edges("plan", dispatch, ["answer"])   # fan-out
b.add_edge("answer", "synthesize")                       # fan-in (auto-join)
b.add_edge("synthesize", END)

graph = b.compile()
print(graph.invoke({
    "question": "How would I design a RAG system for legal documents?",
    "sub_questions": [], "answers": [], "final": "",
})["final"])

The Send(node, state) mechanism is what lets MapReduce-style work happen with a single conditional edge.

7. Common pitfalls

  • Parallel branches writing the same field without a reducer. Race condition — one wins, others are lost. Always use Annotated[..., reducer] on multi-writer fields.
  • Assuming branch order. It's non-deterministic. Don't write code that depends on "A finishes before B."
  • Send workers mutating parent state directly. Send carries a sub-state for that branch. The worker's return is merged back via reducers — same rules apply.
  • Forgetting the conditional edge needs path_map for Send. The simplest pattern is add_conditional_edges("plan", dispatch_fn, ["worker_node_name"]) — the list tells the validator which destinations are possible.
  • Over-parallelizing with rate-limited APIs. 50 concurrent OpenAI calls = hitting rate limits. Use model.with_config(max_concurrency=...) or batch.

8. When to use vs not use

Use parallel when Use sequential when
Branches are independent Each step needs the previous step's output
You can afford the concurrent API calls You're rate-limited or budget-sensitive
You can write a reducer to combine outputs The branches produce conflicting state
Fan-out count is known → static edges
Fan-out count depends on data → Send API

9. Cheatsheet

# STATIC FAN-OUT — add multiple edges
b.add_edge(START, "a")
b.add_edge(START, "b")
b.add_edge(START, "c")
b.add_edge("a", "merge")
b.add_edge("b", "merge")
b.add_edge("c", "merge")

# DYNAMIC FAN-OUT — Send API
from langgraph.types import Send

def dispatch(state) -> list[Send]:
    return [Send("worker", {"item": x}) for x in state["batch"]]

b.add_conditional_edges("plan", dispatch, ["worker"])

# Multi-writer field — REDUCER REQUIRED
class S(TypedDict):
    results: Annotated[list, operator.add]    # or your own merge fn

# Limit concurrency at invoke
graph.invoke(state, config={"max_concurrency": 5})

10. Q&A — recall test

  • Q: What does "super-step" mean? A: A LangGraph execution round in which all currently-scheduled nodes run concurrently. After they finish, their state updates are merged and the next super-step is scheduled.

  • Q: When do you need Send instead of multiple add_edge calls? A: When the number of parallel branches depends on state (e.g., one worker per sub-question, where the count varies). Static add_edge can't express "make N branches at runtime."

  • Q: What happens if two parallel branches return {"x": "a"} and {"x": "b"} with no reducer on x? A: Race — one value wins, the other is lost. Always define a reducer for multi-writer fields.

  • Q: Does order between parallel branches matter? A: No — it's non-deterministic. Treat branches as a set, not a sequence.

  • Q: How do you limit how many Send workers run concurrently? A: Pass config={"max_concurrency": N} to .invoke() or .stream().

Practice

What does this print?

Expected: True

# Parallel fan-out: process 10 items concurrently
items = list(range(10))
parallel_workers = len(items)
print(parallel_workers == 10)

Limit concurrency so you don't hit API rate limits

Expected: True

config = {"max_concurrency": 100}     # bug: 100 concurrent API calls can hit rate limits
safe_config = {"max_concurrency": 5}
print(safe_config["max_concurrency"] < config["max_concurrency"])

Quiz — Quick check

What you remember

Q1. What's the Send API for in LangGraph?

  • Fan-out — dispatches the same node N times with different inputs, running them concurrently
  • Sends a notification
  • Forwards state to END
  • Triggers an LLM call

Why: Send("worker", item) schedules the worker node to run with item as part of its state. Returning a list of Sends fans out the graph — common for map-reduce patterns.

Q2. What's the benefit of parallel execution for LLM calls?

  • LLM calls are I/O-bound; running 10 in parallel can be 10× faster than serial
  • Uses less RAM
  • Cheaper per call
  • More accurate

Why: Each LLM call spends most of its time waiting for the API. Concurrency overlaps the waits — total wall time approaches the time of one call.

Q3. How do you prevent parallel calls from overwriting each other's state?

  • Use reducers (Annotated[list, operator.add]) so concurrent writes are merged instead of overwriting
  • Lock the state
  • Don't run in parallel
  • Use a database

Why: Without a reducer, concurrent writes race — last writer wins. With a reducer, contributions from each parallel branch are combined deterministically.

Common doubts

How do I set up parallel processing of a list of items?

The map-reduce pattern: (1) one node emits a list of Send objects, fanning out to a worker node, (2) the worker processes one item and writes to a state field with operator.add reducer, (3) a collector node reads the accumulated results. LangGraph orchestrates the concurrency.

What's the right max_concurrency?

Depends on your rate limits. For OpenAI: 5-10 is usually safe. For Anthropic: similar. For your own service: as much as your backend handles. Start low and increase if you don't hit rate limits.

Can parallel nodes communicate during execution?

No — that's a feature, not a bug. Each parallel branch runs independently against its own copy of state. They only "communicate" through reducers at merge time. Keeps things predictable.