TractorOS — Deel 3: Enterprise Use Case

🖋️ bert

# Van Kubota Boordcomputer naar Fleet Management Platform

Vereisten: Deel 1 (basisarchitectuur, patterns, RAII) en Deel 2 (templates, FreeRTOS, Event Bus, OTA) afgerond. Wat dit deel is: Geen abstracte cursus meer, maar een concrete, end-to-end uitgewerkte use case. We bouwen het complete systeem: de ESP32-firmware, het Java Spring Boot backend, de bedrijfsregellaag, en de integratiepunten daartussen. Perspectief: Enterprise software architect die een IoT-platform bouwt met de ESP32 als "edge device" in een groter ecosysteem.


# De Use Case: Vandaag Oogsten, Morgen Factureren

Het bedrijf: Loonbedrijf Van der Berg — vijf Kubota-tractoren die werken voor derden.

Het probleem:

  • Chauffeurs schrijven handmatig uren en brandstofverbruik op — fouten, fraude-risico, papierwerk.
  • Klanten willen real-time inzicht: "Is mijn akker al klaar?"
  • De accountant wil automatische factuurregels op basis van motoruren en hectares.
  • De monteur wil voorspellend onderhoud: "Wissel filter voordat het kapotgaat."

De oplossing: Elke tractor krijgt een ESP32-boordcomputer. De firmware stuurt telemetrie naar een Java Spring Boot backend. Het backend verwerkt bedrijfsregels, slaat op in PostgreSQL/TimescaleDB, en exposeert REST + WebSocket APIs voor een React dashboard en een boekhoudkoppeling.


# Systeemoverzicht

+--------------------------------------------------------------+
|                  KUBOTA TRACTOR (Edge)                       |
|                                                              |
|  GPS (NEO-M8)  DS18B20 Temp  INA226 Stroom  Rotary RPM      |
|      |              |              |              |          |
|      +--------------------------------------------+          |
|                          |                                   |
|               ESP32 TractorOS (onze firmware)                |
|                          |                                   |
+--------------------------|-----------------------------------+
                           | 4G/LTE (primair) / LoRa (fallback)
                    +------+-------+
                    | MQTT Broker  |
                    | (Mosquitto)  |
                    +------+-------+
                           |
+--------------------------|-----------------------------------+
|              BACKEND (Java Spring Boot)                      |
|                                                              |
|   MQTT Consumer  -->  TelemetryService  -->  RuleEngine      |
|                               |                              |
|            PostgreSQL + TimescaleDB                          |
|    (tractors / sessions / telemetry / alerts / invoices)     |
|                                                              |
|   REST API    WebSocket (live)    Webhook (boekhouding)      |
+----+-------------------+--------------------+----------------+
     |                   |                    |
  React             Mobiele app          Exact Online /
  Dashboard         Klanten-portaal      Twinfield

# Inhoudsopgave

  1. Het communicatieprotocol: MQTT over TLS
  2. ESP32 firmware: de productie-upgrade
  3. Het binaire protocol: Protobuf Lite met Nanopb
  4. Java Spring Boot: de backend architectuur
  5. De Business Rule Engine
  6. TimescaleDB: tijdreeksdata op schaal
  7. Real-time dashboard via WebSocket
  8. Voorspellend onderhoud: anomaly detection
  9. De factuurkoppeling: Exact Online
  10. Deployment: Docker Compose naar Kubernetes
  11. Security: end-to-end beveiliging
  12. De complete data flow: van sensor naar factuur

# 1. Het Communicatieprotocol: MQTT over TLS

# Waarom MQTT en niet REST?

Als Java-architect is je eerste impuls misschien een REST API: de ESP32 doet een POST /api/telemetry. Dat werkt, maar heeft kritieke nadelen voor een tractor op een akker:

REST (HTTP) MQTT
Verbindingsoverhead Elke call = TCP handshake Een persistent connectie
Pakketgrootte HTTP headers: ~200-500 bytes MQTT header: 2 bytes
Geen verbinding Request faalt hard QoS 1: automatische retry na herverbinding
Bidirectioneel Polling vereist Ingebouwd: server pusht config updates
4G datakosten Hoog Laag — essentieel bij datalimiet

MQTT is het standaardprotocol voor IoT om precies deze redenen. Qua patronen vergelijkbaar met een message broker (Kafka/RabbitMQ), maar geoptimaliseerd voor constrained devices.

# 1.1 MQTT Concepten voor Java-architecten

MQTT Concept          <-->  Bekende equivalent
Topic                 <-->  Kafka topic / JMS destination
Publish               <-->  producer.send()
Subscribe             <-->  consumer.subscribe()
QoS 0                 <-->  fire-and-forget (UDP-achtig)
QoS 1                 <-->  at-least-once (met ACK)
Retained message      <-->  Kafka log compaction: laatste waarde bewaard
Last Will & Testament <-->  onCloseListener die altijd vuurt, ook bij crash

# 1.2 Topic-structuur ontwerp

Een goed topic-schema is het equivalent van je REST API URL-structuur. Eenmaal in productie moeilijk te wijzigen:

tractoros/
+-- {tractorId}/
|   +-- telemetry          <- Sensordata (hoge frequentie, QoS 0)
|   +-- telemetry/gps      <- GPS positie apart (hoge frequentie)
|   +-- alerts             <- Kritieke meldingen (QoS 1)
|   +-- status             <- Online/offline, firmware versie (retained)
|   +-- config/request     <- Backend vraagt config update
|   +-- config/response    <- ESP32 bevestigt config ontvangst
|
+-- fleet/
    +-- broadcast          <- Bericht aan ALLE tractoren
    +-- ota/notify         <- OTA update beschikbaar

# 1.3 MQTT Transport voor de ESP32

We voegen MqttTransport toe als vierde ITransport implementatie. De interface uit deel 1 blijft ongewijzigd:

// src/transport/MqttTransport.h
#pragma once
#include "interfaces/ITransport.h"
#include <WiFiClientSecure.h>
#include <PubSubClient.h>
#include <functional>
#include "diagnostics/Logger.h"

namespace TractorOS {

class MqttTransport : public ITransport {
public:
    struct Config {
        const char* brokerHost;
        uint16_t    brokerPort  = 8883;   // 8883 = MQTT over TLS
        const char* clientId;             // Uniek per tractor: "TRACTOR_001"
        const char* username;
        const char* password;
        const char* caCert;               // Root CA voor TLS verificatie
        const char* clientCert;           // Client certificate voor mTLS
        const char* clientKey;            // Private key
        uint16_t    keepAliveSeconds = 30;
    };

    using MessageCallback = std::function<void(const char* topic,
                                               const uint8_t* payload,
                                               size_t length)>;

    explicit MqttTransport(const Config& cfg)
        : _cfg(cfg), _wifiClient(), _client(_wifiClient) {

        _client.setServer(cfg.brokerHost, cfg.brokerPort);
        _client.setCallback([this](char* topic, uint8_t* payload, unsigned int len) {
            if (_messageCallback) _messageCallback(topic, payload, len);
        });
        _client.setKeepAlive(cfg.keepAliveSeconds);
        _client.setBufferSize(1024);
    }

    bool connect() override {
        // TLS configureren (mutual TLS: client verifieert server EN omgekeerd)
        _wifiClient.setCACert(_cfg.caCert);
        _wifiClient.setCertificate(_cfg.clientCert);
        _wifiClient.setPrivateKey(_cfg.clientKey);

        // Last Will Testament: broker publiceert dit als verbinding wegvalt
        const char* willTopic = _buildTopic("status");
        const char* willMsg   = "{\"online\":false,\"reason\":\"unexpected_disconnect\"}";

        bool ok = _client.connect(
            _cfg.clientId, _cfg.username, _cfg.password,
            willTopic, 1, true, willMsg
        );

        if (ok) {
            LOG_I("MQTT", "Verbonden met %s", _cfg.brokerHost);
            _publishStatus(true);
            // Abonneer op inkomende commando's en config updates
            _client.subscribe(_buildTopic("config/request"), 1);
            _client.subscribe("fleet/broadcast", 1);
            _client.subscribe("fleet/ota/notify", 1);
        } else {
            LOG_E("MQTT", "Verbinding mislukt, state: %d", _client.state());
        }
        return ok;
    }

