Python Django AI Robotics

Robotic Crop Monitoring with Python: A Django Pipeline for Computer Vision Disease Detection

A small wheeled robot rolls through crop rows, captures plant images every few metres, and POSTs them to a Django API. Celery picks up each image, sends it to the Claude Vision API for disease analysis, and Django Channels pushes a real-time alert to the farm dashboard the moment something is wrong. No specialist ML infrastructure needed — just Python, Django, and an API key.

1. What We're Building

Crop disease can wipe out 30–50% of a harvest if it goes undetected for even a few days. Traditional scouting — a human walking rows with a clipboard — covers maybe 5 hectares per hour. An autonomous ground robot can cover the same area in 20 minutes, capturing high-resolution images at every plant without fatigue or missed rows.

This post builds the software side of that pipeline end to end:

  • Robot client — a Python script running on a Raspberry Pi 5 that captures images with picamera2, attaches GPS coordinates, and POSTs each scan to the server
  • Django REST API — receives scans, stores images in S3-compatible storage, queues analysis jobs
  • Celery worker — picks up queued jobs, encodes images, calls the Claude Vision API, parses the structured response, and creates DiseaseAlert records
  • Django Channels — WebSocket consumer that pushes new alerts to any connected dashboard client in real time
  • Farm dashboard — a minimal HTML/JS page showing a live alert feed and field zone status

Everything runs on a single Linux VPS for a small farm operation. Add Redis for Celery and Channels, and you're done. No GPU, no CUDA, no custom model training required.

2. Architecture Overview

The data flow through the system:

Raspberry Pi robot
  │  picamera2 captures image
  │  GPS coordinates attached
  │  POST /api/scans/  (multipart, API-key auth)
  ▼
Django REST API
  │  Validates request, stores image (S3/local)
  │  Creates CropScan record (status=pending)
  │  Enqueues analyse_crop_scan.delay(scan_id)
  ▼
Redis (Celery broker + Channels layer)
  ▼
Celery worker
  │  Loads image, base64-encodes it
  │  Calls Claude Vision API with structured prompt
  │  Parses JSON response → DiseaseAlert if disease found
  │  Updates CropScan.status = "complete"
  │  Sends WebSocket message via channel layer
  ▼
Django Channels WebSocket
  │  Broadcasts alert to dashboard group
  ▼
Browser dashboard
     Receives alert, updates UI in real time

3. Django Models: Robot, Zone, CropScan, DiseaseAlert

Create a new app farm and define four models:

# farm/models.py
import uuid
from django.db import models


class FarmRobot(models.Model):
    id          = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
    name        = models.CharField(max_length=100)
    api_key     = models.CharField(max_length=64, unique=True, db_index=True)
    is_active   = models.BooleanField(default=True)
    last_seen   = models.DateTimeField(null=True, blank=True)
    created_at  = models.DateTimeField(auto_now_add=True)

    def __str__(self):
        return self.name


class FieldZone(models.Model):
    """A named section of the farm (e.g. "Block A - Row 1-20")."""
    name        = models.CharField(max_length=100)
    crop_type   = models.CharField(max_length=100)
    area_ha     = models.DecimalField(max_digits=8, decimal_places=2, null=True, blank=True)
    notes       = models.TextField(blank=True)

    def __str__(self):
        return self.name


