Django Python Celery AI

Django REST API + Celery: Async AI Tasks Without Blocking Your Workers

LLM API calls take 3–30 seconds. Django's synchronous workers spend that entire time blocked — unable to serve any other request. At even modest concurrency this breaks down fast. The fix is to accept the job immediately, hand it to Celery, and let the client check back when it is ready. This guide builds the full pipeline from scratch.

1. The Problem with Synchronous LLM Calls

A typical Django deployment runs Gunicorn with 4–8 synchronous workers. Each worker handles exactly one request at a time. When a request triggers an LLM call that takes 15 seconds, that worker is completely idle for 15 seconds — no timeouts, no other requests served.

With 5 concurrent users each waiting on an LLM response, you have exhausted a 5-worker deployment. The 6th user gets a 503. The solution is not more workers — it is decoupling the accepting of work from the doing of work.

The Django worker's job is to accept the request and return a response in milliseconds. The Celery worker's job is to do the slow work. These are different machines, different processes, and they scale independently.

2. Architecture: The Two-Step Async Pattern

The flow has two distinct phases — submission and retrieval:

Phase 1 — Submission (milliseconds)
  Client → POST /api/ai-tasks/
         → Django creates AITask(status=pending) in DB
         → Dispatches Celery task with task.id
         → Returns 202 Accepted + { id, status: "pending" }

Phase 2 — Processing (seconds, off the request/response cycle)
  Redis broker → Celery worker picks up task
              → Calls LLM API
              → Saves result to AITask in DB
              → status = "done" | "failed"

Phase 3 — Retrieval (milliseconds, whenever the client is ready)
  Client → GET /api/ai-tasks/{id}/
         → Django reads AITask from DB
         → Returns { status, result, ... }

The Django API worker is never blocked on the LLM. It does two tiny DB operations and returns. All the slow work happens in a separate Celery worker process with its own concurrency settings.


3. Install and Configure Celery

pip install celery redis django-celery-results

Run Redis locally with Docker if you don't have it already:

docker run -d --name redis -p 6379:6379 redis:7-alpine

Create the Celery application object:

# config/celery.py
import os
from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.base')

app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
# config/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)

Add settings — keep these in settings/base.py and override URLs per environment:

# settings/base.py
CELERY_BROKER_URL          = env('REDIS_URL', default='redis://localhost:6379/0')
CELERY_RESULT_BACKEND      = env('REDIS_URL', default='redis://localhost:6379/0')
CELERY_ACCEPT_CONTENT      = ['json']
CELERY_TASK_SERIALIZER     = 'json'
CELERY_RESULT_SERIALIZER   = 'json'
CELERY_TIMEZONE            = 'UTC'
CELERY_TASK_TRACK_STARTED  = True
CELERY_TASK_TIME_LIMIT     = 120   # hard kill after 120s
CELERY_TASK_SOFT_TIME_LIMIT = 90   # raises SoftTimeLimitExceeded at 90s

4. The AITask Model

Store task state in the database rather than relying solely on Celery's result backend. DB state survives Redis restarts, is queryable for analytics, and lets you track cost data (token usage) alongside the result:

# aitasks/models.py
import uuid
from django.db import models
from django.utils import timezone


class AITask(models.Model):
    PENDING    = 'pending'
    PROCESSING = 'processing'
    DONE       = 'done'
    FAILED     = 'failed'

    STATUS_CHOICES = [
        (PENDING,    'Pending'),
        (PROCESSING, 'Processing'),
        (DONE,       'Done'),
        (FAILED,     'Failed'),
    ]

    id            = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
    prompt        = models.TextField()
    status        = models.CharField(max_length=20, choices=STATUS_CHOICES,
                                     default=PENDING, db_index=True)
    result        = models.TextField(blank=True)
    error         = models.TextField(blank=True)
    model_used    = models.CharField(max_length=80, blank=True)
    input_tokens  = models.PositiveIntegerField(null=True, blank=True)
    output_tokens = models.PositiveIntegerField(null=True, blank=True)
    created_at    = models.DateTimeField(auto_now_add=True)
    completed_at  = models.DateTimeField(null=True, blank=True)

    class Meta:
        ordering = ['-created_at']

    def duration_seconds(self):
        if self.completed_at:
            return round((self.completed_at - self.created_at).total_seconds(), 2)
        return None

The input_tokens and output_tokens fields are invaluable in production — they let you run a cost report per task type, per user, or per time period without hitting the LLM provider's dashboard.


