Python Django IoT MQTT

Automated Greenhouse Control with Python: MQTT Sensors, PID Climate Control, and a Django Dashboard

Temperature drifts two degrees above the setpoint. Within seconds a vent motor opens, a misting valve pulses, and the live dashboard turns amber — all driven by a Python PID controller reading MQTT sensor data, with Django storing every reading and Django Channels streaming it to the browser. No cloud service, no proprietary hub. Just Python, a Raspberry Pi, and an MQTT broker on your own server.

1. What We're Building

A commercial greenhouse needs temperature, humidity, CO₂, and light levels held within tight bands — a 5°C temperature swing can cut tomato yield by 15%. Manual monitoring around the clock is impractical. Industrial SCADA systems exist but cost tens of thousands of pounds. This post builds the same thing for the price of a few Raspberry Pis and a VPS.

The finished system does the following automatically:

  • Reads temperature, humidity, CO₂, and soil moisture from sensor nodes every 10 seconds
  • Runs a PID controller in Python that calculates how far each value is from its setpoint and outputs a corrective signal
  • Sends actuator commands — open vent, run irrigation pump, switch grow light — to Raspberry Pi GPIO pins via MQTT
  • Persists every reading to Django's database for historical analysis and audit trails
  • Streams live readings and actuator states to a browser dashboard via WebSocket
  • Fires an alert if any value exceeds a danger threshold or if a sensor goes silent

2. Architecture Overview

Sensor nodes (Raspberry Pi Zero 2W)
  │  DHT22 temp/humidity, MH-Z19 CO₂, capacitive soil sensor
  │  Publish every 10s to MQTT topics:
  │    greenhouse/zone/1/temperature
  │    greenhouse/zone/1/humidity
  │    greenhouse/zone/1/co2
  │    greenhouse/zone/1/soil_moisture
  ▼
Mosquitto MQTT Broker (same VPS as Django)
  ▼
Django management command: mqttsubscriber
  │  Subscribes to greenhouse/#
  │  Writes SensorReading rows to PostgreSQL
  │  Pushes raw payload to Redis pub/sub
  ▼
Redis
  ├── Celery broker (control loop task queue)
  └── Channel layer (WebSocket fanout)
  ▼
Celery Beat (every 10s)
  │  Reads latest SensorReading per zone
  │  Runs PID.compute() for each controlled variable
  │  Publishes actuator command back to MQTT:
  │    greenhouse/zone/1/actuators/vent
  │    greenhouse/zone/1/actuators/irrigation
  │    greenhouse/zone/1/actuators/light
  │  Saves ActuatorState to DB
  │  Pushes to channel layer → dashboard WebSocket
  ▼
Actuator node (Raspberry Pi 3B)
  │  Subscribes to greenhouse/zone/1/actuators/#
  │  Drives GPIO pins → relay board → motors/valves/lights
  ▼
Browser dashboard
     Receives live readings and actuator states via WebSocket

The MQTT broker acts as the nervous system. Sensor nodes and actuator nodes are completely decoupled — they only speak MQTT. Django is the brain: it stores history, runs the control logic via Celery, and serves the dashboard.

3. MQTT Broker and Sensor Nodes

Install Mosquitto on the server

sudo apt install mosquitto mosquitto-clients
sudo systemctl enable --now mosquitto

For a local network greenhouse you can run unauthenticated on port 1883. For anything internet-facing, enable TLS and password authentication — see the Mosquitto docs for the full setup. The production checklist at the end covers the TLS configuration.

Sensor node Python script

Each sensor node is a Raspberry Pi Zero 2W with a DHT22 (temp/humidity), MH-Z19B CO₂ sensor, and a capacitive soil moisture sensor on an ADC (MCP3008). The node publishes a JSON payload to each topic every 10 seconds.

# sensor_node/publisher.py
import os
import json
import time
import logging
import board
import adafruit_dht
import paho.mqtt.client as mqtt

# MH-Z19B via serial  (pip install mh-z19)
import mh_z19

# MCP3008 ADC for soil moisture  (pip install adafruit-circuitpython-mcp3xxx)
import busio
import digitalio
import adafruit_mcp3xxx.mcp3008 as MCP
from adafruit_mcp3xxx.analog_in import AnalogIn