    void disconnect() override {
        _publishStatus(false);
        _client.disconnect();
    }

    // Stuur naar het default telemetry topic (QoS 0)
    bool send(const uint8_t* data, size_t length) override {
        return sendToTopic("telemetry", data, length, 0, false);
    }

    bool sendToTopic(const char* subtopic, const uint8_t* data, size_t length,
                     uint8_t qos = 0, bool retain = false) {
        if (!isConnected()) return false;
        return _client.publish(_buildTopic(subtopic), data, length, retain);
    }

    // MQTT is callback-gebaseerd; receive() doet de keepalive loop
    bool receive(uint8_t* buffer, size_t& bytesRead, size_t maxLen) override {
        _client.loop();
        bytesRead = 0;
        return false;
    }

    bool isConnected() const override { return _client.connected(); }
    const char* getName() const override { return "MQTT/TLS"; }

    void onMessage(MessageCallback cb) { _messageCallback = cb; }

    // Aanroepen in de main loop voor MQTT keepalive en herverbinding
    void loop() {
        if (!isConnected()) {
            LOG_W("MQTT", "Verbinding verloren, herverbinden...");
            connect();
            return;
        }
        _client.loop();
    }

private:
    void _publishStatus(bool online) {
        char payload[128];
        snprintf(payload, sizeof(payload),
            "{\"online\":%s,\"ip\":\"%s\",\"rssi\":%d}",
            online ? "true" : "false",
            WiFi.localIP().toString().c_str(),
            WiFi.RSSI());
        sendToTopic("status",
                    reinterpret_cast<const uint8_t*>(payload),
                    strlen(payload), 1, true);
    }

    const char* _buildTopic(const char* subtopic) {
        snprintf(_topicBuffer, sizeof(_topicBuffer),
                 "tractoros/%s/%s", _cfg.clientId, subtopic);
        return _topicBuffer;
    }

    Config           _cfg;
    WiFiClientSecure _wifiClient;
    PubSubClient     _client;
    MessageCallback  _messageCallback;
    char             _topicBuffer[128];
};

} // namespace TractorOS

# 2. ESP32 Firmware: de Productie-Upgrade

# 2.1 De CommandHandler: bidirectionele communicatie

De backend kan nu commando's sturen naar de tractor. Dit is fundamenteel anders dan een REST-model: de server initieert de communicatie.

// src/core/CommandHandler.h
#pragma once
#include <ArduinoJson.h>
#include "transport/MqttTransport.h"
#include "config/ConfigManager.h"
#include "ota/OtaManager.h"
#include "events/EventBus.h"

namespace TractorOS {

class CommandHandler {
public:
    CommandHandler(MqttTransport& transport,
                   ConfigManager& configManager,
                   OtaManager& otaManager)
        : _transport(transport)
        , _configManager(configManager)
        , _otaManager(otaManager) {

        transport.onMessage([this](const char* topic,
                                   const uint8_t* payload, size_t length) {
            _handleMessage(topic, payload, length);
        });
    }

private:
    void _handleMessage(const char* topic, const uint8_t* payload, size_t length) {
        StaticJsonDocument<512> doc;
        if (deserializeJson(doc, payload, length) != DeserializationError::Ok) return;

        const char* command = doc["command"];
        if (!command) return;

        LOG_I("CMD", "Commando ontvangen: %s", command);

        if      (strcmp(command, "update_config") == 0)
            _handleConfigUpdate(doc["payload"]);
        else if (strcmp(command, "ota_update") == 0)
            _handleOtaCommand(doc["payload"]);
        else if (strcmp(command, "request_diagnostics") == 0)
            _handleDiagnosticsRequest();
        else if (strcmp(command, "set_telemetry_interval") == 0)
            _handleIntervalChange(doc["payload"]["interval_ms"].as<uint32_t>());
        else
            LOG_W("CMD", "Onbekend commando: %s", command);
    }

    void _handleConfigUpdate(const JsonVariantConst& payload) {
        auto config = _configManager.load();
        if (payload.containsKey("max_engine_temp"))
            config.maxEngineTemp = payload["max_engine_temp"].as<float>();
        if (payload.containsKey("min_fuel_level"))
            config.minFuelLevel = payload["min_fuel_level"].as<float>();

        auto result = _configManager.save(config);
        _sendAck(result.isOk() ? "config_updated" : "config_failed", result.isOk());
    }

    void _handleOtaCommand(const JsonVariantConst& payload) {
        const char* url = payload["url"];
        if (!url) { _sendAck("ota_rejected", false); return; }
        _sendAck("ota_started", true);
        OtaManager::Config cfg;
        cfg.updateUrl  = url;
        cfg.serverCert = payload["cert"] | "";
        _otaManager.checkAndUpdate();
    }

    void _handleDiagnosticsRequest() {
        char report[256];
        snprintf(report, sizeof(report),
            "{\"heap_free\":%u,\"heap_min\":%u,\"uptime_ms\":%u,\"rssi\":%d}",
            ESP.getFreeHeap(), ESP.getMinFreeHeap(), millis(), WiFi.RSSI());
        _transport.sendToTopic("diagnostics",
                               reinterpret_cast<const uint8_t*>(report),
                               strlen(report), 1);
    }

    void _handleIntervalChange(uint32_t intervalMs) {
        if (intervalMs < 100 || intervalMs > 60000) {
            _sendAck("interval_rejected", false); return;
        }
        EventBus::instance().publish(
            ConfigChangedEvent{"telemetry_interval", String(intervalMs).c_str()});
        _sendAck("interval_updated", true);
    }

    void _sendAck(const char* status, bool success) {
        char payload[128];
        snprintf(payload, sizeof(payload),
            "{\"status\":\"%s\",\"success\":%s,\"ts\":%u}",
            status, success ? "true" : "false", millis());
        _transport.sendToTopic("config/response",
                               reinterpret_cast<const uint8_t*>(payload),
                               strlen(payload), 1);
    }

    MqttTransport& _transport;
    ConfigManager& _configManager;
    OtaManager&    _otaManager;
};

} // namespace TractorOS

# 2.2 GPS & Werksessie-detectie op de edge

De tractor rijdt over een akker. We detecteren automatisch of de tractor aan het werk is (PTO actief, lage snelheid) of aan het rijden. Dit is de basis voor automatische urenregistratie — bedrijfslogica op de edge.

// src/core/WorkSessionDetector.h
#pragma once
#include "events/EventBus.h"
#include "containers/CircularBuffer.h"
#include <cmath>

namespace TractorOS {

struct WorkSessionStartedEvent {
    uint32_t startTimestamp;
    float    startLat;
    float    startLon;
};

struct WorkSessionEndedEvent {
    uint32_t startTimestamp;
    uint32_t endTimestamp;
    uint32_t durationSeconds;
    float    estimatedHectares;
    float    fuelUsedLiters;
};

class WorkSessionDetector {
public:
    void update(float speedKmh, bool ptoActive, float rpm,
                float lat, float lon) {

        bool shouldWork = (ptoActive && speedKmh < 15.0f && rpm > 800.0f);

        if (_state == State::IDLE && shouldWork) {
            _transitionToWorking(lat, lon);
        } else if (_state == State::WORKING && !shouldWork) {
            _transitionToIdle();
        } else if (_state == State::WORKING) {
            _maybeAddTrackPoint(lat, lon);
        }
    }

private:
    enum class State { IDLE, WORKING };

    void _transitionToWorking(float lat, float lon) {
        _state = State::WORKING;
        _sessionStart = millis();
        _trackPoints.push({lat, lon});
        EventBus::instance().publish(
            WorkSessionStartedEvent{_sessionStart / 1000, lat, lon});
        LOG_I("WorkDetector", "Sessie gestart op %.6f, %.6f", lat, lon);
    }

