WEBBYFOX-OS PATH /blog/testing-async-django-code/ DOC ~17 MIN NODE LDN-01
FEED ACTIVE 13:42 BST
./post · testing-async-django-code.md
NEW
// ARTICLE · JUN 5, 2026

Testing Async Django Code: Celery, Channels & WebSockets.

Most Django test suites pass because they only test the easy parts — the request/response loop. The bits that page on-call at 2am — a Celery task that didn't retry, a WebSocket consumer that dropped a message, a Channels group that delivered to no one — are exactly the bits we don't test, because testing them is fiddly. This is a practical guide to the patterns that work: eager-mode Celery tests for unit-level confidence, WebsocketCommunicator for Channels consumers, and the small handful of pytest-asyncio tricks that make async tests stop being flaky.

Python Django Celery Channels Testing
· ~17 min read ·
$ cat ./testing-async-django-code.md
READ

Why Async Testing Is Different

A normal Django view is a function: input request, output response. The test is obvious — call the view, inspect the response. Async Django breaks every part of that contract. A Celery task is fired off and may execute on another machine, minutes from now. A Channels consumer doesn't return — it stays alive for the lifetime of a connection, exchanging messages with a client over a duplex pipe. There's no "the response" to assert against, because there might be six responses, or none, or one in five seconds.

The temptation in every team I've seen is to skip these tests entirely and "rely on integration." The integration tests then become flaky, time out, or quietly stop running. Better is to draw clear lines: test the task body as a function, test the consumer as a state machine, and integration-test only the wiring. The patterns below are what fall out of that distinction.


Testing Celery: Eager Mode Is the Workhorse

A Celery task is a regular Python function with a .delay() handle. The most useful insight is that you usually don't want to test Celery — you want to test the task body. Eager mode runs the task synchronously, in-process, the moment you call .delay(). Combine that with a single pytest fixture and you can unit-test tasks like any other function.

# conftest.py
import pytest

@pytest.fixture
def celery_eager(settings):
    """Run Celery tasks in-process. Use for unit tests where the task body
       is what matters; do NOT use this for tests that exercise the broker."""
    settings.CELERY_TASK_ALWAYS_EAGER = True
    settings.CELERY_TASK_EAGER_PROPAGATES = True       # raise in tests, don't swallow
    settings.CELERY_BROKER_URL = "memory://"           # never connect to Redis/RMQ
    yield
# tasks.py
from celery import shared_task
from .models import Invoice, EmailLog

@shared_task(bind=True, max_retries=3, default_retry_delay=10)
def send_invoice_email(self, invoice_id):
    invoice = Invoice.objects.get(pk=invoice_id)
    try:
        EmailLog.send(invoice)
    except TransientSMTPError as exc:
        raise self.retry(exc=exc)
    invoice.mark_emailed()
    return invoice.id

# test_tasks.py
def test_send_invoice_email_marks_invoice(db, celery_eager, mailoutbox):
    invoice = InvoiceFactory()
    result = send_invoice_email.delay(invoice.id)        # runs synchronously
    assert result.get() == invoice.id                    # task return value
    invoice.refresh_from_db()
    assert invoice.emailed_at is not None
    assert len(mailoutbox) == 1

Testing Celery: Mocking the Call Site

The other half of Celery testing is the opposite problem: a view fires a task, and you want to assert that the task was enqueued with the right arguments, without actually running it. The cleanest tool is patching the task itself.

from unittest.mock import patch

def test_view_enqueues_invoice_email(client, db):
    invoice = InvoiceFactory()
    with patch("billing.tasks.send_invoice_email.delay") as mock_delay:
        client.post(f"/invoices/{invoice.id}/email/")
    mock_delay.assert_called_once_with(invoice.id)

This is the right shape 90% of the time: the view's job is to enqueue, not to email. Testing the view + task as one thing in eager mode means a re-render of the view also triggers an SMTP attempt, and you start writing your tests around the side-effects you don't actually want.

Retries

Retries are tricky because self.retry() raises celery.exceptions.Retry, which Celery itself catches. In eager mode with EAGER_PROPAGATES=True that exception bubbles to your test. Assert on it directly.

from celery.exceptions import Retry

def test_send_invoice_email_retries_on_transient_smtp(db, celery_eager):
    invoice = InvoiceFactory()
    with patch("billing.tasks.EmailLog.send", side_effect=TransientSMTPError):
        with pytest.raises(Retry):
            send_invoice_email.delay(invoice.id).get()
    invoice.refresh_from_db()
    assert invoice.emailed_at is None                    # didn't mark prematurely