BROKER   = os.environ["MQTT_BROKER"]      # e.g. 192.168.1.10
ZONE_ID  = os.environ["ZONE_ID"]          # e.g. "1"
NODE_ID  = os.environ["NODE_ID"]          # e.g. "node-a"
INTERVAL = int(os.environ.get("INTERVAL", "10"))

logger = logging.getLogger(__name__)

def setup_soil_sensor():
    spi  = busio.SPI(clock=board.SCK, MISO=board.MISO, MOSI=board.MOSI)
    cs   = digitalio.DigitalInOut(board.D5)
    mcp  = MCP.MCP3008(spi, cs)
    return AnalogIn(mcp, MCP.P0)

def read_soil_pct(chan) -> float:
    # MCP3008 returns 0–65535; dry ~52000, wet ~15000 (calibrate per sensor)
    raw = chan.value
    pct = max(0.0, min(100.0, (52000 - raw) / 370))
    return round(pct, 1)

def run():
    dht    = adafruit_dht.DHT22(board.D4)
    soil   = setup_soil_sensor()
    client = mqtt.Client(client_id=NODE_ID)
    client.connect(BROKER, 1883, keepalive=60)
    client.loop_start()

    base_topic = f"greenhouse/zone/{ZONE_ID}"

    while True:
        try:
            temp     = round(dht.temperature, 1)
            humidity = round(dht.humidity, 1)
            co2_data = mh_z19.read_all()
            co2      = co2_data.get("co2", 0) if co2_data else 0
            soil_pct = read_soil_pct(soil)

            for subtopic, value in [
                ("temperature",   temp),
                ("humidity",      humidity),
                ("co2",           co2),
                ("soil_moisture", soil_pct),
            ]:
                payload = json.dumps({"value": value, "node": NODE_ID})
                client.publish(f"{base_topic}/{subtopic}", payload, qos=1)

            logger.info("Published: temp=%.1f hum=%.1f co2=%d soil=%.1f", temp, humidity, co2, soil_pct)

        except RuntimeError as exc:
            # DHT22 occasionally misreads — log and skip
            logger.warning("Sensor read error: %s", exc)

        time.sleep(INTERVAL)

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    run()

4. Django Models: Greenhouse, Zone, SensorReading, ActuatorState

# greenhouse/models.py
from django.db import models


class Greenhouse(models.Model):
    name     = models.CharField(max_length=100)
    location = models.CharField(max_length=200, blank=True)

    def __str__(self):
        return self.name


class ClimateZone(models.Model):
    greenhouse = models.ForeignKey(Greenhouse, on_delete=models.CASCADE, related_name="zones")
    name       = models.CharField(max_length=100)
    crop       = models.CharField(max_length=100, blank=True)

    # Setpoints
    temp_setpoint     = models.FloatField(default=22.0)   # °C
    humidity_setpoint = models.FloatField(default=70.0)   # %
    co2_setpoint      = models.FloatField(default=800.0)  # ppm
    soil_setpoint     = models.FloatField(default=60.0)   # % moisture

    # Danger thresholds — triggers alert regardless of PID
    temp_max     = models.FloatField(default=35.0)
    temp_min     = models.FloatField(default=10.0)
    humidity_max = models.FloatField(default=90.0)
    co2_max      = models.FloatField(default=1500.0)

    def __str__(self):
        return f"{self.greenhouse.name} / {self.name}"


class SensorReading(models.Model):
    SENSOR_TEMPERATURE   = "temperature"
    SENSOR_HUMIDITY      = "humidity"
    SENSOR_CO2           = "co2"
    SENSOR_SOIL_MOISTURE = "soil_moisture"
    SENSOR_CHOICES = [
        (SENSOR_TEMPERATURE,   "Temperature (°C)"),
        (SENSOR_HUMIDITY,      "Humidity (%)"),
        (SENSOR_CO2,           "CO₂ (ppm)"),
        (SENSOR_SOIL_MOISTURE, "Soil Moisture (%)"),
    ]

    zone        = models.ForeignKey(ClimateZone, on_delete=models.CASCADE, related_name="readings")
    sensor_type = models.CharField(max_length=20, choices=SENSOR_CHOICES, db_index=True)
    value       = models.FloatField()
    node_id     = models.CharField(max_length=50, blank=True)
    recorded_at = models.DateTimeField(auto_now_add=True, db_index=True)

    class Meta:
        ordering = ["-recorded_at"]
        indexes  = [models.Index(fields=["zone", "sensor_type", "-recorded_at"])]

    def __str__(self):
        return f"{self.zone} {self.sensor_type}={self.value} @ {self.recorded_at:%H:%M:%S}"