    void _transitionToIdle() {
        _state = State::IDLE;
        uint32_t durationMs = millis() - _sessionStart;
        float hectares = _calculateHectares();
        EventBus::instance().publish(WorkSessionEndedEvent{
            _sessionStart / 1000, millis() / 1000,
            durationMs / 1000, hectares, 0.0f
        });
        LOG_I("WorkDetector", "Sessie beeindigd: %.2f ha, %us",
              hectares, durationMs / 1000);
    }

    void _maybeAddTrackPoint(float lat, float lon) {
        if (!_trackPoints.isEmpty()) {
            const auto* last = _trackPoints.peek();
            if (_haversineMeters(last->lat, last->lon, lat, lon) < 10.0f) return;
        }
        if (_trackPoints.isFull()) {
            GpsPoint dummy; _trackPoints.pop(dummy);
        }
        _trackPoints.push({lat, lon});
    }

    // Shoelace-algoritme voor oppervlak berekening
    float _calculateHectares() const {
        if (_trackPoints.size() < 3) return 0.0f;
        double area = 0.0;
        float prevLat = 0, prevLon = 0;
        bool first = true;
        for (const auto& pt : _trackPoints) {
            if (!first)
                area += (double)(prevLon + pt.lon) * (double)(prevLat - pt.lat);
            prevLat = pt.lat; prevLon = pt.lon; first = false;
        }
        float areaM2 = std::abs((float)area) * 0.5f * 111320.0f * 111320.0f;
        return areaM2 / 10000.0f;
    }

    float _haversineMeters(float lat1, float lon1, float lat2, float lon2) const {
        constexpr float R = 6371000.0f;
        float dLat = (lat2 - lat1) * M_PI / 180.0f;
        float dLon = (lon2 - lon1) * M_PI / 180.0f;
        float a = sinf(dLat/2)*sinf(dLat/2) +
                  cosf(lat1*M_PI/180.0f)*cosf(lat2*M_PI/180.0f)*
                  sinf(dLon/2)*sinf(dLon/2);
        return R * 2.0f * atan2f(sqrtf(a), sqrtf(1.0f-a));
    }

    struct GpsPoint { float lat; float lon; };

    State    _state = State::IDLE;
    uint32_t _sessionStart = 0;
    CircularBuffer<GpsPoint, 512> _trackPoints; // 5km track bij 10m sampling
};

} // namespace TractorOS

# 3. Het Binaire Protocol: Protobuf Lite met Nanopb

JSON over MQTT is leesbaar maar duur: elke float kost ~10 bytes als tekst versus 4 bytes binair. Bij een 4G-datalimiet telt dit op over een vloot van vijf tractoren.

We gebruiken Nanopb — een C implementatie van Protocol Buffers geschikt voor embedded.

# 3.1 De gedeelde .proto definitie

// proto/tractoros.proto
// Dit bestand is de "shared schema" tussen embedded en backend.
// Vergelijk: een gedeelde DTO-bibliotheek in een microservices-architectuur.

syntax = "proto3";
package tractoros;

// Hoog-frequentie telemetrie (elke seconde)
message TelemetryFrame {
    uint32 timestamp_unix    = 1;
    uint32 sequence_number   = 2;

    // Positie — integers voorkomen float-precisieverlies
    sint32 lat_e7            = 3;  // Latitude  * 1e7
    sint32 lon_e7            = 4;  // Longitude * 1e7
    uint32 speed_cm_s        = 5;

    // Motor
    uint32 engine_temp_c10   = 6;  // Celsius * 10 (1 decimaal)
    uint32 rpm               = 7;
    bool   pto_active        = 8;

    // Brandstof
    uint32 fuel_level_pct10  = 9;  // Percentage * 10 (0-1000)
    uint32 fuel_rate_ml_h    = 10;

    // Systeem
    int32  rssi_dbm          = 11;
    uint32 free_heap_bytes   = 12;
    uint32 uptime_seconds    = 13;
}

// Wrapper voor meerdere frames in een MQTT-packet (batching)
message TelemetryBatch {
    string  tractor_id               = 1;
    repeated TelemetryFrame frames   = 2;
}

Het slimste architectuurbesluit: dit bestand is gedeeld. Nanopb genereert C-headers voor de ESP32; protoc genereert Java-klassen voor Spring Boot. Als je een veld toevoegt, compileert de firmware NIET totdat je het ook in de encoder verwerkt. Het type-systeem vangt protocol-incompatibiliteiten voor deployment.

# 3.2 Nanopb encoder in de firmware

// src/protocol/TelemetryEncoder.h
#pragma once
#include <pb_encode.h>
#include "tractoros.pb.h"   // Gegenereerd door nanopb
#include "diagnostics/Logger.h"

namespace TractorOS {

class TelemetryEncoder {
public:
    static constexpr size_t MAX_FRAME_SIZE = 64;

    static size_t encodeFrame(const tractoros_TelemetryFrame& frame,
                               uint8_t* buffer, size_t bufferSize) {
        pb_ostream_t stream = pb_ostream_from_buffer(buffer, bufferSize);
        if (!pb_encode(&stream, tractoros_TelemetryFrame_fields, &frame)) {
            LOG_E("Encoder", "Protobuf encode fout: %s", PB_GET_ERROR(&stream));
            return 0;
        }
        return stream.bytes_written;
        // Typische framegrootte: 30-50 bytes vs ~160 bytes JSON
    }

    static tractoros_TelemetryFrame buildFrame(
        uint32_t seq, float lat, float lon,
        float speedKmh, float engineTempC,
        uint16_t rpm, bool ptoActive, float fuelLevelPct)
    {
        tractoros_TelemetryFrame frame = tractoros_TelemetryFrame_init_zero;
        frame.timestamp_unix    = time(nullptr);
        frame.sequence_number   = seq;
        frame.lat_e7            = static_cast<int32_t>(lat  * 1e7f);
        frame.lon_e7            = static_cast<int32_t>(lon  * 1e7f);
        frame.speed_cm_s        = static_cast<uint32_t>(speedKmh * 100.0f / 3.6f);
        frame.engine_temp_c10   = static_cast<uint32_t>(engineTempC  * 10.0f);
        frame.rpm               = rpm;
        frame.pto_active        = ptoActive;
        frame.fuel_level_pct10  = static_cast<uint32_t>(fuelLevelPct * 10.0f);
        frame.rssi_dbm          = WiFi.RSSI();
        frame.free_heap_bytes   = ESP.getFreeHeap();
        frame.uptime_seconds    = millis() / 1000;
        return frame;
    }

private:
    TelemetryEncoder() = delete;  // Puur statische klasse
};

} // namespace TractorOS

# 4. Java Spring Boot: de Backend Architectuur

# 4.1 Projectstructuur

tractor-platform/
+-- proto/
|   +-- tractoros.proto          <- Gedeeld met firmware
+-- backend/
|   +-- build.gradle
|   +-- src/main/java/nl/vandenberg/tractorplatform/
|       +-- TractorPlatformApplication.java
|       +-- domain/
|       |   +-- Tractor.java
|       |   +-- WorkSession.java
|       |   +-- TelemetryFrame.java
|       +-- mqtt/
|       |   +-- MqttConsumerConfig.java
|       |   +-- TelemetryMqttHandler.java
|       |   +-- CommandPublisher.java
|       +-- telemetry/
|       |   +-- TelemetryService.java
|       |   +-- TelemetryRepository.java
|       +-- business/
|       |   +-- WorkSessionService.java
|       |   +-- BusinessRuleEngine.java
|       |   +-- rules/
|       |       +-- BusinessRule.java
|       |       +-- EngineTemperatureRule.java
|       |       +-- MaintenanceIntervalRule.java
|       |       +-- GeofenceRule.java
|       +-- invoicing/
|       |   +-- InvoiceService.java
|       |   +-- ExactOnlineClient.java
|       +-- api/
|           +-- TractorController.java
|           +-- DashboardWebSocketHandler.java
+-- frontend/
    +-- React dashboard

