Back to Blog
GraphRAGKnowledge GraphsRAGNeo4jAIPythonLLMVector Search

GraphRAG — Combining Knowledge Graphs with RAG for Richer, More Accurate AI Retrieval

A practical guide to GraphRAG in production: why flat vector search fails on multi-hop questions and cross-document reasoning, the Microsoft GraphRAG architecture (entity extraction, relationship extraction, community detection with Leiden algorithm, hierarchical summarization), building an entity extraction pipeline with the Anthropic SDK and spaCy, constructing a property graph in Neo4j with MERGE-based upserts and vector indexes, hybrid retrieval combining ANN vector search with Cypher graph traversal, global query answering via community summaries and map-reduce synthesis, LangChain Neo4jGraph integration with GraphCypherQAChain, incremental graph updates with change detection, production patterns for graph freshness (TTL-based refresh, CDC-triggered updates), monitoring GraphRAG quality with faithfulness and entity coverage metrics, and a decision framework for choosing between standard RAG, GraphRAG, and hybrid approaches.

2026-05-29

Why Vector Search Alone Falls Short

Standard RAG works by embedding documents into a vector space and retrieving the top-k chunks most semantically similar to a query. For isolated factual lookups — "What is the refund policy?" or "Which API endpoint handles authentication?" — this approach performs well. But real enterprise knowledge rarely lives in isolated paragraphs.

  • Multi-hop questions. "Which engineers worked on projects that used the same database vendor as the one affected by the outage last quarter?" requires following relationships across multiple documents. No single chunk contains the full answer; vector similarity alone cannot chain across entities.
  • Implicit relationships. A document corpus accumulates implicit structure: people belong to teams, teams own services, services depend on databases. This graph exists in the text but is invisible to a flat vector index.
  • Global summaries. "What are the main architectural themes across all our design documents?" requires synthesising hundreds of documents. The top-k chunks retrieved by ANN search are a biased local sample, not a representative global view.
  • Semantic drift at retrieval time. Embedding similarity captures surface-level wording, not causal or taxonomic relationships. A document about "database connection pooling" may score lower than a tangentially related paragraph about "connection limits" even when the former is the authoritative source.

GraphRAG addresses these gaps by extracting a structured knowledge graph from the document corpus and using it to augment — not replace — vector retrieval. The result is a hybrid system that can answer both local lookups and global synthesis questions.

Note

This article covers the open-source GraphRAG stack: entity and relationship extraction with spaCy and the Anthropic SDK, graph storage in Neo4j, hybrid retrieval with the LangChain Neo4jGraph integration, and community detection from the Microsoft GraphRAG research. All examples use Python 3.11+ and Neo4j 5.x.

The GraphRAG Architecture

Microsoft's GraphRAG paper (2024) introduced a two-phase pipeline: an offline indexing phase that builds the knowledge graph from raw text, and an online query phase that selects between local and global retrieval strategies depending on query type.

Indexing Pipeline Overview

# GraphRAG indexing pipeline — four stages
#
# Stage 1: ENTITY EXTRACTION
#   Input:  raw documents (PDFs, markdown, HTML, plain text)
#   Process: LLM or NLP model extracts named entities (people, orgs,
#            technologies, concepts) and their textual descriptions
#   Output: entity list with descriptions and source document references
#
# Stage 2: RELATIONSHIP EXTRACTION
#   Input:  same documents + entity list from Stage 1
#   Process: LLM extracts (subject, predicate, object) triples
#            describing how entities relate to each other
#   Output: edge list with relationship types, weights, and descriptions
#
# Stage 3: COMMUNITY DETECTION
#   Input:  entity graph (nodes + edges from Stages 1–2)
#   Process: Leiden algorithm partitions the graph into communities
#            (dense subgraphs of closely related entities)
#   Output: hierarchical community assignments at multiple resolutions
#
# Stage 4: COMMUNITY SUMMARIZATION
#   Input:  entities and relationships within each community
#   Process: LLM synthesises a textual summary of each community's
#            key entities, themes, and inter-entity relationships
#   Output: community summary documents indexed in a vector store
#
# Query phase: LOCAL vs GLOBAL
#   LOCAL  queries → hybrid ANN vector search + Cypher graph traversal
#   GLOBAL queries → map-reduce over community summaries