class CropScan(models.Model):
    STATUS_PENDING  = "pending"
    STATUS_RUNNING  = "running"
    STATUS_COMPLETE = "complete"
    STATUS_FAILED   = "failed"
    STATUS_CHOICES  = [
        (STATUS_PENDING,  "Pending"),
        (STATUS_RUNNING,  "Running"),
        (STATUS_COMPLETE, "Complete"),
        (STATUS_FAILED,   "Failed"),
    ]

    id          = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
    robot       = models.ForeignKey(FarmRobot, on_delete=models.CASCADE, related_name="scans")
    zone        = models.ForeignKey(FieldZone, on_delete=models.SET_NULL, null=True, blank=True, related_name="scans")
    image       = models.ImageField(upload_to="scans/%Y/%m/%d/")
    latitude    = models.DecimalField(max_digits=9, decimal_places=6, null=True, blank=True)
    longitude   = models.DecimalField(max_digits=9, decimal_places=6, null=True, blank=True)
    status      = models.CharField(max_length=20, choices=STATUS_CHOICES, default=STATUS_PENDING, db_index=True)
    scanned_at  = models.DateTimeField(auto_now_add=True)
    analysed_at = models.DateTimeField(null=True, blank=True)
    raw_analysis = models.TextField(blank=True)

    class Meta:
        ordering = ["-scanned_at"]

    def __str__(self):
        return f"{self.robot.name} scan @ {self.scanned_at:%Y-%m-%d %H:%M}"


class DiseaseAlert(models.Model):
    SEVERITY_LOW    = "low"
    SEVERITY_MEDIUM = "medium"
    SEVERITY_HIGH   = "high"
    SEVERITY_CHOICES = [
        (SEVERITY_LOW,    "Low"),
        (SEVERITY_MEDIUM, "Medium"),
        (SEVERITY_HIGH,   "High"),
    ]

    scan            = models.OneToOneField(CropScan, on_delete=models.CASCADE, related_name="alert")
    disease_name    = models.CharField(max_length=200)
    severity        = models.CharField(max_length=10, choices=SEVERITY_CHOICES, db_index=True)
    confidence      = models.FloatField()          # 0.0 – 1.0
    affected_area_pct = models.FloatField(null=True, blank=True)
    recommended_action = models.TextField()
    created_at      = models.DateTimeField(auto_now_add=True)

    class Meta:
        ordering = ["-created_at"]

    def __str__(self):
        return f"{self.disease_name} ({self.severity}) — {self.scan}"

Run migrations and register with admin. The api_key field on FarmRobot is used for lightweight robot authentication — no session cookies needed on a headless device.

# farm/admin.py
from django.contrib import admin
from .models import FarmRobot, FieldZone, CropScan, DiseaseAlert

@admin.register(FarmRobot)
class FarmRobotAdmin(admin.ModelAdmin):
    list_display = ["name", "is_active", "last_seen"]
    readonly_fields = ["last_seen"]

@admin.register(CropScan)
class CropScanAdmin(admin.ModelAdmin):
    list_display = ["robot", "zone", "status", "scanned_at"]
    list_filter  = ["status", "robot", "zone"]

@admin.register(DiseaseAlert)
class DiseaseAlertAdmin(admin.ModelAdmin):
    list_display = ["disease_name", "severity", "confidence", "created_at"]
    list_filter  = ["severity"]

admin.site.register(FieldZone)

4. REST API: Receiving Scans from the Robot

The robot POSTs a multipart form with the image, GPS coordinates, and zone ID. We authenticate via a static API key in the X-Robot-Key header — simple enough for an embedded device, auditable enough for a small farm.

# farm/serializers.py
from rest_framework import serializers
from .models import CropScan

class CropScanCreateSerializer(serializers.ModelSerializer):
    class Meta:
        model  = CropScan
        fields = ["zone", "image", "latitude", "longitude"]
# farm/views.py
from django.utils import timezone
from rest_framework.views import APIView
from rest_framework.response import Response
from rest_framework import status

from .models import FarmRobot, CropScan
from .serializers import CropScanCreateSerializer
from .tasks import analyse_crop_scan


class RobotScanUploadView(APIView):
    """
    POST /api/scans/
    Header: X-Robot-Key: <api_key>
    Body:   multipart/form-data  (image, zone, latitude, longitude)
    """

    def _authenticate_robot(self, request):
        key = request.headers.get("X-Robot-Key", "")
        try:
            robot = FarmRobot.objects.get(api_key=key, is_active=True)
            robot.last_seen = timezone.now()
            robot.save(update_fields=["last_seen"])
            return robot
        except FarmRobot.DoesNotExist:
            return None

    def post(self, request):
        robot = self._authenticate_robot(request)
        if not robot:
            return Response({"detail": "Invalid or inactive robot key."}, status=status.HTTP_401_UNAUTHORIZED)

        serializer = CropScanCreateSerializer(data=request.data)
        if not serializer.is_valid():
            return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)

        scan = serializer.save(robot=robot)
        analyse_crop_scan.delay(str(scan.id))

        return Response({"scan_id": str(scan.id), "status": "queued"}, status=status.HTTP_202_ACCEPTED)