# 4.2 MQTT Consumer met Spring Integration

// mqtt/TelemetryMqttHandler.java
@Slf4j
@Component
@RequiredArgsConstructor
public class TelemetryMqttHandler {

    private final TelemetryService telemetryService;

    /**
     * Spring Integration roept dit aan voor elk bericht op de geabonneerde topics.
     * Topic patroon: tractoros/{tractorId}/telemetry
     *
     * De architectuur spiegelt de ESP32 CommandHandler maar omgekeerd:
     * die verwerkte inkomende commando's, dit verwerkt inkomende telemetrie.
     */
    @ServiceActivator(inputChannel = "mqttInputChannel")
    public void handleMessage(Message<byte[]> message) {
        String topic   = (String) message.getHeaders().get("mqtt_receivedTopic");
        byte[] payload = message.getPayload();

        log.debug("MQTT ontvangen: {} ({} bytes)", topic, payload.length);

        try {
            String tractorId    = extractTractorId(topic);
            String messageType  = extractMessageType(topic);

            switch (messageType) {
                case "telemetry" -> handleTelemetry(tractorId, payload);
                case "alerts"    -> telemetryService.processAlert(tractorId, new String(payload));
                case "status"    -> telemetryService.updateTractorStatus(tractorId, new String(payload));
                default          -> log.warn("Onbekend message type: {}", messageType);
            }
        } catch (Exception e) {
            // Niet opnieuw gooien: Spring Integration zou het bericht anders
            // in een dead letter queue plaatsen en de channel blokkeren
            log.error("Fout bij verwerken van MQTT bericht op {}: {}", topic, e.getMessage());
        }
    }

    private void handleTelemetry(String tractorId, byte[] payload)
            throws InvalidProtocolBufferException {
        TelemetryBatch batch = TelemetryBatch.parseFrom(payload);
        log.debug("Tractor {}: {} frames ontvangen", tractorId, batch.getFramesCount());
        telemetryService.processBatch(tractorId, batch);
    }

    private String extractTractorId(String topic) {
        String[] parts = topic.split("/");
        return parts.length >= 2 ? parts[1] : "unknown";
    }

    private String extractMessageType(String topic) {
        String[] parts = topic.split("/");
        return parts.length >= 3 ? parts[2] : "unknown";
    }
}

# 4.3 TelemetryService: de verwerkingskern

// telemetry/TelemetryService.java
@Slf4j
@Service
@RequiredArgsConstructor
public class TelemetryService {

    private final TelemetryRepository        telemetryRepository;
    private final BusinessRuleEngine         businessRuleEngine;
    private final WorkSessionService         workSessionService;
    private final DashboardWebSocketHandler  dashboardHandler;

    @Transactional
    public void processBatch(String tractorId, TelemetryBatch batch) {

        // 1. Converteer protobuf naar domain objecten
        List<TelemetryFrame> frames = batch.getFramesList().stream()
            .map(f -> protoToDomain(tractorId, f))
            .toList();

        // 2. Opslaan in TimescaleDB (batch insert)
        telemetryRepository.saveAll(frames);

        // 3. Bedrijfsregels evalueren op de meest recente frame
        frames.stream()
            .max(Comparator.comparing(TelemetryFrame::getTimestamp))
            .ifPresent(latest -> businessRuleEngine.evaluate(tractorId, latest));

        // 4. Werksessie-state updaten
        frames.forEach(f -> workSessionService.update(tractorId, f));

        // 5. Live update naar dashboard pushen (asynchroon)
        frames.stream()
            .max(Comparator.comparing(TelemetryFrame::getTimestamp))
            .ifPresent(latest -> dashboardHandler.pushUpdate(tractorId, latest));
    }

    private TelemetryFrame protoToDomain(String tractorId,
                                          Tractoros.TelemetryFrame proto) {
        return TelemetryFrame.builder()
            .tractorId(tractorId)
            .timestamp(Instant.ofEpochSecond(proto.getTimestampUnix()))
            .latitude(proto.getLatE7()  / 1e7)
            .longitude(proto.getLonE7() / 1e7)
            .speedKmh(proto.getSpeedCmS() * 3.6 / 100.0)
            .engineTempC(proto.getEngineTempC10() / 10.0)
            .rpm(proto.getRpm())
            .ptoActive(proto.getPtoActive())
            .fuelLevelPct(proto.getFuelLevelPct10() / 10.0)
            .rssiDbm(proto.getRssiDbm())
            .freeHeapBytes(proto.getFreeHeapBytes())
            .build();
    }
}

# 5. De Business Rule Engine

Dit is de kern van de enterprise-waarde. De hardware stuurt data; de bedrijfsregels bepalen wat het betekent.

# 5.1 De Rule Interface

// business/rules/BusinessRule.java
public interface BusinessRule {

    String getName();

    /**
     * Evalueer de regel.
     * @return Een RuleViolation als de regel geschonden is, anders empty.
     *
     * Vergelijk: de IMiddleware interface van de ESP32, maar dan op Java-niveau.
     * Zelfde principe: open/closed — nieuwe regels toevoegen zonder bestaande
     * aan te passen. Spring injecteert automatisch alle implementaties.
     */
    Optional<RuleViolation> evaluate(String tractorId, TelemetryFrame frame);

    default boolean isEnabled() { return true; }
}

// business/rules/RuleViolation.java
@Value
@Builder
public class RuleViolation {

    public enum Severity { INFO, WARNING, CRITICAL }

    public enum Action {
        LOG_ONLY,
        NOTIFY_DRIVER,
        NOTIFY_MANAGER,
        REDUCE_INTERVAL,   // Verhoog telemetrie frequentie voor meer detail
        SEND_COMMAND       // Stuur commando naar de tractor
    }

    String   ruleName;
    String   tractorId;
    Severity severity;
    String   message;
    Instant  timestamp;
    Object   context;
    Action   action;
}

# 5.2 Concrete Bedrijfsregels

// business/rules/EngineTemperatureRule.java
@Component
public class EngineTemperatureRule implements BusinessRule {

    private static final double WARN_THRESHOLD     = 88.0;
    private static final double CRITICAL_THRESHOLD = 95.0;

    @Override
    public String getName() { return "ENGINE_TEMPERATURE"; }

    @Override
    public Optional<RuleViolation> evaluate(String tractorId, TelemetryFrame frame) {
        double temp = frame.getEngineTempC();

        if (temp >= CRITICAL_THRESHOLD) {
            return Optional.of(RuleViolation.builder()
                .ruleName(getName())
                .tractorId(tractorId)
                .severity(RuleViolation.Severity.CRITICAL)
                .message("KRITIEK: Motortemperatuur %.1f°C — motor direct stoppen!".formatted(temp))
                .timestamp(frame.getTimestamp())
                .action(RuleViolation.Action.NOTIFY_DRIVER)
                .context(temp)
                .build());
        }

        if (temp >= WARN_THRESHOLD) {
            return Optional.of(RuleViolation.builder()
                .ruleName(getName())
                .tractorId(tractorId)
                .severity(RuleViolation.Severity.WARNING)
                .message("Motortemperatuur %.1f°C — controleer koelvloeistof".formatted(temp))
                .timestamp(frame.getTimestamp())
                .action(RuleViolation.Action.NOTIFY_MANAGER)
                .context(temp)
                .build());
        }

        return Optional.empty();
    }
}
// business/rules/MaintenanceIntervalRule.java
@Component
@RequiredArgsConstructor
public class MaintenanceIntervalRule implements BusinessRule {

    private static final long OIL_CHANGE_HOURS    = 250;
    private static final long WARNING_AHEAD_HOURS = 20;

    private final MaintenanceRepository maintenanceRepository;

    @Override
    public String getName() { return "MAINTENANCE_INTERVAL"; }