Entity and Relationship Extraction

The first step is converting unstructured text into a structured graph. For high-volume corpora, a two-tier approach works well: use spaCy's transformer NER model for coarse entity detection, then an LLM for relationship extraction and entity disambiguation. This keeps token costs low while preserving accuracy.

Entity Extraction with spaCy + LLM Disambiguation

# pip install spacy anthropic
# python -m spacy download en_core_web_trf

import spacy
import anthropic
import json
from dataclasses import dataclass, field

nlp = spacy.load("en_core_web_trf")
client = anthropic.Anthropic()

@dataclass
class Entity:
    name: str
    type: str          # PERSON, ORG, TECHNOLOGY, CONCEPT, etc.
    description: str
    source_docs: list[str] = field(default_factory=list)

@dataclass
class Relationship:
    source: str        # entity name
    target: str        # entity name
    predicate: str     # e.g. "USES", "OWNS", "DEPENDS_ON", "LEADS"
    description: str
    weight: float = 1.0


ENTITY_PROMPT = """Extract all named entities from the following text.
For each entity, provide:
- name: canonical name (normalised, e.g. "PostgreSQL" not "postgres" or "PG")
- type: one of PERSON, ORGANIZATION, TECHNOLOGY, CONCEPT, PRODUCT, LOCATION
- description: 1-2 sentence description based on how it appears in this text

Text:
{text}

Return ONLY a JSON array of objects with keys: name, type, description.
Merge duplicate entities (same real-world referent, different surface forms)."""

RELATIONSHIP_PROMPT = """Given the following text and entity list, extract all
relationships between entities as (source, predicate, target, description) tuples.

Entities: {entities}

Text:
{text}

Use active-voice predicates like: USES, OWNS, MANAGES, DEPENDS_ON, INTEGRATES_WITH,
REPLACES, SUCCEEDS, LEADS, PART_OF, INSTANCE_OF, CAUSES, MONITORS.

Return ONLY a JSON array with keys: source, predicate, target, description, weight (0.1–1.0).
Only include relationships explicitly or strongly implied by the text."""


def extract_entities(text: str, doc_id: str) -> list[Entity]:
    # Fast NER with spaCy to seed the LLM prompt
    doc = nlp(text[:50000])  # spaCy limit
    spacy_ents = {ent.text for ent in doc.ents if ent.label_ in
                  {"PERSON", "ORG", "GPE", "PRODUCT", "WORK_OF_ART"}}

    response = client.messages.create(
        model="claude-haiku-4-5",
        max_tokens=2048,
        messages=[{"role": "user", "content": ENTITY_PROMPT.format(text=text[:8000])}],
    )
    raw = json.loads(response.content[0].text)
    return [
        Entity(
            name=e["name"],
            type=e["type"],
            description=e["description"],
            source_docs=[doc_id],
        )
        for e in raw
    ]


def extract_relationships(text: str, entities: list[Entity]) -> list[Relationship]:
    entity_names = [e.name for e in entities]
    response = client.messages.create(
        model="claude-haiku-4-5",
        max_tokens=2048,
        messages=[{
            "role": "user",
            "content": RELATIONSHIP_PROMPT.format(
                entities=json.dumps(entity_names),
                text=text[:8000],
            ),
        }],
    )
    raw = json.loads(response.content[0].text)
    return [
        Relationship(
            source=r["source"],
            target=r["target"],
            predicate=r["predicate"],
            description=r["description"],
            weight=float(r.get("weight", 1.0)),
        )
        for r in raw
        if r["source"] in entity_names and r["target"] in entity_names
    ]

Note

For large corpora (10k+ documents), run entity extraction in parallel batches using asyncio or a task queue. Use a separate deduplication pass to merge entities that refer to the same real-world object across documents — simple string normalisation handles ~80% of cases; the remaining 20% can be resolved with an embedding-similarity merge step.

Building the Property Graph in Neo4j

