AI-Powered Precision Farming Robots: Autonomous Navigation, Variable-Rate Application, and an LLM Crop Advisor with Python
A fleet of Raspberry Pi robots follows GPS waypoint missions dispatched from Django, detects weeds in real time with a TensorFlow Lite model, controls a variable-rate sprayer over GPIO PWM, and publishes telemetry back over MQTT. Every night, a Celery task sends the accumulated field data to Claude, which returns a structured agronomist report with hotspot coordinates and prioritised action recommendations — no specialist ML infrastructure, no cloud vision quota per plant image.
1. What We're Building
A previous post covered disease detection with Claude Vision — one robot, one camera, one image per scan POSTed to Django. This post goes further: multiple robots, bidirectional communication, decisions made on the robot in real time, and AI operating at the field level rather than the plant level.
The system has three distinct intelligence layers:
- Edge intelligence (on the robot) — a TensorFlow Lite weed-detection model runs at 6–8 fps on a Raspberry Pi 5 with a Camera Module 3. The robot decides how much herbicide to apply at each GPS position without waiting for a server round trip.
- Fleet intelligence (Django + MQTT) — the server dispatches waypoint missions, tracks real-time robot positions on a Leaflet map, and persists every application event to PostgreSQL.
- Field intelligence (Claude LLM) — a nightly Celery Beat task compiles the past seven days of weed density readings and spray events, summarises the field into a compact JSON payload, and asks Claude to produce a structured agronomist report: field health score, hotspot coordinates, and a ranked list of recommended actions with urgency labels.
Everything runs on a single Linux VPS: Django, Redis, a Mosquitto MQTT broker, and two Celery workers (one for general tasks, one dedicated to the AI advisor). No GPU, no proprietary IoT platform, no per-message cloud vision billing.
2. Architecture Overview
Raspberry Pi Robot(s)
│ Camera Module 3 → TFLite weed classifier (6–8 fps)
│ GPS receiver → current position (haversine path following)
│ GPIO PWM → variable-rate sprayer relay
│
│ MQTT publish → farm/robots/{id}/telemetry
│ (lat, lon, weed_density, spray_duty, battery)
│ MQTT subscribe → farm/robots/{id}/missions
│ (waypoint list, action commands)
▼
Mosquitto MQTT Broker (TLS, port 8883)
▼
Django Management Command ←→ PostgreSQL
│ Persists ApplicationEvent rows
│ Updates FarmRobot position + battery
│ Pushes live update via Channels layer
▼
Redis ←→ Celery workers ←→ Django Channels
│ │
│ └── Nightly: run_crop_advisor task
│ Queries 7-day ApplicationEvents
│ Calls Claude API → AdvisorReport
▼
Browser dashboard
WebSocket → live fleet map (Leaflet)
REST API → mission CRUD, advisor reports
The key design decision is using MQTT for robot communication rather than HTTP polling. Robots can receive mission updates immediately without polling, and telemetry flows continuously at 0.5 Hz without the overhead of opening a new TCP connection for every reading. The same Redis instance serves as both the Celery broker and the Django Channels layer, keeping the infrastructure footprint minimal.
3. Django Models
Create a farm app and define five models:
# farm/models.py
import uuid
from django.db import models
class FarmRobot(models.Model):
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=100)
api_key = models.CharField(max_length=64, unique=True, db_index=True)
current_mission = models.ForeignKey(
'Mission', null=True, blank=True, on_delete=models.SET_NULL, related_name='+'
)
current_lat = models.DecimalField(max_digits=9, decimal_places=6, null=True, blank=True)
current_lon = models.DecimalField(max_digits=9, decimal_places=6, null=True, blank=True)
battery_pct = models.FloatField(null=True, blank=True)
is_active = models.BooleanField(default=True)
last_seen = models.DateTimeField(null=True, blank=True)
def __str__(self):
return self.name
class Mission(models.Model):
STATUS_DRAFT = 'draft'
STATUS_QUEUED = 'queued'
STATUS_RUNNING = 'running'
STATUS_COMPLETE = 'complete'
STATUS_ABORTED = 'aborted'
STATUS_CHOICES = [
(STATUS_DRAFT, 'Draft'),
(STATUS_QUEUED, 'Queued'),
(STATUS_RUNNING, 'Running'),
(STATUS_COMPLETE, 'Complete'),
(STATUS_ABORTED, 'Aborted'),
]
id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False)
name = models.CharField(max_length=200)
robot = models.ForeignKey(FarmRobot, on_delete=models.CASCADE, related_name='missions')
status = models.CharField(max_length=20, choices=STATUS_CHOICES, default=STATUS_DRAFT, db_index=True)
created_at = models.DateTimeField(auto_now_add=True)
started_at = models.DateTimeField(null=True, blank=True)
completed_at = models.DateTimeField(null=True, blank=True)
notes = models.TextField(blank=True)
class Meta:
ordering = ['-created_at']
def __str__(self):
return f"{self.name} ({self.get_status_display()})"
class Waypoint(models.Model):
ACTION_SCAN = 'scan'
ACTION_SPRAY = 'spray'
ACTION_SAMPLE = 'sample'
ACTION_CHOICES = [
(ACTION_SCAN, 'Scan only'),
(ACTION_SPRAY, 'Spray (variable rate)'),
(ACTION_SAMPLE, 'Soil sample'),
]
mission = models.ForeignKey(Mission, on_delete=models.CASCADE, related_name='waypoints')
sequence = models.PositiveIntegerField()
latitude = models.DecimalField(max_digits=9, decimal_places=6)
longitude = models.DecimalField(max_digits=9, decimal_places=6)
action = models.CharField(max_length=20, choices=ACTION_CHOICES, default=ACTION_SCAN)
target_rate = models.FloatField(null=True, blank=True) # max L/ha for spray waypoints
reached_at = models.DateTimeField(null=True, blank=True)
class Meta:
ordering = ['sequence']
unique_together = [('mission', 'sequence')]
def __str__(self):
return f"WP{self.sequence} ({self.action}) @ {self.latitude},{self.longitude}"
class ApplicationEvent(models.Model):
"""One row per spray decision made by the robot at a GPS position."""
robot = models.ForeignKey(FarmRobot, on_delete=models.CASCADE, related_name='events')
mission = models.ForeignKey(Mission, on_delete=models.SET_NULL, null=True, blank=True)
latitude = models.DecimalField(max_digits=9, decimal_places=6)
longitude = models.DecimalField(max_digits=9, decimal_places=6)
weed_density = models.FloatField() # TFLite output, 0.0–1.0
applied_rate = models.FloatField() # actual PWM duty cycle, 0–100
applied_at = models.DateTimeField(auto_now_add=True)
class Meta:
indexes = [models.Index(fields=['robot', 'applied_at'])]
class AdvisorReport(models.Model):
"""LLM-generated agronomist report for a robot's 7-day field data."""
robot = models.ForeignKey(FarmRobot, on_delete=models.CASCADE, related_name='reports')
date_range_start = models.DateField()
date_range_end = models.DateField()
report_markdown = models.TextField()
recommendations = models.JSONField() # structured list from Claude
tokens_used = models.PositiveIntegerField()
created_at = models.DateTimeField(auto_now_add=True)
class Meta:
ordering = ['-created_at']
def __str__(self):
return f"{self.robot.name} report {self.date_range_start}–{self.date_range_end}"
4. Mission Planning API: Create and Dispatch Waypoint Missions
Operators create missions in the Django admin or via the REST API, attach an ordered
list of waypoints, then dispatch. Dispatching changes the status to
queued and publishes the waypoint list to the robot's MQTT topic.
# farm/serializers.py
from rest_framework import serializers
from .models import Mission, Waypoint
class WaypointSerializer(serializers.ModelSerializer):
class Meta:
model = Waypoint
fields = ['sequence', 'latitude', 'longitude', 'action', 'target_rate']
class MissionSerializer(serializers.ModelSerializer):
waypoints = WaypointSerializer(many=True)
class Meta:
model = Mission
fields = ['id', 'name', 'robot', 'status', 'waypoints', 'notes', 'created_at']
read_only_fields = ['id', 'status', 'created_at']
def create(self, validated_data):
waypoints_data = validated_data.pop('waypoints')
mission = Mission.objects.create(**validated_data)
for wp_data in waypoints_data:
Waypoint.objects.create(mission=mission, **wp_data)
return mission
# farm/views.py
import json
import paho.mqtt.publish as publish
from django.conf import settings
from rest_framework import viewsets, status
from rest_framework.decorators import action
from rest_framework.response import Response
from rest_framework.permissions import IsAuthenticated
from .models import Mission, FarmRobot
from .serializers import MissionSerializer
class MissionViewSet(viewsets.ModelViewSet):
queryset = Mission.objects.select_related('robot').prefetch_related('waypoints')
serializer_class = MissionSerializer
permission_classes = [IsAuthenticated]
@action(detail=True, methods=['post'])
def dispatch(self, request, pk=None):
mission = self.get_object()
if mission.status != Mission.STATUS_DRAFT:
return Response(
{'detail': f'Mission is {mission.status}, not draft.'},
status=status.HTTP_400_BAD_REQUEST,
)
waypoints = list(
mission.waypoints.values('sequence', 'latitude', 'longitude', 'action', 'target_rate')
)
payload = json.dumps({
'mission_id': str(mission.id),
'waypoints': [
{
'seq': wp['sequence'],
'lat': float(wp['latitude']),
'lon': float(wp['longitude']),
'action': wp['action'],
'target_rate': wp['target_rate'],
}
for wp in waypoints
],
})
topic = f"farm/robots/{mission.robot.id}/missions"
publish.single(
topic,
payload=payload,
hostname=settings.MQTT_BROKER_HOST,
port=8883,
tls=settings.MQTT_TLS_CONTEXT,
auth={'username': settings.MQTT_USERNAME, 'password': settings.MQTT_PASSWORD},
)
mission.status = Mission.STATUS_QUEUED
mission.robot.current_mission = mission
mission.save(update_fields=['status'])
mission.robot.save(update_fields=['current_mission'])
return Response({'status': 'queued', 'mission_id': str(mission.id)})
paho.mqtt.publish.single is a fire-and-forget helper — it opens a connection,
publishes one message, and closes. For high-frequency publishing (like telemetry), keep a
persistent client connection; for infrequent mission dispatches, single is
clean and stateless.
5. Edge AI: On-Device Weed Detection with TensorFlow Lite
The fundamental advantage of running inference on the robot is latency and cost. Sending every camera frame to a cloud vision API at 6 fps would cost thousands per day per robot and add 300–500 ms of network latency to every spray decision. A MobileNetV2-based TFLite model runs inference in 60–120 ms on a Raspberry Pi 5 with no network dependency at all.
You can fine-tune a MobileNetV2 weed classifier with a dataset like
CropDeep
or
DeepWeeds using TensorFlow and convert to TFLite for deployment. The inference wrapper
below is model-agnostic — swap the path and adjust INPUT_SIZE to match your
trained model.
# robot/weed_detector.py
import numpy as np
try:
from tflite_runtime.interpreter import Interpreter
except ImportError:
from tensorflow.lite.python.interpreter import Interpreter # fallback for dev
MODEL_PATH = "/home/pi/models/weed_detector.tflite"
INPUT_SIZE = (224, 224) # must match training resolution
WEED_CLASS = 1 # index of the "weed" class in model output
class WeedDetector:
def __init__(self, model_path: str = MODEL_PATH):
self.interpreter = Interpreter(model_path=model_path)
self.interpreter.allocate_tensors()
self._input = self.interpreter.get_input_details()[0]
self._output = self.interpreter.get_output_details()[0]
def predict(self, frame: np.ndarray) -> float:
"""
frame : H×W×3 uint8 array (RGB or BGR — match your training pipeline).
Returns weed probability 0.0–1.0.
"""
import cv2
resized = cv2.resize(frame, INPUT_SIZE)
# MobileNetV2 expects float32 normalised to [-1, 1]
inp = ((resized.astype(np.float32) / 127.5) - 1.0)[np.newaxis, ...]
self.interpreter.set_tensor(self._input['index'], inp)
self.interpreter.invoke()
scores = self.interpreter.get_tensor(self._output['index'])[0]
return float(scores[WEED_CLASS])
Install the lightweight runtime on the Pi — no full TensorFlow required:
pip install tflite-runtime opencv-python-headless
For even faster inference, enable the NNAPI delegate (uses the Pi 5's Neural Processing
Unit) by adding
experimental_delegates=[load_delegate('libnnapi.so')] to the
Interpreter constructor. Latency typically drops 30–50% versus CPU-only
inference on compatible operations.
6. Variable-Rate Sprayer Control via GPIO PWM
The sprayer is a 12 V solenoid valve driven by a relay board connected to a Raspberry Pi GPIO pin. PWM duty cycle controls pulse width, which controls flow rate through the valve — higher weed density, higher duty cycle, more herbicide applied per square metre.
# robot/sprayer.py
import RPi.GPIO as GPIO
SPRAY_PIN = 18 # BCM numbering
PWM_FREQ_HZ = 50 # 50 Hz works well for solenoid valves; avoid audible freq
MIN_RATE_LHA = 20.0 # minimum application rate — never spray below this
MAX_RATE_LHA = 200.0 # maximum application rate at full weed density
DENSITY_FLOOR = 0.15 # density below this: skip spray entirely
class Sprayer:
def __init__(self):
GPIO.setmode(GPIO.BCM)
GPIO.setup(SPRAY_PIN, GPIO.OUT)
self._pwm = GPIO.PWM(SPRAY_PIN, PWM_FREQ_HZ)
self._pwm.start(0)
def set_rate(self, weed_density: float) -> float:
"""
Map weed_density [0, 1] → duty cycle [0, 100].
Returns the duty cycle applied.
"""
if weed_density < DENSITY_FLOOR:
self._pwm.ChangeDutyCycle(0)
return 0.0
# Linear interpolation between MIN and MAX rate
rate = MIN_RATE_LHA + (MAX_RATE_LHA - MIN_RATE_LHA) * weed_density
duty = (rate - MIN_RATE_LHA) / (MAX_RATE_LHA - MIN_RATE_LHA) * 100.0
duty = max(0.0, min(100.0, duty))
self._pwm.ChangeDutyCycle(duty)
return duty
def off(self):
self._pwm.ChangeDutyCycle(0)
def cleanup(self):
self.off()
self._pwm.stop()
GPIO.cleanup()
The DENSITY_FLOOR threshold prevents low-confidence detections from
triggering the sprayer — a plant that scores 0.12 is probably healthy; no need to spray.
Calibrate MIN_RATE_LHA and MAX_RATE_LHA against your sprayer's
flow-rate curve at the operating pressure you're running.
For the path-following logic that decides when to move to the next waypoint, use the haversine formula to measure distance to the target coordinate:
# robot/navigation.py
import math
EARTH_RADIUS_M = 6_371_000
ARRIVAL_THRESHOLD_M = 1.5 # metres — consider waypoint reached
def haversine_metres(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
"""Great-circle distance between two GPS coordinates in metres."""
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlambda = math.radians(lon2 - lon1)
a = math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlambda / 2) ** 2
return 2 * EARTH_RADIUS_M * math.asin(math.sqrt(a))
def waypoint_reached(current_lat: float, current_lon: float, wp: dict) -> bool:
dist = haversine_metres(current_lat, current_lon, wp['lat'], wp['lon'])
return dist <= ARRIVAL_THRESHOLD_M
7. MQTT Telemetry Bridge: Robot ↔ Django
MQTT gives us bidirectional communication with very low overhead. The robot publishes telemetry at 0.5 Hz and subscribes to its own mission topic. Django subscribes to all robot telemetry topics and persists the data.
7a. Robot: Telemetry Publisher and Mission Subscriber
# robot/main.py
import os, json, time, logging
import paho.mqtt.client as mqtt
from picamera2 import Picamera2
from weed_detector import WeedDetector
from sprayer import Sprayer
from navigation import waypoint_reached
try:
import gpsd; gpsd.connect(); GPS_OK = True
except Exception: GPS_OK = False
logger = logging.getLogger(__name__)
BROKER = os.environ["MQTT_BROKER"]
ROBOT_ID = os.environ["ROBOT_ID"]
API_KEY = os.environ["ROBOT_API_KEY"]
TELEMETRY_TOPIC = f"farm/robots/{ROBOT_ID}/telemetry"
MISSION_TOPIC = f"farm/robots/{ROBOT_ID}/missions"
def get_gps():
if not GPS_OK:
return None, None
try:
pkt = gpsd.get_current()
return (pkt.lat, pkt.lon) if pkt.mode >= 2 else (None, None)
except Exception:
return None, None
def read_battery_pct():
# Read from ADC or /sys/class/power_supply — implementation is hardware-specific
return None
def run():
state = {
"waypoints": [],
"wp_index": 0,
"mission_id": None,
"running": True,
}
def on_mission(client, userdata, msg):
payload = json.loads(msg.payload)
userdata["mission_id"] = payload["mission_id"]
userdata["waypoints"] = payload["waypoints"]
userdata["wp_index"] = 0
logger.info("Mission received: %s (%d waypoints)", payload["mission_id"], len(payload["waypoints"]))
client = mqtt.Client(userdata=state)
client.username_pw_set(ROBOT_ID, API_KEY)
client.tls_set()
client.on_message = on_mission
client.connect(BROKER, 8883)
client.subscribe(MISSION_TOPIC)
client.loop_start()
detector = WeedDetector()
sprayer = Sprayer()
camera = Picamera2()
camera.configure(camera.create_still_configuration(main={"size": (1920, 1080)}))
camera.start()
time.sleep(2)
try:
while state["running"]:
lat, lon = get_gps()
frame = camera.capture_array()
density = detector.predict(frame)
waypoints = state["waypoints"]
wp_index = state["wp_index"]
duty = 0.0
if waypoints and wp_index < len(waypoints):
current_wp = waypoints[wp_index]
if current_wp["action"] == "spray":
duty = sprayer.set_rate(density)
if lat and lon and waypoint_reached(lat, lon, current_wp):
state["wp_index"] += 1
logger.info("Waypoint %d reached", wp_index)
else:
sprayer.off()
payload = json.dumps({
"robot_id": ROBOT_ID,
"mission_id": state["mission_id"],
"lat": lat,
"lon": lon,
"weed_density": round(density, 3),
"spray_duty": round(duty, 1),
"battery_pct": read_battery_pct(),
"wp_index": state["wp_index"],
"ts": time.time(),
})
client.publish(TELEMETRY_TOPIC, payload, qos=0)
time.sleep(2)
finally:
sprayer.cleanup()
client.loop_stop()
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
run()
7b. Django: MQTT Subscriber Management Command
A Django management command runs as a long-lived process (a systemd service on the VPS), subscribing to all robot telemetry topics and persisting data.
# farm/management/commands/mqtt_subscriber.py
import json, logging
import paho.mqtt.client as mqtt
from django.core.management.base import BaseCommand
from django.conf import settings
from django.utils import timezone
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from farm.models import FarmRobot, ApplicationEvent
logger = logging.getLogger(__name__)
FLEET_GROUP = "farm_fleet"
class Command(BaseCommand):
help = "Subscribe to MQTT broker and persist robot telemetry"
def handle(self, *args, **options):
client = mqtt.Client()
client.username_pw_set(settings.MQTT_USERNAME, settings.MQTT_PASSWORD)
client.tls_set()
client.on_connect = lambda c, u, f, rc: c.subscribe("farm/robots/+/telemetry", qos=0)
client.on_message = self._on_message
client.connect(settings.MQTT_BROKER_HOST, 8883)
self.stdout.write("MQTT subscriber running...")
client.loop_forever()
def _on_message(self, client, userdata, msg):
try:
data = json.loads(msg.payload)
robot_id = data["robot_id"]
robot = FarmRobot.objects.get(id=robot_id, is_active=True)
density = float(data.get("weed_density", 0))
duty = float(data.get("spray_duty", 0))
if duty > 0:
ApplicationEvent.objects.create(
robot=robot,
mission_id=data.get("mission_id"),
latitude=data["lat"],
longitude=data["lon"],
weed_density=density,
applied_rate=duty,
)
robot.current_lat = data.get("lat")
robot.current_lon = data.get("lon")
robot.battery_pct = data.get("battery_pct")
robot.last_seen = timezone.now()
robot.save(update_fields=["current_lat", "current_lon", "battery_pct", "last_seen"])
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
FLEET_GROUP,
{
"type": "fleet.update",
"payload": {
"robot_id": str(robot.id),
"name": robot.name,
"lat": data.get("lat"),
"lon": data.get("lon"),
"battery_pct": data.get("battery_pct"),
"weed_density": density,
"spray_duty": duty,
"wp_index": data.get("wp_index"),
},
},
)
except FarmRobot.DoesNotExist:
logger.warning("Unknown robot: %s", data.get("robot_id"))
except Exception:
logger.exception("Error processing telemetry: %s", msg.payload[:300])
Run the subscriber as a systemd service on the VPS alongside the Django application. It's a simple loop — if it crashes, systemd restarts it and MQTT handles message durability via QoS levels and the broker's persistent session.
8. AI Crop Advisor: Analysing Field Data with Claude
The fundamental insight here is that an LLM is excellent at reasoning over structured summaries — not pixel-level perception. Rather than sending individual plant images for cloud inference, we send a compact JSON summary of seven days of weed density readings and spray events. Claude synthesises patterns across hundreds of GPS points that no human agronomist could easily spot, and returns a machine-parseable report with prioritised recommendations.
A Celery Beat task runs this analysis every night at 02:00 for each active robot:
# farm/tasks.py
import json
import logging
from datetime import date, timedelta
import anthropic
from celery import shared_task
from django.conf import settings
from .models import FarmRobot, ApplicationEvent, AdvisorReport
logger = logging.getLogger(__name__)
ADVISOR_SYSTEM = """You are an expert agronomist and precision agriculture consultant.
You will receive a JSON summary of a robot's 7-day field activity.
Respond with a single JSON object — no markdown fences, no prose outside the JSON:
{
"field_health_score": 0-100,
"summary": "2–3 sentence plain-English field health summary",
"hotspots": [
{"lat": 0.0, "lon": 0.0, "radius_m": 10, "description": "..."}
],
"recommendations": [
{
"action": "spot_spray | re_scan | soil_sample | notify_agronomist | adjust_mission",
"zone_description": "...",
"urgency": "low | medium | high | critical",
"rationale": "..."
}
],
"report_markdown": "full agronomist report in markdown (headings, bullet points)"
}"""
@shared_task
def run_crop_advisor(robot_id: str):
try:
robot = FarmRobot.objects.get(id=robot_id, is_active=True)
except FarmRobot.DoesNotExist:
logger.error("run_crop_advisor: robot %s not found", robot_id)
return
end = date.today()
start = end - timedelta(days=7)
events = list(
ApplicationEvent.objects.filter(
robot=robot,
applied_at__date__gte=start,
applied_at__date__lte=end,
).values("latitude", "longitude", "weed_density", "applied_rate", "applied_at")
)
if not events:
logger.info("No events for robot %s in date range %s–%s", robot.name, start, end)
return
total = len(events)
high_density = [e for e in events if e["weed_density"] > 0.6]
medium_density = [e for e in events if 0.3 <= e["weed_density"] <= 0.6]
avg_density = sum(e["weed_density"] for e in events) / total
total_spray_pct = sum(e["applied_rate"] for e in events) / total
# Send at most 30 hotspot coordinates to keep the prompt lean
hotspot_sample = [
{
"lat": float(e["latitude"]),
"lon": float(e["longitude"]),
"weed_density": round(float(e["weed_density"]), 3),
"ts": e["applied_at"].isoformat(),
}
for e in high_density[:30]
]
field_summary = {
"robot": robot.name,
"period": f"{start} to {end}",
"total_readings": total,
"avg_weed_density": round(avg_density, 3),
"high_density_count": len(high_density),
"medium_density_count": len(medium_density),
"avg_spray_duty": round(total_spray_pct, 1),
"high_density_hotspots": hotspot_sample,
}
client = anthropic.Anthropic(api_key=settings.ANTHROPIC_API_KEY)
response = client.messages.create(
model="claude-opus-4-7",
max_tokens=2048,
system=ADVISOR_SYSTEM,
messages=[
{"role": "user", "content": json.dumps(field_summary, default=str)},
],
)
raw = response.content[0].text
result = json.loads(raw)
AdvisorReport.objects.create(
robot=robot,
date_range_start=start,
date_range_end=end,
report_markdown=result.get("report_markdown", ""),
recommendations=result.get("recommendations", []),
tokens_used=response.usage.input_tokens + response.usage.output_tokens,
)
logger.info(
"AdvisorReport created for %s: health=%s, recs=%d, tokens=%d",
robot.name,
result.get("field_health_score"),
len(result.get("recommendations", [])),
response.usage.input_tokens + response.usage.output_tokens,
)
Schedule the nightly run with Celery Beat:
# settings.py
from celery.schedules import crontab
CELERY_BEAT_SCHEDULE = {
"nightly-crop-advisor": {
"task": "farm.tasks.run_crop_advisor_all_robots",
"schedule": crontab(hour=2, minute=0),
},
}
# farm/tasks.py — add a wrapper that fans out to each active robot
@shared_task
def run_crop_advisor_all_robots():
for robot in FarmRobot.objects.filter(is_active=True):
run_crop_advisor.delay(str(robot.id))
At typical farm scale (one robot, 500 readings/day, 30 hotspot coordinates), the
field_summary payload is under 5 KB. The Claude API call costs around
0.3–0.5p per robot per night. Compare that to cloud vision billing on every camera
frame — at 6 fps the cost difference is three to four orders of magnitude.
9. Real-Time Fleet Dashboard with Django Channels
The dashboard connects over WebSocket and receives robot position updates every two seconds from the MQTT subscriber command. Leaflet.js renders a live map with a marker per robot that moves as telemetry arrives.
# farm/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
FLEET_GROUP = "farm_fleet"
class FleetConsumer(AsyncWebsocketConsumer):
async def connect(self):
await self.channel_layer.group_add(FLEET_GROUP, self.channel_name)
await self.accept()
async def disconnect(self, close_code):
await self.channel_layer.group_discard(FLEET_GROUP, self.channel_name)
async def fleet_update(self, event):
await self.send(text_data=json.dumps(event["payload"]))
# farm/routing.py
from django.urls import path
from .consumers import FleetConsumer
websocket_urlpatterns = [
path("ws/fleet/", FleetConsumer.as_asgi()),
]
The browser-side Leaflet integration:
# farm/views.py — serve the dashboard page
from django.contrib.auth.decorators import login_required
from django.shortcuts import render
from .models import FarmRobot, AdvisorReport
@login_required
def fleet_dashboard(request):
robots = FarmRobot.objects.filter(is_active=True)
reports = AdvisorReport.objects.select_related('robot').order_by('-created_at')[:5]
return render(request, 'farm/dashboard.html', {'robots': robots, 'reports': reports})
The JavaScript connects to the WebSocket and updates Leaflet markers in place — the
marker moves on the map as the robot moves in the field. New markers are created only
once per robot ID; subsequent updates call setLatLng:
# Simplified — see full template in farm/templates/farm/dashboard.html
# JavaScript pseudocode embedded in the template:
#
# const map = L.map('map').setView([51.5, -1.5], 15);
# const markers = {};
# const ws = new WebSocket(`wss://${location.host}/ws/fleet/`);
#
# ws.onmessage = (evt) => {
# const d = JSON.parse(evt.data);
# if (!d.lat || !d.lon) return;
# if (markers[d.robot_id]) {
# markers[d.robot_id].setLatLng([d.lat, d.lon]);
# } else {
# markers[d.robot_id] = L.marker([d.lat, d.lon])
# .bindPopup(`<b>${d.name}</b><br>Density: ${d.weed_density}`)
# .addTo(map);
# }
# };
#
# ws.onclose = () => setTimeout(() => location.reload(), 5000);
10. Production Checklist
MQTT security
Enable TLS (port 8883) and per-robot username/password authentication in
mosquitto.conf. Generate robot credentials at provisioning time and store
them in the robot's .env file. Rotate keys on a schedule or when a robot
is decommissioned — treat MQTT credentials the same as API keys.
Offline mission buffering
Fields have intermittent connectivity. The robot should buffer the received mission in a local JSON file so it can resume after a network dropout without waiting for a new dispatch:
# robot/mission_store.py
import json, pathlib
MISSION_FILE = pathlib.Path("/home/pi/scanner/current_mission.json")
def save(payload: dict):
MISSION_FILE.write_text(json.dumps(payload))
def load() -> dict | None:
if MISSION_FILE.exists():
return json.loads(MISSION_FILE.read_text())
return None
On startup, call mission_store.load() before connecting to MQTT. If a
cached mission exists and its mission_id matches the server's current
dispatch, resume from the last reported wp_index.
TFLite model updates
Serve the model file from Django storage. On robot startup, compare the local model's
MD5 hash against a /api/models/current/ endpoint. If a newer version
exists, download it before starting inference. This gives you over-the-air model
updates without SSH access to each robot.
ApplicationEvent data volume
A robot running 6 hours/day at 2-second intervals generates ~10,800 rows per day.
For a 10-robot fleet that's 100,000+ rows/day. Add a Celery Beat task to archive
and purge ApplicationEvent rows older than 90 days to keep query
performance stable. If you need long-term spatial analytics, migrate to
TimescaleDB
and use continuous aggregates to pre-compute per-zone weed density statistics.
Advisor report quality
A few things significantly improve Claude's recommendations:
- Include crop type in the prompt — "This is a winter wheat field" changes the list of plausible weeds and the recommended herbicide window
- Include historical reports — pass the previous week's
field_health_scoreand summary as context so Claude can comment on trends, not just the current snapshot - Parse and validate the JSON response — use Pydantic to validate the structured output before writing to the database; reject and retry if the model returns malformed JSON or an invalid
urgencyvalue
# farm/schemas.py — validate advisor output with Pydantic
from typing import Literal
from pydantic import BaseModel, Field
class Hotspot(BaseModel):
lat: float
lon: float
radius_m: float = 10.0
description: str
class Recommendation(BaseModel):
action: Literal['spot_spray', 're_scan', 'soil_sample', 'notify_agronomist', 'adjust_mission']
zone_description: str
urgency: Literal['low', 'medium', 'high', 'critical']
rationale: str
class AdvisorOutput(BaseModel):
field_health_score: int = Field(ge=0, le=100)
summary: str
hotspots: list[Hotspot]
recommendations: list[Recommendation]
report_markdown: str
In run_crop_advisor, replace result = json.loads(raw) with:
from .schemas import AdvisorOutput
result = AdvisorOutput.model_validate_json(raw)
# result.recommendations is now a validated list of Recommendation objects
Robot health monitoring
Add a Celery Beat task that runs every 5 minutes and alerts if any active robot's
last_seen is more than 10 minutes old during a scheduled mission window.
A silent robot during an active mission usually means a crash, a stuck process, or a
dead battery — worth catching immediately rather than discovering at end of day.