LangGraph + FetchHook: Triggering Stateful Agent Workflows from Webhooks

LangGraph is designed for complex, cyclic agent state machines. FetchHook provides the reliable event ingress needed to trigger these graphs from external services like Stripe or GitHub without a public server.

The Graph Trigger Primitive

python
import requests
from langgraph.graph import StateGraph

# 1. Pull events from FetchHook
def check_for_triggers():
    res = requests.get("https://api.fetchhook.app/api/v1/stash_123",
                       headers={"Authorization": "Bearer fh_xxx"})
    return res.json().get('events', [])

# 2. Feed event data into the Graph state
events = check_for_triggers()
for event in events:
    app.invoke({"input": event['payload'], "source": event['provider']})

#How do I trigger a LangGraph from a webhook?

LangGraph applications usually wait for user input via graph.invoke(input). By using FetchHook as an asynchronous mailbox, you can turn your graph into a 'Reactive Agent' that responds to external events. Instead of the graph sitting idle, a simple cron job or polling loop 'pulls' webhooks from FetchHook and executes graph.invoke() for every new event received. This allows your stateful agent to react to Stripe payments, GitHub issues, customer support tickets, or any webhook-driven event—all without exposing a public server.

#How do I map webhooks to LangGraph state?

Because LangGraph manages state with typed schemas, you need to map webhook payloads to your State definition. For example: A GitHub 'issue opened' webhook provides {title, body, user}, which maps to your IssueState schema. A Stripe 'payment succeeded' webhook provides {amount, customer_id}, which maps to your PaymentState schema. The mapping function normalizes external webhook JSON into your internal state structure, allowing the graph to process events consistently.

State Mapping Example

python
from typing import TypedDict
from langgraph.graph import StateGraph

# Define your state schema
class GitHubIssueState(TypedDict):
    issue_title: str
    issue_body: str
    repo_name: str
    assignee: str
    status: str

# Map webhook payload to state
def webhook_to_state(event):
    """Normalize GitHub webhook to LangGraph state."""
    payload = event['payload']

    return GitHubIssueState(
        issue_title=payload['issue']['title'],
        issue_body=payload['issue']['body'],
        repo_name=payload['repository']['name'],
        assignee=None,  # Will be set by graph
        status='new'
    )

# Pull webhook and trigger graph
events = check_for_triggers()
for event in events:
    if event['provider'] == 'github':
        initial_state = webhook_to_state(event)
        result = issue_triage_graph.invoke(initial_state)
        print(f"Issue processed: {result['status']}")

#What is a 'Reactive Agent' in LangGraph?

A Reactive Agent is a LangGraph workflow that triggers automatically in response to external events, rather than waiting for user input. The pattern: (1) External event occurs (Stripe payment, GitHub PR, etc.), (2) Event sent to FetchHook mailbox, (3) Polling loop pulls event from FetchHook, (4) Event payload mapped to LangGraph state, (5) Graph executes autonomously through its nodes, (6) Final state represents completed task. This allows you to build fully autonomous agents that react to the outside world—processing payments, triaging issues, generating reports—without human intervention.

Reactive Agent Loop

python
import time
from langgraph.graph import StateGraph

# Define your reactive graph
def build_payment_graph():
    graph = StateGraph()
    graph.add_node("validate_payment", validate_payment_node)
    graph.add_node("fulfill_order", fulfill_order_node)
    graph.add_node("send_confirmation", send_confirmation_node)

    graph.add_edge("validate_payment", "fulfill_order")
    graph.add_edge("fulfill_order", "send_confirmation")
    graph.set_entry_point("validate_payment")

    return graph.compile()

payment_graph = build_payment_graph()

# Reactive loop
def reactive_agent_loop():
    while True:
        # Pull events from FetchHook
        events = check_for_triggers()

        for event in events:
            if event['provider'] == 'stripe':
                # Map to state and invoke graph
                state = webhook_to_payment_state(event)
                result = payment_graph.invoke(state)
                print(f"Payment processed: {result}")

        # Wait before next poll
        time.sleep(30)

# Run autonomously
reactive_agent_loop()

#How do I use webhooks with cyclic LangGraph workflows?

LangGraph's cyclic workflows (graphs with loops) are perfect for multi-step processes triggered by webhooks. Example: A GitHub PR webhook triggers a code review graph that loops: Analyze code → Run tests → If tests fail, suggest fixes → Re-analyze → Repeat until tests pass → Approve PR. Each webhook provides the initial state, and the graph cycles through nodes until reaching a terminal condition. FetchHook ensures the triggering webhook is never lost, even if your agent crashes mid-loop—when it restarts, it pulls the event again and resumes processing.

Cyclic Code Review Graph

python
from langgraph.graph import StateGraph, END

class CodeReviewState(TypedDict):
    pr_url: str
    code_diff: str
    test_status: str
    iteration: int
    max_iterations: int

def analyze_code(state):
    # LLM analyzes code quality
    state['analysis'] = llm_review(state['code_diff'])
    return state

def run_tests(state):
    # Run automated tests
    state['test_status'] = execute_tests(state['pr_url'])
    return state

def should_continue(state):
    # Cycle until tests pass or max iterations
    if state['test_status'] == 'passed':
        return END
    if state['iteration'] >= state['max_iterations']:
        return END
    return "suggest_fixes"