Neo4j's Cypher query language is well-suited for property graphs: nodes represent entities, relationships carry typed edges, and both can hold arbitrary properties. Neo4j 5.x adds native vector indexing, enabling ANN search directly in the graph without a separate vector store.

Schema Setup and MERGE-based Upserts

# pip install neo4j anthropic sentence-transformers

from neo4j import GraphDatabase
from sentence_transformers import SentenceTransformer
import numpy as np

NEO4J_URI      = "bolt://localhost:7687"
NEO4J_USER     = "neo4j"
NEO4J_PASSWORD = "your-password"   # use env var in production

driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
embedder = SentenceTransformer("all-MiniLM-L6-v2")  # 384-dim, fast


def setup_schema(session) -> None:
    """Create indexes and constraints once at startup."""
    # Uniqueness constraint — prevents duplicate entity nodes
    session.run("""
        CREATE CONSTRAINT entity_name IF NOT EXISTS
        FOR (e:Entity) REQUIRE e.name IS UNIQUE
    """)
    # Full-text index for keyword search
    session.run("""
        CREATE FULLTEXT INDEX entity_description IF NOT EXISTS
        FOR (e:Entity) ON EACH [e.name, e.description]
    """)
    # Vector index for ANN search (Neo4j 5.11+)
    session.run("""
        CREATE VECTOR INDEX entity_embeddings IF NOT EXISTS
        FOR (e:Entity) ON e.embedding
        OPTIONS {indexConfig: {
            `vector.dimensions`: 384,
            `vector.similarity_function`: 'cosine'
        }}
    """)


def upsert_entity(session, entity: "Entity") -> None:
    embedding = embedder.encode(
        f"{entity.name}: {entity.description}"
    ).tolist()

    session.run(
        """
        MERGE (e:Entity {name: $name})
        ON CREATE SET
            e.type        = $type,
            e.description = $description,
            e.embedding   = $embedding,
            e.source_docs = $source_docs,
            e.created_at  = datetime()
        ON MATCH SET
            e.description = CASE WHEN size($description) > size(e.description)
                            THEN $description ELSE e.description END,
            e.source_docs = apoc.coll.toSet(e.source_docs + $source_docs),
            e.embedding   = $embedding,
            e.updated_at  = datetime()
        """,
        name=entity.name,
        type=entity.type,
        description=entity.description,
        embedding=embedding,
        source_docs=entity.source_docs,
    )


def upsert_relationship(session, rel: "Relationship") -> None:
    # MERGE prevents duplicate edges between the same pair
    session.run(
        """
        MATCH (src:Entity {name: $source})
        MATCH (tgt:Entity {name: $target})
        MERGE (src)-[r:RELATES_TO {predicate: $predicate}]->(tgt)
        ON CREATE SET
            r.description = $description,
            r.weight      = $weight,
            r.created_at  = datetime()
        ON MATCH SET
            r.weight      = (r.weight + $weight) / 2.0,
            r.updated_at  = datetime()
        """,
        source=rel.source,
        target=rel.target,
        predicate=rel.predicate,
        description=rel.description,
        weight=rel.weight,
    )


def ingest_document(entities: list, relationships: list) -> None:
    with driver.session() as session:
        with session.begin_transaction() as tx:
            setup_schema(session)

        for entity in entities:
            with driver.session() as session:
                upsert_entity(session, entity)

        for rel in relationships:
            with driver.session() as session:
                upsert_relationship(session, rel)

Community Detection with the Leiden Algorithm

Community detection partitions the entity graph into clusters of densely interconnected nodes. In the context of GraphRAG, each community represents a coherent thematic cluster — e.g., all entities related to "data ingestion infrastructure" or "authentication and authorisation". The Leiden algorithm is preferred over Louvain because it guarantees well-connected communities and avoids the disconnected-community problem.

Community Detection and Summary Generation

# pip install leidenalg igraph

import igraph as ig
import leidenalg
from neo4j import GraphDatabase
import anthropic
import json

client = anthropic.Anthropic()