    @Override
    public Optional<RuleViolation> evaluate(String tractorId, TelemetryFrame frame) {
        long hoursSinceLastOilChange = maintenanceRepository
            .getHoursSinceLastMaintenance(tractorId, "OIL_CHANGE");

        long remainingHours = OIL_CHANGE_HOURS - hoursSinceLastOilChange;

        if (remainingHours <= 0) {
            return Optional.of(RuleViolation.builder()
                .ruleName(getName())
                .tractorId(tractorId)
                .severity(RuleViolation.Severity.WARNING)
                .message("Olieverversing OVERSCHREDEN: %d uren geleden gepland"
                    .formatted(Math.abs(remainingHours)))
                .timestamp(frame.getTimestamp())
                .action(RuleViolation.Action.NOTIFY_MANAGER)
                .build());
        }

        if (remainingHours <= WARNING_AHEAD_HOURS) {
            return Optional.of(RuleViolation.builder()
                .ruleName(getName())
                .tractorId(tractorId)
                .severity(RuleViolation.Severity.INFO)
                .message("Olieverversing over %d motoruren".formatted(remainingHours))
                .timestamp(frame.getTimestamp())
                .action(RuleViolation.Action.LOG_ONLY)
                .build());
        }

        return Optional.empty();
    }
}
// business/rules/GeofenceRule.java
@Component
@RequiredArgsConstructor
public class GeofenceRule implements BusinessRule {

    private final GeofenceRepository geofenceRepository;

    @Override
    public String getName() { return "GEOFENCE"; }

    @Override
    public Optional<RuleViolation> evaluate(String tractorId, TelemetryFrame frame) {
        var assignedParcels = geofenceRepository.getActiveAssignments(
            tractorId, frame.getTimestamp());

        if (assignedParcels.isEmpty()) return Optional.empty();

        double lat = frame.getLatitude();
        double lon = frame.getLongitude();

        boolean inAnyParcel = assignedParcels.stream()
            .anyMatch(parcel -> isPointInPolygon(lat, lon, parcel));

        if (!inAnyParcel && frame.getSpeedKmh() > 2.0) {
            return Optional.of(RuleViolation.builder()
                .ruleName(getName())
                .tractorId(tractorId)
                .severity(RuleViolation.Severity.WARNING)
                .message("Tractor buiten toegewezen perceel op %.6f, %.6f".formatted(lat, lon))
                .timestamp(frame.getTimestamp())
                .action(RuleViolation.Action.NOTIFY_MANAGER)
                .context(Map.of("lat", lat, "lon", lon))
                .build());
        }

        return Optional.empty();
    }

    // Ray casting algoritme voor point-in-polygon
    private boolean isPointInPolygon(double lat, double lon, Geofence fence) {
        var vertices = fence.getVertices();
        int n = vertices.size();
        boolean inside = false;

        for (int i = 0, j = n - 1; i < n; j = i++) {
            double xi = vertices.get(i).getLat(), yi = vertices.get(i).getLon();
            double xj = vertices.get(j).getLat(), yj = vertices.get(j).getLon();

            if (((yi > lon) != (yj > lon)) &&
                (lat < (xj - xi) * (lon - yi) / (yj - yi) + xi)) {
                inside = !inside;
            }
        }
        return inside;
    }
}

# 5.3 De Rule Engine Orchestrator

// business/BusinessRuleEngine.java
@Slf4j
@Service
@RequiredArgsConstructor
public class BusinessRuleEngine {

    // Spring injecteert automatisch ALLE BusinessRule @Component implementaties.
    // Nieuwe regel toevoegen = nieuwe @Component klasse = klaar. Nul andere wijzigingen.
    private final List<BusinessRule>  rules;
    private final ViolationRepository violationRepository;
    private final NotificationService notificationService;
    private final CommandPublisher    commandPublisher;

    public void evaluate(String tractorId, TelemetryFrame frame) {
        rules.stream()
            .filter(BusinessRule::isEnabled)
            .map(rule -> {
                try {
                    return rule.evaluate(tractorId, frame);
                } catch (Exception e) {
                    log.error("Fout in regel {}: {}", rule.getName(), e.getMessage());
                    return Optional.<RuleViolation>empty();
                }
            })
            .filter(Optional::isPresent)
            .map(Optional::get)
            .forEach(this::handleViolation);
    }

    private void handleViolation(RuleViolation violation) {
        log.info("Regel geschonden: {} [{}] - {}",
            violation.getRuleName(), violation.getSeverity(), violation.getMessage());

        violationRepository.save(violation);

        switch (violation.getAction()) {
            case NOTIFY_DRIVER  -> notificationService.notifyDriver(
                violation.getTractorId(), violation.getMessage());
            case NOTIFY_MANAGER -> notificationService.notifyManager(
                violation.getMessage(), violation.getSeverity());
            case REDUCE_INTERVAL -> commandPublisher.sendCommand(
                violation.getTractorId(), "set_telemetry_interval",
                Map.of("interval_ms", 200));
            case SEND_COMMAND -> {
                if ("ENGINE_TEMPERATURE".equals(violation.getRuleName()) &&
                    violation.getSeverity() == RuleViolation.Severity.CRITICAL) {
                    commandPublisher.sendCommand(
                        violation.getTractorId(), "request_diagnostics", Map.of());
                }
            }
            case LOG_ONLY -> { /* Al opgeslagen */ }
        }
    }
}

# 6. TimescaleDB: Tijdreeksdata op Schaal

Vijf tractoren, een frame per seconde: 432.000 frames per dag, 157 miljoen per jaar. Standaard PostgreSQL kan dit, maar tijdreeks-queries worden traag zonder de juiste structuur.

TimescaleDB is een PostgreSQL-extensie die data automatisch partitioneert in wekelijkse "chunks". Queries als "geef alle data van gisteren" zijn dan O(1) in plaats van O(n).

# 6.1 Schema en Flyway migraties

-- db/migration/V1__initial_schema.sql

CREATE EXTENSION IF NOT EXISTS timescaledb;

CREATE TABLE tractors (
    id               VARCHAR(16)  PRIMARY KEY,
    name             VARCHAR(64)  NOT NULL,
    kubota_model     VARCHAR(32),
    year             INTEGER,
    is_online        BOOLEAN      DEFAULT FALSE,
    last_seen_at     TIMESTAMPTZ,
    firmware_version VARCHAR(16),
    config_json      JSONB
);

-- De telemetrie hypertable: TimescaleDB partitioneert automatisch per week
CREATE TABLE telemetry (
    time          TIMESTAMPTZ    NOT NULL,
    tractor_id    VARCHAR(16)    NOT NULL REFERENCES tractors(id),
    lat           DOUBLE PRECISION,
    lon           DOUBLE PRECISION,
    speed_kmh     REAL,
    engine_temp_c REAL,
    rpm           INTEGER,
    pto_active    BOOLEAN,
    fuel_level_pct REAL,
    fuel_rate_ml_h REAL,
    rssi_dbm      INTEGER
);

SELECT create_hypertable('telemetry', 'time',
    chunk_time_interval => INTERVAL '1 week');

CREATE INDEX ON telemetry (tractor_id, time DESC);

-- Automatische compressie na 7 dagen (TimescaleDB comprimeert ~10x)
ALTER TABLE telemetry SET (
    timescaledb.compress,
    timescaledb.compress_orderby   = 'time DESC',
    timescaledb.compress_segmentby = 'tractor_id'
);
SELECT add_compression_policy('telemetry', INTERVAL '7 days');

-- Verwijder data ouder dan 2 jaar automatisch
SELECT add_retention_policy('telemetry', INTERVAL '2 years');
-- db/migration/V2__work_sessions.sql

CREATE TABLE work_sessions (
    id           BIGSERIAL    PRIMARY KEY,
    tractor_id   VARCHAR(16)  NOT NULL REFERENCES tractors(id),
    customer_id  BIGINT       REFERENCES customers(id),
    parcel_id    BIGINT       REFERENCES parcels(id),
    started_at   TIMESTAMPTZ  NOT NULL,
    ended_at     TIMESTAMPTZ,
    -- Gegenereerde kolom: automatisch berekend door PostgreSQL
    duration_s   INTEGER GENERATED ALWAYS AS
                     (EXTRACT(EPOCH FROM (ended_at - started_at))::INTEGER) STORED,
    engine_hours REAL,
    area_m2      INTEGER,
    fuel_used_ml INTEGER,
    start_lat    DOUBLE PRECISION,
    start_lon    DOUBLE PRECISION,
    gps_track    JSONB,        -- Array van GPS-punten voor perceelkaart op dashboard
    status       VARCHAR(16)  DEFAULT 'ACTIVE',
    invoice_id   BIGINT
);