class ActuatorState(models.Model):
    ACTUATOR_VENT       = "vent"
    ACTUATOR_IRRIGATION = "irrigation"
    ACTUATOR_LIGHT      = "light"
    ACTUATOR_HEATER     = "heater"
    ACTUATOR_CHOICES = [
        (ACTUATOR_VENT,       "Vent"),
        (ACTUATOR_IRRIGATION, "Irrigation"),
        (ACTUATOR_LIGHT,      "Grow Light"),
        (ACTUATOR_HEATER,     "Heater"),
    ]

    zone          = models.ForeignKey(ClimateZone, on_delete=models.CASCADE, related_name="actuator_states")
    actuator_type = models.CharField(max_length=20, choices=ACTUATOR_CHOICES, db_index=True)
    output        = models.FloatField()       # 0.0 (off) – 1.0 (full on); relays use 0 or 1
    pid_error     = models.FloatField()       # raw error fed to PID (for dashboarding)
    commanded_at  = models.DateTimeField(auto_now_add=True)

    class Meta:
        ordering = ["-commanded_at"]
        get_latest_by = "commanded_at"

The compound index on (zone, sensor_type, -recorded_at) makes the "latest reading per zone per sensor type" query — which the control loop runs every 10 seconds — a fast index scan rather than a table scan.

5. MQTT Subscriber: Django Management Command

Rather than a standalone script, the MQTT subscriber runs as a Django management command. This gives it full access to the ORM, settings, and the channel layer without any import gymnastics.

# greenhouse/management/commands/mqttsubscriber.py
import json
import logging
import signal
import sys

import paho.mqtt.client as mqtt
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from django.conf import settings
from django.core.management.base import BaseCommand

from greenhouse.models import ClimateZone, SensorReading

logger = logging.getLogger(__name__)

TOPIC_PREFIX = "greenhouse/zone/"


class Command(BaseCommand):
    help = "Subscribe to MQTT broker and persist sensor readings"

    def handle(self, *args, **kwargs):
        client = mqtt.Client(client_id="django-subscriber", clean_session=False)
        client.on_connect    = self._on_connect
        client.on_message    = self._on_message
        client.on_disconnect = self._on_disconnect

        broker = settings.MQTT_BROKER
        port   = getattr(settings, "MQTT_PORT", 1883)
        client.connect(broker, port, keepalive=60)

        # Graceful shutdown on SIGTERM (systemd stop, Kubernetes pod eviction, etc.)
        def _shutdown(sig, frame):
            self.stdout.write("Shutting down MQTT subscriber…")
            client.disconnect()
            sys.exit(0)

        signal.signal(signal.SIGTERM, _shutdown)
        signal.signal(signal.SIGINT,  _shutdown)

        client.loop_forever()

    def _on_connect(self, client, userdata, flags, rc):
        logger.info("Connected to MQTT broker (rc=%d)", rc)
        client.subscribe("greenhouse/#", qos=1)

    def _on_disconnect(self, client, userdata, rc):
        if rc != 0:
            logger.warning("Unexpected MQTT disconnect (rc=%d) — will auto-reconnect", rc)

    def _on_message(self, client, userdata, msg):
        try:
            # Topic format: greenhouse/zone/{zone_id}/{sensor_type}
            parts = msg.topic.split("/")
            if len(parts) != 4 or parts[2] == "actuators":
                return

            _, _, zone_id, sensor_type = parts
            payload = json.loads(msg.payload.decode())
            value   = float(payload["value"])
            node_id = payload.get("node", "")

            zone = ClimateZone.objects.get(id=zone_id)
            reading = SensorReading.objects.create(
                zone=zone, sensor_type=sensor_type, value=value, node_id=node_id
            )

            # Push raw reading to channel layer for live dashboard
            channel_layer = get_channel_layer()
            async_to_sync(channel_layer.group_send)(
                f"greenhouse_zone_{zone_id}",
                {
                    "type": "sensor.reading",
                    "payload": {
                        "zone_id":     zone_id,
                        "sensor_type": sensor_type,
                        "value":       value,
                        "timestamp":   reading.recorded_at.isoformat(),
                    },
                },
            )

        except ClimateZone.DoesNotExist:
            logger.warning("No zone with id=%s — ignoring message on %s", zone_id, msg.topic)
        except (KeyError, ValueError, json.JSONDecodeError) as exc:
            logger.warning("Bad payload on %s: %s", msg.topic, exc)