def load_graph_from_neo4j(driver) -> ig.Graph:
    """Export the entity graph from Neo4j into an igraph object."""
    with driver.session() as session:
        nodes_result = session.run("MATCH (e:Entity) RETURN e.name AS name, id(e) AS id")
        nodes = [(r["id"], r["name"]) for r in nodes_result]

        edges_result = session.run("""
            MATCH (a:Entity)-[r:RELATES_TO]->(b:Entity)
            RETURN id(a) AS src, id(b) AS tgt, r.weight AS weight
        """)
        edges = [(r["src"], r["tgt"], r["weight"]) for r in edges_result]

    node_id_map = {neo4j_id: idx for idx, (neo4j_id, _) in enumerate(nodes)}
    node_names  = [name for _, name in nodes]

    g = ig.Graph(directed=False)
    g.add_vertices(len(nodes))
    g.vs["name"] = node_names

    edge_list   = [(node_id_map[src], node_id_map[tgt]) for src, tgt, _ in edges]
    edge_weights = [w for _, _, w in edges]
    g.add_edges(edge_list)
    g.es["weight"] = edge_weights
    return g


def detect_communities(g: ig.Graph, resolution: float = 1.0) -> dict[str, int]:
    """Run Leiden algorithm and return entity → community_id mapping."""
    partition = leidenalg.find_partition(
        g,
        leidenalg.RBConfigurationVertexPartition,
        weights=g.es["weight"],
        resolution_parameter=resolution,
        n_iterations=10,
        seed=42,
    )
    return {g.vs[i]["name"]: community_id
            for community_id, members in enumerate(partition)
            for i in members}


COMMUNITY_SUMMARY_PROMPT = """You are a technical documentation analyst.
Summarise the following cluster of related entities and their relationships
into a coherent thematic overview.

Entities in this community:
{entities}

Key relationships:
{relationships}

Write a 2-3 paragraph summary covering:
1. The primary theme or technical domain of this community
2. The most important entities and their roles
3. Key dependencies, workflows, or interactions between entities

Be specific and technical. This summary will be used to answer user questions."""


def summarise_community(
    driver,
    community_id: int,
    entity_names: list[str],
) -> str:
    with driver.session() as session:
        entities_result = session.run(
            "MATCH (e:Entity) WHERE e.name IN $names "
            "RETURN e.name AS name, e.type AS type, e.description AS desc",
            names=entity_names,
        )
        entities_text = "
".join(
            f"- {r['name']} ({r['type']}): {r['desc']}"
            for r in entities_result
        )

        rels_result = session.run(
            """
            MATCH (a:Entity)-[r:RELATES_TO]->(b:Entity)
            WHERE a.name IN $names AND b.name IN $names
            RETURN a.name AS src, r.predicate AS pred, b.name AS tgt, r.description AS desc
            LIMIT 50
            """,
            names=entity_names,
        )
        rels_text = "
".join(
            f"- {r['src']} {r['pred']} {r['tgt']}: {r['desc']}"
            for r in rels_result
        )

    response = client.messages.create(
        model="claude-haiku-4-5",
        max_tokens=1024,
        messages=[{
            "role": "user",
            "content": COMMUNITY_SUMMARY_PROMPT.format(
                entities=entities_text,
                relationships=rels_text,
            ),
        }],
    )
    return response.content[0].text


def store_community_summary(driver, community_id: int, summary: str, entity_names: list[str]) -> None:
    from sentence_transformers import SentenceTransformer
    embedder = SentenceTransformer("all-MiniLM-L6-v2")
    embedding = embedder.encode(summary).tolist()

    with driver.session() as session:
        session.run(
            """
            MERGE (c:Community {id: $community_id})
            SET c.summary   = $summary,
                c.embedding = $embedding,
                c.members   = $entity_names,
                c.updated_at = datetime()
            """,
            community_id=community_id,
            summary=summary,
            embedding=embedding,
            entity_names=entity_names,
        )

Hybrid Retrieval: Vector Search + Graph Traversal