-- Materialized view voor motoruren-rapportage (dagelijks ververst)
-- Equivalent van een voorberekende aggregate voor rapportage-snelheid
CREATE MATERIALIZED VIEW daily_engine_hours AS
SELECT
    tractor_id,
    DATE(started_at)       AS work_date,
    SUM(engine_hours)      AS total_hours,
    SUM(area_m2) / 10000.0 AS total_hectares,
    SUM(fuel_used_ml)/1000.0 AS total_fuel_liters,
    COUNT(*)               AS session_count
FROM work_sessions
WHERE status = 'COMPLETED'
GROUP BY tractor_id, DATE(started_at);

# 6.2 Efficiënte tijdreeks queries

// In TelemetryRepository:

/**
 * Gemiddelde motortemperatuur per uur, afgelopen 24 uur.
 * TimescaleDB's time_bucket() is SQL GROUP BY + date_trunc,
 * maar geoptimaliseerd voor hypertables.
 */
@Query(value = """
    SELECT
        time_bucket('1 hour', time) AS bucket,
        AVG(engine_temp_c)          AS avg_temp,
        MAX(engine_temp_c)          AS max_temp,
        AVG(rpm)                    AS avg_rpm
    FROM telemetry
    WHERE tractor_id = :tractorId
      AND time > NOW() - INTERVAL '24 hours'
    GROUP BY bucket
    ORDER BY bucket DESC
    """, nativeQuery = true)
List<Object[]> getHourlyAggregates(@Param("tractorId") String tractorId);

/**
 * Detecteer frames waar de motortemperatuur meer dan
 * 2 standaarddeviaties boven het daggemiddelde ligt.
 */
@Query(value = """
    WITH daily_stats AS (
        SELECT AVG(engine_temp_c) AS mean,
               STDDEV(engine_temp_c) AS stddev
        FROM telemetry
        WHERE tractor_id = :tractorId
          AND time > NOW() - INTERVAL '24 hours'
    )
    SELECT t.time, t.engine_temp_c, t.rpm, t.pto_active
    FROM telemetry t, daily_stats
    WHERE t.tractor_id = :tractorId
      AND t.time > NOW() - INTERVAL '24 hours'
      AND t.engine_temp_c > daily_stats.mean + (2 * daily_stats.stddev)
    ORDER BY t.time DESC
    LIMIT 100
    """, nativeQuery = true)
List<Object[]> findTemperatureAnomalies(@Param("tractorId") String tractorId);

# 7. Real-time Dashboard via WebSocket

Het React-dashboard wil live updates zonder polling. We gebruiken Spring WebSocket met STOMP.

# 7.1 Backend push handler

// api/DashboardWebSocketHandler.java
@Component
@RequiredArgsConstructor
public class DashboardWebSocketHandler {

    private final SimpMessagingTemplate messagingTemplate;

    /**
     * Kanaalstructuur:
     *   /topic/fleet           <- Alle tractoren (voor fleet overview map)
     *   /topic/tractor/{id}    <- Specifieke tractor (voor detail view)
     *   /topic/alerts          <- Kritieke meldingen
     */
    public void pushUpdate(String tractorId, TelemetryFrame frame) {
        var dto = LiveTelemetryDto.from(frame);
        messagingTemplate.convertAndSend("/topic/fleet", dto);
        messagingTemplate.convertAndSend("/topic/tractor/" + tractorId, dto);
    }

    public void pushAlert(RuleViolation violation) {
        messagingTemplate.convertAndSend("/topic/alerts", AlertDto.from(violation));
    }
}

# 7.2 React TypeScript hook

// frontend/src/hooks/useTractorTelemetry.ts
import { useEffect, useState } from 'react';
import { Client } from '@stomp/stompjs';

interface LiveTelemetry {
  tractorId:     string;
  timestamp:     string;
  latitude:      number;
  longitude:     number;
  engineTempC:   number;
  speedKmh:      number;
  rpm:           number;
  ptoActive:     boolean;
  fuelLevelPct:  number;
}

/**
 * De frontend-architectuur spiegelt de backend:
 * subscribe op een topic (net als MQTT op de ESP32),
 * ontvang updates asynchroon via callback,
 * geen polling.
 */
export function useTractorTelemetry(tractorId: string) {
  const [telemetry, setTelemetry] = useState<LiveTelemetry | null>(null);
  const [connected,  setConnected]  = useState(false);

  useEffect(() => {
    const client = new Client({
      brokerURL: 'ws://localhost:8080/ws',
      onConnect: () => {
        setConnected(true);
        client.subscribe(`/topic/tractor/${tractorId}`, (message) => {
          setTelemetry(JSON.parse(message.body));
        });
      },
      onDisconnect: () => setConnected(false),
    });

    client.activate();
    return () => { client.deactivate(); };
  }, [tractorId]);

  return { telemetry, connected };
}

// Gebruik:
// const { telemetry } = useTractorTelemetry('TRACTOR_001');
// <EngineGauge value={telemetry?.engineTempC} warning={88} critical={95} />

# 8. Voorspellend Onderhoud: Anomaly Detection

De meest waardevolle enterprise-functie: een probleem detecteren voordat de tractor kapotgaat op de akker.

// maintenance/AnomalyDetector.java
@Service
@RequiredArgsConstructor
public class AnomalyDetector {

    private final TelemetryRepository telemetryRepository;

    /**
     * Detecteer afwijkend gedrag via Exponential Moving Average.
     *
     * EMA geeft meer gewicht aan recente waarden.
     * Z-score: hoeveel standaarddeviaties wijkt de meting af van de EMA-baseline?
     * Z > 3.0: statistisch zeer ongebruikelijk (kans < 0.3%).
     */
    public Optional<MaintenanceAlert> detectTemperatureAnomaly(
            String tractorId, TelemetryFrame current) {

        var baseline = telemetryRepository.getTemperatureBaseline(tractorId);
        if (baseline.isEmpty()) return Optional.empty();

        double ema      = baseline.get().getEmaTemp();
        double stddev   = Math.sqrt(baseline.get().getVariance());
        double zScore   = (current.getEngineTempC() - ema) / stddev;

        if (zScore > 3.0 && current.getEngineTempC() > 75.0) {
            return Optional.of(MaintenanceAlert.builder()
                .tractorId(tractorId)
                .type("TEMPERATURE_ANOMALY")
                .severity("WARNING")
                .message("Ongebruikelijke temperatuurstijging: %.1f°C (%.1f sigma boven normaal). Controleer koelsysteem."
                    .formatted(current.getEngineTempC(), zScore))
                .detectedAt(current.getTimestamp())
                .recommendation("Inspecteer koelsysteem: koelvloeistofpeil, thermostaat, radiateur")
                .build());
        }

        // Update EMA voor volgende detectie (alpha = 0.1: trage filter)
        double alpha      = 0.1;
        double newEma     = alpha * current.getEngineTempC() + (1 - alpha) * ema;
        double newVariance = (1 - alpha) * (baseline.get().getVariance() +
            alpha * Math.pow(current.getEngineTempC() - ema, 2));

        telemetryRepository.updateTemperatureBaseline(tractorId, newEma, newVariance);

        return Optional.empty();
    }