Run it as a systemd service alongside Django:

# /etc/systemd/system/greenhouse-mqtt.service
[Unit]
Description=Greenhouse MQTT Subscriber
After=network.target postgresql.service redis.service

[Service]
User=www-data
WorkingDirectory=/var/www/greenhouse
EnvironmentFile=/var/www/greenhouse/.env
ExecStart=/var/www/greenhouse/.venv/bin/python manage.py mqttsubscriber
Restart=on-failure
RestartSec=5

[Install]
WantedBy=multi-user.target

6. PID Controller in Python

A PID (Proportional–Integral–Derivative) controller computes a corrective output from three terms:

  • P — proportional to the current error (how far off we are right now)
  • I — proportional to the accumulated error over time (corrects persistent offsets)
  • D — proportional to the rate of error change (dampens overshoot)

For greenhouse climate control the derivative term is usually kept small or disabled — sensor noise amplifies it into jittery actuator commands. The implementation below clamps the output to [0, 1] and applies anti-windup on the integral to prevent runaway when an actuator is saturated.

# greenhouse/pid.py
import time
from dataclasses import dataclass, field


@dataclass
class PIDController:
    kp: float               # Proportional gain
    ki: float               # Integral gain
    kd: float = 0.0         # Derivative gain (often 0 for climate)
    setpoint: float = 0.0
    output_min: float = 0.0
    output_max: float = 1.0
    integral_limit: float = 10.0  # anti-windup clamp

    _integral:  float = field(default=0.0, init=False, repr=False)
    _last_error: float = field(default=0.0, init=False, repr=False)
    _last_time:  float = field(default_factory=time.monotonic, init=False, repr=False)

    def compute(self, measured: float) -> float:
        now  = time.monotonic()
        dt   = now - self._last_time
        if dt <= 0:
            dt = 1e-6

        error      = self.setpoint - measured
        self._integral = max(
            -self.integral_limit,
            min(self.integral_limit, self._integral + error * dt)
        )
        derivative = (error - self._last_error) / dt

        output = (self.kp * error) + (self.ki * self._integral) + (self.kd * derivative)
        output = max(self.output_min, min(self.output_max, output))

        self._last_error = error
        self._last_time  = now
        return round(output, 3)

    def reset(self):
        self._integral   = 0.0
        self._last_error = 0.0
        self._last_time  = time.monotonic()

Tuning the gains (kp, ki) is empirical — start with kp=0.1, ki=0.01 for temperature, observe the response on the dashboard, and increase kp until the system reacts promptly without overshooting. For a 10-second control loop interval the integral term accumulates slowly, so ki can be very small.

A practical tip: store PID gains in the ClimateZone model (add temp_kp, temp_ki fields) so they're adjustable from the Django admin without a code deploy.

7. Celery Beat: The Climate Control Loop

The control loop runs every 10 seconds via Celery Beat. It reads the latest sensor value per zone per variable, runs the PID, publishes an actuator command back to MQTT, saves the ActuatorState, and pushes a dashboard update.

# settings.py additions
from celery.schedules import crontab

MQTT_BROKER = env("MQTT_BROKER", default="localhost")
MQTT_PORT   = env.int("MQTT_PORT", default=1883)

CELERY_BEAT_SCHEDULE = {
    "greenhouse-control-loop": {
        "task":     "greenhouse.tasks.run_climate_control",
        "schedule": 10.0,   # every 10 seconds
    },
}
# greenhouse/tasks.py
import json
import logging