5. Write the Celery Task

The task does three things: mark the job as processing, call the LLM, save the result (or error). Each state transition is a targeted update_fields save to avoid overwriting concurrent changes:

# aitasks/tasks.py
import anthropic
from celery import shared_task
from celery.exceptions import SoftTimeLimitExceeded
from django.conf import settings
from django.utils import timezone
from .models import AITask

_claude = anthropic.Anthropic(api_key=settings.ANTHROPIC_API_KEY)


@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def process_ai_task(self, task_id):
    try:
        task = AITask.objects.get(id=task_id)
    except AITask.DoesNotExist:
        return  # deleted before the worker picked it up

    task.status = AITask.PROCESSING
    task.save(update_fields=['status'])

    try:
        response = _claude.messages.create(
            model='claude-sonnet-4-6',
            max_tokens=1024,
            messages=[{'role': 'user', 'content': task.prompt}],
        )
        task.status        = AITask.DONE
        task.result        = response.content[0].text
        task.model_used    = response.model
        task.input_tokens  = response.usage.input_tokens
        task.output_tokens = response.usage.output_tokens
        task.completed_at  = timezone.now()
        task.save(update_fields=[
            'status', 'result', 'model_used',
            'input_tokens', 'output_tokens', 'completed_at',
        ])

    except SoftTimeLimitExceeded:
        task.status = AITask.FAILED
        task.error  = 'Task exceeded the 90-second time limit.'
        task.save(update_fields=['status', 'error'])

    except anthropic.RateLimitError as exc:
        # Don't mark as failed — requeue with a longer delay
        task.status = AITask.PENDING
        task.save(update_fields=['status'])
        raise self.retry(exc=exc, countdown=90)

    except Exception as exc:
        task.status = AITask.FAILED
        task.error  = str(exc)
        task.save(update_fields=['status', 'error'])
        raise self.retry(exc=exc, countdown=30)

The RateLimitError handler is important: it resets the status to pending before retrying. If it were left as processing, the client would see a stale "processing" state that never resolves.


6. The API Endpoints

Two endpoints: one to submit a prompt and one to check status. The create endpoint returns 202 Accepted — not 200 OK — to signal that work is in progress, not complete:

# aitasks/serializers.py
from rest_framework import serializers
from .models import AITask


class AITaskCreateSerializer(serializers.ModelSerializer):
    class Meta:
        model  = AITask
        fields = ('prompt',)

    def validate_prompt(self, value):
        value = value.strip()
        if len(value) < 5:
            raise serializers.ValidationError('Prompt is too short.')
        if len(value) > 4000:
            raise serializers.ValidationError('Prompt must be 4,000 characters or fewer.')
        return value


class AITaskSerializer(serializers.ModelSerializer):
    duration = serializers.SerializerMethodField()

    class Meta:
        model  = AITask
        fields = ('id', 'status', 'result', 'error', 'model_used',
                  'input_tokens', 'output_tokens', 'created_at',
                  'completed_at', 'duration')
        read_only_fields = fields

    def get_duration(self, obj):
        return obj.duration_seconds()
# aitasks/views.py
from rest_framework import status
from rest_framework.generics import RetrieveAPIView
from rest_framework.response import Response
from rest_framework.throttling import AnonRateThrottle
from rest_framework.views import APIView
from .models import AITask
from .serializers import AITaskCreateSerializer, AITaskSerializer
from .tasks import process_ai_task


class AITaskCreateView(APIView):
    throttle_classes = [AnonRateThrottle]

    def post(self, request):
        serializer = AITaskCreateSerializer(data=request.data)
        serializer.is_valid(raise_exception=True)
        task = serializer.save()

        # Fire and forget — returns in <10ms
        process_ai_task.delay(str(task.id))

        return Response(AITaskSerializer(task).data, status=status.HTTP_202_ACCEPTED)


class AITaskStatusView(RetrieveAPIView):
    queryset         = AITask.objects.all()
    serializer_class = AITaskSerializer
    lookup_field     = 'id'
# aitasks/urls.py
from django.urls import path
from .views import AITaskCreateView, AITaskStatusView

urlpatterns = [
    path('ai-tasks/',           AITaskCreateView.as_view(), name='ai-task-create'),
    path('ai-tasks/<uuid:id>/', AITaskStatusView.as_view(), name='ai-task-status'),
]

Quick test with curl:

# Submit
curl -s -X POST http://localhost:8000/api/ai-tasks/ \
  -H 'Content-Type: application/json' \
  -d '{"prompt": "Explain Django signals in one paragraph."}' | python -m json.tool

# Poll (replace with your task id)
curl -s http://localhost:8000/api/ai-tasks/<task-id>/ | python -m json.tool

7. Frontend: Polling with Exponential Backoff

Polling every 500ms wastes requests when the LLM is taking 15 seconds. Exponential backoff — starting fast and slowing down — gives a responsive feel without hammering the server:

// Submit a prompt and wait for the result
async function askAI(prompt) {
    const res = await fetch('/api/ai-tasks/', {
        method:  'POST',
        headers: { 'Content-Type': 'application/json' },
        body:    JSON.stringify({ prompt }),
    });

    if (!res.ok) {
        const err = await res.json();
        throw new Error(err.prompt?.[0] || 'Submission failed');
    }

    const { id } = await res.json();
    return pollUntilDone(id);
}

async function pollUntilDone(taskId, maxWaitMs = 120_000) {
    const start = Date.now();
    let delay   = 1_000;   // first poll after 1 second

    while (Date.now() - start < maxWaitMs) {
        await sleep(delay);

        const task = await fetch(`/api/ai-tasks/${taskId}/`).then(r => r.json());

        if (task.status === 'done')   return task;
        if (task.status === 'failed') throw new Error(task.error || 'Task failed');

        // Back off: 1s → 1.5s → 2.25s → 3.4s → 5s (cap)
        delay = Math.min(delay * 1.5, 5_000);
    }

    throw new Error('Timed out waiting for AI response');
}

const sleep = ms => new Promise(resolve => setTimeout(resolve, ms));

// Usage
askAI('What is memoisation in Python?')
    .then(task => console.log(task.result))
    .catch(err => console.error(err.message));

The cap at 5 seconds keeps the experience responsive. For tasks that reliably complete in under 10 seconds you can tighten the cap to 2–3 seconds.


8. Real-Time Updates Without Polling

Polling works well for most cases. Two alternatives are worth knowing:

Option A — Server-Sent Events (SSE)

Add a streaming endpoint that holds the connection open and pushes a status event when the task completes. The Celery task writes to Redis pub/sub; the SSE view subscribes:

# In the Celery task, after saving the done/failed state:
import redis, json
from django.conf import settings

_redis = redis.from_url(settings.CELERY_BROKER_URL)

_redis.publish(f'task:{task_id}', json.dumps({
    'status': task.status,
    'result': task.result,
    'error':  task.error,
}))
# SSE view — subscribes and streams the event to the client
import json, redis
from django.http import StreamingHttpResponse
from django.conf import settings

def task_sse(request, task_id):
    def event_stream():
        r  = redis.from_url(settings.CELERY_BROKER_URL)
        ps = r.pubsub()
        ps.subscribe(f'task:{task_id}')

        for message in ps.listen():
            if message['type'] == 'message':
                yield f'data: {message["data"].decode()}\n\n'
                break   # single event — close after result arrives

    response = StreamingHttpResponse(event_stream(), content_type='text/event-stream')
    response['Cache-Control']    = 'no-cache'
    response['X-Accel-Buffering'] = 'no'
    return response

Option B — Django Channels (WebSocket)

For apps that already use Django Channels, push the update over a WebSocket instead. In the Celery task, call channel_layer.group_send after the result is saved:

from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync

channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
    f'task_{task_id}',
    {
        'type':   'task.update',
        'status': task.status,
        'result': task.result,
    }
)

SSE is simpler to add to an existing DRF project. WebSocket is the right choice if you already have Channels wired up or need bidirectional communication.


9. Retry Failed Tasks Gracefully

Celery's retry mechanism is good but needs a few refinements for LLM workloads:

@shared_task(bind=True, max_retries=3)
def process_ai_task(self, task_id):
    ...
    except anthropic.RateLimitError as exc:
        # Exponential backoff: 90s, 180s, 360s
        countdown = 90 * (2 ** self.request.retries)
        task.status = AITask.PENDING
        task.save(update_fields=['status'])
        raise self.retry(exc=exc, countdown=countdown)

    except anthropic.APIStatusError as exc:
        if exc.status_code >= 500:
            # Transient server error — retry
            raise self.retry(exc=exc, countdown=30)
        # 4xx client errors won't recover with retries — fail immediately
        task.status = AITask.FAILED
        task.error  = f'API error {exc.status_code}: {exc.message}'
        task.save(update_fields=['status', 'error'])