# farm/urls.py
from django.urls import path
from .views import RobotScanUploadView

urlpatterns = [
    path("scans/", RobotScanUploadView.as_view(), name="scan-upload"),
]
# conf/urls.py  (add to project urlconf)
path("api/", include("farm.urls")),

The view returns 202 Accepted immediately. The robot doesn't wait for analysis to complete — it moves on to the next plant position while Celery does the heavy lifting in the background.

5. Celery Task: Analysing Images with Claude Vision

The task encodes the stored image as base64, sends it to Claude's Vision API with a structured prompt requesting a JSON response, then parses the result into a DiseaseAlert record.

# requirements additions
anthropic>=0.40.0
celery>=5.3
redis>=5.0
pillow>=10.0
# settings.py additions
import anthropic

ANTHROPIC_API_KEY  = env("ANTHROPIC_API_KEY")
FARM_VISION_MODEL  = "claude-opus-4-7"    # vision-capable model
CELERY_BROKER_URL  = env("REDIS_URL", default="redis://localhost:6379/0")
CELERY_RESULT_BACKEND = CELERY_BROKER_URL
# farm/tasks.py
import base64
import json
import logging
from datetime import datetime, timezone

import anthropic
from celery import shared_task
from django.conf import settings

from .models import CropScan, DiseaseAlert

logger = logging.getLogger(__name__)

ANALYSIS_PROMPT = """
You are an expert agricultural pathologist. Analyse the crop photograph and respond
with a single JSON object — no markdown, no prose — using exactly this schema:

{
  "disease_detected": true | false,
  "disease_name": "string or null",
  "severity": "low" | "medium" | "high" | null,
  "confidence": 0.0–1.0,
  "affected_area_pct": 0–100 or null,
  "symptoms": ["list", "of", "observed", "symptoms"],
  "recommended_action": "string or null"
}

If no disease is detected set disease_detected to false and all other fields to null
except symptoms (empty array) and confidence (your confidence that the plant is healthy).
""".strip()


@shared_task(bind=True, max_retries=3, default_retry_delay=30)
def analyse_crop_scan(self, scan_id: str):
    try:
        scan = CropScan.objects.get(id=scan_id)
    except CropScan.DoesNotExist:
        logger.error("analyse_crop_scan: scan %s not found", scan_id)
        return

    scan.status = CropScan.STATUS_RUNNING
    scan.save(update_fields=["status"])

    try:
        # Read image from storage and base64-encode it
        with scan.image.open("rb") as f:
            image_data = base64.standard_b64encode(f.read()).decode("utf-8")

        # Detect media type from file extension
        ext = scan.image.name.rsplit(".", 1)[-1].lower()
        media_type_map = {"jpg": "image/jpeg", "jpeg": "image/jpeg", "png": "image/png", "webp": "image/webp"}
        media_type = media_type_map.get(ext, "image/jpeg")

        client = anthropic.Anthropic(api_key=settings.ANTHROPIC_API_KEY)
        message = client.messages.create(
            model=settings.FARM_VISION_MODEL,
            max_tokens=512,
            messages=[
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "image",
                            "source": {
                                "type": "base64",
                                "media_type": media_type,
                                "data": image_data,
                            },
                        },
                        {"type": "text", "text": ANALYSIS_PROMPT},
                    ],
                }
            ],
        )

        raw = message.content[0].text
        scan.raw_analysis = raw
        result = json.loads(raw)

        if result.get("disease_detected"):
            DiseaseAlert.objects.update_or_create(
                scan=scan,
                defaults={
                    "disease_name":       result.get("disease_name", "Unknown"),
                    "severity":           result.get("severity", DiseaseAlert.SEVERITY_MEDIUM),
                    "confidence":         float(result.get("confidence", 0.0)),
                    "affected_area_pct":  result.get("affected_area_pct"),
                    "recommended_action": result.get("recommended_action", ""),
                },
            )
            _push_alert_to_dashboard(scan)

        scan.status = CropScan.STATUS_COMPLETE
        scan.analysed_at = datetime.now(tz=timezone.utc)
        scan.save(update_fields=["status", "analysed_at", "raw_analysis"])

    except Exception as exc:
        logger.exception("analyse_crop_scan failed for scan %s", scan_id)
        scan.status = CropScan.STATUS_FAILED
        scan.save(update_fields=["status"])
        raise self.retry(exc=exc)

