Python Django AI Robotics Precision Agriculture

AI-Powered Precision Farming Robots: Autonomous Navigation, Variable-Rate Application, and an LLM Crop Advisor with Python

A fleet of Raspberry Pi robots follows GPS waypoint missions dispatched from Django, detects weeds in real time with a TensorFlow Lite model, controls a variable-rate sprayer over GPIO PWM, and publishes telemetry back over MQTT. Every night, a Celery task sends the accumulated field data to Claude, which returns a structured agronomist report with hotspot coordinates and prioritised action recommendations — no specialist ML infrastructure, no cloud vision quota per plant image.

1. What We're Building

A previous post covered disease detection with Claude Vision — one robot, one camera, one image per scan POSTed to Django. This post goes further: multiple robots, bidirectional communication, decisions made on the robot in real time, and AI operating at the field level rather than the plant level.

The system has three distinct intelligence layers:

  • Edge intelligence (on the robot) — a TensorFlow Lite weed-detection model runs at 6–8 fps on a Raspberry Pi 5 with a Camera Module 3. The robot decides how much herbicide to apply at each GPS position without waiting for a server round trip.
  • Fleet intelligence (Django + MQTT) — the server dispatches waypoint missions, tracks real-time robot positions on a Leaflet map, and persists every application event to PostgreSQL.
  • Field intelligence (Claude LLM) — a nightly Celery Beat task compiles the past seven days of weed density readings and spray events, summarises the field into a compact JSON payload, and asks Claude to produce a structured agronomist report: field health score, hotspot coordinates, and a ranked list of recommended actions with urgency labels.

Everything runs on a single Linux VPS: Django, Redis, a Mosquitto MQTT broker, and two Celery workers (one for general tasks, one dedicated to the AI advisor). No GPU, no proprietary IoT platform, no per-message cloud vision billing.


2. Architecture Overview

Raspberry Pi Robot(s)
  │  Camera Module 3 → TFLite weed classifier (6–8 fps)
  │  GPS receiver → current position (haversine path following)
  │  GPIO PWM → variable-rate sprayer relay
  │
  │  MQTT publish  → farm/robots/{id}/telemetry
  │                  (lat, lon, weed_density, spray_duty, battery)
  │  MQTT subscribe → farm/robots/{id}/missions
  │                  (waypoint list, action commands)
  ▼
Mosquitto MQTT Broker (TLS, port 8883)
  ▼
Django Management Command  ←→  PostgreSQL
  │  Persists ApplicationEvent rows
  │  Updates FarmRobot position + battery
  │  Pushes live update via Channels layer
  ▼
Redis  ←→  Celery workers  ←→  Django Channels
  │              │
  │              └── Nightly: run_crop_advisor task
  │                    Queries 7-day ApplicationEvents
  │                    Calls Claude API → AdvisorReport
  ▼
Browser dashboard
  WebSocket → live fleet map (Leaflet)
  REST API  → mission CRUD, advisor reports

The key design decision is using MQTT for robot communication rather than HTTP polling. Robots can receive mission updates immediately without polling, and telemetry flows continuously at 0.5 Hz without the overhead of opening a new TCP connection for every reading. The same Redis instance serves as both the Celery broker and the Django Channels layer, keeping the infrastructure footprint minimal.


3. Django Models

Create a farm app and define five models:

# farm/models.py
import uuid
from django.db import models


class FarmRobot(models.Model):
    id           = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
    name         = models.CharField(max_length=100)
    api_key      = models.CharField(max_length=64, unique=True, db_index=True)
    current_mission = models.ForeignKey(
        'Mission', null=True, blank=True, on_delete=models.SET_NULL, related_name='+'
    )
    current_lat  = models.DecimalField(max_digits=9, decimal_places=6, null=True, blank=True)
    current_lon  = models.DecimalField(max_digits=9, decimal_places=6, null=True, blank=True)
    battery_pct  = models.FloatField(null=True, blank=True)
    is_active    = models.BooleanField(default=True)
    last_seen    = models.DateTimeField(null=True, blank=True)

    def __str__(self):
        return self.name