Chained workflows: chord, group, chain

Eager mode handles chain and group reasonably. chord needs a result backend that supports it; for tests, set CELERY_RESULT_BACKEND="cache+memory://". The pattern is the same — invoke the workflow, call .get(), assert on the side-effects:

from celery import chain

def test_invoice_workflow(db, celery_eager):
    invoice = InvoiceFactory()
    workflow = chain(
        validate_invoice.s(invoice.id),
        generate_pdf.s(),
        send_invoice_email.s(),
    )
    workflow.apply_async().get()
    invoice.refresh_from_db()
    assert invoice.status == "sent"

Testing Channels Consumers

Channels ships WebsocketCommunicator — a test client that speaks WebSocket against your consumer in-process, with no network and no ASGI server. You drive the consumer with send_json_to / receive_json_from and assert on what comes back. It's the single best testing primitive in the Channels stack.

# consumers.py
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from channels.db import database_sync_to_async

class ChatConsumer(AsyncJsonWebsocketConsumer):
    async def connect(self):
        self.room = self.scope["url_route"]["kwargs"]["room"]
        await self.channel_layer.group_add(self.room, self.channel_name)
        await self.accept()

    async def disconnect(self, code):
        await self.channel_layer.group_discard(self.room, self.channel_name)

    async def receive_json(self, content, **kwargs):
        await database_sync_to_async(Message.objects.create)(
            room=self.room, text=content["text"], user_id=self.scope["user"].id,
        )
        await self.channel_layer.group_send(
            self.room, {"type": "chat.message", "text": content["text"]},
        )

    async def chat_message(self, event):
        await self.send_json({"text": event["text"]})
# test_consumers.py
import pytest
from channels.testing import WebsocketCommunicator
from chat.consumers import ChatConsumer

@pytest.mark.asyncio
async def test_chat_message_round_trip(db_async):
    user = await UserFactory.acreate()
    comm = WebsocketCommunicator(
        ChatConsumer.as_asgi(),
        path="/ws/chat/room-42/",
    )
    comm.scope["url_route"] = {"kwargs": {"room": "room-42"}}
    comm.scope["user"] = user

    connected, _ = await comm.connect()
    assert connected

    await comm.send_json_to({"text": "hello"})
    reply = await comm.receive_json_from(timeout=2)
    assert reply == {"text": "hello"}

    await comm.disconnect()

Channels + Database from Async

Django's ORM is synchronous. Touching it directly from an async def raises SynchronousOnlyOperation. Channels gives you two escape hatches: database_sync_to_async (a decorator) for the consumer code, and a small set of fixture tweaks for the tests.

# conftest.py — async-safe DB fixture
import pytest
from channels.db import database_sync_to_async

@pytest.fixture
def db_async(db, transactional_db):
    """Use this in async tests instead of `db`. The `transactional_db`
       fixture flushes between tests, which is required because the
       async test runs in its own event loop and won't see the savepoint."""
    return db

# Helper for tests
@database_sync_to_async
def _count_messages(room):
    return Message.objects.filter(room=room).count()
@pytest.mark.asyncio
async def test_message_is_persisted(db_async):
    # ... open communicator, send a message ...
    assert await _count_messages("room-42") == 1

Testing Channels Routing

The routing layer (asgi.py + routing.py) is the part that's most likely to silently break in production. The consumer might be perfect; if the URL pattern is wrong, no one ever connects to it. A direct routing test is one fixture and three lines:

from channels.testing import WebsocketCommunicator
from config.asgi import application                  # the actual ASGI router

@pytest.mark.asyncio
async def test_chat_route_reachable(db_async):
    comm = WebsocketCommunicator(application, "/ws/chat/room-42/")
    connected, _ = await comm.connect()
    assert connected
    await comm.disconnect()

This exercises the real ProtocolTypeRouter, AuthMiddlewareStack, and URLRouter. If anyone breaks them — a missing slash, a misordered middleware, an unimported consumer — this test fails before you ship it.


Testing Group Sends Without a Browser

The other thing that breaks silently is group_send. The consumer subscribes to a group on connect, your task or signal handler does group_send on some external event, the consumer's handler should receive the message and forward to the client. There's no browser in your test — you use a second communicator instead.

from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync

@pytest.mark.asyncio
async def test_external_broadcast_reaches_client(db_async):
    comm = WebsocketCommunicator(application, "/ws/chat/room-42/")
    connected, _ = await comm.connect()
    assert connected

    # Simulate something outside the consumer (a Celery task, a signal) firing
    # a group_send. We use the same channel layer the consumer subscribed to.
    layer = get_channel_layer()
    await layer.group_send(
        "room-42",
        {"type": "chat.message", "text": "system: server restarting in 30s"},
    )

    reply = await comm.receive_json_from(timeout=2)
    assert reply["text"].startswith("system:")
    await comm.disconnect()