    /**
     * RPM-instabiliteit detectie.
     * Een gezonde motor heeft stabiele RPM bij constante PTO-belasting.
     * Variatiecoefficient > 5%: ongebruikelijk, waarschijnlijk brandstoffilter.
     */
    public Optional<MaintenanceAlert> detectRpmInstability(
            String tractorId, List<TelemetryFrame> recentFrames) {

        if (recentFrames.size() < 10) return Optional.empty();

        var ptoFrames = recentFrames.stream()
            .filter(TelemetryFrame::isPtoActive).toList();
        if (ptoFrames.size() < 5) return Optional.empty();

        double mean = ptoFrames.stream()
            .mapToInt(TelemetryFrame::getRpm).average().orElse(0);
        double stddev = Math.sqrt(ptoFrames.stream()
            .mapToDouble(f -> Math.pow(f.getRpm() - mean, 2)).average().orElse(0));
        double cv = stddev / mean;

        if (cv > 0.05) {
            return Optional.of(MaintenanceAlert.builder()
                .tractorId(tractorId)
                .type("RPM_INSTABILITY")
                .severity("INFO")
                .message("RPM-instabiliteit: %.1f%% variatie tijdens PTO-gebruik".formatted(cv * 100))
                .detectedAt(recentFrames.getLast().getTimestamp())
                .recommendation("Brandstoffilter vervangen (part #KU-BF-1247)")
                .build());
        }

        return Optional.empty();
    }
}

# 9. De Factuurkoppeling: Exact Online

Het loonbedrijf wil dat afgeronde werksessies automatisch factuurregels genereren in Exact Online.

// invoicing/InvoiceService.java
@Service
@RequiredArgsConstructor
@Slf4j
public class InvoiceService {

    private final WorkSessionRepository workSessionRepository;
    private final ExactOnlineClient     exactClient;
    private final PriceRepository       priceRepository;

    /**
     * Verwerk een afgeronde werksessie naar een factuurregel.
     * Event-driven: aangeroepen vanuit WorkSessionService zodra een
     * sessie de status COMPLETED krijgt.
     */
    @Transactional
    public void processCompletedSession(WorkSession session) {
        log.info("Factuurverwerking voor sessie {}", session.getId());

        var customer     = session.getCustomer();
        var billableItems = calculateBillableItems(session);

        for (var item : billableItems) {
            var invoiceLine = ExactInvoiceLine.builder()
                .accountCode(customer.getExactAccountCode())
                .itemCode(item.itemCode())
                .description(item.description())
                .quantity(item.quantity())
                .unitPrice(item.unitPrice())
                .vatCode("21")
                .build();

            exactClient.createInvoiceLine(invoiceLine).ifPresentOrElse(
                invoiceId -> {
                    session.setInvoiceId(invoiceId);
                    workSessionRepository.save(session);
                    log.info("Factuurregel aangemaakt: {}", invoiceId);
                },
                () -> log.error("Factuurlijn aanmaken mislukt voor sessie {}", session.getId())
            );
        }
    }

    private List<BillableItem> calculateBillableItems(WorkSession session) {
        var prices = priceRepository.getPricesForCustomer(
            session.getCustomerId(), session.getStartedAt().toLocalDate());
        var items  = new ArrayList<BillableItem>();

        // Uurprijs voor inzet (afgerond op kwartier)
        double hours = session.getDurationS() / 3600.0;
        items.add(new BillableItem(
            "TRACTOR_HOUR",
            "Tractor inzet %s — %s".formatted(
                session.getTractor().getName(),
                session.getStartedAt().toLocalDate()),
            roundToQuarter(hours),
            prices.getHourlyRate()
        ));

        // Hectare-toeslag indien van toepassing
        if (session.getAreaM2() > 0 && prices.getAreaRate() > 0) {
            double hectares = session.getAreaM2() / 10000.0;
            items.add(new BillableItem(
                "HECTARE_SURCHARGE",
                "Bewerkt oppervlak: %.2f ha".formatted(hectares),
                Math.round(hectares * 100) / 100.0,
                prices.getAreaRate()
            ));
        }

        return items;
    }

    private double roundToQuarter(double hours) {
        return Math.round(hours * 4) / 4.0;
    }

    record BillableItem(String itemCode, String description,
                        double quantity, double unitPrice) {}
}

# 10. Deployment: Docker Compose naar Kubernetes

# 10.1 Docker Compose voor development en staging

# docker-compose.yml
version: '3.8'

services:

  db:
    image: timescale/timescaledb:latest-pg15
    environment:
      POSTGRES_DB:       tractorplatform
      POSTGRES_USER:     tractor
      POSTGRES_PASSWORD: ${DB_PASSWORD}
    volumes:
      - pgdata:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U tractor -d tractorplatform"]
      interval: 10s
      retries: 5

  mqtt:
    image: eclipse-mosquitto:2
    volumes:
      - ./config/mosquitto:/mosquitto/config
      - ./certs:/mosquitto/certs:ro
    ports:
      - "8883:8883"    # MQTT over TLS
      - "9001:9001"    # WebSocket voor debugging

  backend:
    build: ./backend
    depends_on:
      db:
        condition: service_healthy
    environment:
      SPRING_DATASOURCE_URL:      jdbc:postgresql://db:5432/tractorplatform
      SPRING_DATASOURCE_USERNAME: tractor
      SPRING_DATASOURCE_PASSWORD: ${DB_PASSWORD}
      MQTT_BROKER_URL:            ssl://mqtt:8883
      EXACT_CLIENT_ID:            ${EXACT_CLIENT_ID}
      EXACT_CLIENT_SECRET:        ${EXACT_CLIENT_SECRET}
    ports:
      - "8080:8080"

  dashboard:
    build: ./frontend
    depends_on:
      - backend
    ports:
      - "3000:80"

volumes:
  pgdata:

# 10.2 Kubernetes productie excerpts

# k8s/backend-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: tractor-backend
spec:
  replicas: 2            # Horizontaal schaalbaar
  selector:
    matchLabels:
      app: tractor-backend
  template:
    spec:
      containers:
      - name: backend
        image: vandenberg/tractor-backend:1.2.0
        resources:
          requests: { memory: "512Mi", cpu: "250m" }
          limits:   { memory: "1Gi",   cpu: "1000m" }
        env:
        - name: DB_PASSWORD
          valueFrom:
            secretKeyRef: { name: db-secret, key: password }
        livenessProbe:
          httpGet: { path: /actuator/health/liveness,  port: 8080 }
          initialDelaySeconds: 30
        readinessProbe:
          httpGet: { path: /actuator/health/readiness, port: 8080 }

# 11. Security: End-to-End Beveiliging

Een tractor die gespoofde GPS-data ontvangt, of waarvan de firmware vervangen kan worden door een aanvaller, is een reeel veiligheidsrisico.

# 11.1 mTLS: mutual TLS voor MQTT

Standaard TLS valideert alleen de server. Mutual TLS (mTLS) valideert ook de client via een certificaat. Elke tractor krijgt een uniek client-certificaat:

# Eenmalig: maak CA aan
openssl genrsa -out ca.key 4096
openssl req -new -x509 -days 3650 -key ca.key -out ca.crt \
    -subj "/CN=TractorOS CA/O=Van der Berg Loonbedrijf"

# Per tractor: uniek client certificaat
TRACTOR_ID="TRACTOR_001"
openssl genrsa -out ${TRACTOR_ID}.key 2048
openssl req -new -key ${TRACTOR_ID}.key -out ${TRACTOR_ID}.csr \
    -subj "/CN=${TRACTOR_ID}/O=Van der Berg Loonbedrijf"
openssl x509 -req -days 365 -in ${TRACTOR_ID}.csr \
    -CA ca.crt -CAkey ca.key -out ${TRACTOR_ID}.crt

# Flash naar ESP32 NVS via provisioning script
python3 scripts/provision_tractor.py \
    --tractor-id ${TRACTOR_ID} \
    --ca-cert ca.crt \
    --client-cert ${TRACTOR_ID}.crt \
    --client-key ${TRACTOR_ID}.key \
    --port /dev/ttyUSB0

# 11.2 Firmware signing voor OTA

// src/ota/FirmwareVerifier.h
#pragma once
#include <mbedtls/sha256.h>
#include <mbedtls/rsa.h>
#include "diagnostics/Logger.h"

