Automated Greenhouse Control with Python: MQTT Sensors, PID Climate Control, and a Django Dashboard
Temperature drifts two degrees above the setpoint. Within seconds a vent motor opens, a misting valve pulses, and the live dashboard turns amber — all driven by a Python PID controller reading MQTT sensor data, with Django storing every reading and Django Channels streaming it to the browser. No cloud service, no proprietary hub. Just Python, a Raspberry Pi, and an MQTT broker on your own server.
1. What We're Building
A commercial greenhouse needs temperature, humidity, CO₂, and light levels held within tight bands — a 5°C temperature swing can cut tomato yield by 15%. Manual monitoring around the clock is impractical. Industrial SCADA systems exist but cost tens of thousands of pounds. This post builds the same thing for the price of a few Raspberry Pis and a VPS.
The finished system does the following automatically:
- Reads temperature, humidity, CO₂, and soil moisture from sensor nodes every 10 seconds
- Runs a PID controller in Python that calculates how far each value is from its setpoint and outputs a corrective signal
- Sends actuator commands — open vent, run irrigation pump, switch grow light — to Raspberry Pi GPIO pins via MQTT
- Persists every reading to Django's database for historical analysis and audit trails
- Streams live readings and actuator states to a browser dashboard via WebSocket
- Fires an alert if any value exceeds a danger threshold or if a sensor goes silent
2. Architecture Overview
Sensor nodes (Raspberry Pi Zero 2W)
│ DHT22 temp/humidity, MH-Z19 CO₂, capacitive soil sensor
│ Publish every 10s to MQTT topics:
│ greenhouse/zone/1/temperature
│ greenhouse/zone/1/humidity
│ greenhouse/zone/1/co2
│ greenhouse/zone/1/soil_moisture
▼
Mosquitto MQTT Broker (same VPS as Django)
▼
Django management command: mqttsubscriber
│ Subscribes to greenhouse/#
│ Writes SensorReading rows to PostgreSQL
│ Pushes raw payload to Redis pub/sub
▼
Redis
├── Celery broker (control loop task queue)
└── Channel layer (WebSocket fanout)
▼
Celery Beat (every 10s)
│ Reads latest SensorReading per zone
│ Runs PID.compute() for each controlled variable
│ Publishes actuator command back to MQTT:
│ greenhouse/zone/1/actuators/vent
│ greenhouse/zone/1/actuators/irrigation
│ greenhouse/zone/1/actuators/light
│ Saves ActuatorState to DB
│ Pushes to channel layer → dashboard WebSocket
▼
Actuator node (Raspberry Pi 3B)
│ Subscribes to greenhouse/zone/1/actuators/#
│ Drives GPIO pins → relay board → motors/valves/lights
▼
Browser dashboard
Receives live readings and actuator states via WebSocket
The MQTT broker acts as the nervous system. Sensor nodes and actuator nodes are completely decoupled — they only speak MQTT. Django is the brain: it stores history, runs the control logic via Celery, and serves the dashboard.
3. MQTT Broker and Sensor Nodes
Install Mosquitto on the server
sudo apt install mosquitto mosquitto-clients
sudo systemctl enable --now mosquitto
For a local network greenhouse you can run unauthenticated on port 1883. For anything internet-facing, enable TLS and password authentication — see the Mosquitto docs for the full setup. The production checklist at the end covers the TLS configuration.
Sensor node Python script
Each sensor node is a Raspberry Pi Zero 2W with a DHT22 (temp/humidity), MH-Z19B CO₂ sensor, and a capacitive soil moisture sensor on an ADC (MCP3008). The node publishes a JSON payload to each topic every 10 seconds.
# sensor_node/publisher.py
import os
import json
import time
import logging
import board
import adafruit_dht
import paho.mqtt.client as mqtt
# MH-Z19B via serial (pip install mh-z19)
import mh_z19
# MCP3008 ADC for soil moisture (pip install adafruit-circuitpython-mcp3xxx)
import busio
import digitalio
import adafruit_mcp3xxx.mcp3008 as MCP
from adafruit_mcp3xxx.analog_in import AnalogIn
BROKER = os.environ["MQTT_BROKER"] # e.g. 192.168.1.10
ZONE_ID = os.environ["ZONE_ID"] # e.g. "1"
NODE_ID = os.environ["NODE_ID"] # e.g. "node-a"
INTERVAL = int(os.environ.get("INTERVAL", "10"))
logger = logging.getLogger(__name__)
def setup_soil_sensor():
spi = busio.SPI(clock=board.SCK, MISO=board.MISO, MOSI=board.MOSI)
cs = digitalio.DigitalInOut(board.D5)
mcp = MCP.MCP3008(spi, cs)
return AnalogIn(mcp, MCP.P0)
def read_soil_pct(chan) -> float:
# MCP3008 returns 0–65535; dry ~52000, wet ~15000 (calibrate per sensor)
raw = chan.value
pct = max(0.0, min(100.0, (52000 - raw) / 370))
return round(pct, 1)
def run():
dht = adafruit_dht.DHT22(board.D4)
soil = setup_soil_sensor()
client = mqtt.Client(client_id=NODE_ID)
client.connect(BROKER, 1883, keepalive=60)
client.loop_start()
base_topic = f"greenhouse/zone/{ZONE_ID}"
while True:
try:
temp = round(dht.temperature, 1)
humidity = round(dht.humidity, 1)
co2_data = mh_z19.read_all()
co2 = co2_data.get("co2", 0) if co2_data else 0
soil_pct = read_soil_pct(soil)
for subtopic, value in [
("temperature", temp),
("humidity", humidity),
("co2", co2),
("soil_moisture", soil_pct),
]:
payload = json.dumps({"value": value, "node": NODE_ID})
client.publish(f"{base_topic}/{subtopic}", payload, qos=1)
logger.info("Published: temp=%.1f hum=%.1f co2=%d soil=%.1f", temp, humidity, co2, soil_pct)
except RuntimeError as exc:
# DHT22 occasionally misreads — log and skip
logger.warning("Sensor read error: %s", exc)
time.sleep(INTERVAL)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
run()
4. Django Models: Greenhouse, Zone, SensorReading, ActuatorState
# greenhouse/models.py
from django.db import models
class Greenhouse(models.Model):
name = models.CharField(max_length=100)
location = models.CharField(max_length=200, blank=True)
def __str__(self):
return self.name
class ClimateZone(models.Model):
greenhouse = models.ForeignKey(Greenhouse, on_delete=models.CASCADE, related_name="zones")
name = models.CharField(max_length=100)
crop = models.CharField(max_length=100, blank=True)
# Setpoints
temp_setpoint = models.FloatField(default=22.0) # °C
humidity_setpoint = models.FloatField(default=70.0) # %
co2_setpoint = models.FloatField(default=800.0) # ppm
soil_setpoint = models.FloatField(default=60.0) # % moisture
# Danger thresholds — triggers alert regardless of PID
temp_max = models.FloatField(default=35.0)
temp_min = models.FloatField(default=10.0)
humidity_max = models.FloatField(default=90.0)
co2_max = models.FloatField(default=1500.0)
def __str__(self):
return f"{self.greenhouse.name} / {self.name}"
class SensorReading(models.Model):
SENSOR_TEMPERATURE = "temperature"
SENSOR_HUMIDITY = "humidity"
SENSOR_CO2 = "co2"
SENSOR_SOIL_MOISTURE = "soil_moisture"
SENSOR_CHOICES = [
(SENSOR_TEMPERATURE, "Temperature (°C)"),
(SENSOR_HUMIDITY, "Humidity (%)"),
(SENSOR_CO2, "CO₂ (ppm)"),
(SENSOR_SOIL_MOISTURE, "Soil Moisture (%)"),
]
zone = models.ForeignKey(ClimateZone, on_delete=models.CASCADE, related_name="readings")
sensor_type = models.CharField(max_length=20, choices=SENSOR_CHOICES, db_index=True)
value = models.FloatField()
node_id = models.CharField(max_length=50, blank=True)
recorded_at = models.DateTimeField(auto_now_add=True, db_index=True)
class Meta:
ordering = ["-recorded_at"]
indexes = [models.Index(fields=["zone", "sensor_type", "-recorded_at"])]
def __str__(self):
return f"{self.zone} {self.sensor_type}={self.value} @ {self.recorded_at:%H:%M:%S}"
class ActuatorState(models.Model):
ACTUATOR_VENT = "vent"
ACTUATOR_IRRIGATION = "irrigation"
ACTUATOR_LIGHT = "light"
ACTUATOR_HEATER = "heater"
ACTUATOR_CHOICES = [
(ACTUATOR_VENT, "Vent"),
(ACTUATOR_IRRIGATION, "Irrigation"),
(ACTUATOR_LIGHT, "Grow Light"),
(ACTUATOR_HEATER, "Heater"),
]
zone = models.ForeignKey(ClimateZone, on_delete=models.CASCADE, related_name="actuator_states")
actuator_type = models.CharField(max_length=20, choices=ACTUATOR_CHOICES, db_index=True)
output = models.FloatField() # 0.0 (off) – 1.0 (full on); relays use 0 or 1
pid_error = models.FloatField() # raw error fed to PID (for dashboarding)
commanded_at = models.DateTimeField(auto_now_add=True)
class Meta:
ordering = ["-commanded_at"]
get_latest_by = "commanded_at"
The compound index on (zone, sensor_type, -recorded_at) makes the
"latest reading per zone per sensor type" query — which the control loop runs every
10 seconds — a fast index scan rather than a table scan.
5. MQTT Subscriber: Django Management Command
Rather than a standalone script, the MQTT subscriber runs as a Django management command. This gives it full access to the ORM, settings, and the channel layer without any import gymnastics.
# greenhouse/management/commands/mqttsubscriber.py
import json
import logging
import signal
import sys
import paho.mqtt.client as mqtt
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from django.conf import settings
from django.core.management.base import BaseCommand
from greenhouse.models import ClimateZone, SensorReading
logger = logging.getLogger(__name__)
TOPIC_PREFIX = "greenhouse/zone/"
class Command(BaseCommand):
help = "Subscribe to MQTT broker and persist sensor readings"
def handle(self, *args, **kwargs):
client = mqtt.Client(client_id="django-subscriber", clean_session=False)
client.on_connect = self._on_connect
client.on_message = self._on_message
client.on_disconnect = self._on_disconnect
broker = settings.MQTT_BROKER
port = getattr(settings, "MQTT_PORT", 1883)
client.connect(broker, port, keepalive=60)
# Graceful shutdown on SIGTERM (systemd stop, Kubernetes pod eviction, etc.)
def _shutdown(sig, frame):
self.stdout.write("Shutting down MQTT subscriber…")
client.disconnect()
sys.exit(0)
signal.signal(signal.SIGTERM, _shutdown)
signal.signal(signal.SIGINT, _shutdown)
client.loop_forever()
def _on_connect(self, client, userdata, flags, rc):
logger.info("Connected to MQTT broker (rc=%d)", rc)
client.subscribe("greenhouse/#", qos=1)
def _on_disconnect(self, client, userdata, rc):
if rc != 0:
logger.warning("Unexpected MQTT disconnect (rc=%d) — will auto-reconnect", rc)
def _on_message(self, client, userdata, msg):
try:
# Topic format: greenhouse/zone/{zone_id}/{sensor_type}
parts = msg.topic.split("/")
if len(parts) != 4 or parts[2] == "actuators":
return
_, _, zone_id, sensor_type = parts
payload = json.loads(msg.payload.decode())
value = float(payload["value"])
node_id = payload.get("node", "")
zone = ClimateZone.objects.get(id=zone_id)
reading = SensorReading.objects.create(
zone=zone, sensor_type=sensor_type, value=value, node_id=node_id
)
# Push raw reading to channel layer for live dashboard
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
f"greenhouse_zone_{zone_id}",
{
"type": "sensor.reading",
"payload": {
"zone_id": zone_id,
"sensor_type": sensor_type,
"value": value,
"timestamp": reading.recorded_at.isoformat(),
},
},
)
except ClimateZone.DoesNotExist:
logger.warning("No zone with id=%s — ignoring message on %s", zone_id, msg.topic)
except (KeyError, ValueError, json.JSONDecodeError) as exc:
logger.warning("Bad payload on %s: %s", msg.topic, exc)
Run it as a systemd service alongside Django:
# /etc/systemd/system/greenhouse-mqtt.service
[Unit]
Description=Greenhouse MQTT Subscriber
After=network.target postgresql.service redis.service
[Service]
User=www-data
WorkingDirectory=/var/www/greenhouse
EnvironmentFile=/var/www/greenhouse/.env
ExecStart=/var/www/greenhouse/.venv/bin/python manage.py mqttsubscriber
Restart=on-failure
RestartSec=5
[Install]
WantedBy=multi-user.target
6. PID Controller in Python
A PID (Proportional–Integral–Derivative) controller computes a corrective output from three terms:
- P — proportional to the current error (how far off we are right now)
- I — proportional to the accumulated error over time (corrects persistent offsets)
- D — proportional to the rate of error change (dampens overshoot)
For greenhouse climate control the derivative term is usually kept small or disabled — sensor noise amplifies it into jittery actuator commands. The implementation below clamps the output to [0, 1] and applies anti-windup on the integral to prevent runaway when an actuator is saturated.
# greenhouse/pid.py
import time
from dataclasses import dataclass, field
@dataclass
class PIDController:
kp: float # Proportional gain
ki: float # Integral gain
kd: float = 0.0 # Derivative gain (often 0 for climate)
setpoint: float = 0.0
output_min: float = 0.0
output_max: float = 1.0
integral_limit: float = 10.0 # anti-windup clamp
_integral: float = field(default=0.0, init=False, repr=False)
_last_error: float = field(default=0.0, init=False, repr=False)
_last_time: float = field(default_factory=time.monotonic, init=False, repr=False)
def compute(self, measured: float) -> float:
now = time.monotonic()
dt = now - self._last_time
if dt <= 0:
dt = 1e-6
error = self.setpoint - measured
self._integral = max(
-self.integral_limit,
min(self.integral_limit, self._integral + error * dt)
)
derivative = (error - self._last_error) / dt
output = (self.kp * error) + (self.ki * self._integral) + (self.kd * derivative)
output = max(self.output_min, min(self.output_max, output))
self._last_error = error
self._last_time = now
return round(output, 3)
def reset(self):
self._integral = 0.0
self._last_error = 0.0
self._last_time = time.monotonic()
Tuning the gains (kp, ki) is empirical — start with
kp=0.1, ki=0.01 for temperature, observe the response on the dashboard,
and increase kp until the system reacts promptly without overshooting.
For a 10-second control loop interval the integral term accumulates slowly, so
ki can be very small.
A practical tip: store PID gains in the ClimateZone model (add
temp_kp, temp_ki fields) so they're adjustable from the
Django admin without a code deploy.
7. Celery Beat: The Climate Control Loop
The control loop runs every 10 seconds via Celery Beat. It reads the latest sensor
value per zone per variable, runs the PID, publishes an actuator command back to MQTT,
saves the ActuatorState, and pushes a dashboard update.
# settings.py additions
from celery.schedules import crontab
MQTT_BROKER = env("MQTT_BROKER", default="localhost")
MQTT_PORT = env.int("MQTT_PORT", default=1883)
CELERY_BEAT_SCHEDULE = {
"greenhouse-control-loop": {
"task": "greenhouse.tasks.run_climate_control",
"schedule": 10.0, # every 10 seconds
},
}
# greenhouse/tasks.py
import json
import logging
import paho.mqtt.publish as mqtt_publish
from asgiref.sync import async_to_sync
from celery import shared_task
from channels.layers import get_channel_layer
from django.conf import settings
from django.utils import timezone
from .models import ClimateZone, SensorReading, ActuatorState
from .pid import PIDController
logger = logging.getLogger(__name__)
# In-memory PID instances keyed by (zone_id, variable)
# These persist across Celery task invocations within the same worker process,
# preserving integral and derivative state between ticks.
_pid_registry: dict = {}
def _get_pid(zone: ClimateZone, variable: str) -> PIDController:
key = (zone.id, variable)
if key not in _pid_registry:
setpoints = {
"temperature": (zone.temp_setpoint, 0.15, 0.02),
"humidity": (zone.humidity_setpoint, 0.10, 0.01),
"co2": (zone.co2_setpoint, 0.05, 0.005),
"soil_moisture": (zone.soil_setpoint, 0.20, 0.02),
}
sp, kp, ki = setpoints[variable]
_pid_registry[key] = PIDController(kp=kp, ki=ki, setpoint=sp)
return _pid_registry[key]
def _latest_reading(zone: ClimateZone, sensor_type: str) -> float | None:
reading = (
SensorReading.objects
.filter(zone=zone, sensor_type=sensor_type)
.values("value")
.first()
)
return reading["value"] if reading else None
VARIABLE_TO_ACTUATOR = {
"temperature": ActuatorState.ACTUATOR_VENT,
"humidity": ActuatorState.ACTUATOR_IRRIGATION,
"soil_moisture": ActuatorState.ACTUATOR_IRRIGATION,
# CO₂ is typically passive (ventilation-driven) — no dedicated actuator here
}
@shared_task
def run_climate_control():
channel_layer = get_channel_layer()
messages = [] # batch MQTT publishes
for zone in ClimateZone.objects.select_related("greenhouse").all():
zone_updates = {}
for variable, actuator_type in VARIABLE_TO_ACTUATOR.items():
value = _latest_reading(zone, variable)
if value is None:
logger.warning("No reading for zone %s / %s — skipping", zone.id, variable)
continue
pid = _get_pid(zone, variable)
output = pid.compute(value)
error = pid.setpoint - value
ActuatorState.objects.create(
zone=zone,
actuator_type=actuator_type,
output=output,
pid_error=error,
)
topic = f"greenhouse/zone/{zone.id}/actuators/{actuator_type}"
payload = json.dumps({"output": output, "zone": str(zone.id)})
messages.append({"topic": topic, "payload": payload, "qos": 1})
zone_updates[actuator_type] = {"output": output, "error": round(error, 2)}
# Danger threshold check
_check_thresholds(zone, variable, value)
# Push actuator state update to dashboard WebSocket
if zone_updates:
async_to_sync(channel_layer.group_send)(
f"greenhouse_zone_{zone.id}",
{
"type": "actuator.update",
"payload": {
"zone_id": str(zone.id),
"actuators": zone_updates,
"timestamp": timezone.now().isoformat(),
},
},
)
if messages:
mqtt_publish.multiple(
messages,
hostname=settings.MQTT_BROKER,
port=settings.MQTT_PORT,
)
def _check_thresholds(zone: ClimateZone, variable: str, value: float):
limits = {
"temperature": (zone.temp_min, zone.temp_max),
"humidity": (None, zone.humidity_max),
"co2": (None, zone.co2_max),
"soil_moisture": (None, None),
}
lo, hi = limits.get(variable, (None, None))
if hi is not None and value > hi:
logger.error("DANGER: %s / %s = %.1f exceeds max %.1f", zone, variable, value, hi)
if lo is not None and value < lo:
logger.error("DANGER: %s / %s = %.1f below min %.1f", zone, variable, value, lo)
The _pid_registry dict lives in the Celery worker process memory. As long
as you run a single worker (fine for one greenhouse), integral state persists correctly
between ticks. If you scale to multiple workers, move PID state to Redis using
Redis hashes
— store integral and last_error per (zone_id, variable)
key and fetch/update atomically with a Lua script.
8. Django Channels: Live Sensor Dashboard
The management command and the Celery control loop both push to the channel layer. The WebSocket consumer simply funnels those messages to the browser.
# greenhouse/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
class GreenhouseConsumer(AsyncWebsocketConsumer):
async def connect(self):
zone_id = self.scope["url_route"]["kwargs"]["zone_id"]
self.group = f"greenhouse_zone_{zone_id}"
await self.channel_layer.group_add(self.group, self.channel_name)
await self.accept()
async def disconnect(self, close_code):
await self.channel_layer.group_discard(self.group, self.channel_name)
async def sensor_reading(self, event):
await self.send(text_data=json.dumps({"type": "reading", **event["payload"]}))
async def actuator_update(self, event):
await self.send(text_data=json.dumps({"type": "actuator", **event["payload"]}))
# greenhouse/routing.py
from django.urls import path
from .consumers import GreenhouseConsumer
websocket_urlpatterns = [
path("ws/greenhouse/zone/<int:zone_id>/", GreenhouseConsumer.as_asgi()),
]
Dashboard JavaScript — a minimal live gauge update loop:
<!-- greenhouse/templates/greenhouse/dashboard.html -->
<div id="gauges"></div>
<div id="actuators"></div>
<script>
const zoneId = {{ zone.id }};
const ws = new WebSocket(
(location.protocol === "https:" ? "wss://" : "ws://") + location.host +
`/ws/greenhouse/zone/${zoneId}/`
);
const gauges = {};
const actuators = {};
ws.onmessage = function (event) {
const msg = JSON.parse(event.data);
if (msg.type === "reading") {
const id = `gauge-${msg.sensor_type}`;
if (!gauges[id]) {
const el = document.createElement("div");
el.id = id;
el.className = "gauge";
document.getElementById("gauges").appendChild(el);
gauges[id] = el;
}
gauges[id].innerHTML = `
<span class="gauge-label">${msg.sensor_type.replace("_", " ")}</span>
<span class="gauge-value">${msg.value}</span>
<time>${new Date(msg.timestamp).toLocaleTimeString()}</time>
`;
}
if (msg.type === "actuator") {
const el = document.getElementById("actuators");
el.innerHTML = Object.entries(msg.actuators)
.map(([name, state]) => `
<div class="actuator ${state.output > 0.5 ? 'on' : 'off'}">
${name} — ${Math.round(state.output * 100)}%
<small>error ${state.error > 0 ? "+" : ""}${state.error}</small>
</div>
`).join("");
}
};
ws.onclose = () => setTimeout(() => location.reload(), 3000);
</script>
9. Actuator Control: GPIO and Relay Boards
The actuator node subscribes to the greenhouse/zone/{id}/actuators/#
topics and drives GPIO pins connected to a relay board. A 4-channel relay module
(SRD-05VDC-SL-C) handles mains-voltage vents and pumps from 3.3V logic.
# actuator_node/controller.py (runs on Raspberry Pi 3B)
import os
import json
import logging
import paho.mqtt.client as mqtt
import RPi.GPIO as GPIO
BROKER = os.environ["MQTT_BROKER"]
ZONE_ID = os.environ["ZONE_ID"]
# Map actuator type → GPIO BCM pin
PIN_MAP = {
"vent": 17,
"irrigation": 27,
"light": 22,
"heater": 23,
}
logger = logging.getLogger(__name__)
GPIO.setmode(GPIO.BCM)
for pin in PIN_MAP.values():
GPIO.setup(pin, GPIO.OUT, initial=GPIO.LOW)
def on_connect(client, userdata, flags, rc):
logger.info("Actuator node connected (rc=%d)", rc)
client.subscribe(f"greenhouse/zone/{ZONE_ID}/actuators/#", qos=1)
def on_message(client, userdata, msg):
# Topic: greenhouse/zone/{zone_id}/actuators/{actuator_type}
actuator_type = msg.topic.split("/")[-1]
pin = PIN_MAP.get(actuator_type)
if pin is None:
return
try:
payload = json.loads(msg.payload.decode())
output = float(payload["output"])
# Binary relay: on if output > 0.5, off otherwise
GPIO.output(pin, GPIO.HIGH if output > 0.5 else GPIO.LOW)
logger.info("%s → %s (output=%.2f)", actuator_type, "ON" if output > 0.5 else "OFF", output)
except (KeyError, ValueError, json.JSONDecodeError) as exc:
logger.warning("Bad actuator payload: %s", exc)
client = mqtt.Client(client_id=f"actuator-node-zone-{ZONE_ID}")
client.on_connect = on_connect
client.on_message = on_message
client.connect(BROKER, 1883, keepalive=60)
client.loop_forever()
For PWM actuators (variable-speed fans, dimmable lights) replace the binary
GPIO.output call with GPIO.PWM and set the duty cycle
to output * 100. The PID output is already scaled 0–1, so the mapping
is direct.
10. Production Checklist
MQTT security
If sensor nodes connect over the internet (e.g. multiple greenhouse sites to a central server), enable TLS on port 8883 and require per-device passwords:
# /etc/mosquitto/conf.d/tls.conf
listener 8883
cafile /etc/mosquitto/certs/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile /etc/mosquitto/certs/server.key
require_certificate true
password_file /etc/mosquitto/passwd
mosquitto_passwd -c /etc/mosquitto/passwd sensor-node-1
Pass credentials in paho:
client.username_pw_set("sensor-node-1", "secret") and set
client.tls_set(ca_certs="ca.crt").
Failsafe logic on the actuator node
If the MQTT broker goes silent for more than 30 seconds, the actuator node should fall back to a safe state — close vents to midpoint, turn irrigation off, leave grow lights on timer. Never leave actuators in their last commanded state indefinitely during a network outage.
# actuator_node/controller.py — add a watchdog
import threading
WATCHDOG_TIMEOUT = 30 # seconds
def _failsafe():
logger.error("Watchdog timeout — entering failsafe state")
GPIO.output(PIN_MAP["vent"], GPIO.LOW)
GPIO.output(PIN_MAP["irrigation"], GPIO.LOW)
# Keep light on last known state or switch to timer logic
watchdog = threading.Timer(WATCHDOG_TIMEOUT, _failsafe)
watchdog.start()
def on_message(client, userdata, msg):
global watchdog
watchdog.cancel()
watchdog = threading.Timer(WATCHDOG_TIMEOUT, _failsafe)
watchdog.start()
# ... rest of handler
Sensor drift and calibration
DHT22 sensors drift over months. Store a calibration_offset per sensor
node in the database and apply it in on_message before writing the
SensorReading. Calibrate quarterly against a reference thermometer. CO₂
sensors (MH-Z19B) have an auto-calibration mode that assumes the lowest reading in
24 hours is 400 ppm — disable this in sealed greenhouses where CO₂ is enriched.
Data retention
At a 10-second interval across 4 sensors and 3 zones you generate ~4,300 rows per hour — about 100k rows per day. Add a Celery Beat task to downsample and purge:
# greenhouse/tasks.py
from datetime import timedelta
from django.utils import timezone
@shared_task
def purge_old_readings():
cutoff = timezone.now() - timedelta(days=90)
deleted, _ = SensorReading.objects.filter(recorded_at__lt=cutoff).delete()
logger.info("Purged %d old sensor readings", deleted)
For long-term trend analysis, downsample to hourly averages before purging the raw rows, or consider TimescaleDB (a PostgreSQL extension) which handles time-series partitioning and continuous aggregates natively.
Alerting
The _check_thresholds function in the Celery task currently just logs.
In production, hook it into an alerting channel — send an email via Django's
mail_admins(), push a notification via
ntfy.sh (self-hosted,
free), or POST to a Slack webhook. Debounce alerts with a Redis key so you don't
send 360 emails per hour for a stuck sensor.
PID gains per crop stage
Tomatoes in the seedling stage need different temperature setpoints from fruiting
plants. Add a GrowthStage model linked to ClimateZone with
start/end dates and overriding setpoints. The control loop checks the current active
stage before fetching the setpoint — no code change needed for seasonal transitions.