The prompt asks Claude to return pure JSON only — no markdown fences, no prose. This is reliable with Claude models because you can be very explicit about output format. If you need extra resilience, wrap the json.loads() in a fallback that strips triple-backtick fences before parsing.

update_or_create on DiseaseAlert means re-running analysis (e.g. after a model upgrade) safely overwrites the old result rather than creating a duplicate.

6. Django Channels: Real-Time Dashboard Alerts

Rather than making the dashboard poll for new alerts, we push them the moment Celery creates a DiseaseAlert. Django Channels gives us a WebSocket consumer backed by the same Redis instance already running for Celery.

# requirements additions
channels>=4.0
channels-redis>=4.2
# settings.py additions
INSTALLED_APPS += ["channels"]

ASGI_APPLICATION = "conf.asgi.application"

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {"hosts": [env("REDIS_URL", default="redis://localhost:6379/0")]},
    }
}
# conf/asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from farm.routing import websocket_urlpatterns

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "conf.settings.production")

application = ProtocolTypeRouter({
    "http":      get_asgi_application(),
    "websocket": AuthMiddlewareStack(URLRouter(websocket_urlpatterns)),
})
# farm/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer

DASHBOARD_GROUP = "farm_dashboard"

class DashboardConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        await self.channel_layer.group_add(DASHBOARD_GROUP, self.channel_name)
        await self.accept()

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(DASHBOARD_GROUP, self.channel_name)

    # Receive message sent by channel layer (from Celery task)
    async def farm_alert(self, event):
        await self.send(text_data=json.dumps(event["payload"]))
# farm/routing.py
from django.urls import path
from .consumers import DashboardConsumer

websocket_urlpatterns = [
    path("ws/dashboard/", DashboardConsumer.as_asgi()),
]

Now wire the Celery task to push to the channel layer when an alert is created:

# farm/tasks.py  (add this helper, called from analyse_crop_scan above)
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from .consumers import DASHBOARD_GROUP

def _push_alert_to_dashboard(scan):
    alert = scan.alert  # OneToOne reverse accessor
    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(
        DASHBOARD_GROUP,
        {
            "type": "farm.alert",   # maps to DashboardConsumer.farm_alert()
            "payload": {
                "scan_id":          str(scan.id),
                "robot":            scan.robot.name,
                "zone":             scan.zone.name if scan.zone else None,
                "latitude":         float(scan.latitude)  if scan.latitude  else None,
                "longitude":        float(scan.longitude) if scan.longitude else None,
                "disease_name":     alert.disease_name,
                "severity":         alert.severity,
                "confidence":       round(alert.confidence, 2),
                "affected_area_pct": alert.affected_area_pct,
                "recommended_action": alert.recommended_action,
                "timestamp":        alert.created_at.isoformat(),
            },
        },
    )

async_to_sync from asgiref lets the synchronous Celery task call the async channel layer. No extra threads, no event loop management — asgiref handles it transparently.

7. Robot-Side Python: Capture and Upload

The robot runs a Python script on a Raspberry Pi 5 with a Camera Module 3. It captures an image, reads GPS from a connected USB receiver, and uploads the scan. The loop runs continuously while the robot is in the field.