Eager Mode vs Real Broker: When to Pay the Cost

Eager mode is great for unit tests and bad for everything beyond unit tests. Anything that depends on the actual broker behaviour — concurrency, visibility timeouts, prefetch limits, late acks, dead-letter queues — is invisible in eager mode because the task runs in the same process. For those, you need a real worker.

Test typeSetupWhat it catches
Unit (task body) Eager mode Logic bugs, DB writes, retry decisions, return values
Unit (call site) patch(...delay) "This view enqueues the right task with the right args"
Integration (worker) Real broker + worker Routing, queue selection, acks, concurrency, retries with delay
Smoke (deploy) End-to-end "The task ran end-to-end after we shipped"

For the integration layer, spin a real worker in a fixture using pytest-celery or a Docker-compose service that boots Redis + a worker. Run those tests in CI on a separate, slower job — they're worth running on every PR, but they don't belong in the fast feedback loop.


CI Configuration That Doesn't Flake

A few small things, together, kill most async-test flakiness:

  • Pin asyncio_mode = "auto" in pyproject.toml so every async test doesn't need @pytest.mark.asyncio.
  • Set explicit timeouts on every receive_json_from and every .get() on a Celery result. Default-forever is how 4-hour CI hangs happen.
  • Run parallel tests in separate event loops. pytest-xdist works, but Channels tests want --dist loadscope so one worker doesn't see another's channel layer state.
  • Mock time.sleep if your code uses it for backoff. Real sleeps are how 200ms tests turn into 4-minute ones.
  • Use InMemoryChannelLayer in CI for unit/component tests; reserve Redis for the integration job.
# pyproject.toml — sensible defaults
[tool.pytest.ini_options]
DJANGO_SETTINGS_MODULE = "config.settings.test"
asyncio_mode = "auto"
addopts = "-p no:cacheprovider --tb=short --strict-markers"
markers = [
    "celery_integration: real broker required; runs in slow CI job only",
]
# config/settings/test.py — the async-safe minimum
from .base import *  # noqa

CELERY_TASK_ALWAYS_EAGER     = True
CELERY_TASK_EAGER_PROPAGATES = True
CELERY_BROKER_URL            = "memory://"
CELERY_RESULT_BACKEND        = "cache+memory://"

CHANNEL_LAYERS = {
    "default": {"BACKEND": "channels.layers.InMemoryChannelLayer"},
}

# Disable real outbound services in tests
EMAIL_BACKEND = "django.core.mail.backends.locmem.EmailBackend"

Production Checklist

  • Three Celery settings, every time: ALWAYS_EAGER, EAGER_PROPAGATES, BROKER_URL="memory://". Missing the middle one silently swallows exceptions.
  • Test the task body and the call site separately. Eager mode for the body; patch(...delay) for the view that enqueues it.
  • Assert celery.exceptions.Retry directly when testing retry paths. Don't wait for max_retries to exhaust — you're not testing Celery's retry counter, you're testing your retry decision.
  • Use WebsocketCommunicator against the consumer class for unit tests, and against the ASGI application for routing tests. Both. The first is fast, the second is honest about what users actually hit.
  • Set timeouts everywhere. receive_json_from(timeout=2), result.get(timeout=5). Default-forever in CI is a 6-hour pager.
  • Use InMemoryChannelLayer in tests. The Redis layer is for production; the in-memory layer is for tests. Always.
  • Use transactional_db for async tests. The savepoint-based db fixture races against the event loop; the transactional flavour doesn't.
  • Two CI jobs: a fast unit job (eager, in-memory) and a slower integration job (real Redis, real worker, marked tests). Block merges on the first; surface the second.
  • Mock time.sleep in any code path that backs off. Real sleeps kill iteration speed.
  • Test the routing layer once. A single WebsocketCommunicator(application, "/ws/…/") check catches the entire class of "we forgot to register the consumer" bugs.

The pattern that ties all of this together is simple: treat async code as code that happens to run asynchronously, not as a separate world. The task body is a function. The consumer is a state machine. Test each one as itself, then write one or two integration tests that prove they're wired together. That's the difference between a suite that catches the bug at 2am on Tuesday and a suite that catches it at 2am on Tuesday before you ship.

$ ls ./related/
3 POSTS
$ cd ../ · · ↗ RSS feed · ↑ top