class Mission(models.Model):
    STATUS_DRAFT    = 'draft'
    STATUS_QUEUED   = 'queued'
    STATUS_RUNNING  = 'running'
    STATUS_COMPLETE = 'complete'
    STATUS_ABORTED  = 'aborted'
    STATUS_CHOICES  = [
        (STATUS_DRAFT,    'Draft'),
        (STATUS_QUEUED,   'Queued'),
        (STATUS_RUNNING,  'Running'),
        (STATUS_COMPLETE, 'Complete'),
        (STATUS_ABORTED,  'Aborted'),
    ]

    id           = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
    name         = models.CharField(max_length=200)
    robot        = models.ForeignKey(FarmRobot, on_delete=models.CASCADE, related_name='missions')
    status       = models.CharField(max_length=20, choices=STATUS_CHOICES, default=STATUS_DRAFT, db_index=True)
    created_at   = models.DateTimeField(auto_now_add=True)
    started_at   = models.DateTimeField(null=True, blank=True)
    completed_at = models.DateTimeField(null=True, blank=True)
    notes        = models.TextField(blank=True)

    class Meta:
        ordering = ['-created_at']

    def __str__(self):
        return f"{self.name} ({self.get_status_display()})"


class Waypoint(models.Model):
    ACTION_SCAN   = 'scan'
    ACTION_SPRAY  = 'spray'
    ACTION_SAMPLE = 'sample'
    ACTION_CHOICES = [
        (ACTION_SCAN,   'Scan only'),
        (ACTION_SPRAY,  'Spray (variable rate)'),
        (ACTION_SAMPLE, 'Soil sample'),
    ]

    mission     = models.ForeignKey(Mission, on_delete=models.CASCADE, related_name='waypoints')
    sequence    = models.PositiveIntegerField()
    latitude    = models.DecimalField(max_digits=9, decimal_places=6)
    longitude   = models.DecimalField(max_digits=9, decimal_places=6)
    action      = models.CharField(max_length=20, choices=ACTION_CHOICES, default=ACTION_SCAN)
    target_rate = models.FloatField(null=True, blank=True)  # max L/ha for spray waypoints
    reached_at  = models.DateTimeField(null=True, blank=True)

    class Meta:
        ordering = ['sequence']
        unique_together = [('mission', 'sequence')]

    def __str__(self):
        return f"WP{self.sequence} ({self.action}) @ {self.latitude},{self.longitude}"


class ApplicationEvent(models.Model):
    """One row per spray decision made by the robot at a GPS position."""
    robot        = models.ForeignKey(FarmRobot, on_delete=models.CASCADE, related_name='events')
    mission      = models.ForeignKey(Mission, on_delete=models.SET_NULL, null=True, blank=True)
    latitude     = models.DecimalField(max_digits=9, decimal_places=6)
    longitude    = models.DecimalField(max_digits=9, decimal_places=6)
    weed_density = models.FloatField()      # TFLite output, 0.0–1.0
    applied_rate = models.FloatField()      # actual PWM duty cycle, 0–100
    applied_at   = models.DateTimeField(auto_now_add=True)

    class Meta:
        indexes = [models.Index(fields=['robot', 'applied_at'])]


class AdvisorReport(models.Model):
    """LLM-generated agronomist report for a robot's 7-day field data."""
    robot            = models.ForeignKey(FarmRobot, on_delete=models.CASCADE, related_name='reports')
    date_range_start = models.DateField()
    date_range_end   = models.DateField()
    report_markdown  = models.TextField()
    recommendations  = models.JSONField()   # structured list from Claude
    tokens_used      = models.PositiveIntegerField()
    created_at       = models.DateTimeField(auto_now_add=True)

    class Meta:
        ordering = ['-created_at']

    def __str__(self):
        return f"{self.robot.name} report {self.date_range_start}–{self.date_range_end}"

4. Mission Planning API: Create and Dispatch Waypoint Missions

Operators create missions in the Django admin or via the REST API, attach an ordered list of waypoints, then dispatch. Dispatching changes the status to queued and publishes the waypoint list to the robot's MQTT topic.

# farm/serializers.py
from rest_framework import serializers
from .models import Mission, Waypoint


class WaypointSerializer(serializers.ModelSerializer):
    class Meta:
        model  = Waypoint
        fields = ['sequence', 'latitude', 'longitude', 'action', 'target_rate']