# robot/scanner.py  (runs on Raspberry Pi)
import os
import io
import time
import logging
import requests
from picamera2 import Picamera2

# Optional: pip install gpsd-py3
try:
    import gpsd
    gpsd.connect()
    GPS_AVAILABLE = True
except Exception:
    GPS_AVAILABLE = False

logger = logging.getLogger(__name__)

SERVER_URL  = os.environ["FARM_SERVER_URL"]    # e.g. https://farm.example.com/api/scans/
ROBOT_KEY   = os.environ["ROBOT_API_KEY"]
ZONE_ID     = os.environ.get("ZONE_ID", "")    # set per mission
SCAN_INTERVAL_S = float(os.environ.get("SCAN_INTERVAL_S", "8"))


def get_gps():
    if not GPS_AVAILABLE:
        return None, None
    try:
        packet = gpsd.get_current()
        if packet.mode >= 2:
            return packet.lat, packet.lon
    except Exception:
        pass
    return None, None


def capture_image(camera: Picamera2) -> bytes:
    buf = io.BytesIO()
    camera.capture_file(buf, format="jpeg")
    buf.seek(0)
    return buf.read()


def upload_scan(image_bytes: bytes, lat, lon) -> bool:
    files = {"image": ("scan.jpg", image_bytes, "image/jpeg")}
    data  = {"zone": ZONE_ID}
    if lat is not None:
        data["latitude"]  = str(lat)
        data["longitude"] = str(lon)

    try:
        resp = requests.post(
            SERVER_URL,
            files=files,
            data=data,
            headers={"X-Robot-Key": ROBOT_KEY},
            timeout=15,
        )
        resp.raise_for_status()
        logger.info("Scan uploaded: %s", resp.json().get("scan_id"))
        return True
    except requests.RequestException as exc:
        logger.warning("Upload failed: %s", exc)
        return False


def run():
    camera = Picamera2()
    config = camera.create_still_configuration(main={"size": (2304, 1728)})
    camera.configure(config)
    camera.start()
    time.sleep(2)  # warm-up

    logger.info("Scanner started. Interval: %ss", SCAN_INTERVAL_S)

    while True:
        lat, lon = get_gps()
        image_bytes = capture_image(camera)
        upload_scan(image_bytes, lat, lon)
        time.sleep(SCAN_INTERVAL_S)


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    run()

Run the scanner as a systemd service so it starts automatically when the robot powers on and restarts on failure:

# /etc/systemd/system/crop-scanner.service
[Unit]
Description=Crop Scanner
After=network-online.target
Wants=network-online.target

[Service]
User=pi
WorkingDirectory=/home/pi/scanner
EnvironmentFile=/home/pi/scanner/.env
ExecStart=/home/pi/scanner/.venv/bin/python scanner.py
Restart=on-failure
RestartSec=10

[Install]
WantedBy=multi-user.target
sudo systemctl enable --now crop-scanner

8. Dashboard: Live Map and Alert Feed

A minimal page connects to the WebSocket and renders incoming alerts into a feed. In a real deployment you'd overlay alerts on a field map (Leaflet.js works well), but the core WebSocket plumbing is the same.

<!-- farm/templates/farm/dashboard.html -->
<div id="alert-feed"></div>

<script>
const ws = new WebSocket(
  (location.protocol === "https:" ? "wss://" : "ws://") + location.host + "/ws/dashboard/"
);

ws.onmessage = function (event) {
  const alert = JSON.parse(event.data);
  const feed  = document.getElementById("alert-feed");

  const card = document.createElement("div");
  card.className = "alert-card severity-" + alert.severity;
  card.innerHTML = `
    <strong>${alert.disease_name}</strong>
    <span class="severity-badge">${alert.severity}</span>
    <div class="alert-meta">
      ${alert.zone ?? "Unknown zone"} · ${alert.robot}
      · ${Math.round(alert.confidence * 100)}% confidence
    </div>
    <div class="alert-action">${alert.recommended_action}</div>
    <time>${new Date(alert.timestamp).toLocaleTimeString()}</time>
  `;

  feed.prepend(card);
};