The distinction between 4xx and 5xx errors matters: a 400 Bad Request from the LLM provider will not resolve by retrying — it means your prompt is malformed. A 503 Service Unavailable will often resolve after a short wait.

Always reset status to pending before retrying. A task stuck in processing forever is worse than a failed one because it never triggers user-facing error handling.


10. Separate Queues for Priority Work

Not all AI tasks are equal. A quick classification request should not queue behind a long document analysis. Use separate named queues and run worker pools with different concurrency settings for each:

# settings/base.py
CELERY_TASK_ROUTES = {
    'aitasks.tasks.process_quick_task': {'queue': 'ai_fast'},
    'aitasks.tasks.process_ai_task':    {'queue': 'ai_default'},
    'aitasks.tasks.process_bulk_task':  {'queue': 'ai_slow'},
}
# tasks.py — explicit queue on the decorator
@shared_task(bind=True, queue='ai_fast', max_retries=3)
def process_quick_task(self, task_id):
    ...   # short prompt, low max_tokens

@shared_task(bind=True, queue='ai_slow', max_retries=3)
def process_bulk_task(self, task_id):
    ...   # long document, high max_tokens, not time-sensitive

Run separate worker pools in your Procfile or supervisor config:

# Procfile (Heroku / Render)
web:        gunicorn config.wsgi:application
worker_fast: celery -A config worker -Q ai_fast -c 4 --loglevel=info
worker_slow: celery -A config worker -Q ai_slow -c 1 --loglevel=info

The fast queue gets 4 concurrent workers; the slow queue gets 1 so bulk jobs never consume all your LLM API concurrency. Scale each independently based on observed queue depth.


11. Production Checklist

Monitor queues with Flower

Flower is Celery's real-time web UI. Run it as a separate process and restrict access to internal networks or behind basic auth:

pip install flower

celery -A config flower --port=5555 --basic-auth=admin:yourpassword

Set a result TTL

Task results stored in Redis accumulate forever without a TTL. Set a sensible expiry and periodically clean up old DB rows:

CELERY_RESULT_EXPIRES = 60 * 60 * 24   # 24 hours in Redis

# Periodic cleanup task (run via Celery Beat)
@shared_task
def cleanup_old_tasks():
    from django.utils import timezone
    from datetime import timedelta
    cutoff = timezone.now() - timedelta(days=30)
    AITask.objects.filter(created_at__lt=cutoff, status=AITask.DONE).delete()

Throttle at the API level

# settings/base.py
REST_FRAMEWORK = {
    'DEFAULT_THROTTLE_RATES': {
        'anon': '10/hour',
        'user': '100/hour',
    }
}

Track cost per request

With input_tokens and output_tokens stored on every task, a daily cost report is a simple aggregation query:

from django.db.models import Sum
from datetime import date

today = AITask.objects.filter(
    status=AITask.DONE,
    completed_at__date=date.today(),
)
totals = today.aggregate(
    input=Sum('input_tokens'),
    output=Sum('output_tokens'),
)
# claude-sonnet-4-6: $3/M input, $15/M output
cost = (totals['input'] / 1_000_000 * 3) + (totals['output'] / 1_000_000 * 15)
print(f"Today's AI cost: ${cost:.4f}")

Health check endpoint

Expose a lightweight endpoint that verifies the broker is reachable, so your load balancer can take unhealthy instances out of rotation:

from celery import current_app
from rest_framework.decorators import api_view
from rest_framework.response import Response

@api_view(['GET'])
def celery_health(request):
    try:
        current_app.control.ping(timeout=1)
        return Response({'status': 'ok'})
    except Exception:
        return Response({'status': 'degraded'}, status=503)

Wrapping Up

The pattern is simple once it clicks: Django accepts the job and returns a task ID in milliseconds; Celery does the slow work in the background; the client polls or subscribes for the result. Your Django workers stay free for other requests and your Celery workers scale independently based on AI workload.

The pieces that matter most in production are retrying with the right delay (rate limit errors need longer back-off than transient 5xx errors), resetting status to pending before any retry so the client never gets stuck, and separating queues so time-sensitive tasks are never blocked behind long-running bulk jobs.

Add token tracking from day one — retrofitting cost visibility into a production system that is already handling thousands of tasks per day is painful.