import paho.mqtt.publish as mqtt_publish
from asgiref.sync import async_to_sync
from celery import shared_task
from channels.layers import get_channel_layer
from django.conf import settings
from django.utils import timezone

from .models import ClimateZone, SensorReading, ActuatorState
from .pid import PIDController

logger = logging.getLogger(__name__)

# In-memory PID instances keyed by (zone_id, variable)
# These persist across Celery task invocations within the same worker process,
# preserving integral and derivative state between ticks.
_pid_registry: dict = {}

def _get_pid(zone: ClimateZone, variable: str) -> PIDController:
    key = (zone.id, variable)
    if key not in _pid_registry:
        setpoints = {
            "temperature":   (zone.temp_setpoint,     0.15, 0.02),
            "humidity":      (zone.humidity_setpoint,  0.10, 0.01),
            "co2":           (zone.co2_setpoint,       0.05, 0.005),
            "soil_moisture": (zone.soil_setpoint,      0.20, 0.02),
        }
        sp, kp, ki = setpoints[variable]
        _pid_registry[key] = PIDController(kp=kp, ki=ki, setpoint=sp)
    return _pid_registry[key]


def _latest_reading(zone: ClimateZone, sensor_type: str) -> float | None:
    reading = (
        SensorReading.objects
        .filter(zone=zone, sensor_type=sensor_type)
        .values("value")
        .first()
    )
    return reading["value"] if reading else None


VARIABLE_TO_ACTUATOR = {
    "temperature":   ActuatorState.ACTUATOR_VENT,
    "humidity":      ActuatorState.ACTUATOR_IRRIGATION,
    "soil_moisture": ActuatorState.ACTUATOR_IRRIGATION,
    # CO₂ is typically passive (ventilation-driven) — no dedicated actuator here
}


@shared_task
def run_climate_control():
    channel_layer = get_channel_layer()
    messages = []  # batch MQTT publishes

    for zone in ClimateZone.objects.select_related("greenhouse").all():
        zone_updates = {}

        for variable, actuator_type in VARIABLE_TO_ACTUATOR.items():
            value = _latest_reading(zone, variable)
            if value is None:
                logger.warning("No reading for zone %s / %s — skipping", zone.id, variable)
                continue

            pid    = _get_pid(zone, variable)
            output = pid.compute(value)
            error  = pid.setpoint - value

            ActuatorState.objects.create(
                zone=zone,
                actuator_type=actuator_type,
                output=output,
                pid_error=error,
            )

            topic   = f"greenhouse/zone/{zone.id}/actuators/{actuator_type}"
            payload = json.dumps({"output": output, "zone": str(zone.id)})
            messages.append({"topic": topic, "payload": payload, "qos": 1})

            zone_updates[actuator_type] = {"output": output, "error": round(error, 2)}

            # Danger threshold check
            _check_thresholds(zone, variable, value)

        # Push actuator state update to dashboard WebSocket
        if zone_updates:
            async_to_sync(channel_layer.group_send)(
                f"greenhouse_zone_{zone.id}",
                {
                    "type": "actuator.update",
                    "payload": {
                        "zone_id":  str(zone.id),
                        "actuators": zone_updates,
                        "timestamp": timezone.now().isoformat(),
                    },
                },
            )

    if messages:
        mqtt_publish.multiple(
            messages,
            hostname=settings.MQTT_BROKER,
            port=settings.MQTT_PORT,
        )


def _check_thresholds(zone: ClimateZone, variable: str, value: float):
    limits = {
        "temperature":   (zone.temp_min,  zone.temp_max),
        "humidity":      (None,           zone.humidity_max),
        "co2":           (None,           zone.co2_max),
        "soil_moisture": (None,           None),
    }
    lo, hi = limits.get(variable, (None, None))
    if hi is not None and value > hi:
        logger.error("DANGER: %s / %s = %.1f exceeds max %.1f", zone, variable, value, hi)
    if lo is not None and value < lo:
        logger.error("DANGER: %s / %s = %.1f below min %.1f", zone, variable, value, lo)

