Django Wagtail Python AI · RAG

RAG with Django: Chat Over Your Wagtail CMS Content

Fine-tuning a model on your CMS content is slow, expensive, and goes stale the moment an editor publishes a new page. Retrieval-Augmented Generation (RAG) is the better path: embed your Wagtail pages into a vector store, retrieve the relevant chunks at query time, and let an LLM compose a grounded, cited answer — all from a standard Django REST endpoint.

1. How RAG Works

RAG splits into two pipelines that run at different times:

  • Indexing pipeline (runs once, then on content change) — extract text from Wagtail pages, split into chunks, generate vector embeddings, store in a vector database.
  • Query pipeline (runs on every chat request) — embed the user's question, find the most similar chunks by cosine distance, inject them into a prompt, return the LLM's answer with source citations.

No training, no fine-tuning. The model never sees your content until a user asks a question — and it sees only the few chunks most relevant to that question. This keeps responses grounded, makes citations trivial, and lets editors update content without any redeployment.


2. Prerequisites

  • Wagtail 4.x or 5.x with PostgreSQL (pgvector requires Postgres)
  • An OpenAI API key (for embeddings)
  • An Anthropic API key (for the chat LLM) — or swap in OpenAI if you prefer
  • PostgreSQL 14+ with the pgvector extension available

If you are running Postgres locally via Docker, pgvector is available in the official pgvector/pgvector image:

docker run -d --name pgvector \
  -e POSTGRES_PASSWORD=postgres \
  -p 5432:5432 \
  pgvector/pgvector:pg16

3. Install Dependencies

pip install openai anthropic pgvector psycopg[binary] djangorestframework

Add to settings/base.py:

INSTALLED_APPS = [
    ...
    'rest_framework',
    'chat',          # the app we will create below
]

OPENAI_API_KEY    = env('OPENAI_API_KEY')
ANTHROPIC_API_KEY = env('ANTHROPIC_API_KEY')

4. Enable pgvector in PostgreSQL

pgvector must be enabled as an extension inside your database. Create a migration that runs the SQL for you so it works in CI and on fresh installs:

# chat/migrations/0001_enable_pgvector.py
from django.db import migrations

class Migration(migrations.Migration):
    dependencies = []

    operations = [
        migrations.RunSQL(
            sql='CREATE EXTENSION IF NOT EXISTS vector;',
            reverse_sql='DROP EXTENSION IF EXISTS vector;',
        )
    ]

5. The PageEmbedding Model