namespace TractorOS {

/**
 * Verifieert de digitale handtekening van een firmware-image voor installatie.
 *
 * De CI/CD pipeline ondertekent elk image met de private key van het devteam.
 * De ESP32 verifieert met de bijbehorende public key die in flash is ingebrand.
 * Zonder geldige handtekening: update categorisch geweigerd.
 *
 * Dit voorkomt dat een aanvaller via MQTT een kwaadaardige firmware pusht.
 */
class FirmwareVerifier {
public:
    static bool verify(const uint8_t* firmwareData, size_t firmwareLen,
                       const uint8_t* signature, size_t sigLen) {
        // 1. Bereken SHA-256 hash van het firmware-image
        uint8_t hash[32];
        mbedtls_sha256_context sha_ctx;
        mbedtls_sha256_init(&sha_ctx);
        mbedtls_sha256_starts(&sha_ctx, 0);
        mbedtls_sha256_update(&sha_ctx, firmwareData, firmwareLen);
        mbedtls_sha256_finish(&sha_ctx, hash);
        mbedtls_sha256_free(&sha_ctx);

        // 2. Verifieer RSA-2048 handtekening met ingebakken public key
        mbedtls_rsa_context rsa;
        mbedtls_rsa_init(&rsa);

        bool valid = false;
        if (loadPublicKey(rsa)) {
            int ret = mbedtls_rsa_pkcs1_verify(
                &rsa, MBEDTLS_MD_SHA256, 32, hash, signature);
            valid = (ret == 0);
        }
        mbedtls_rsa_free(&rsa);

        if (!valid) LOG_E("Verifier", "HANDTEKENING ONGELDIG — update geweigerd!");
        return valid;
    }

private:
    static bool loadPublicKey(mbedtls_rsa_context& ctx);
    FirmwareVerifier() = delete;
};

} // namespace TractorOS

# 12. De Complete Data Flow: Van Sensor naar Factuur

Laten we de volledige reis van een meting door het hele systeem volgen.

T+0ms      DS18B20 sensor geeft spanning af: 87.4 graden Celsius

T+1ms      SensorTask leest waarde, bouwt SensorReading (stack-gealloceerd).
           Publiceert SensorReadingEvent op de EventBus.

T+2ms      WorkSessionDetector ontvangt event.
           State: WORKING — snelheid 8.3 km/h, PTO actief.
           Voegt GPS-punt toe aan CircularBuffer (track, elke 10 meter).

T+3ms      TelemetryEncoder bouwt protobuf TelemetryFrame:
             engine_temp_c10 = 874   (87.4 * 10, integer)
             lat_e7 = 520347812      (52.0347812 * 1e7)
           Framegrootte: 34 bytes  vs.  ~160 bytes JSON.

T+4ms      MiddlewarePipeline:
             RateLimitMiddleware  -> OK (100ms verstreken)
             ChecksumMiddleware   -> XOR checksum toegevoegd
             CompressionMiddleware -> 34 bytes < drempel, geen compressie

T+5ms      MqttTransport publiceert op "tractoros/TRACTOR_001/telemetry".
           QoS 0 — fire and forget voor hoge-frequentie telemetrie.

T+15ms     Spring Boot MQTT Consumer ontvangt het bericht.
           TelemetryMqttHandler.handleMessage() wordt aangeroepen.

T+16ms     Protobuf deserialisatie:
             engine_temp_c = 874 / 10.0 = 87.4  (terug naar double)
             latitude = 520347812 / 1e7 = 52.0347812

T+17ms     TelemetryService.processBatch():
           1. JDBC batch insert naar TimescaleDB (5 frames per INSERT).

T+18ms     BusinessRuleEngine evalueert alle regels:
             EngineTemperatureRule:   87.4 < 88.0 (warning drempel) -> OK
             MaintenanceIntervalRule: 63 uur tot olieverversing     -> OK
             GeofenceRule:            binnen perceel "Akker Noord"  -> OK
             AnomalyDetector:         z-score = 0.8 (normaal)       -> OK

T+19ms     DashboardWebSocketHandler pusht naar "/topic/tractor/TRACTOR_001".
           React dashboard ontvangt update, gauge-display refresht.

------- 3 uur later --------------------------------------------------------

T+3u       Tractor stopt: PTO uitgeschakeld, motor stationair.
           WorkSessionDetector: transitie WORKING -> IDLE.
           Publiceert WorkSessionEndedEvent:
             duration:  10.847 seconden
             area:      3.2 ha (shoelace-algoritme over 847 GPS-punten)
             fuel:      42.3 liter

T+3u+50ms  WorkSessionService ontvangt event via EventBus.
           Sessie opgeslagen als COMPLETED in work_sessions tabel.

T+3u+51ms  InvoiceService.processCompletedSession():
           Klant: Maatschap Hendriks, perceel: Akker Noord
           Berekening:
             3.0 uur (afgerond op kwartier) x EUR 145/uur = EUR 435,00
             3.20 ha x EUR 12,00/ha                       = EUR  38,40
             Totaal excl. BTW:                              EUR 473,40

T+3u+52ms  ExactOnlineClient:
           POST https://start.exactonline.nl/api/v1/{div}/salesentry/SalesEntryLines
           Response: InvoiceID "INV-2024-1847"

T+3u+52ms  WorkSession.invoiceId = "INV-2024-1847", opgeslagen.
           Dashboard toont: "Sessie gefactureerd"

Totale latency sensor -> database:    ~17ms
Totale latency sensor -> dashboard:   ~19ms
Totale latency sessie -> factuur:     ~2 seconden (inclusief Exact Online API)

# Bijlage A: De Gedeelde Proto Library

Het slimste architectuurbesluit: het .proto bestand wordt gedeeld door firmware en backend.

proto/tractoros.proto
       |
       +-- nanopb_generator.py --> src/protocol/tractoros.pb.h  (ESP32/C++)
       |                           src/protocol/tractoros.pb.c
       |
       +-- protoc + grpc-java  --> TelemetryFrame.java          (Spring Boot/Java)
                                   TelemetryBatch.java

Als je een veld toevoegt aan het .proto bestand, compileert de ESP32 firmware niet totdat je het ook verwerkt in de C++ encoder. De Java backend geeft een compileerfout als de decoder het veld niet afhandelt. Het type-systeem vangt protocol-incompatibiliteiten voor deployment — hetzelfde principe als het Result type uit deel 2, maar nu over de taalgrens heen.


# Bijlage B: Wat Verder Uitgebouwd Kan Worden

Dit systeem is productie-klaar voor een loonbedrijf met vijf tractoren. Voor grotere schaal of hogere eisen:

Schaalbaarheid: Bij 50+ tractoren wordt Mosquitto een bottleneck. Vervang door geclusterde HiveMQ/VerneMQ, of gebruik AWS IoT Core als managed service.

Event sourcing: De work_sessions tabel slaat alleen het eindresultaat op. Met event sourcing (elke state-transitie als onveranderlijk event) kun je facturen herberekenen bij tariefwijzigingen zonder historische data te verliezen.

CQRS: De BusinessRuleEngine en queries draaien nu op dezelfde database. Bij hogere load: write-database voor live data, read replica (TimescaleDB) voor analytische queries.

Edge ML: De AnomalyDetector gebruikt een eenvoudige z-score. Met genoeg historische data train je een TensorFlow Lite model dat als binary op de ESP32 draait — anomaliedetectie zonder netwerk, nul latency.

Multi-tenancy: Voor een SaaS-platform met meerdere loonbedrijven: voeg tenant_id toe aan alle tabellen, gebruik PostgreSQL Row-Level Security, en OAuth2/OIDC per tenant.


Cursusreeks volledig. Van de byte-level Protobuf encoder op de ESP32, via de event-driven middleware pipeline en de Business Rule Engine, tot de automatisch gegenereerde factuurregels in Exact Online — met de architectuurdiscipline van een ervaren software-architect door de hele stack heen.

Reacties (0 )

Geen reacties beschikbaar.