class MissionSerializer(serializers.ModelSerializer):
    waypoints = WaypointSerializer(many=True)

    class Meta:
        model  = Mission
        fields = ['id', 'name', 'robot', 'status', 'waypoints', 'notes', 'created_at']
        read_only_fields = ['id', 'status', 'created_at']

    def create(self, validated_data):
        waypoints_data = validated_data.pop('waypoints')
        mission = Mission.objects.create(**validated_data)
        for wp_data in waypoints_data:
            Waypoint.objects.create(mission=mission, **wp_data)
        return mission
# farm/views.py
import json
import paho.mqtt.publish as publish
from django.conf import settings
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated

from .models import Mission, FarmRobot
from .serializers import MissionSerializer


class MissionViewSet(viewsets.ModelViewSet):
    queryset = Mission.objects.select_related('robot').prefetch_related('waypoints')
    serializer_class = MissionSerializer
    permission_classes = [IsAuthenticated]

    @action(detail=True, methods=['post'])
    def dispatch(self, request, pk=None):
        mission = self.get_object()
        if mission.status != Mission.STATUS_DRAFT:
            return Response(
                {'detail': f'Mission is {mission.status}, not draft.'},
                status=status.HTTP_400_BAD_REQUEST,
            )

        waypoints = list(
            mission.waypoints.values('sequence', 'latitude', 'longitude', 'action', 'target_rate')
        )

        payload = json.dumps({
            'mission_id': str(mission.id),
            'waypoints':  [
                {
                    'seq':    wp['sequence'],
                    'lat':    float(wp['latitude']),
                    'lon':    float(wp['longitude']),
                    'action': wp['action'],
                    'target_rate': wp['target_rate'],
                }
                for wp in waypoints
            ],
        })

        topic = f"farm/robots/{mission.robot.id}/missions"
        publish.single(
            topic,
            payload=payload,
            hostname=settings.MQTT_BROKER_HOST,
            port=8883,
            tls=settings.MQTT_TLS_CONTEXT,
            auth={'username': settings.MQTT_USERNAME, 'password': settings.MQTT_PASSWORD},
        )

        mission.status = Mission.STATUS_QUEUED
        mission.robot.current_mission = mission
        mission.save(update_fields=['status'])
        mission.robot.save(update_fields=['current_mission'])

        return Response({'status': 'queued', 'mission_id': str(mission.id)})

paho.mqtt.publish.single is a fire-and-forget helper — it opens a connection, publishes one message, and closes. For high-frequency publishing (like telemetry), keep a persistent client connection; for infrequent mission dispatches, single is clean and stateless.


5. Edge AI: On-Device Weed Detection with TensorFlow Lite

The fundamental advantage of running inference on the robot is latency and cost. Sending every camera frame to a cloud vision API at 6 fps would cost thousands per day per robot and add 300–500 ms of network latency to every spray decision. A MobileNetV2-based TFLite model runs inference in 60–120 ms on a Raspberry Pi 5 with no network dependency at all.

You can fine-tune a MobileNetV2 weed classifier with a dataset like CropDeep or DeepWeeds using TensorFlow and convert to TFLite for deployment. The inference wrapper below is model-agnostic — swap the path and adjust INPUT_SIZE to match your trained model.

# robot/weed_detector.py
import numpy as np

try:
    from tflite_runtime.interpreter import Interpreter
except ImportError:
    from tensorflow.lite.python.interpreter import Interpreter  # fallback for dev

MODEL_PATH   = "/home/pi/models/weed_detector.tflite"
INPUT_SIZE   = (224, 224)   # must match training resolution
WEED_CLASS   = 1            # index of the "weed" class in model output


class WeedDetector:
    def __init__(self, model_path: str = MODEL_PATH):
        self.interpreter = Interpreter(model_path=model_path)
        self.interpreter.allocate_tensors()
        self._input  = self.interpreter.get_input_details()[0]
        self._output = self.interpreter.get_output_details()[0]

    def predict(self, frame: np.ndarray) -> float:
        """
        frame : H×W×3 uint8 array (RGB or BGR — match your training pipeline).
        Returns weed probability 0.0–1.0.
        """
        import cv2
        resized = cv2.resize(frame, INPUT_SIZE)
        # MobileNetV2 expects float32 normalised to [-1, 1]
        inp = ((resized.astype(np.float32) / 127.5) - 1.0)[np.newaxis, ...]

        self.interpreter.set_tensor(self._input['index'], inp)
        self.interpreter.invoke()

        scores = self.interpreter.get_tensor(self._output['index'])[0]
        return float(scores[WEED_CLASS])