Each Wagtail page is split into chunks. Every chunk gets its own row with a 1536-dimensional embedding vector (matching OpenAI's text-embedding-3-small output size). We create an HNSW index for fast approximate nearest-neighbour search:

# chat/models.py
from django.db import models
from pgvector.django import VectorField, HnswIndex


class PageEmbedding(models.Model):
    page        = models.ForeignKey(
        'wagtailcore.Page', on_delete=models.CASCADE, related_name='embeddings'
    )
    chunk_index = models.PositiveIntegerField()
    content     = models.TextField()
    embedding   = VectorField(dimensions=1536)
    created_at  = models.DateTimeField(auto_now_add=True)

    class Meta:
        unique_together = ('page', 'chunk_index')
        indexes = [
            HnswIndex(
                name='page_embedding_hnsw_idx',
                fields=['embedding'],
                m=16,
                ef_construction=64,
                opclasses=['vector_cosine_ops'],
            )
        ]

    def __str__(self):
        return f'{self.page.title} — chunk {self.chunk_index}'

HNSW is the right index type here: it gives sub-millisecond nearest-neighbour queries at the cost of a little extra memory, which is a worthwhile trade for a chat endpoint.


6. Extract Page Content

Wagtail pages have different field layouts per page type. A helper that handles the most common cases — RichTextField, StreamField, and plain CharField/TextField attributes:

# chat/indexing.py
from wagtail.rich_text import get_text_for_indexing
from wagtail.fields import StreamField, RichTextField


def extract_page_text(page):
    """Return plain text from a Wagtail page's indexable fields."""
    specific = page.specific
    parts = [page.title]

    for field in specific._meta.get_fields():
        value = getattr(specific, field.name, None)
        if value is None:
            continue

        # StreamField
        if isinstance(getattr(specific.__class__, field.name, None), StreamField.descriptor_class):
            for block in value:
                bv = block.value
                if hasattr(bv, 'source'):
                    # RichTextBlock
                    parts.append(get_text_for_indexing(bv.source))
                elif isinstance(bv, str) and bv.strip():
                    parts.append(bv)

        # RichTextField stored as HTML
        elif isinstance(field, RichTextField.__class__) and isinstance(value, str):
            parts.append(get_text_for_indexing(value))

    # Fallback: search_description is often a good summary
    if hasattr(specific, 'search_description') and specific.search_description:
        parts.insert(1, specific.search_description)

    return '\n\n'.join(filter(None, parts))

7. Chunk the Text

Embeddings work best on focused chunks of 200–600 words. Too short and you lose context; too long and the embedding averages over too many topics and retrieval quality drops. A simple word-based sliding window with overlap keeps sentences from being cut mid-thought:

# chat/indexing.py (continued)

def chunk_text(text, chunk_size=400, overlap=60):
    """Split text into overlapping word-level chunks."""
    words = text.split()
    if not words:
        return []

    chunks = []
    i = 0
    while i < len(words):
        chunk = ' '.join(words[i : i + chunk_size])
        chunks.append(chunk)
        if i + chunk_size >= len(words):
            break
        i += chunk_size - overlap

    return chunks

The 60-word overlap means the same sentence will appear in two adjacent chunks. This prevents the edge of a chunk from cutting relevant context in half, which would degrade retrieval.


8. Generate and Store Embeddings

One function to embed a single string, and one to index a complete page — deleting old chunks first so a re-index is always clean:

# chat/indexing.py (continued)
from openai import OpenAI
from django.conf import settings
from .models import PageEmbedding

_openai = OpenAI(api_key=settings.OPENAI_API_KEY)


def get_embedding(text):
    """Call OpenAI and return a 1536-dim list of floats."""
    response = _openai.embeddings.create(
        model='text-embedding-3-small',
        input=text.replace('\n', ' '),
    )
    return response.data[0].embedding


def index_page(page):
    """Extract, chunk, embed, and store a single Wagtail page."""
    text = extract_page_text(page)
    if not text.strip():
        return 0

    chunks = chunk_text(text)

    # Clear stale chunks before reindexing
    PageEmbedding.objects.filter(page=page).delete()

    to_create = []
    for i, chunk in enumerate(chunks):
        to_create.append(PageEmbedding(
            page=page,
            chunk_index=i,
            content=chunk,
            embedding=get_embedding(chunk),
        ))

    PageEmbedding.objects.bulk_create(to_create)
    return len(to_create)

9. Management Command: Index All Pages

Run this once to build the initial index, and again after any bulk content import:

# chat/management/commands/build_rag_index.py
from django.core.management.base import BaseCommand
from wagtail.models import Page
from chat.indexing import index_page


class Command(BaseCommand):
    help = 'Embed all live Wagtail pages and store them in PageEmbedding.'

    def add_arguments(self, parser):
        parser.add_argument('--page-type', help='Limit to a specific page type (dotted path)')

    def handle(self, *args, **options):
        qs = Page.objects.live().not_in_menu() | Page.objects.live().in_menu()
        qs = qs.live().order_by('pk')

        page_type = options.get('page_type')
        if page_type:
            module, cls = page_type.rsplit('.', 1)
            import importlib
            model = getattr(importlib.import_module(module), cls)
            qs = model.objects.live()

        total = qs.count()
        self.stdout.write(f'Indexing {total} pages...')

        for page in qs.iterator():
            n = index_page(page)
            self.stdout.write(f'  [{page.pk}] {page.title} — {n} chunks')
python manage.py build_rag_index
# or limit to one page type:
python manage.py build_rag_index --page-type blog.models.BlogPage

10. Retrieve Relevant Chunks

Embed the user's question using the same model used at index time, then query the vector store for the closest chunks by cosine distance:

# chat/retrieval.py
from pgvector.django import CosineDistance
from .indexing import get_embedding
from .models import PageEmbedding


def retrieve(question, top_k=5):
    """Return the top_k most relevant PageEmbedding rows for the question."""
    q_embedding = get_embedding(question)

    return (
        PageEmbedding.objects
        .annotate(distance=CosineDistance('embedding', q_embedding))
        .select_related('page')
        .order_by('distance')[:top_k]
    )

CosineDistance returns values in [0, 2] — 0 is identical, 2 is opposite. For most well-embedded content, relevant chunks score below 0.3. You can add a distance threshold to avoid injecting irrelevant context into the prompt:

def retrieve(question, top_k=5, max_distance=0.4):
    q_embedding = get_embedding(question)

    return (
        PageEmbedding.objects
        .annotate(distance=CosineDistance('embedding', q_embedding))
        .filter(distance__lt=max_distance)
        .select_related('page')
        .order_by('distance')[:top_k]
    )

11. Build the Prompt and Call the LLM

The system prompt is the most important part of the RAG setup. It must tell the model to stay grounded in the provided context, admit when it does not know, and cite sources:

# chat/llm.py
import anthropic
from django.conf import settings

_claude = anthropic.Anthropic(api_key=settings.ANTHROPIC_API_KEY)

SYSTEM_PROMPT = """You are a helpful assistant for this website. Answer the user's question
using ONLY the context passages provided below. Each passage is labelled with the page it
came from.

Rules:
- If the context does not contain enough information, say "I don't have enough information
  about that on this site" — do not guess or invent facts.
- Always mention which page(s) your answer is drawn from.
- Be concise. Prefer bullet points for multi-part answers.
- Never make up URLs — only cite page titles."""


def build_context(chunks):
    parts = []
    for chunk in chunks:
        parts.append(f'[Page: {chunk.page.title}]\n{chunk.content}')
    return '\n\n---\n\n'.join(parts)


def answer(question, chunks):
    """Return a plain-text answer grounded in the retrieved chunks."""
    context = build_context(chunks)

    response = _claude.messages.create(
        model='claude-sonnet-4-6',
        max_tokens=1024,
        system=SYSTEM_PROMPT,
        messages=[{
            'role': 'user',
            'content': f'Context:\n\n{context}\n\nQuestion: {question}',
        }],
    )
    return response.content[0].text

12. The Chat API Endpoint

Wire the retrieval and LLM steps together in a DRF view. Return the answer plus deduplicated source pages so the frontend can show citations:

# chat/views.py
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status, throttling
from .retrieval import retrieve
from .llm import answer


class ChatView(APIView):
    throttle_classes = [throttling.AnonRateThrottle]

    def post(self, request):
        question = request.data.get('question', '').strip()

        if not question:
            return Response(
                {'error': 'question is required'},
                status=status.HTTP_400_BAD_REQUEST,
            )

        if len(question) > 500:
            return Response(
                {'error': 'question must be 500 characters or fewer'},
                status=status.HTTP_400_BAD_REQUEST,
            )

        chunks = retrieve(question, top_k=5)

        if not chunks:
            return Response({
                'answer':  "I couldn't find relevant content to answer your question.",
                'sources': [],
            })

        reply = answer(question, chunks)

        # Deduplicate sources, preserve relevance order
        seen = set()
        sources = []
        for chunk in chunks:
            if chunk.page_id not in seen:
                seen.add(chunk.page_id)
                sources.append({
                    'title': chunk.page.title,
                    'url':   chunk.page.url,
                })

        return Response({'answer': reply, 'sources': sources})
# chat/urls.py
from django.urls import path
from .views import ChatView

urlpatterns = [
    path('chat/', ChatView.as_view(), name='rag-chat'),
]
# config/urls.py
urlpatterns += [
    path('api/', include('chat.urls')),
]

Test it with curl:

curl -X POST http://localhost:8000/api/chat/ \
  -H 'Content-Type: application/json' \
  -d '{"question": "What services do you offer?"}'
{
  "answer": "Based on the Services page, the site offers...",
  "sources": [
    { "title": "Services", "url": "/services/" },
    { "title": "About Us", "url": "/about/" }
  ]
}

13. Streaming Responses with SSE

LLM responses can take 3–10 seconds for longer answers. Streaming via Server-Sent Events (SSE) lets the frontend start rendering as tokens arrive, which feels dramatically faster to the user. Django's StreamingHttpResponse handles this without async:

# chat/views.py (add alongside ChatView)
import json
from django.http import StreamingHttpResponse
from django.conf import settings
from django.views import View
import anthropic
from .retrieval import retrieve
from .llm import SYSTEM_PROMPT, build_context

_claude = anthropic.Anthropic(api_key=settings.ANTHROPIC_API_KEY)


class ChatStreamView(View):
    def get(self, request):
        question = request.GET.get('q', '').strip()

        if not question or len(question) > 500:
            return StreamingHttpResponse(
                iter([f'data: {json.dumps({"error": "invalid question"})}\n\n']),
                content_type='text/event-stream',
            )

        chunks  = retrieve(question, top_k=5)
        context = build_context(chunks) if chunks else 'No relevant content found.'
        sources = []
        seen    = set()
        for c in chunks:
            if c.page_id not in seen:
                seen.add(c.page_id)
                sources.append({'title': c.page.title, 'url': c.page.url})

        def event_stream():
            # Emit sources first so the frontend can render them immediately
            yield f'data: {json.dumps({"sources": sources})}\n\n'

            with _claude.messages.stream(
                model='claude-sonnet-4-6',
                max_tokens=1024,
                system=SYSTEM_PROMPT,
                messages=[{
                    'role': 'user',
                    'content': f'Context:\n\n{context}\n\nQuestion: {question}',
                }],
            ) as stream:
                for token in stream.text_stream:
                    yield f'data: {json.dumps({"token": token})}\n\n'

            yield 'data: [DONE]\n\n'

        response = StreamingHttpResponse(
            event_stream(), content_type='text/event-stream'
        )
        response['Cache-Control'] = 'no-cache'
        response['X-Accel-Buffering'] = 'no'  # disable Nginx buffering
        return response

Wire it into chat/urls.py:

from .views import ChatView, ChatStreamView

urlpatterns = [
    path('chat/',        ChatView.as_view(),       name='rag-chat'),
    path('chat/stream/', ChatStreamView.as_view(), name='rag-chat-stream'),
]

Connect from JavaScript:

const es = new EventSource(`/api/chat/stream/?q=${encodeURIComponent(question)}`);

es.onmessage = (e) => {
  if (e.data === '[DONE]') { es.close(); return; }
  const payload = JSON.parse(e.data);
  if (payload.sources) renderSources(payload.sources);
  if (payload.token)   appendToken(payload.token);
};

14. Auto-Index on Page Publish

Hook into Wagtail's page_published and page_unpublished signals so the index stays current without cron jobs or manual intervention. Use Celery to run the embedding in the background — OpenAI API calls are too slow to block the publish request:

# chat/tasks.py
from celery import shared_task
from wagtail.models import Page
from .indexing import index_page
from .models import PageEmbedding


@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def index_page_task(self, page_pk):
    try:
        page = Page.objects.get(pk=page_pk)
        index_page(page)
    except Exception as exc:
        raise self.retry(exc=exc)


@shared_task
def remove_page_embeddings(page_pk):
    PageEmbedding.objects.filter(page_id=page_pk).delete()
# chat/signals.py
from wagtail.signals import page_published, page_unpublished
from django.dispatch import receiver
from .tasks import index_page_task, remove_page_embeddings


@receiver(page_published)
def on_publish(sender, instance, **kwargs):
    index_page_task.delay(instance.pk)


@receiver(page_unpublished)
def on_unpublish(sender, instance, **kwargs):
    remove_page_embeddings.delay(instance.pk)
# chat/apps.py
from django.apps import AppConfig

class ChatConfig(AppConfig):
    name = 'chat'

    def ready(self):
        import chat.signals  # noqa: F401  — register signal handlers

15. Production Considerations

Rate limiting and cost control

Each chat request makes two API calls: one embedding (cheap, ~$0.00002 per question) and one LLM generation (much more, ~$0.003–0.015 depending on context size and model). Rate limit the endpoint aggressively in settings:

REST_FRAMEWORK = {
    'DEFAULT_THROTTLE_RATES': {
        'anon': '20/hour',
        'user': '100/hour',
    }
}

Cache repeated questions

Many users ask the same questions. Cache at the question level using Django's cache framework — a SHA256 hash of the normalised question string makes a safe cache key:

import hashlib
from django.core.cache import cache

def cached_answer(question, chunks):
    key = 'rag:' + hashlib.sha256(question.lower().encode()).hexdigest()
    result = cache.get(key)
    if result:
        return result
    result = answer(question, chunks)
    cache.set(key, result, timeout=3600)
    return result

Private content

The retrieval step searches all indexed chunks regardless of user session. If your Wagtail site has restricted pages (behind page_permissions or a paywall), filter the queryset in retrieve() by pages the current user can access before running the vector search.

Index freshness monitoring

Track the last indexed timestamp per page in PageEmbedding and expose a /api/rag-health/ endpoint that reports pages published after their last embed. A simple Celery beat task can alert you when the index drifts more than an hour behind live content.

Choose the right chunk size for your content

The 400-word default works well for blog posts and documentation. For product pages with lots of short bullet points, shrink to 150–200 words. For long legal or policy documents, go up to 600–800 words. Measure retrieval quality by checking whether the correct page appears in the top 3 results for a set of known test questions.


Wrapping Up

The full pipeline — extract, chunk, embed, retrieve, generate — is around 200 lines of Python with no specialised ML frameworks. pgvector keeps everything inside the Postgres database you already run, and Wagtail signals keep the index current without any manual intervention after the initial build.

The parts that repay the most attention in production are the system prompt (keep the model grounded in context), the distance threshold (filter out irrelevant chunks rather than injecting noise), and rate limiting (LLM generation costs add up fast at scale). Get those three right and you have a genuinely useful chat interface over your CMS content — one that stays accurate as editors update pages, with zero retraining required.