GraphRAG's power comes from combining two retrieval strategies. For local queries — questions about specific entities, relationships, or events — ANN vector search finds the seed entities, and Cypher graph traversal expands the context by following typed relationships. For global queries — thematic summaries, broad comparisons — the community summary index provides a pre-synthesised view of the corpus.

Local Query: ANN Seed + Cypher Expansion

from neo4j import GraphDatabase
from sentence_transformers import SentenceTransformer
import anthropic

driver = GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_USER, NEO4J_PASSWORD))
embedder = SentenceTransformer("all-MiniLM-L6-v2")
client = anthropic.Anthropic()


def local_retrieval(query: str, top_k: int = 5, hops: int = 2) -> list[dict]:
    """
    Phase 1: ANN search finds the k most relevant seed entities.
    Phase 2: Cypher traversal expands up to 'hops' relationship hops
             from each seed to gather connected context.
    """
    query_embedding = embedder.encode(query).tolist()

    with driver.session() as session:
        # Phase 1: ANN vector search for seed entities
        seed_result = session.run(
            """
            CALL db.index.vector.queryNodes('entity_embeddings', $k, $embedding)
            YIELD node AS e, score
            RETURN e.name AS name, e.description AS description,
                   e.type AS type, score
            ORDER BY score DESC
            """,
            k=top_k,
            embedding=query_embedding,
        )
        seeds = [dict(r) for r in seed_result]
        seed_names = [s["name"] for s in seeds]

        if not seed_names:
            return []

        # Phase 2: Cypher graph expansion — follow relationships outward
        expansion_result = session.run(
            f"""
            MATCH (seed:Entity) WHERE seed.name IN $seed_names
            CALL {{
                WITH seed
                MATCH path = (seed)-[r:RELATES_TO*1..{hops}]-(neighbor:Entity)
                RETURN neighbor, relationships(path) AS rels,
                       length(path) AS distance
            }}
            RETURN DISTINCT
                neighbor.name        AS name,
                neighbor.description AS description,
                neighbor.type        AS type,
                min(distance)        AS distance,
                collect(DISTINCT {{
                    predicate:   last(rels).predicate,
                    description: last(rels).description
                }}) AS edges
            ORDER BY distance ASC
            LIMIT 30
            """,
            seed_names=seed_names,
        )
        neighbors = [dict(r) for r in expansion_result]

    return seeds + neighbors


def answer_local_query(query: str) -> str:
    context_nodes = local_retrieval(query)
    context_text = "

".join(
        f"Entity: {n['name']} ({n['type']})
"
        f"Description: {n['description']}
"
        + (f"Connected via: {n.get('edges', [])}" if n.get("edges") else "")
        for n in context_nodes
    )

    response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        messages=[{
            "role": "user",
            "content": (
                f"Answer the following question using only the knowledge graph context provided.\n\n"
                f"Context (entities and relationships from the knowledge graph):\n{context_text}\n\n"
                f"Question: {query}"
            ),
        }],
    )
    return response.content[0].text

Global Query: Map-Reduce over Community Summaries

def global_retrieval(query: str, top_communities: int = 10) -> list[str]:
    """
    For broad thematic questions, search community summaries instead of
    individual entity nodes. Community summaries are pre-synthesised
    overviews of dense entity clusters — better for global reasoning.
    """
    query_embedding = embedder.encode(query).tolist()

    with driver.session() as session:
        result = session.run(
            """
            CALL db.index.vector.queryNodes('community_embeddings', $k, $embedding)
            YIELD node AS c, score
            RETURN c.summary AS summary, c.members AS members, score
            ORDER BY score DESC
            """,
            k=top_communities,
            embedding=query_embedding,
        )
        return [r["summary"] for r in result]


