Django REST API + Celery: Async AI Tasks Without Blocking Your Workers
LLM API calls take 3–30 seconds. Django's synchronous workers spend that entire time blocked — unable to serve any other request. At even modest concurrency this breaks down fast. The fix is to accept the job immediately, hand it to Celery, and let the client check back when it is ready. This guide builds the full pipeline from scratch.
1. The Problem with Synchronous LLM Calls
A typical Django deployment runs Gunicorn with 4–8 synchronous workers. Each worker handles exactly one request at a time. When a request triggers an LLM call that takes 15 seconds, that worker is completely idle for 15 seconds — no timeouts, no other requests served.
With 5 concurrent users each waiting on an LLM response, you have exhausted a 5-worker deployment. The 6th user gets a 503. The solution is not more workers — it is decoupling the accepting of work from the doing of work.
The Django worker's job is to accept the request and return a response in milliseconds. The Celery worker's job is to do the slow work. These are different machines, different processes, and they scale independently.
2. Architecture: The Two-Step Async Pattern
The flow has two distinct phases — submission and retrieval:
Phase 1 — Submission (milliseconds)
Client → POST /api/ai-tasks/
→ Django creates AITask(status=pending) in DB
→ Dispatches Celery task with task.id
→ Returns 202 Accepted + { id, status: "pending" }
Phase 2 — Processing (seconds, off the request/response cycle)
Redis broker → Celery worker picks up task
→ Calls LLM API
→ Saves result to AITask in DB
→ status = "done" | "failed"
Phase 3 — Retrieval (milliseconds, whenever the client is ready)
Client → GET /api/ai-tasks/{id}/
→ Django reads AITask from DB
→ Returns { status, result, ... }
The Django API worker is never blocked on the LLM. It does two tiny DB operations and returns. All the slow work happens in a separate Celery worker process with its own concurrency settings.
3. Install and Configure Celery
pip install celery redis django-celery-results
Run Redis locally with Docker if you don't have it already:
docker run -d --name redis -p 6379:6379 redis:7-alpine
Create the Celery application object:
# config/celery.py
import os
from celery import Celery
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings.base')
app = Celery('myproject')
app.config_from_object('django.conf:settings', namespace='CELERY')
app.autodiscover_tasks()
# config/__init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)
Add settings — keep these in settings/base.py and override URLs per environment:
# settings/base.py
CELERY_BROKER_URL = env('REDIS_URL', default='redis://localhost:6379/0')
CELERY_RESULT_BACKEND = env('REDIS_URL', default='redis://localhost:6379/0')
CELERY_ACCEPT_CONTENT = ['json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'UTC'
CELERY_TASK_TRACK_STARTED = True
CELERY_TASK_TIME_LIMIT = 120 # hard kill after 120s
CELERY_TASK_SOFT_TIME_LIMIT = 90 # raises SoftTimeLimitExceeded at 90s
4. The AITask Model
Store task state in the database rather than relying solely on Celery's result backend. DB state survives Redis restarts, is queryable for analytics, and lets you track cost data (token usage) alongside the result:
# aitasks/models.py
import uuid
from django.db import models
from django.utils import timezone
class AITask(models.Model):
PENDING = 'pending'
PROCESSING = 'processing'
DONE = 'done'
FAILED = 'failed'
STATUS_CHOICES = [
(PENDING, 'Pending'),
(PROCESSING, 'Processing'),
(DONE, 'Done'),
(FAILED, 'Failed'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
prompt = models.TextField()
status = models.CharField(max_length=20, choices=STATUS_CHOICES,
default=PENDING, db_index=True)
result = models.TextField(blank=True)
error = models.TextField(blank=True)
model_used = models.CharField(max_length=80, blank=True)
input_tokens = models.PositiveIntegerField(null=True, blank=True)
output_tokens = models.PositiveIntegerField(null=True, blank=True)
created_at = models.DateTimeField(auto_now_add=True)
completed_at = models.DateTimeField(null=True, blank=True)
class Meta:
ordering = ['-created_at']
def duration_seconds(self):
if self.completed_at:
return round((self.completed_at - self.created_at).total_seconds(), 2)
return None
The input_tokens and output_tokens fields are invaluable in
production — they let you run a cost report per task type, per user, or per time period
without hitting the LLM provider's dashboard.
5. Write the Celery Task
The task does three things: mark the job as processing, call the LLM, save the result
(or error). Each state transition is a targeted update_fields save to avoid
overwriting concurrent changes:
# aitasks/tasks.py
import anthropic
from celery import shared_task
from celery.exceptions import SoftTimeLimitExceeded
from django.conf import settings
from django.utils import timezone
from .models import AITask
_claude = anthropic.Anthropic(api_key=settings.ANTHROPIC_API_KEY)
@shared_task(bind=True, max_retries=3, default_retry_delay=60)
def process_ai_task(self, task_id):
try:
task = AITask.objects.get(id=task_id)
except AITask.DoesNotExist:
return # deleted before the worker picked it up
task.status = AITask.PROCESSING
task.save(update_fields=['status'])
try:
response = _claude.messages.create(
model='claude-sonnet-4-6',
max_tokens=1024,
messages=[{'role': 'user', 'content': task.prompt}],
)
task.status = AITask.DONE
task.result = response.content[0].text
task.model_used = response.model
task.input_tokens = response.usage.input_tokens
task.output_tokens = response.usage.output_tokens
task.completed_at = timezone.now()
task.save(update_fields=[
'status', 'result', 'model_used',
'input_tokens', 'output_tokens', 'completed_at',
])
except SoftTimeLimitExceeded:
task.status = AITask.FAILED
task.error = 'Task exceeded the 90-second time limit.'
task.save(update_fields=['status', 'error'])
except anthropic.RateLimitError as exc:
# Don't mark as failed — requeue with a longer delay
task.status = AITask.PENDING
task.save(update_fields=['status'])
raise self.retry(exc=exc, countdown=90)
except Exception as exc:
task.status = AITask.FAILED
task.error = str(exc)
task.save(update_fields=['status', 'error'])
raise self.retry(exc=exc, countdown=30)
The RateLimitError handler is important: it resets the status to
pending before retrying. If it were left as processing, the
client would see a stale "processing" state that never resolves.
6. The API Endpoints
Two endpoints: one to submit a prompt and one to check status. The create endpoint
returns 202 Accepted — not 200 OK — to signal that work
is in progress, not complete:
# aitasks/serializers.py
from rest_framework import serializers
from .models import AITask
class AITaskCreateSerializer(serializers.ModelSerializer):
class Meta:
model = AITask
fields = ('prompt',)
def validate_prompt(self, value):
value = value.strip()
if len(value) < 5:
raise serializers.ValidationError('Prompt is too short.')
if len(value) > 4000:
raise serializers.ValidationError('Prompt must be 4,000 characters or fewer.')
return value
class AITaskSerializer(serializers.ModelSerializer):
duration = serializers.SerializerMethodField()
class Meta:
model = AITask
fields = ('id', 'status', 'result', 'error', 'model_used',
'input_tokens', 'output_tokens', 'created_at',
'completed_at', 'duration')
read_only_fields = fields
def get_duration(self, obj):
return obj.duration_seconds()
# aitasks/views.py
from rest_framework import status
from rest_framework.generics import RetrieveAPIView
from rest_framework.response import Response
from rest_framework.throttling import AnonRateThrottle
from rest_framework.views import APIView
from .models import AITask
from .serializers import AITaskCreateSerializer, AITaskSerializer
from .tasks import process_ai_task
class AITaskCreateView(APIView):
throttle_classes = [AnonRateThrottle]
def post(self, request):
serializer = AITaskCreateSerializer(data=request.data)
serializer.is_valid(raise_exception=True)
task = serializer.save()
# Fire and forget — returns in <10ms
process_ai_task.delay(str(task.id))
return Response(AITaskSerializer(task).data, status=status.HTTP_202_ACCEPTED)
class AITaskStatusView(RetrieveAPIView):
queryset = AITask.objects.all()
serializer_class = AITaskSerializer
lookup_field = 'id'
# aitasks/urls.py
from django.urls import path
from .views import AITaskCreateView, AITaskStatusView
urlpatterns = [
path('ai-tasks/', AITaskCreateView.as_view(), name='ai-task-create'),
path('ai-tasks/<uuid:id>/', AITaskStatusView.as_view(), name='ai-task-status'),
]
Quick test with curl:
# Submit
curl -s -X POST http://localhost:8000/api/ai-tasks/ \
-H 'Content-Type: application/json' \
-d '{"prompt": "Explain Django signals in one paragraph."}' | python -m json.tool
# Poll (replace with your task id)
curl -s http://localhost:8000/api/ai-tasks/<task-id>/ | python -m json.tool
7. Frontend: Polling with Exponential Backoff
Polling every 500ms wastes requests when the LLM is taking 15 seconds. Exponential backoff — starting fast and slowing down — gives a responsive feel without hammering the server:
// Submit a prompt and wait for the result
async function askAI(prompt) {
const res = await fetch('/api/ai-tasks/', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({ prompt }),
});
if (!res.ok) {
const err = await res.json();
throw new Error(err.prompt?.[0] || 'Submission failed');
}
const { id } = await res.json();
return pollUntilDone(id);
}
async function pollUntilDone(taskId, maxWaitMs = 120_000) {
const start = Date.now();
let delay = 1_000; // first poll after 1 second
while (Date.now() - start < maxWaitMs) {
await sleep(delay);
const task = await fetch(`/api/ai-tasks/${taskId}/`).then(r => r.json());
if (task.status === 'done') return task;
if (task.status === 'failed') throw new Error(task.error || 'Task failed');
// Back off: 1s → 1.5s → 2.25s → 3.4s → 5s (cap)
delay = Math.min(delay * 1.5, 5_000);
}
throw new Error('Timed out waiting for AI response');
}
const sleep = ms => new Promise(resolve => setTimeout(resolve, ms));
// Usage
askAI('What is memoisation in Python?')
.then(task => console.log(task.result))
.catch(err => console.error(err.message));
The cap at 5 seconds keeps the experience responsive. For tasks that reliably complete in under 10 seconds you can tighten the cap to 2–3 seconds.
8. Real-Time Updates Without Polling
Polling works well for most cases. Two alternatives are worth knowing:
Option A — Server-Sent Events (SSE)
Add a streaming endpoint that holds the connection open and pushes a status event when the task completes. The Celery task writes to Redis pub/sub; the SSE view subscribes:
# In the Celery task, after saving the done/failed state:
import redis, json
from django.conf import settings
_redis = redis.from_url(settings.CELERY_BROKER_URL)
_redis.publish(f'task:{task_id}', json.dumps({
'status': task.status,
'result': task.result,
'error': task.error,
}))
# SSE view — subscribes and streams the event to the client
import json, redis
from django.http import StreamingHttpResponse
from django.conf import settings
def task_sse(request, task_id):
def event_stream():
r = redis.from_url(settings.CELERY_BROKER_URL)
ps = r.pubsub()
ps.subscribe(f'task:{task_id}')
for message in ps.listen():
if message['type'] == 'message':
yield f'data: {message["data"].decode()}\n\n'
break # single event — close after result arrives
response = StreamingHttpResponse(event_stream(), content_type='text/event-stream')
response['Cache-Control'] = 'no-cache'
response['X-Accel-Buffering'] = 'no'
return response
Option B — Django Channels (WebSocket)
For apps that already use Django Channels, push the update over a WebSocket instead.
In the Celery task, call channel_layer.group_send after the result is saved:
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
f'task_{task_id}',
{
'type': 'task.update',
'status': task.status,
'result': task.result,
}
)
SSE is simpler to add to an existing DRF project. WebSocket is the right choice if you already have Channels wired up or need bidirectional communication.
9. Retry Failed Tasks Gracefully
Celery's retry mechanism is good but needs a few refinements for LLM workloads:
@shared_task(bind=True, max_retries=3)
def process_ai_task(self, task_id):
...
except anthropic.RateLimitError as exc:
# Exponential backoff: 90s, 180s, 360s
countdown = 90 * (2 ** self.request.retries)
task.status = AITask.PENDING
task.save(update_fields=['status'])
raise self.retry(exc=exc, countdown=countdown)
except anthropic.APIStatusError as exc:
if exc.status_code >= 500:
# Transient server error — retry
raise self.retry(exc=exc, countdown=30)
# 4xx client errors won't recover with retries — fail immediately
task.status = AITask.FAILED
task.error = f'API error {exc.status_code}: {exc.message}'
task.save(update_fields=['status', 'error'])
The distinction between 4xx and 5xx errors matters: a 400 Bad Request from
the LLM provider will not resolve by retrying — it means your prompt is malformed. A
503 Service Unavailable will often resolve after a short wait.
Always reset status to pending before retrying. A task stuck in
processing forever is worse than a failed one because it never triggers
user-facing error handling.
10. Separate Queues for Priority Work
Not all AI tasks are equal. A quick classification request should not queue behind a long document analysis. Use separate named queues and run worker pools with different concurrency settings for each:
# settings/base.py
CELERY_TASK_ROUTES = {
'aitasks.tasks.process_quick_task': {'queue': 'ai_fast'},
'aitasks.tasks.process_ai_task': {'queue': 'ai_default'},
'aitasks.tasks.process_bulk_task': {'queue': 'ai_slow'},
}
# tasks.py — explicit queue on the decorator
@shared_task(bind=True, queue='ai_fast', max_retries=3)
def process_quick_task(self, task_id):
... # short prompt, low max_tokens
@shared_task(bind=True, queue='ai_slow', max_retries=3)
def process_bulk_task(self, task_id):
... # long document, high max_tokens, not time-sensitive
Run separate worker pools in your Procfile or supervisor config:
# Procfile (Heroku / Render)
web: gunicorn config.wsgi:application
worker_fast: celery -A config worker -Q ai_fast -c 4 --loglevel=info
worker_slow: celery -A config worker -Q ai_slow -c 1 --loglevel=info
The fast queue gets 4 concurrent workers; the slow queue gets 1 so bulk jobs never consume all your LLM API concurrency. Scale each independently based on observed queue depth.
11. Production Checklist
Monitor queues with Flower
Flower is Celery's real-time web UI. Run it as a separate process and restrict access to internal networks or behind basic auth:
pip install flower
celery -A config flower --port=5555 --basic-auth=admin:yourpassword
Set a result TTL
Task results stored in Redis accumulate forever without a TTL. Set a sensible expiry and periodically clean up old DB rows:
CELERY_RESULT_EXPIRES = 60 * 60 * 24 # 24 hours in Redis
# Periodic cleanup task (run via Celery Beat)
@shared_task
def cleanup_old_tasks():
from django.utils import timezone
from datetime import timedelta
cutoff = timezone.now() - timedelta(days=30)
AITask.objects.filter(created_at__lt=cutoff, status=AITask.DONE).delete()
Throttle at the API level
# settings/base.py
REST_FRAMEWORK = {
'DEFAULT_THROTTLE_RATES': {
'anon': '10/hour',
'user': '100/hour',
}
}
Track cost per request
With input_tokens and output_tokens stored on every task, a
daily cost report is a simple aggregation query:
from django.db.models import Sum
from datetime import date
today = AITask.objects.filter(
status=AITask.DONE,
completed_at__date=date.today(),
)
totals = today.aggregate(
input=Sum('input_tokens'),
output=Sum('output_tokens'),
)
# claude-sonnet-4-6: $3/M input, $15/M output
cost = (totals['input'] / 1_000_000 * 3) + (totals['output'] / 1_000_000 * 15)
print(f"Today's AI cost: ${cost:.4f}")
Health check endpoint
Expose a lightweight endpoint that verifies the broker is reachable, so your load balancer can take unhealthy instances out of rotation:
from celery import current_app
from rest_framework.decorators import api_view
from rest_framework.response import Response
@api_view(['GET'])
def celery_health(request):
try:
current_app.control.ping(timeout=1)
return Response({'status': 'ok'})
except Exception:
return Response({'status': 'degraded'}, status=503)
Wrapping Up
The pattern is simple once it clicks: Django accepts the job and returns a task ID in milliseconds; Celery does the slow work in the background; the client polls or subscribes for the result. Your Django workers stay free for other requests and your Celery workers scale independently based on AI workload.
The pieces that matter most in production are retrying with the right delay (rate limit
errors need longer back-off than transient 5xx errors), resetting status to
pending before any retry so the client never gets stuck, and separating queues
so time-sensitive tasks are never blocked behind long-running bulk jobs.
Add token tracking from day one — retrofitting cost visibility into a production system that is already handling thousands of tasks per day is painful.