Install the lightweight runtime on the Pi — no full TensorFlow required:

pip install tflite-runtime opencv-python-headless

For even faster inference, enable the NNAPI delegate (uses the Pi 5's Neural Processing Unit) by adding experimental_delegates=[load_delegate('libnnapi.so')] to the Interpreter constructor. Latency typically drops 30–50% versus CPU-only inference on compatible operations.


6. Variable-Rate Sprayer Control via GPIO PWM

The sprayer is a 12 V solenoid valve driven by a relay board connected to a Raspberry Pi GPIO pin. PWM duty cycle controls pulse width, which controls flow rate through the valve — higher weed density, higher duty cycle, more herbicide applied per square metre.

# robot/sprayer.py
import RPi.GPIO as GPIO

SPRAY_PIN    = 18    # BCM numbering
PWM_FREQ_HZ  = 50    # 50 Hz works well for solenoid valves; avoid audible freq
MIN_RATE_LHA = 20.0  # minimum application rate — never spray below this
MAX_RATE_LHA = 200.0 # maximum application rate at full weed density
DENSITY_FLOOR = 0.15  # density below this: skip spray entirely


class Sprayer:
    def __init__(self):
        GPIO.setmode(GPIO.BCM)
        GPIO.setup(SPRAY_PIN, GPIO.OUT)
        self._pwm = GPIO.PWM(SPRAY_PIN, PWM_FREQ_HZ)
        self._pwm.start(0)

    def set_rate(self, weed_density: float) -> float:
        """
        Map weed_density [0, 1] → duty cycle [0, 100].
        Returns the duty cycle applied.
        """
        if weed_density < DENSITY_FLOOR:
            self._pwm.ChangeDutyCycle(0)
            return 0.0

        # Linear interpolation between MIN and MAX rate
        rate = MIN_RATE_LHA + (MAX_RATE_LHA - MIN_RATE_LHA) * weed_density
        duty = (rate - MIN_RATE_LHA) / (MAX_RATE_LHA - MIN_RATE_LHA) * 100.0
        duty = max(0.0, min(100.0, duty))
        self._pwm.ChangeDutyCycle(duty)
        return duty

    def off(self):
        self._pwm.ChangeDutyCycle(0)

    def cleanup(self):
        self.off()
        self._pwm.stop()
        GPIO.cleanup()

The DENSITY_FLOOR threshold prevents low-confidence detections from triggering the sprayer — a plant that scores 0.12 is probably healthy; no need to spray. Calibrate MIN_RATE_LHA and MAX_RATE_LHA against your sprayer's flow-rate curve at the operating pressure you're running.

For the path-following logic that decides when to move to the next waypoint, use the haversine formula to measure distance to the target coordinate:

# robot/navigation.py
import math

EARTH_RADIUS_M = 6_371_000
ARRIVAL_THRESHOLD_M = 1.5  # metres — consider waypoint reached


def haversine_metres(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
    """Great-circle distance between two GPS coordinates in metres."""
    phi1, phi2 = math.radians(lat1), math.radians(lat2)
    dphi       = math.radians(lat2 - lat1)
    dlambda    = math.radians(lon2 - lon1)
    a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlambda / 2) ** 2
    return 2 * EARTH_RADIUS_M * math.asin(math.sqrt(a))


def waypoint_reached(current_lat: float, current_lon: float, wp: dict) -> bool:
    dist = haversine_metres(current_lat, current_lon, wp['lat'], wp['lon'])
    return dist <= ARRIVAL_THRESHOLD_M

7. MQTT Telemetry Bridge: Robot ↔ Django

MQTT gives us bidirectional communication with very low overhead. The robot publishes telemetry at 0.5 Hz and subscribes to its own mission topic. Django subscribes to all robot telemetry topics and persists the data.

7a. Robot: Telemetry Publisher and Mission Subscriber

# robot/main.py
import os, json, time, logging
import paho.mqtt.client as mqtt
from picamera2 import Picamera2
from weed_detector import WeedDetector
from sprayer import Sprayer
from navigation import waypoint_reached

try:
    import gpsd; gpsd.connect(); GPS_OK = True
except Exception: GPS_OK = False

logger = logging.getLogger(__name__)

BROKER   = os.environ["MQTT_BROKER"]
ROBOT_ID = os.environ["ROBOT_ID"]
API_KEY  = os.environ["ROBOT_API_KEY"]

TELEMETRY_TOPIC = f"farm/robots/{ROBOT_ID}/telemetry"
MISSION_TOPIC   = f"farm/robots/{ROBOT_ID}/missions"


def get_gps():
    if not GPS_OK:
        return None, None
    try:
        pkt = gpsd.get_current()
        return (pkt.lat, pkt.lon) if pkt.mode >= 2 else (None, None)
    except Exception:
        return None, None


def read_battery_pct():
    # Read from ADC or /sys/class/power_supply — implementation is hardware-specific
    return None


def run():
    state = {
        "waypoints":   [],
        "wp_index":    0,
        "mission_id":  None,
        "running":     True,
    }

    def on_mission(client, userdata, msg):
        payload = json.loads(msg.payload)
        userdata["mission_id"] = payload["mission_id"]
        userdata["waypoints"]  = payload["waypoints"]
        userdata["wp_index"]   = 0
        logger.info("Mission received: %s (%d waypoints)", payload["mission_id"], len(payload["waypoints"]))

    client = mqtt.Client(userdata=state)
    client.username_pw_set(ROBOT_ID, API_KEY)
    client.tls_set()
    client.on_message = on_mission
    client.connect(BROKER, 8883)
    client.subscribe(MISSION_TOPIC)
    client.loop_start()

    detector = WeedDetector()
    sprayer  = Sprayer()
    camera   = Picamera2()
    camera.configure(camera.create_still_configuration(main={"size": (1920, 1080)}))
    camera.start()
    time.sleep(2)

    try:
        while state["running"]:
            lat, lon = get_gps()
            frame    = camera.capture_array()
            density  = detector.predict(frame)

            waypoints = state["waypoints"]
            wp_index  = state["wp_index"]
            duty      = 0.0

            if waypoints and wp_index < len(waypoints):
                current_wp = waypoints[wp_index]
                if current_wp["action"] == "spray":
                    duty = sprayer.set_rate(density)
                if lat and lon and waypoint_reached(lat, lon, current_wp):
                    state["wp_index"] += 1
                    logger.info("Waypoint %d reached", wp_index)
            else:
                sprayer.off()

            payload = json.dumps({
                "robot_id":     ROBOT_ID,
                "mission_id":   state["mission_id"],
                "lat":          lat,
                "lon":          lon,
                "weed_density": round(density, 3),
                "spray_duty":   round(duty, 1),
                "battery_pct":  read_battery_pct(),
                "wp_index":     state["wp_index"],
                "ts":           time.time(),
            })
            client.publish(TELEMETRY_TOPIC, payload, qos=0)
            time.sleep(2)
    finally:
        sprayer.cleanup()
        client.loop_stop()


if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    run()

7b. Django: MQTT Subscriber Management Command

A Django management command runs as a long-lived process (a systemd service on the VPS), subscribing to all robot telemetry topics and persisting data.

# farm/management/commands/mqtt_subscriber.py
import json, logging
import paho.mqtt.client as mqtt
from django.core.management.base import BaseCommand
from django.conf import settings
from django.utils import timezone
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer

from farm.models import FarmRobot, ApplicationEvent

logger = logging.getLogger(__name__)
FLEET_GROUP = "farm_fleet"


class Command(BaseCommand):
    help = "Subscribe to MQTT broker and persist robot telemetry"

    def handle(self, *args, **options):
        client = mqtt.Client()
        client.username_pw_set(settings.MQTT_USERNAME, settings.MQTT_PASSWORD)
        client.tls_set()
        client.on_connect = lambda c, u, f, rc: c.subscribe("farm/robots/+/telemetry", qos=0)
        client.on_message = self._on_message
        client.connect(settings.MQTT_BROKER_HOST, 8883)
        self.stdout.write("MQTT subscriber running...")
        client.loop_forever()

    def _on_message(self, client, userdata, msg):
        try:
            data     = json.loads(msg.payload)
            robot_id = data["robot_id"]
            robot    = FarmRobot.objects.get(id=robot_id, is_active=True)

            density = float(data.get("weed_density", 0))
            duty    = float(data.get("spray_duty", 0))

            if duty > 0:
                ApplicationEvent.objects.create(
                    robot=robot,
                    mission_id=data.get("mission_id"),
                    latitude=data["lat"],
                    longitude=data["lon"],
                    weed_density=density,
                    applied_rate=duty,
                )

            robot.current_lat = data.get("lat")
            robot.current_lon = data.get("lon")
            robot.battery_pct = data.get("battery_pct")
            robot.last_seen   = timezone.now()
            robot.save(update_fields=["current_lat", "current_lon", "battery_pct", "last_seen"])

            channel_layer = get_channel_layer()
            async_to_sync(channel_layer.group_send)(
                FLEET_GROUP,
                {
                    "type":    "fleet.update",
                    "payload": {
                        "robot_id":     str(robot.id),
                        "name":         robot.name,
                        "lat":          data.get("lat"),
                        "lon":          data.get("lon"),
                        "battery_pct":  data.get("battery_pct"),
                        "weed_density": density,
                        "spray_duty":   duty,
                        "wp_index":     data.get("wp_index"),
                    },
                },
            )
        except FarmRobot.DoesNotExist:
            logger.warning("Unknown robot: %s", data.get("robot_id"))
        except Exception:
            logger.exception("Error processing telemetry: %s", msg.payload[:300])

Run the subscriber as a systemd service on the VPS alongside the Django application. It's a simple loop — if it crashes, systemd restarts it and MQTT handles message durability via QoS levels and the broker's persistent session.


8. AI Crop Advisor: Analysing Field Data with Claude

The fundamental insight here is that an LLM is excellent at reasoning over structured summaries — not pixel-level perception. Rather than sending individual plant images for cloud inference, we send a compact JSON summary of seven days of weed density readings and spray events. Claude synthesises patterns across hundreds of GPS points that no human agronomist could easily spot, and returns a machine-parseable report with prioritised recommendations.

A Celery Beat task runs this analysis every night at 02:00 for each active robot:

# farm/tasks.py
import json
import logging
from datetime import date, timedelta

import anthropic
from celery import shared_task
from django.conf import settings

from .models import FarmRobot, ApplicationEvent, AdvisorReport

logger = logging.getLogger(__name__)

ADVISOR_SYSTEM = """You are an expert agronomist and precision agriculture consultant.
You will receive a JSON summary of a robot's 7-day field activity.
Respond with a single JSON object — no markdown fences, no prose outside the JSON:

{
  "field_health_score": 0-100,
  "summary": "2–3 sentence plain-English field health summary",
  "hotspots": [
    {"lat": 0.0, "lon": 0.0, "radius_m": 10, "description": "..."}
  ],
  "recommendations": [
    {
      "action": "spot_spray | re_scan | soil_sample | notify_agronomist | adjust_mission",
      "zone_description": "...",
      "urgency": "low | medium | high | critical",
      "rationale": "..."
    }
  ],
  "report_markdown": "full agronomist report in markdown (headings, bullet points)"
}"""


@shared_task
def run_crop_advisor(robot_id: str):
    try:
        robot = FarmRobot.objects.get(id=robot_id, is_active=True)
    except FarmRobot.DoesNotExist:
        logger.error("run_crop_advisor: robot %s not found", robot_id)
        return

    end   = date.today()
    start = end - timedelta(days=7)

    events = list(
        ApplicationEvent.objects.filter(
            robot=robot,
            applied_at__date__gte=start,
            applied_at__date__lte=end,
        ).values("latitude", "longitude", "weed_density", "applied_rate", "applied_at")
    )

    if not events:
        logger.info("No events for robot %s in date range %s–%s", robot.name, start, end)
        return

    total           = len(events)
    high_density    = [e for e in events if e["weed_density"] > 0.6]
    medium_density  = [e for e in events if 0.3 <= e["weed_density"] <= 0.6]
    avg_density     = sum(e["weed_density"] for e in events) / total
    total_spray_pct = sum(e["applied_rate"] for e in events) / total

    # Send at most 30 hotspot coordinates to keep the prompt lean
    hotspot_sample = [
        {
            "lat":          float(e["latitude"]),
            "lon":          float(e["longitude"]),
            "weed_density": round(float(e["weed_density"]), 3),
            "ts":           e["applied_at"].isoformat(),
        }
        for e in high_density[:30]
    ]

    field_summary = {
        "robot":               robot.name,
        "period":              f"{start} to {end}",
        "total_readings":      total,
        "avg_weed_density":    round(avg_density, 3),
        "high_density_count":  len(high_density),
        "medium_density_count": len(medium_density),
        "avg_spray_duty":      round(total_spray_pct, 1),
        "high_density_hotspots": hotspot_sample,
    }

    client = anthropic.Anthropic(api_key=settings.ANTHROPIC_API_KEY)
    response = client.messages.create(
        model="claude-opus-4-7",
        max_tokens=2048,
        system=ADVISOR_SYSTEM,
        messages=[
            {"role": "user", "content": json.dumps(field_summary, default=str)},
        ],
    )

    raw    = response.content[0].text
    result = json.loads(raw)

    AdvisorReport.objects.create(
        robot=robot,
        date_range_start=start,
        date_range_end=end,
        report_markdown=result.get("report_markdown", ""),
        recommendations=result.get("recommendations", []),
        tokens_used=response.usage.input_tokens + response.usage.output_tokens,
    )
    logger.info(
        "AdvisorReport created for %s: health=%s, recs=%d, tokens=%d",
        robot.name,
        result.get("field_health_score"),
        len(result.get("recommendations", [])),
        response.usage.input_tokens + response.usage.output_tokens,
    )

Schedule the nightly run with Celery Beat:

# settings.py
from celery.schedules import crontab

CELERY_BEAT_SCHEDULE = {
    "nightly-crop-advisor": {
        "task":     "farm.tasks.run_crop_advisor_all_robots",
        "schedule": crontab(hour=2, minute=0),
    },
}

# farm/tasks.py — add a wrapper that fans out to each active robot
@shared_task
def run_crop_advisor_all_robots():
    for robot in FarmRobot.objects.filter(is_active=True):
        run_crop_advisor.delay(str(robot.id))

At typical farm scale (one robot, 500 readings/day, 30 hotspot coordinates), the field_summary payload is under 5 KB. The Claude API call costs around 0.3–0.5p per robot per night. Compare that to cloud vision billing on every camera frame — at 6 fps the cost difference is three to four orders of magnitude.


9. Real-Time Fleet Dashboard with Django Channels

The dashboard connects over WebSocket and receives robot position updates every two seconds from the MQTT subscriber command. Leaflet.js renders a live map with a marker per robot that moves as telemetry arrives.

# farm/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer

FLEET_GROUP = "farm_fleet"


class FleetConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        await self.channel_layer.group_add(FLEET_GROUP, self.channel_name)
        await self.accept()

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(FLEET_GROUP, self.channel_name)

    async def fleet_update(self, event):
        await self.send(text_data=json.dumps(event["payload"]))
# farm/routing.py
from django.urls import path
from .consumers import FleetConsumer

websocket_urlpatterns = [
    path("ws/fleet/", FleetConsumer.as_asgi()),
]

The browser-side Leaflet integration:

# farm/views.py — serve the dashboard page
from django.contrib.auth.decorators import login_required
from django.shortcuts import render
from .models import FarmRobot, AdvisorReport

@login_required
def fleet_dashboard(request):
    robots  = FarmRobot.objects.filter(is_active=True)
    reports = AdvisorReport.objects.select_related('robot').order_by('-created_at')[:5]
    return render(request, 'farm/dashboard.html', {'robots': robots, 'reports': reports})

The JavaScript connects to the WebSocket and updates Leaflet markers in place — the marker moves on the map as the robot moves in the field. New markers are created only once per robot ID; subsequent updates call setLatLng:

# Simplified — see full template in farm/templates/farm/dashboard.html
# JavaScript pseudocode embedded in the template:
#
#   const map     = L.map('map').setView([51.5, -1.5], 15);
#   const markers = {};
#   const ws      = new WebSocket(`wss://${location.host}/ws/fleet/`);
#
#   ws.onmessage = (evt) => {
#       const d = JSON.parse(evt.data);
#       if (!d.lat || !d.lon) return;
#       if (markers[d.robot_id]) {
#           markers[d.robot_id].setLatLng([d.lat, d.lon]);
#       } else {
#           markers[d.robot_id] = L.marker([d.lat, d.lon])
#               .bindPopup(`<b>${d.name}</b><br>Density: ${d.weed_density}`)
#               .addTo(map);
#       }
#   };
#
#   ws.onclose = () => setTimeout(() => location.reload(), 5000);

10. Production Checklist

MQTT security

Enable TLS (port 8883) and per-robot username/password authentication in mosquitto.conf. Generate robot credentials at provisioning time and store them in the robot's .env file. Rotate keys on a schedule or when a robot is decommissioned — treat MQTT credentials the same as API keys.

Offline mission buffering

Fields have intermittent connectivity. The robot should buffer the received mission in a local JSON file so it can resume after a network dropout without waiting for a new dispatch:

# robot/mission_store.py
import json, pathlib

MISSION_FILE = pathlib.Path("/home/pi/scanner/current_mission.json")

def save(payload: dict):
    MISSION_FILE.write_text(json.dumps(payload))

def load() -> dict | None:
    if MISSION_FILE.exists():
        return json.loads(MISSION_FILE.read_text())
    return None

On startup, call mission_store.load() before connecting to MQTT. If a cached mission exists and its mission_id matches the server's current dispatch, resume from the last reported wp_index.

TFLite model updates

Serve the model file from Django storage. On robot startup, compare the local model's MD5 hash against a /api/models/current/ endpoint. If a newer version exists, download it before starting inference. This gives you over-the-air model updates without SSH access to each robot.

ApplicationEvent data volume

A robot running 6 hours/day at 2-second intervals generates ~10,800 rows per day. For a 10-robot fleet that's 100,000+ rows/day. Add a Celery Beat task to archive and purge ApplicationEvent rows older than 90 days to keep query performance stable. If you need long-term spatial analytics, migrate to TimescaleDB and use continuous aggregates to pre-compute per-zone weed density statistics.

Advisor report quality

A few things significantly improve Claude's recommendations:

  • Include crop type in the prompt — "This is a winter wheat field" changes the list of plausible weeds and the recommended herbicide window
  • Include historical reports — pass the previous week's field_health_score and summary as context so Claude can comment on trends, not just the current snapshot
  • Parse and validate the JSON response — use Pydantic to validate the structured output before writing to the database; reject and retry if the model returns malformed JSON or an invalid urgency value
# farm/schemas.py — validate advisor output with Pydantic
from typing import Literal
from pydantic import BaseModel, Field


class Hotspot(BaseModel):
    lat:         float
    lon:         float
    radius_m:    float = 10.0
    description: str


class Recommendation(BaseModel):
    action:           Literal['spot_spray', 're_scan', 'soil_sample', 'notify_agronomist', 'adjust_mission']
    zone_description: str
    urgency:          Literal['low', 'medium', 'high', 'critical']
    rationale:        str


class AdvisorOutput(BaseModel):
    field_health_score: int = Field(ge=0, le=100)
    summary:            str
    hotspots:           list[Hotspot]
    recommendations:    list[Recommendation]
    report_markdown:    str

In run_crop_advisor, replace result = json.loads(raw) with:

from .schemas import AdvisorOutput

result = AdvisorOutput.model_validate_json(raw)
# result.recommendations is now a validated list of Recommendation objects

Robot health monitoring

Add a Celery Beat task that runs every 5 minutes and alerts if any active robot's last_seen is more than 10 minutes old during a scheduled mission window. A silent robot during an active mission usually means a crash, a stuck process, or a dead battery — worth catching immediately rather than discovering at end of day.