def answer_global_query(query: str) -> str:
    """
    Map phase: retrieve top community summaries.
    Reduce phase: synthesise a final answer from the summaries.
    This two-step approach prevents the LLM context from being
    overwhelmed by raw entity data when answering broad questions.
    """
    community_summaries = global_retrieval(query)

    # Map step: score each community summary's relevance
    map_prompt = (
        "Given the following community summary and the user question, "
        "extract the most relevant points from this summary that help "
        "answer the question. If the summary is not relevant, return an empty string.\n\n"
        "Question: {query}\n\nSummary:\n{summary}"
    )
    relevant_excerpts = []
    for summary in community_summaries:
        response = client.messages.create(
            model="claude-haiku-4-5",
            max_tokens=512,
            messages=[{
                "role": "user",
                "content": map_prompt.format(query=query, summary=summary),
            }],
        )
        excerpt = response.content[0].text.strip()
        if excerpt:
            relevant_excerpts.append(excerpt)

    if not relevant_excerpts:
        return "No relevant information found in the knowledge graph."

    # Reduce step: synthesise final answer
    reduce_context = "

---

".join(relevant_excerpts)
    reduce_response = client.messages.create(
        model="claude-sonnet-4-6",
        max_tokens=1024,
        messages=[{
            "role": "user",
            "content": (
                f"Synthesise a comprehensive answer to the following question "
                f"based on the extracted knowledge graph insights below.\n\n"
                f"Question: {query}\n\nInsights:\n{reduce_context}"
            ),
        }],
    )
    return reduce_response.content[0].text

LangChain Neo4j Integration

If you are already using LangChain, the Neo4jGraph integration provides GraphCypherQAChain — a chain that translates natural language questions into Cypher queries using an LLM, executes them against Neo4j, and returns the results as context for answer generation.

# pip install langchain langchain-anthropic langchain-community neo4j

from langchain_community.graphs import Neo4jGraph
from langchain.chains import GraphCypherQAChain
from langchain_anthropic import ChatAnthropic

# Connect LangChain to Neo4j
graph = Neo4jGraph(
    url=NEO4J_URI,
    username=NEO4J_USER,
    password=NEO4J_PASSWORD,
)

# Refresh the schema so the LLM knows what node labels and
# relationship types exist — call this after bulk ingestion
graph.refresh_schema()

# Inspect what the LLM will see
print(graph.schema)
# Example output:
# Node properties are the following:
# Entity {name: STRING, type: STRING, description: STRING, embedding: LIST}
# Community {id: INTEGER, summary: STRING, members: LIST}
# Relationship properties are the following:
# RELATES_TO {predicate: STRING, description: STRING, weight: FLOAT}
# ...

llm = ChatAnthropic(model="claude-sonnet-4-6", temperature=0)

chain = GraphCypherQAChain.from_llm(
    llm=llm,
    graph=graph,
    verbose=True,               # log the generated Cypher query
    return_intermediate_steps=True,
    allow_dangerous_requests=True,  # required in langchain >= 0.2.0
)

# Ask a multi-hop question
result = chain.invoke({
    "query": "Which teams own services that depend on PostgreSQL, "
             "and what incidents have affected those services?"
})
print(result["result"])
# The chain generates Cypher like:
#   MATCH (t:Entity {type: 'ORGANIZATION'})-[:RELATES_TO {predicate: 'OWNS'}]
#         ->(s:Entity {type: 'TECHNOLOGY'})-[:RELATES_TO {predicate: 'DEPENDS_ON'}]
#         ->(db:Entity {name: 'PostgreSQL'})
#   OPTIONAL MATCH (s)-[:RELATES_TO {predicate: 'AFFECTED_BY'}]->(i:Entity {type: 'CONCEPT'})
#   RETURN t.name AS team, s.name AS service, i.name AS incident


# For RAG-style use, combine Cypher results with vector search context
from langchain.chains.combine_documents import create_stuff_documents_chain
from langchain_core.prompts import ChatPromptTemplate

prompt = ChatPromptTemplate.from_template("""
Answer the question using the graph context and document context below.

Graph context (from Cypher query):
{graph_context}

Document context (from vector search):
{document_context}

Question: {question}
""")

Note

GraphCypherQAChain works best when the graph schema is clean and the LLM has been primed with a few-shot Cypher example for your specific domain. For production use, validate generated Cypher before execution with a read-only Neo4j user that has MATCH permissions only — never expose a write-enabled connection to a chain that generates arbitrary queries.

Incremental Graph Updates and Freshness