ws.onclose = function () {
  console.warn("Dashboard WebSocket closed — reconnecting in 5s");
  setTimeout(() => location.reload(), 5000);
};
</script>

Add a Django view to serve the template and protect it with @login_required so only authenticated farm staff see it. The WebSocket handshake goes through AuthMiddlewareStack, so unauthenticated connections are rejected at the ASGI layer.

9. Production Checklist

Robot authentication

The static API key approach is fine for a closed farm network. If robots connect over the internet, rotate keys regularly and consider adding HMAC request signing — the robot hashes the request body + timestamp with a shared secret, and the Django view verifies it before trusting the payload.

Storage

Use django-storages with an S3-compatible backend (AWS S3, Cloudflare R2, MinIO) for production image storage. A robot scanning at 8-second intervals across 50 hectares generates roughly 1,800 images per hour at ~2 MB each — that's 3.5 GB/hr. Set a lifecycle policy to move images older than 30 days to cold storage.

Celery queue priority

Route high-severity alerts to a dedicated fast queue so they aren't stuck behind a backlog of routine scans:

# farm/tasks.py
@shared_task(bind=True, queue="farm_priority", max_retries=3)
def analyse_crop_scan(self, scan_id: str):
    ...

# celery worker startup
# celery -A conf worker -Q farm_priority,celery --concurrency=4

Image pre-screening on the robot

Running a tiny on-device model (e.g. TensorFlow Lite on a Coral USB Accelerator) to classify images as "likely healthy" / "possible disease" before uploading cuts API calls by 80–90%. Only borderline or flagged images get sent to Claude Vision for detailed analysis.

Rate limiting the API endpoint

Add DRF throttling on the scan upload endpoint to prevent a malfunctioning robot from flooding the queue:

# settings.py
REST_FRAMEWORK = {
    "DEFAULT_THROTTLE_CLASSES": ["rest_framework.throttling.AnonRateThrottle"],
    "DEFAULT_THROTTLE_RATES": {"anon": "200/hour"},
}

Offline buffering on the robot

Fields have poor connectivity. Buffer failed uploads to a local SQLite database on the Pi and retry in the background:

# robot/buffer.py  (sketch)
import sqlite3, json, pathlib

DB = pathlib.Path("/home/pi/scanner/buffer.db")

def init():
    with sqlite3.connect(DB) as conn:
        conn.execute("""
            CREATE TABLE IF NOT EXISTS pending (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                image_path TEXT,
                meta TEXT,
                created_at TEXT DEFAULT (datetime('now'))
            )
        """)

def push(image_path: str, meta: dict):
    with sqlite3.connect(DB) as conn:
        conn.execute("INSERT INTO pending (image_path, meta) VALUES (?,?)",
                     (image_path, json.dumps(meta)))

def flush(upload_fn):
    with sqlite3.connect(DB) as conn:
        rows = conn.execute("SELECT id, image_path, meta FROM pending ORDER BY id LIMIT 20").fetchall()
        for row_id, image_path, meta in rows:
            if upload_fn(image_path, json.loads(meta)):
                conn.execute("DELETE FROM pending WHERE id=?", (row_id,))

Scheduled cleanup

Add a Celery beat task to purge old CropScan records and orphaned images weekly to keep the database and storage bucket lean. Keep DiseaseAlert records indefinitely — they're your historical disease map and agronomist audit trail.

Monitoring

Track these metrics with Flower (Celery) or Prometheus:

  • Scan queue depth — spikes mean the Celery worker is behind or has crashed
  • Vision API latency — Claude Vision calls should complete in under 4s for a single image; outliers indicate network issues
  • Disease detection rate — a sudden spike (or a suspicious zero) during a known disease season warrants investigation
  • Robot last-seen — alert if any FarmRobot.last_seen is more than 15 minutes old during an active scanning mission