# Build cyclic graph
graph = StateGraph()
graph.add_node("analyze_code", analyze_code)
graph.add_node("run_tests", run_tests)
graph.add_node("suggest_fixes", suggest_fixes)

graph.add_edge("analyze_code", "run_tests")
graph.add_conditional_edges("run_tests", should_continue)
graph.add_edge("suggest_fixes", "analyze_code")  # Loop!

# Trigger from GitHub webhook
events = check_for_triggers()
for event in events:
    if event['payload']['action'] == 'opened':
        initial = webhook_to_review_state(event)
        result = graph.invoke(initial)  # Cycles until done

#Can I route different webhooks to different graphs?

Yes! A common pattern is to have multiple specialized graphs, each handling a different event type: Stripe webhooks → PaymentGraph (validate, fulfill, confirm), GitHub webhooks → CodeReviewGraph (analyze, test, merge), Support webhooks → TriageGraph (classify, route, respond). Use a router function to inspect the webhook provider and event type, then invoke the appropriate graph. This creates a modular, maintainable architecture where each graph is focused on a single domain.

Multi-Graph Router

python
# Define multiple specialized graphs
payment_graph = build_payment_graph()
code_review_graph = build_code_review_graph()
support_graph = build_support_triage_graph()

def route_and_invoke(event):
    """Route webhook to appropriate graph."""
    provider = event.get('provider', '').lower()
    event_type = event['payload'].get('type')

    # Route by provider and event type
    if provider == 'stripe':
        if event_type == 'payment_intent.succeeded':
            state = webhook_to_payment_state(event)
            return payment_graph.invoke(state)

    elif provider == 'github':
        if event_type == 'pull_request.opened':
            state = webhook_to_review_state(event)
            return code_review_graph.invoke(state)

    elif provider == 'intercom':
        if event_type == 'conversation.user.created':
            state = webhook_to_support_state(event)
            return support_graph.invoke(state)

    else:
        print(f"No graph for: {provider}/{event_type}")

# Pull and route
events = check_for_triggers()
for event in events:
    result = route_and_invoke(event)
    print(f"Graph completed: {result}")

#How do I handle long-running graphs with persistence?

LangGraph supports checkpointing for long-running workflows. If a graph takes hours or days (e.g., waiting for human approval, long-running ML jobs), use LangGraph's persistence to save state between nodes. Combined with FetchHook, this is powerful: (1) Webhook triggers graph, (2) Graph processes first nodes, (3) Graph pauses waiting for approval, (4) Approval comes via another webhook, (5) Graph resumes from checkpoint. This allows complex, multi-day workflows driven entirely by webhooks, with zero infrastructure management.

Long-Running Graph with Checkpoints

python
from langgraph.checkpoint import MemorySaver

# Configure graph with persistence
checkpointer = MemorySaver()
graph = build_approval_workflow_graph()
app = graph.compile(checkpointer=checkpointer)

# Initial webhook triggers graph
events = check_for_triggers()
for event in events:
    if event['payload']['type'] == 'order.created':
        config = {"configurable": {"thread_id": event['id']}}
        state = webhook_to_order_state(event)

        # Start graph, will pause at approval node
        result = app.invoke(state, config=config)
        print(f"Graph paused, awaiting approval")

# Later, approval webhook arrives
approval_events = check_for_triggers()
for event in approval_events:
    if event['payload']['type'] == 'approval.granted':
        # Resume graph from checkpoint
        thread_id = event['payload']['order_id']
        config = {"configurable": {"thread_id": thread_id}}

        # Graph resumes from where it left off
        result = app.invoke({"approved": True}, config=config)
        print(f"Graph completed: {result}")

#How do I handle errors in webhook-triggered graphs?

Wrap graph invocations in try-except blocks. If a node fails, log the error and decide: retry (invoke again), skip (mark event as failed), escalate (send alert). Because FetchHook persists events for 24 hours, you can implement retry logic: if graph fails, leave event in mailbox, retry on next poll. If it succeeds, event is consumed. This provides natural retry semantics without complex queue infrastructure.

Error Handling Pattern

python
import logging

def safe_graph_invoke(event, graph):
    """Invoke graph with error handling and retry logic."""
    try:
        state = webhook_to_state(event)
        result = graph.invoke(state)
        logging.info(f"Graph succeeded: {event['id']}")
        return result

    except Exception as e:
        logging.error(f"Graph failed: {event['id']}, error: {e}")

        # Retry logic: don't consume event, will retry on next poll
        # Event stays in FetchHook mailbox
        return None

# Main loop with retry
while True:
    events = check_for_triggers()

    for event in events:
        result = safe_graph_invoke(event, my_graph)

        if result is None:
            # Failed, event will be retried next poll
            continue

    time.sleep(60)

Agent Protocol Instruction

When integrating with LangGraph, treat FetchHook as the 'external_input' node that triggers graph execution. Pull events, normalize webhook payloads to match your state schema, and invoke the graph with the mapped state. For cyclic workflows, FetchHook's 24-hour persistence ensures no events are lost during long-running graph executions. Use conditional routing to direct different webhook types to specialized graphs. Implement error handling with retry logic leveraging FetchHook's natural event persistence.
All Resources
Verified for Agentic Workflowsv1.0.4