A static knowledge graph goes stale. Production deployments need a strategy for keeping entity descriptions, relationships, and community summaries current as the underlying document corpus changes. Two patterns cover most cases: TTL-based scheduled refresh for slowly-changing corpora, and CDC-triggered incremental updates for near-real-time knowledge bases.

Change Detection and Incremental Reindexing

import hashlib
import datetime
from neo4j import GraphDatabase

def document_fingerprint(content: str) -> str:
    return hashlib.sha256(content.encode()).hexdigest()


def needs_reindex(driver, doc_id: str, content_hash: str) -> bool:
    """Returns True if this document has changed since last indexing."""
    with driver.session() as session:
        result = session.run(
            "MATCH (d:Document {id: $id}) RETURN d.content_hash AS hash",
            id=doc_id,
        )
        record = result.single()
        if record is None:
            return True  # new document
        return record["hash"] != content_hash


def mark_indexed(driver, doc_id: str, content_hash: str) -> None:
    with driver.session() as session:
        session.run(
            """
            MERGE (d:Document {id: $id})
            SET d.content_hash = $hash,
                d.last_indexed  = $ts
            """,
            id=doc_id,
            hash=content_hash,
            ts=datetime.datetime.utcnow().isoformat(),
        )


def incremental_ingest(driver, documents: list[dict]) -> dict:
    """
    Process only changed documents, reuse existing graph nodes
    for unchanged ones. Returns counts of new/updated/skipped docs.
    """
    stats = {"new": 0, "updated": 0, "skipped": 0}

    for doc in documents:
        doc_id   = doc["id"]
        content  = doc["content"]
        content_hash = document_fingerprint(content)

        if not needs_reindex(driver, doc_id, content_hash):
            stats["skipped"] += 1
            continue

        # Soft-delete stale relationships from this document
        with driver.session() as session:
            session.run(
                """
                MATCH ()-[r:RELATES_TO]-()
                WHERE $doc_id IN r.source_docs
                WITH r, [x IN r.source_docs WHERE x <> $doc_id] AS remaining
                SET r.source_docs = remaining
                WITH r WHERE size(r.source_docs) = 0
                DELETE r
                """,
                doc_id=doc_id,
            )

        entities      = extract_entities(content, doc_id)
        relationships = extract_relationships(content, entities)

        for entity in entities:
            with driver.session() as session:
                upsert_entity(session, entity)
        for rel in relationships:
            with driver.session() as session:
                upsert_relationship(session, rel)

        mark_indexed(driver, doc_id, content_hash)
        is_new = not needs_reindex(driver, doc_id, "")
        stats["new" if is_new else "updated"] += 1

    return stats


# Scheduled refresh — run via Airflow or cron
# After any incremental update, re-run community detection
# if >5% of entities changed:
def should_rerun_communities(stats: dict, total_entities: int) -> bool:
    changed = stats["new"] + stats["updated"]
    return changed / max(total_entities, 1) > 0.05

Monitoring GraphRAG Quality

GraphRAG introduces new failure modes beyond standard RAG. Entity extraction errors propagate into the graph, corrupt relationships, and degrade retrieval quality silently. Monitor graph health with metrics that are specific to the graph layer, not just the LLM response layer.

# metrics.py — Prometheus metrics for GraphRAG
from prometheus_client import Counter, Gauge, Histogram

# Graph structure metrics
GRAPH_ENTITY_COUNT = Gauge(
    "graphrag_entity_total",
    "Total number of entities in the knowledge graph",
    ["entity_type"],
)
GRAPH_RELATIONSHIP_COUNT = Gauge(
    "graphrag_relationship_total",
    "Total number of relationships in the knowledge graph",
)
GRAPH_COMMUNITY_COUNT = Gauge(
    "graphrag_community_total",
    "Total number of communities",
)
ENTITY_COVERAGE_RATIO = Gauge(
    "graphrag_entity_coverage_ratio",
    "Fraction of documents that have at least one entity extracted",
)