The _pid_registry dict lives in the Celery worker process memory. As long as you run a single worker (fine for one greenhouse), integral state persists correctly between ticks. If you scale to multiple workers, move PID state to Redis using Redis hashes — store integral and last_error per (zone_id, variable) key and fetch/update atomically with a Lua script.

8. Django Channels: Live Sensor Dashboard

The management command and the Celery control loop both push to the channel layer. The WebSocket consumer simply funnels those messages to the browser.

# greenhouse/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer

class GreenhouseConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        zone_id   = self.scope["url_route"]["kwargs"]["zone_id"]
        self.group = f"greenhouse_zone_{zone_id}"
        await self.channel_layer.group_add(self.group, self.channel_name)
        await self.accept()

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(self.group, self.channel_name)

    async def sensor_reading(self, event):
        await self.send(text_data=json.dumps({"type": "reading", **event["payload"]}))

    async def actuator_update(self, event):
        await self.send(text_data=json.dumps({"type": "actuator", **event["payload"]}))
# greenhouse/routing.py
from django.urls import path
from .consumers import GreenhouseConsumer

websocket_urlpatterns = [
    path("ws/greenhouse/zone/<int:zone_id>/", GreenhouseConsumer.as_asgi()),
]

Dashboard JavaScript — a minimal live gauge update loop:

<!-- greenhouse/templates/greenhouse/dashboard.html -->
<div id="gauges"></div>
<div id="actuators"></div>

<script>
const zoneId = {{ zone.id }};
const ws = new WebSocket(
  (location.protocol === "https:" ? "wss://" : "ws://") + location.host +
  `/ws/greenhouse/zone/${zoneId}/`
);

const gauges    = {};
const actuators = {};

ws.onmessage = function (event) {
  const msg = JSON.parse(event.data);

  if (msg.type === "reading") {
    const id = `gauge-${msg.sensor_type}`;
    if (!gauges[id]) {
      const el = document.createElement("div");
      el.id        = id;
      el.className = "gauge";
      document.getElementById("gauges").appendChild(el);
      gauges[id] = el;
    }
    gauges[id].innerHTML = `
      <span class="gauge-label">${msg.sensor_type.replace("_", " ")}</span>
      <span class="gauge-value">${msg.value}</span>
      <time>${new Date(msg.timestamp).toLocaleTimeString()}</time>
    `;
  }

  if (msg.type === "actuator") {
    const el = document.getElementById("actuators");
    el.innerHTML = Object.entries(msg.actuators)
      .map(([name, state]) => `
        <div class="actuator ${state.output > 0.5 ? 'on' : 'off'}">
          ${name} — ${Math.round(state.output * 100)}%
          <small>error ${state.error > 0 ? "+" : ""}${state.error}</small>
        </div>
      `).join("");
  }
};

ws.onclose = () => setTimeout(() => location.reload(), 3000);
</script>

9. Actuator Control: GPIO and Relay Boards

The actuator node subscribes to the greenhouse/zone/{id}/actuators/# topics and drives GPIO pins connected to a relay board. A 4-channel relay module (SRD-05VDC-SL-C) handles mains-voltage vents and pumps from 3.3V logic.

# actuator_node/controller.py  (runs on Raspberry Pi 3B)
import os
import json
import logging
import paho.mqtt.client as mqtt
import RPi.GPIO as GPIO

BROKER  = os.environ["MQTT_BROKER"]
ZONE_ID = os.environ["ZONE_ID"]

# Map actuator type → GPIO BCM pin
PIN_MAP = {
    "vent":       17,
    "irrigation": 27,
    "light":      22,
    "heater":     23,
}

logger = logging.getLogger(__name__)

GPIO.setmode(GPIO.BCM)
for pin in PIN_MAP.values():
    GPIO.setup(pin, GPIO.OUT, initial=GPIO.LOW)


def on_connect(client, userdata, flags, rc):
    logger.info("Actuator node connected (rc=%d)", rc)
    client.subscribe(f"greenhouse/zone/{ZONE_ID}/actuators/#", qos=1)