# Retrieval quality metrics
GRAPH_RETRIEVAL_LATENCY = Histogram(
    "graphrag_retrieval_duration_seconds",
    "End-to-end GraphRAG retrieval latency (ANN + Cypher)",
    ["query_type"],  # local | global
    buckets=[0.05, 0.1, 0.25, 0.5, 1.0, 2.0, 5.0],
)
RETRIEVED_ENTITY_COUNT = Histogram(
    "graphrag_retrieved_entities",
    "Number of entities retrieved per query",
    ["query_type"],
    buckets=[1, 2, 5, 10, 20, 50, 100],
)
EMPTY_RETRIEVAL_RATE = Counter(
    "graphrag_empty_retrieval_total",
    "Queries that returned zero entities from the graph",
    ["query_type"],
)


def collect_graph_metrics(driver) -> None:
    """Populate graph structure metrics — run every 5 minutes."""
    with driver.session() as session:
        entity_result = session.run(
            "MATCH (e:Entity) RETURN e.type AS type, count(*) AS cnt"
        )
        for row in entity_result:
            GRAPH_ENTITY_COUNT.labels(entity_type=row["type"]).set(row["cnt"])

        rel_count = session.run(
            "MATCH ()-[r:RELATES_TO]->() RETURN count(r) AS cnt"
        ).single()["cnt"]
        GRAPH_RELATIONSHIP_COUNT.set(rel_count)

        community_count = session.run(
            "MATCH (c:Community) RETURN count(c) AS cnt"
        ).single()["cnt"]
        GRAPH_COMMUNITY_COUNT.set(community_count)

        # Entity coverage: documents that yielded >= 1 entity
        coverage = session.run("""
            MATCH (d:Document)
            WITH count(d) AS total
            MATCH (d:Document) WHERE size(d.entity_names) > 0
            WITH total, count(d) AS covered
            RETURN toFloat(covered) / total AS ratio
        """).single()
        if coverage:
            ENTITY_COVERAGE_RATIO.set(coverage["ratio"])

Standard RAG vs GraphRAG vs Hybrid — When to Use Each

GraphRAG adds significant complexity to your retrieval stack. The engineering cost is only justified when your query patterns actually require it. Use this framework to make the right call.

  • Use standard vector RAG when: Questions are factual lookups against isolated document chunks. The corpus is small (<10k documents) and entities rarely appear across multiple documents. Fast iteration is more important than maximum accuracy. Budget and operational complexity are primary constraints.
  • Use GraphRAG when: Questions require multi-hop reasoning ("who works with whom on what"). The corpus has high entity density — technical docs, codebases, knowledge bases, research papers. Global synthesis queries ("summarise all architecture decisions across the corpus") are common. You need explainable retrieval — Cypher traversal paths are inspectable.
  • Use hybrid (vector RAG + graph) when: Most queries are local lookups but a subset require multi-hop reasoning. You want to start with standard RAG and incrementally add graph retrieval for query types that underperform. The document corpus contains both structured (graph-extractable) and unstructured (chunk-only) content.
  • Operational cost to consider: GraphRAG adds two new systems to operate (a graph database and a community detection pipeline). Entity extraction costs ~0.5–2 LLM calls per document. Community re-detection is O(n log n) in entity count and should run after every significant corpus update. Budget 3–5× the infrastructure cost of standard RAG for a production GraphRAG deployment.

Note

The Microsoft GraphRAG library provides a full pipeline implementation that can run against Azure OpenAI or local models via Ollama. For teams already using Azure AI Foundry, the managed GraphRAG service reduces operational overhead significantly. For self-hosted deployments, the custom pipeline in this article gives you full control over extraction prompts, graph schema, and community granularity.

Work with us

Building RAG systems that struggle with complex multi-hop questions or cross-document reasoning?

We design and implement GraphRAG systems — from entity and relationship extraction pipelines and Neo4j property graph construction to hybrid vector+graph retrieval, community summarization for global queries, incremental graph update pipelines, and production monitoring for entity coverage and faithfulness. Let’s talk.

Get in touch

Related Articles

DataSOps Consulting

Need help implementing this in production?

We build and operate data pipelines, AI systems, and observability stacks for engineering teams. Reach out for a free 30-minute architecture review.