def on_message(client, userdata, msg):
    # Topic: greenhouse/zone/{zone_id}/actuators/{actuator_type}
    actuator_type = msg.topic.split("/")[-1]
    pin = PIN_MAP.get(actuator_type)
    if pin is None:
        return

    try:
        payload = json.loads(msg.payload.decode())
        output  = float(payload["output"])
        # Binary relay: on if output > 0.5, off otherwise
        GPIO.output(pin, GPIO.HIGH if output > 0.5 else GPIO.LOW)
        logger.info("%s → %s (output=%.2f)", actuator_type, "ON" if output > 0.5 else "OFF", output)
    except (KeyError, ValueError, json.JSONDecodeError) as exc:
        logger.warning("Bad actuator payload: %s", exc)


client = mqtt.Client(client_id=f"actuator-node-zone-{ZONE_ID}")
client.on_connect = on_connect
client.on_message = on_message
client.connect(BROKER, 1883, keepalive=60)
client.loop_forever()

For PWM actuators (variable-speed fans, dimmable lights) replace the binary GPIO.output call with GPIO.PWM and set the duty cycle to output * 100. The PID output is already scaled 0–1, so the mapping is direct.

10. Production Checklist

MQTT security

If sensor nodes connect over the internet (e.g. multiple greenhouse sites to a central server), enable TLS on port 8883 and require per-device passwords:

# /etc/mosquitto/conf.d/tls.conf
listener 8883
cafile   /etc/mosquitto/certs/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile  /etc/mosquitto/certs/server.key
require_certificate true
password_file /etc/mosquitto/passwd
mosquitto_passwd -c /etc/mosquitto/passwd sensor-node-1

Pass credentials in paho: client.username_pw_set("sensor-node-1", "secret") and set client.tls_set(ca_certs="ca.crt").

Failsafe logic on the actuator node

If the MQTT broker goes silent for more than 30 seconds, the actuator node should fall back to a safe state — close vents to midpoint, turn irrigation off, leave grow lights on timer. Never leave actuators in their last commanded state indefinitely during a network outage.

# actuator_node/controller.py — add a watchdog
import threading

WATCHDOG_TIMEOUT = 30  # seconds

def _failsafe():
    logger.error("Watchdog timeout — entering failsafe state")
    GPIO.output(PIN_MAP["vent"],       GPIO.LOW)
    GPIO.output(PIN_MAP["irrigation"], GPIO.LOW)
    # Keep light on last known state or switch to timer logic

watchdog = threading.Timer(WATCHDOG_TIMEOUT, _failsafe)
watchdog.start()

def on_message(client, userdata, msg):
    global watchdog
    watchdog.cancel()
    watchdog = threading.Timer(WATCHDOG_TIMEOUT, _failsafe)
    watchdog.start()
    # ... rest of handler

Sensor drift and calibration

DHT22 sensors drift over months. Store a calibration_offset per sensor node in the database and apply it in on_message before writing the SensorReading. Calibrate quarterly against a reference thermometer. CO₂ sensors (MH-Z19B) have an auto-calibration mode that assumes the lowest reading in 24 hours is 400 ppm — disable this in sealed greenhouses where CO₂ is enriched.

Data retention

At a 10-second interval across 4 sensors and 3 zones you generate ~4,300 rows per hour — about 100k rows per day. Add a Celery Beat task to downsample and purge:

# greenhouse/tasks.py
from datetime import timedelta
from django.utils import timezone

@shared_task
def purge_old_readings():
    cutoff = timezone.now() - timedelta(days=90)
    deleted, _ = SensorReading.objects.filter(recorded_at__lt=cutoff).delete()
    logger.info("Purged %d old sensor readings", deleted)

For long-term trend analysis, downsample to hourly averages before purging the raw rows, or consider TimescaleDB (a PostgreSQL extension) which handles time-series partitioning and continuous aggregates natively.

Alerting

The _check_thresholds function in the Celery task currently just logs. In production, hook it into an alerting channel — send an email via Django's mail_admins(), push a notification via ntfy.sh (self-hosted, free), or POST to a Slack webhook. Debounce alerts with a Redis key so you don't send 360 emails per hour for a stuck sensor.

PID gains per crop stage

Tomatoes in the seedling stage need different temperature setpoints from fruiting plants. Add a GrowthStage model linked to ClimateZone with start/end dates and overriding setpoints. The control loop checks the current active stage before fetching the setpoint — no code change needed for seasonal transitions.