# Van Kubota Boordcomputer naar Fleet Management Platform
Vereisten: Deel 1 (basisarchitectuur, patterns, RAII) en Deel 2 (templates, FreeRTOS, Event Bus, OTA) afgerond. Wat dit deel is: Geen abstracte cursus meer, maar een concrete, end-to-end uitgewerkte use case. We bouwen het complete systeem: de ESP32-firmware, het Java Spring Boot backend, de bedrijfsregellaag, en de integratiepunten daartussen. Perspectief: Enterprise software architect die een IoT-platform bouwt met de ESP32 als "edge device" in een groter ecosysteem.
# De Use Case: Vandaag Oogsten, Morgen Factureren
Het bedrijf: Loonbedrijf Van der Berg — vijf Kubota-tractoren die werken voor derden.
Het probleem:
- Chauffeurs schrijven handmatig uren en brandstofverbruik op — fouten, fraude-risico, papierwerk.
- Klanten willen real-time inzicht: "Is mijn akker al klaar?"
- De accountant wil automatische factuurregels op basis van motoruren en hectares.
- De monteur wil voorspellend onderhoud: "Wissel filter voordat het kapotgaat."
De oplossing: Elke tractor krijgt een ESP32-boordcomputer. De firmware stuurt telemetrie naar een Java Spring Boot backend. Het backend verwerkt bedrijfsregels, slaat op in PostgreSQL/TimescaleDB, en exposeert REST + WebSocket APIs voor een React dashboard en een boekhoudkoppeling.
# Systeemoverzicht
+--------------------------------------------------------------+
| KUBOTA TRACTOR (Edge) |
| |
| GPS (NEO-M8) DS18B20 Temp INA226 Stroom Rotary RPM |
| | | | | |
| +--------------------------------------------+ |
| | |
| ESP32 TractorOS (onze firmware) |
| | |
+--------------------------|-----------------------------------+
| 4G/LTE (primair) / LoRa (fallback)
+------+-------+
| MQTT Broker |
| (Mosquitto) |
+------+-------+
|
+--------------------------|-----------------------------------+
| BACKEND (Java Spring Boot) |
| |
| MQTT Consumer --> TelemetryService --> RuleEngine |
| | |
| PostgreSQL + TimescaleDB |
| (tractors / sessions / telemetry / alerts / invoices) |
| |
| REST API WebSocket (live) Webhook (boekhouding) |
+----+-------------------+--------------------+----------------+
| | |
React Mobiele app Exact Online /
Dashboard Klanten-portaal Twinfield
# Inhoudsopgave
- Het communicatieprotocol: MQTT over TLS
- ESP32 firmware: de productie-upgrade
- Het binaire protocol: Protobuf Lite met Nanopb
- Java Spring Boot: de backend architectuur
- De Business Rule Engine
- TimescaleDB: tijdreeksdata op schaal
- Real-time dashboard via WebSocket
- Voorspellend onderhoud: anomaly detection
- De factuurkoppeling: Exact Online
- Deployment: Docker Compose naar Kubernetes
- Security: end-to-end beveiliging
- De complete data flow: van sensor naar factuur
# 1. Het Communicatieprotocol: MQTT over TLS
# Waarom MQTT en niet REST?
Als Java-architect is je eerste impuls misschien een REST API: de ESP32 doet een POST /api/telemetry. Dat werkt, maar heeft kritieke nadelen voor een tractor op een akker:
| REST (HTTP) | MQTT | |
|---|---|---|
| Verbindingsoverhead | Elke call = TCP handshake | Een persistent connectie |
| Pakketgrootte | HTTP headers: ~200-500 bytes | MQTT header: 2 bytes |
| Geen verbinding | Request faalt hard | QoS 1: automatische retry na herverbinding |
| Bidirectioneel | Polling vereist | Ingebouwd: server pusht config updates |
| 4G datakosten | Hoog | Laag — essentieel bij datalimiet |
MQTT is het standaardprotocol voor IoT om precies deze redenen. Qua patronen vergelijkbaar met een message broker (Kafka/RabbitMQ), maar geoptimaliseerd voor constrained devices.
# 1.1 MQTT Concepten voor Java-architecten
MQTT Concept <--> Bekende equivalent
Topic <--> Kafka topic / JMS destination
Publish <--> producer.send()
Subscribe <--> consumer.subscribe()
QoS 0 <--> fire-and-forget (UDP-achtig)
QoS 1 <--> at-least-once (met ACK)
Retained message <--> Kafka log compaction: laatste waarde bewaard
Last Will & Testament <--> onCloseListener die altijd vuurt, ook bij crash
# 1.2 Topic-structuur ontwerp
Een goed topic-schema is het equivalent van je REST API URL-structuur. Eenmaal in productie moeilijk te wijzigen:
tractoros/
+-- {tractorId}/
| +-- telemetry <- Sensordata (hoge frequentie, QoS 0)
| +-- telemetry/gps <- GPS positie apart (hoge frequentie)
| +-- alerts <- Kritieke meldingen (QoS 1)
| +-- status <- Online/offline, firmware versie (retained)
| +-- config/request <- Backend vraagt config update
| +-- config/response <- ESP32 bevestigt config ontvangst
|
+-- fleet/
+-- broadcast <- Bericht aan ALLE tractoren
+-- ota/notify <- OTA update beschikbaar
# 1.3 MQTT Transport voor de ESP32
We voegen MqttTransport toe als vierde ITransport implementatie. De interface uit deel 1 blijft ongewijzigd:
// src/transport/MqttTransport.h
#pragma once
#include "interfaces/ITransport.h"
#include <WiFiClientSecure.h>
#include <PubSubClient.h>
#include <functional>
#include "diagnostics/Logger.h"
namespace TractorOS {
class MqttTransport : public ITransport {
public:
struct Config {
const char* brokerHost;
uint16_t brokerPort = 8883; // 8883 = MQTT over TLS
const char* clientId; // Uniek per tractor: "TRACTOR_001"
const char* username;
const char* password;
const char* caCert; // Root CA voor TLS verificatie
const char* clientCert; // Client certificate voor mTLS
const char* clientKey; // Private key
uint16_t keepAliveSeconds = 30;
};
using MessageCallback = std::function<void(const char* topic,
const uint8_t* payload,
size_t length)>;
explicit MqttTransport(const Config& cfg)
: _cfg(cfg), _wifiClient(), _client(_wifiClient) {
_client.setServer(cfg.brokerHost, cfg.brokerPort);
_client.setCallback([this](char* topic, uint8_t* payload, unsigned int len) {
if (_messageCallback) _messageCallback(topic, payload, len);
});
_client.setKeepAlive(cfg.keepAliveSeconds);
_client.setBufferSize(1024);
}
bool connect() override {
// TLS configureren (mutual TLS: client verifieert server EN omgekeerd)
_wifiClient.setCACert(_cfg.caCert);
_wifiClient.setCertificate(_cfg.clientCert);
_wifiClient.setPrivateKey(_cfg.clientKey);
// Last Will Testament: broker publiceert dit als verbinding wegvalt
const char* willTopic = _buildTopic("status");
const char* willMsg = "{\"online\":false,\"reason\":\"unexpected_disconnect\"}";
bool ok = _client.connect(
_cfg.clientId, _cfg.username, _cfg.password,
willTopic, 1, true, willMsg
);
if (ok) {
LOG_I("MQTT", "Verbonden met %s", _cfg.brokerHost);
_publishStatus(true);
// Abonneer op inkomende commando's en config updates
_client.subscribe(_buildTopic("config/request"), 1);
_client.subscribe("fleet/broadcast", 1);
_client.subscribe("fleet/ota/notify", 1);
} else {
LOG_E("MQTT", "Verbinding mislukt, state: %d", _client.state());
}
return ok;
}
void disconnect() override {
_publishStatus(false);
_client.disconnect();
}
// Stuur naar het default telemetry topic (QoS 0)
bool send(const uint8_t* data, size_t length) override {
return sendToTopic("telemetry", data, length, 0, false);
}
bool sendToTopic(const char* subtopic, const uint8_t* data, size_t length,
uint8_t qos = 0, bool retain = false) {
if (!isConnected()) return false;
return _client.publish(_buildTopic(subtopic), data, length, retain);
}
// MQTT is callback-gebaseerd; receive() doet de keepalive loop
bool receive(uint8_t* buffer, size_t& bytesRead, size_t maxLen) override {
_client.loop();
bytesRead = 0;
return false;
}
bool isConnected() const override { return _client.connected(); }
const char* getName() const override { return "MQTT/TLS"; }
void onMessage(MessageCallback cb) { _messageCallback = cb; }
// Aanroepen in de main loop voor MQTT keepalive en herverbinding
void loop() {
if (!isConnected()) {
LOG_W("MQTT", "Verbinding verloren, herverbinden...");
connect();
return;
}
_client.loop();
}
private:
void _publishStatus(bool online) {
char payload[128];
snprintf(payload, sizeof(payload),
"{\"online\":%s,\"ip\":\"%s\",\"rssi\":%d}",
online ? "true" : "false",
WiFi.localIP().toString().c_str(),
WiFi.RSSI());
sendToTopic("status",
reinterpret_cast<const uint8_t*>(payload),
strlen(payload), 1, true);
}
const char* _buildTopic(const char* subtopic) {
snprintf(_topicBuffer, sizeof(_topicBuffer),
"tractoros/%s/%s", _cfg.clientId, subtopic);
return _topicBuffer;
}
Config _cfg;
WiFiClientSecure _wifiClient;
PubSubClient _client;
MessageCallback _messageCallback;
char _topicBuffer[128];
};
} // namespace TractorOS
# 2. ESP32 Firmware: de Productie-Upgrade
# 2.1 De CommandHandler: bidirectionele communicatie
De backend kan nu commando's sturen naar de tractor. Dit is fundamenteel anders dan een REST-model: de server initieert de communicatie.
// src/core/CommandHandler.h
#pragma once
#include <ArduinoJson.h>
#include "transport/MqttTransport.h"
#include "config/ConfigManager.h"
#include "ota/OtaManager.h"
#include "events/EventBus.h"
namespace TractorOS {
class CommandHandler {
public:
CommandHandler(MqttTransport& transport,
ConfigManager& configManager,
OtaManager& otaManager)
: _transport(transport)
, _configManager(configManager)
, _otaManager(otaManager) {
transport.onMessage([this](const char* topic,
const uint8_t* payload, size_t length) {
_handleMessage(topic, payload, length);
});
}
private:
void _handleMessage(const char* topic, const uint8_t* payload, size_t length) {
StaticJsonDocument<512> doc;
if (deserializeJson(doc, payload, length) != DeserializationError::Ok) return;
const char* command = doc["command"];
if (!command) return;
LOG_I("CMD", "Commando ontvangen: %s", command);
if (strcmp(command, "update_config") == 0)
_handleConfigUpdate(doc["payload"]);
else if (strcmp(command, "ota_update") == 0)
_handleOtaCommand(doc["payload"]);
else if (strcmp(command, "request_diagnostics") == 0)
_handleDiagnosticsRequest();
else if (strcmp(command, "set_telemetry_interval") == 0)
_handleIntervalChange(doc["payload"]["interval_ms"].as<uint32_t>());
else
LOG_W("CMD", "Onbekend commando: %s", command);
}
void _handleConfigUpdate(const JsonVariantConst& payload) {
auto config = _configManager.load();
if (payload.containsKey("max_engine_temp"))
config.maxEngineTemp = payload["max_engine_temp"].as<float>();
if (payload.containsKey("min_fuel_level"))
config.minFuelLevel = payload["min_fuel_level"].as<float>();
auto result = _configManager.save(config);
_sendAck(result.isOk() ? "config_updated" : "config_failed", result.isOk());
}
void _handleOtaCommand(const JsonVariantConst& payload) {
const char* url = payload["url"];
if (!url) { _sendAck("ota_rejected", false); return; }
_sendAck("ota_started", true);
OtaManager::Config cfg;
cfg.updateUrl = url;
cfg.serverCert = payload["cert"] | "";
_otaManager.checkAndUpdate();
}
void _handleDiagnosticsRequest() {
char report[256];
snprintf(report, sizeof(report),
"{\"heap_free\":%u,\"heap_min\":%u,\"uptime_ms\":%u,\"rssi\":%d}",
ESP.getFreeHeap(), ESP.getMinFreeHeap(), millis(), WiFi.RSSI());
_transport.sendToTopic("diagnostics",
reinterpret_cast<const uint8_t*>(report),
strlen(report), 1);
}
void _handleIntervalChange(uint32_t intervalMs) {
if (intervalMs < 100 || intervalMs > 60000) {
_sendAck("interval_rejected", false); return;
}
EventBus::instance().publish(
ConfigChangedEvent{"telemetry_interval", String(intervalMs).c_str()});
_sendAck("interval_updated", true);
}
void _sendAck(const char* status, bool success) {
char payload[128];
snprintf(payload, sizeof(payload),
"{\"status\":\"%s\",\"success\":%s,\"ts\":%u}",
status, success ? "true" : "false", millis());
_transport.sendToTopic("config/response",
reinterpret_cast<const uint8_t*>(payload),
strlen(payload), 1);
}
MqttTransport& _transport;
ConfigManager& _configManager;
OtaManager& _otaManager;
};
} // namespace TractorOS
# 2.2 GPS & Werksessie-detectie op de edge
De tractor rijdt over een akker. We detecteren automatisch of de tractor aan het werk is (PTO actief, lage snelheid) of aan het rijden. Dit is de basis voor automatische urenregistratie — bedrijfslogica op de edge.
// src/core/WorkSessionDetector.h
#pragma once
#include "events/EventBus.h"
#include "containers/CircularBuffer.h"
#include <cmath>
namespace TractorOS {
struct WorkSessionStartedEvent {
uint32_t startTimestamp;
float startLat;
float startLon;
};
struct WorkSessionEndedEvent {
uint32_t startTimestamp;
uint32_t endTimestamp;
uint32_t durationSeconds;
float estimatedHectares;
float fuelUsedLiters;
};
class WorkSessionDetector {
public:
void update(float speedKmh, bool ptoActive, float rpm,
float lat, float lon) {
bool shouldWork = (ptoActive && speedKmh < 15.0f && rpm > 800.0f);
if (_state == State::IDLE && shouldWork) {
_transitionToWorking(lat, lon);
} else if (_state == State::WORKING && !shouldWork) {
_transitionToIdle();
} else if (_state == State::WORKING) {
_maybeAddTrackPoint(lat, lon);
}
}
private:
enum class State { IDLE, WORKING };
void _transitionToWorking(float lat, float lon) {
_state = State::WORKING;
_sessionStart = millis();
_trackPoints.push({lat, lon});
EventBus::instance().publish(
WorkSessionStartedEvent{_sessionStart / 1000, lat, lon});
LOG_I("WorkDetector", "Sessie gestart op %.6f, %.6f", lat, lon);
}
void _transitionToIdle() {
_state = State::IDLE;
uint32_t durationMs = millis() - _sessionStart;
float hectares = _calculateHectares();
EventBus::instance().publish(WorkSessionEndedEvent{
_sessionStart / 1000, millis() / 1000,
durationMs / 1000, hectares, 0.0f
});
LOG_I("WorkDetector", "Sessie beeindigd: %.2f ha, %us",
hectares, durationMs / 1000);
}
void _maybeAddTrackPoint(float lat, float lon) {
if (!_trackPoints.isEmpty()) {
const auto* last = _trackPoints.peek();
if (_haversineMeters(last->lat, last->lon, lat, lon) < 10.0f) return;
}
if (_trackPoints.isFull()) {
GpsPoint dummy; _trackPoints.pop(dummy);
}
_trackPoints.push({lat, lon});
}
// Shoelace-algoritme voor oppervlak berekening
float _calculateHectares() const {
if (_trackPoints.size() < 3) return 0.0f;
double area = 0.0;
float prevLat = 0, prevLon = 0;
bool first = true;
for (const auto& pt : _trackPoints) {
if (!first)
area += (double)(prevLon + pt.lon) * (double)(prevLat - pt.lat);
prevLat = pt.lat; prevLon = pt.lon; first = false;
}
float areaM2 = std::abs((float)area) * 0.5f * 111320.0f * 111320.0f;
return areaM2 / 10000.0f;
}
float _haversineMeters(float lat1, float lon1, float lat2, float lon2) const {
constexpr float R = 6371000.0f;
float dLat = (lat2 - lat1) * M_PI / 180.0f;
float dLon = (lon2 - lon1) * M_PI / 180.0f;
float a = sinf(dLat/2)*sinf(dLat/2) +
cosf(lat1*M_PI/180.0f)*cosf(lat2*M_PI/180.0f)*
sinf(dLon/2)*sinf(dLon/2);
return R * 2.0f * atan2f(sqrtf(a), sqrtf(1.0f-a));
}
struct GpsPoint { float lat; float lon; };
State _state = State::IDLE;
uint32_t _sessionStart = 0;
CircularBuffer<GpsPoint, 512> _trackPoints; // 5km track bij 10m sampling
};
} // namespace TractorOS
# 3. Het Binaire Protocol: Protobuf Lite met Nanopb
JSON over MQTT is leesbaar maar duur: elke float kost ~10 bytes als tekst versus 4 bytes binair. Bij een 4G-datalimiet telt dit op over een vloot van vijf tractoren.
We gebruiken Nanopb — een C implementatie van Protocol Buffers geschikt voor embedded.
# 3.1 De gedeelde .proto definitie
// proto/tractoros.proto
// Dit bestand is de "shared schema" tussen embedded en backend.
// Vergelijk: een gedeelde DTO-bibliotheek in een microservices-architectuur.
syntax = "proto3";
package tractoros;
// Hoog-frequentie telemetrie (elke seconde)
message TelemetryFrame {
uint32 timestamp_unix = 1;
uint32 sequence_number = 2;
// Positie — integers voorkomen float-precisieverlies
sint32 lat_e7 = 3; // Latitude * 1e7
sint32 lon_e7 = 4; // Longitude * 1e7
uint32 speed_cm_s = 5;
// Motor
uint32 engine_temp_c10 = 6; // Celsius * 10 (1 decimaal)
uint32 rpm = 7;
bool pto_active = 8;
// Brandstof
uint32 fuel_level_pct10 = 9; // Percentage * 10 (0-1000)
uint32 fuel_rate_ml_h = 10;
// Systeem
int32 rssi_dbm = 11;
uint32 free_heap_bytes = 12;
uint32 uptime_seconds = 13;
}
// Wrapper voor meerdere frames in een MQTT-packet (batching)
message TelemetryBatch {
string tractor_id = 1;
repeated TelemetryFrame frames = 2;
}
Het slimste architectuurbesluit: dit bestand is gedeeld. Nanopb genereert C-headers voor de ESP32; protoc genereert Java-klassen voor Spring Boot. Als je een veld toevoegt, compileert de firmware NIET totdat je het ook in de encoder verwerkt. Het type-systeem vangt protocol-incompatibiliteiten voor deployment.
# 3.2 Nanopb encoder in de firmware
// src/protocol/TelemetryEncoder.h
#pragma once
#include <pb_encode.h>
#include "tractoros.pb.h" // Gegenereerd door nanopb
#include "diagnostics/Logger.h"
namespace TractorOS {
class TelemetryEncoder {
public:
static constexpr size_t MAX_FRAME_SIZE = 64;
static size_t encodeFrame(const tractoros_TelemetryFrame& frame,
uint8_t* buffer, size_t bufferSize) {
pb_ostream_t stream = pb_ostream_from_buffer(buffer, bufferSize);
if (!pb_encode(&stream, tractoros_TelemetryFrame_fields, &frame)) {
LOG_E("Encoder", "Protobuf encode fout: %s", PB_GET_ERROR(&stream));
return 0;
}
return stream.bytes_written;
// Typische framegrootte: 30-50 bytes vs ~160 bytes JSON
}
static tractoros_TelemetryFrame buildFrame(
uint32_t seq, float lat, float lon,
float speedKmh, float engineTempC,
uint16_t rpm, bool ptoActive, float fuelLevelPct)
{
tractoros_TelemetryFrame frame = tractoros_TelemetryFrame_init_zero;
frame.timestamp_unix = time(nullptr);
frame.sequence_number = seq;
frame.lat_e7 = static_cast<int32_t>(lat * 1e7f);
frame.lon_e7 = static_cast<int32_t>(lon * 1e7f);
frame.speed_cm_s = static_cast<uint32_t>(speedKmh * 100.0f / 3.6f);
frame.engine_temp_c10 = static_cast<uint32_t>(engineTempC * 10.0f);
frame.rpm = rpm;
frame.pto_active = ptoActive;
frame.fuel_level_pct10 = static_cast<uint32_t>(fuelLevelPct * 10.0f);
frame.rssi_dbm = WiFi.RSSI();
frame.free_heap_bytes = ESP.getFreeHeap();
frame.uptime_seconds = millis() / 1000;
return frame;
}
private:
TelemetryEncoder() = delete; // Puur statische klasse
};
} // namespace TractorOS
# 4. Java Spring Boot: de Backend Architectuur
# 4.1 Projectstructuur
tractor-platform/
+-- proto/
| +-- tractoros.proto <- Gedeeld met firmware
+-- backend/
| +-- build.gradle
| +-- src/main/java/nl/vandenberg/tractorplatform/
| +-- TractorPlatformApplication.java
| +-- domain/
| | +-- Tractor.java
| | +-- WorkSession.java
| | +-- TelemetryFrame.java
| +-- mqtt/
| | +-- MqttConsumerConfig.java
| | +-- TelemetryMqttHandler.java
| | +-- CommandPublisher.java
| +-- telemetry/
| | +-- TelemetryService.java
| | +-- TelemetryRepository.java
| +-- business/
| | +-- WorkSessionService.java
| | +-- BusinessRuleEngine.java
| | +-- rules/
| | +-- BusinessRule.java
| | +-- EngineTemperatureRule.java
| | +-- MaintenanceIntervalRule.java
| | +-- GeofenceRule.java
| +-- invoicing/
| | +-- InvoiceService.java
| | +-- ExactOnlineClient.java
| +-- api/
| +-- TractorController.java
| +-- DashboardWebSocketHandler.java
+-- frontend/
+-- React dashboard
# 4.2 MQTT Consumer met Spring Integration
// mqtt/TelemetryMqttHandler.java
@Slf4j
@Component
@RequiredArgsConstructor
public class TelemetryMqttHandler {
private final TelemetryService telemetryService;
/**
* Spring Integration roept dit aan voor elk bericht op de geabonneerde topics.
* Topic patroon: tractoros/{tractorId}/telemetry
*
* De architectuur spiegelt de ESP32 CommandHandler maar omgekeerd:
* die verwerkte inkomende commando's, dit verwerkt inkomende telemetrie.
*/
@ServiceActivator(inputChannel = "mqttInputChannel")
public void handleMessage(Message<byte[]> message) {
String topic = (String) message.getHeaders().get("mqtt_receivedTopic");
byte[] payload = message.getPayload();
log.debug("MQTT ontvangen: {} ({} bytes)", topic, payload.length);
try {
String tractorId = extractTractorId(topic);
String messageType = extractMessageType(topic);
switch (messageType) {
case "telemetry" -> handleTelemetry(tractorId, payload);
case "alerts" -> telemetryService.processAlert(tractorId, new String(payload));
case "status" -> telemetryService.updateTractorStatus(tractorId, new String(payload));
default -> log.warn("Onbekend message type: {}", messageType);
}
} catch (Exception e) {
// Niet opnieuw gooien: Spring Integration zou het bericht anders
// in een dead letter queue plaatsen en de channel blokkeren
log.error("Fout bij verwerken van MQTT bericht op {}: {}", topic, e.getMessage());
}
}
private void handleTelemetry(String tractorId, byte[] payload)
throws InvalidProtocolBufferException {
TelemetryBatch batch = TelemetryBatch.parseFrom(payload);
log.debug("Tractor {}: {} frames ontvangen", tractorId, batch.getFramesCount());
telemetryService.processBatch(tractorId, batch);
}
private String extractTractorId(String topic) {
String[] parts = topic.split("/");
return parts.length >= 2 ? parts[1] : "unknown";
}
private String extractMessageType(String topic) {
String[] parts = topic.split("/");
return parts.length >= 3 ? parts[2] : "unknown";
}
}
# 4.3 TelemetryService: de verwerkingskern
// telemetry/TelemetryService.java
@Slf4j
@Service
@RequiredArgsConstructor
public class TelemetryService {
private final TelemetryRepository telemetryRepository;
private final BusinessRuleEngine businessRuleEngine;
private final WorkSessionService workSessionService;
private final DashboardWebSocketHandler dashboardHandler;
@Transactional
public void processBatch(String tractorId, TelemetryBatch batch) {
// 1. Converteer protobuf naar domain objecten
List<TelemetryFrame> frames = batch.getFramesList().stream()
.map(f -> protoToDomain(tractorId, f))
.toList();
// 2. Opslaan in TimescaleDB (batch insert)
telemetryRepository.saveAll(frames);
// 3. Bedrijfsregels evalueren op de meest recente frame
frames.stream()
.max(Comparator.comparing(TelemetryFrame::getTimestamp))
.ifPresent(latest -> businessRuleEngine.evaluate(tractorId, latest));
// 4. Werksessie-state updaten
frames.forEach(f -> workSessionService.update(tractorId, f));
// 5. Live update naar dashboard pushen (asynchroon)
frames.stream()
.max(Comparator.comparing(TelemetryFrame::getTimestamp))
.ifPresent(latest -> dashboardHandler.pushUpdate(tractorId, latest));
}
private TelemetryFrame protoToDomain(String tractorId,
Tractoros.TelemetryFrame proto) {
return TelemetryFrame.builder()
.tractorId(tractorId)
.timestamp(Instant.ofEpochSecond(proto.getTimestampUnix()))
.latitude(proto.getLatE7() / 1e7)
.longitude(proto.getLonE7() / 1e7)
.speedKmh(proto.getSpeedCmS() * 3.6 / 100.0)
.engineTempC(proto.getEngineTempC10() / 10.0)
.rpm(proto.getRpm())
.ptoActive(proto.getPtoActive())
.fuelLevelPct(proto.getFuelLevelPct10() / 10.0)
.rssiDbm(proto.getRssiDbm())
.freeHeapBytes(proto.getFreeHeapBytes())
.build();
}
}
# 5. De Business Rule Engine
Dit is de kern van de enterprise-waarde. De hardware stuurt data; de bedrijfsregels bepalen wat het betekent.
# 5.1 De Rule Interface
// business/rules/BusinessRule.java
public interface BusinessRule {
String getName();
/**
* Evalueer de regel.
* @return Een RuleViolation als de regel geschonden is, anders empty.
*
* Vergelijk: de IMiddleware interface van de ESP32, maar dan op Java-niveau.
* Zelfde principe: open/closed — nieuwe regels toevoegen zonder bestaande
* aan te passen. Spring injecteert automatisch alle implementaties.
*/
Optional<RuleViolation> evaluate(String tractorId, TelemetryFrame frame);
default boolean isEnabled() { return true; }
}
// business/rules/RuleViolation.java
@Value
@Builder
public class RuleViolation {
public enum Severity { INFO, WARNING, CRITICAL }
public enum Action {
LOG_ONLY,
NOTIFY_DRIVER,
NOTIFY_MANAGER,
REDUCE_INTERVAL, // Verhoog telemetrie frequentie voor meer detail
SEND_COMMAND // Stuur commando naar de tractor
}
String ruleName;
String tractorId;
Severity severity;
String message;
Instant timestamp;
Object context;
Action action;
}
# 5.2 Concrete Bedrijfsregels
// business/rules/EngineTemperatureRule.java
@Component
public class EngineTemperatureRule implements BusinessRule {
private static final double WARN_THRESHOLD = 88.0;
private static final double CRITICAL_THRESHOLD = 95.0;
@Override
public String getName() { return "ENGINE_TEMPERATURE"; }
@Override
public Optional<RuleViolation> evaluate(String tractorId, TelemetryFrame frame) {
double temp = frame.getEngineTempC();
if (temp >= CRITICAL_THRESHOLD) {
return Optional.of(RuleViolation.builder()
.ruleName(getName())
.tractorId(tractorId)
.severity(RuleViolation.Severity.CRITICAL)
.message("KRITIEK: Motortemperatuur %.1f°C — motor direct stoppen!".formatted(temp))
.timestamp(frame.getTimestamp())
.action(RuleViolation.Action.NOTIFY_DRIVER)
.context(temp)
.build());
}
if (temp >= WARN_THRESHOLD) {
return Optional.of(RuleViolation.builder()
.ruleName(getName())
.tractorId(tractorId)
.severity(RuleViolation.Severity.WARNING)
.message("Motortemperatuur %.1f°C — controleer koelvloeistof".formatted(temp))
.timestamp(frame.getTimestamp())
.action(RuleViolation.Action.NOTIFY_MANAGER)
.context(temp)
.build());
}
return Optional.empty();
}
}
// business/rules/MaintenanceIntervalRule.java
@Component
@RequiredArgsConstructor
public class MaintenanceIntervalRule implements BusinessRule {
private static final long OIL_CHANGE_HOURS = 250;
private static final long WARNING_AHEAD_HOURS = 20;
private final MaintenanceRepository maintenanceRepository;
@Override
public String getName() { return "MAINTENANCE_INTERVAL"; }
@Override
public Optional<RuleViolation> evaluate(String tractorId, TelemetryFrame frame) {
long hoursSinceLastOilChange = maintenanceRepository
.getHoursSinceLastMaintenance(tractorId, "OIL_CHANGE");
long remainingHours = OIL_CHANGE_HOURS - hoursSinceLastOilChange;
if (remainingHours <= 0) {
return Optional.of(RuleViolation.builder()
.ruleName(getName())
.tractorId(tractorId)
.severity(RuleViolation.Severity.WARNING)
.message("Olieverversing OVERSCHREDEN: %d uren geleden gepland"
.formatted(Math.abs(remainingHours)))
.timestamp(frame.getTimestamp())
.action(RuleViolation.Action.NOTIFY_MANAGER)
.build());
}
if (remainingHours <= WARNING_AHEAD_HOURS) {
return Optional.of(RuleViolation.builder()
.ruleName(getName())
.tractorId(tractorId)
.severity(RuleViolation.Severity.INFO)
.message("Olieverversing over %d motoruren".formatted(remainingHours))
.timestamp(frame.getTimestamp())
.action(RuleViolation.Action.LOG_ONLY)
.build());
}
return Optional.empty();
}
}
// business/rules/GeofenceRule.java
@Component
@RequiredArgsConstructor
public class GeofenceRule implements BusinessRule {
private final GeofenceRepository geofenceRepository;
@Override
public String getName() { return "GEOFENCE"; }
@Override
public Optional<RuleViolation> evaluate(String tractorId, TelemetryFrame frame) {
var assignedParcels = geofenceRepository.getActiveAssignments(
tractorId, frame.getTimestamp());
if (assignedParcels.isEmpty()) return Optional.empty();
double lat = frame.getLatitude();
double lon = frame.getLongitude();
boolean inAnyParcel = assignedParcels.stream()
.anyMatch(parcel -> isPointInPolygon(lat, lon, parcel));
if (!inAnyParcel && frame.getSpeedKmh() > 2.0) {
return Optional.of(RuleViolation.builder()
.ruleName(getName())
.tractorId(tractorId)
.severity(RuleViolation.Severity.WARNING)
.message("Tractor buiten toegewezen perceel op %.6f, %.6f".formatted(lat, lon))
.timestamp(frame.getTimestamp())
.action(RuleViolation.Action.NOTIFY_MANAGER)
.context(Map.of("lat", lat, "lon", lon))
.build());
}
return Optional.empty();
}
// Ray casting algoritme voor point-in-polygon
private boolean isPointInPolygon(double lat, double lon, Geofence fence) {
var vertices = fence.getVertices();
int n = vertices.size();
boolean inside = false;
for (int i = 0, j = n - 1; i < n; j = i++) {
double xi = vertices.get(i).getLat(), yi = vertices.get(i).getLon();
double xj = vertices.get(j).getLat(), yj = vertices.get(j).getLon();
if (((yi > lon) != (yj > lon)) &&
(lat < (xj - xi) * (lon - yi) / (yj - yi) + xi)) {
inside = !inside;
}
}
return inside;
}
}
# 5.3 De Rule Engine Orchestrator
// business/BusinessRuleEngine.java
@Slf4j
@Service
@RequiredArgsConstructor
public class BusinessRuleEngine {
// Spring injecteert automatisch ALLE BusinessRule @Component implementaties.
// Nieuwe regel toevoegen = nieuwe @Component klasse = klaar. Nul andere wijzigingen.
private final List<BusinessRule> rules;
private final ViolationRepository violationRepository;
private final NotificationService notificationService;
private final CommandPublisher commandPublisher;
public void evaluate(String tractorId, TelemetryFrame frame) {
rules.stream()
.filter(BusinessRule::isEnabled)
.map(rule -> {
try {
return rule.evaluate(tractorId, frame);
} catch (Exception e) {
log.error("Fout in regel {}: {}", rule.getName(), e.getMessage());
return Optional.<RuleViolation>empty();
}
})
.filter(Optional::isPresent)
.map(Optional::get)
.forEach(this::handleViolation);
}
private void handleViolation(RuleViolation violation) {
log.info("Regel geschonden: {} [{}] - {}",
violation.getRuleName(), violation.getSeverity(), violation.getMessage());
violationRepository.save(violation);
switch (violation.getAction()) {
case NOTIFY_DRIVER -> notificationService.notifyDriver(
violation.getTractorId(), violation.getMessage());
case NOTIFY_MANAGER -> notificationService.notifyManager(
violation.getMessage(), violation.getSeverity());
case REDUCE_INTERVAL -> commandPublisher.sendCommand(
violation.getTractorId(), "set_telemetry_interval",
Map.of("interval_ms", 200));
case SEND_COMMAND -> {
if ("ENGINE_TEMPERATURE".equals(violation.getRuleName()) &&
violation.getSeverity() == RuleViolation.Severity.CRITICAL) {
commandPublisher.sendCommand(
violation.getTractorId(), "request_diagnostics", Map.of());
}
}
case LOG_ONLY -> { /* Al opgeslagen */ }
}
}
}
# 6. TimescaleDB: Tijdreeksdata op Schaal
Vijf tractoren, een frame per seconde: 432.000 frames per dag, 157 miljoen per jaar. Standaard PostgreSQL kan dit, maar tijdreeks-queries worden traag zonder de juiste structuur.
TimescaleDB is een PostgreSQL-extensie die data automatisch partitioneert in wekelijkse "chunks". Queries als "geef alle data van gisteren" zijn dan O(1) in plaats van O(n).
# 6.1 Schema en Flyway migraties
-- db/migration/V1__initial_schema.sql
CREATE EXTENSION IF NOT EXISTS timescaledb;
CREATE TABLE tractors (
id VARCHAR(16) PRIMARY KEY,
name VARCHAR(64) NOT NULL,
kubota_model VARCHAR(32),
year INTEGER,
is_online BOOLEAN DEFAULT FALSE,
last_seen_at TIMESTAMPTZ,
firmware_version VARCHAR(16),
config_json JSONB
);
-- De telemetrie hypertable: TimescaleDB partitioneert automatisch per week
CREATE TABLE telemetry (
time TIMESTAMPTZ NOT NULL,
tractor_id VARCHAR(16) NOT NULL REFERENCES tractors(id),
lat DOUBLE PRECISION,
lon DOUBLE PRECISION,
speed_kmh REAL,
engine_temp_c REAL,
rpm INTEGER,
pto_active BOOLEAN,
fuel_level_pct REAL,
fuel_rate_ml_h REAL,
rssi_dbm INTEGER
);
SELECT create_hypertable('telemetry', 'time',
chunk_time_interval => INTERVAL '1 week');
CREATE INDEX ON telemetry (tractor_id, time DESC);
-- Automatische compressie na 7 dagen (TimescaleDB comprimeert ~10x)
ALTER TABLE telemetry SET (
timescaledb.compress,
timescaledb.compress_orderby = 'time DESC',
timescaledb.compress_segmentby = 'tractor_id'
);
SELECT add_compression_policy('telemetry', INTERVAL '7 days');
-- Verwijder data ouder dan 2 jaar automatisch
SELECT add_retention_policy('telemetry', INTERVAL '2 years');
-- db/migration/V2__work_sessions.sql
CREATE TABLE work_sessions (
id BIGSERIAL PRIMARY KEY,
tractor_id VARCHAR(16) NOT NULL REFERENCES tractors(id),
customer_id BIGINT REFERENCES customers(id),
parcel_id BIGINT REFERENCES parcels(id),
started_at TIMESTAMPTZ NOT NULL,
ended_at TIMESTAMPTZ,
-- Gegenereerde kolom: automatisch berekend door PostgreSQL
duration_s INTEGER GENERATED ALWAYS AS
(EXTRACT(EPOCH FROM (ended_at - started_at))::INTEGER) STORED,
engine_hours REAL,
area_m2 INTEGER,
fuel_used_ml INTEGER,
start_lat DOUBLE PRECISION,
start_lon DOUBLE PRECISION,
gps_track JSONB, -- Array van GPS-punten voor perceelkaart op dashboard
status VARCHAR(16) DEFAULT 'ACTIVE',
invoice_id BIGINT
);
-- Materialized view voor motoruren-rapportage (dagelijks ververst)
-- Equivalent van een voorberekende aggregate voor rapportage-snelheid
CREATE MATERIALIZED VIEW daily_engine_hours AS
SELECT
tractor_id,
DATE(started_at) AS work_date,
SUM(engine_hours) AS total_hours,
SUM(area_m2) / 10000.0 AS total_hectares,
SUM(fuel_used_ml)/1000.0 AS total_fuel_liters,
COUNT(*) AS session_count
FROM work_sessions
WHERE status = 'COMPLETED'
GROUP BY tractor_id, DATE(started_at);
# 6.2 Efficiënte tijdreeks queries
// In TelemetryRepository:
/**
* Gemiddelde motortemperatuur per uur, afgelopen 24 uur.
* TimescaleDB's time_bucket() is SQL GROUP BY + date_trunc,
* maar geoptimaliseerd voor hypertables.
*/
@Query(value = """
SELECT
time_bucket('1 hour', time) AS bucket,
AVG(engine_temp_c) AS avg_temp,
MAX(engine_temp_c) AS max_temp,
AVG(rpm) AS avg_rpm
FROM telemetry
WHERE tractor_id = :tractorId
AND time > NOW() - INTERVAL '24 hours'
GROUP BY bucket
ORDER BY bucket DESC
""", nativeQuery = true)
List<Object[]> getHourlyAggregates(@Param("tractorId") String tractorId);
/**
* Detecteer frames waar de motortemperatuur meer dan
* 2 standaarddeviaties boven het daggemiddelde ligt.
*/
@Query(value = """
WITH daily_stats AS (
SELECT AVG(engine_temp_c) AS mean,
STDDEV(engine_temp_c) AS stddev
FROM telemetry
WHERE tractor_id = :tractorId
AND time > NOW() - INTERVAL '24 hours'
)
SELECT t.time, t.engine_temp_c, t.rpm, t.pto_active
FROM telemetry t, daily_stats
WHERE t.tractor_id = :tractorId
AND t.time > NOW() - INTERVAL '24 hours'
AND t.engine_temp_c > daily_stats.mean + (2 * daily_stats.stddev)
ORDER BY t.time DESC
LIMIT 100
""", nativeQuery = true)
List<Object[]> findTemperatureAnomalies(@Param("tractorId") String tractorId);
# 7. Real-time Dashboard via WebSocket
Het React-dashboard wil live updates zonder polling. We gebruiken Spring WebSocket met STOMP.
# 7.1 Backend push handler
// api/DashboardWebSocketHandler.java
@Component
@RequiredArgsConstructor
public class DashboardWebSocketHandler {
private final SimpMessagingTemplate messagingTemplate;
/**
* Kanaalstructuur:
* /topic/fleet <- Alle tractoren (voor fleet overview map)
* /topic/tractor/{id} <- Specifieke tractor (voor detail view)
* /topic/alerts <- Kritieke meldingen
*/
public void pushUpdate(String tractorId, TelemetryFrame frame) {
var dto = LiveTelemetryDto.from(frame);
messagingTemplate.convertAndSend("/topic/fleet", dto);
messagingTemplate.convertAndSend("/topic/tractor/" + tractorId, dto);
}
public void pushAlert(RuleViolation violation) {
messagingTemplate.convertAndSend("/topic/alerts", AlertDto.from(violation));
}
}
# 7.2 React TypeScript hook
// frontend/src/hooks/useTractorTelemetry.ts
import { useEffect, useState } from 'react';
import { Client } from '@stomp/stompjs';
interface LiveTelemetry {
tractorId: string;
timestamp: string;
latitude: number;
longitude: number;
engineTempC: number;
speedKmh: number;
rpm: number;
ptoActive: boolean;
fuelLevelPct: number;
}
/**
* De frontend-architectuur spiegelt de backend:
* subscribe op een topic (net als MQTT op de ESP32),
* ontvang updates asynchroon via callback,
* geen polling.
*/
export function useTractorTelemetry(tractorId: string) {
const [telemetry, setTelemetry] = useState<LiveTelemetry | null>(null);
const [connected, setConnected] = useState(false);
useEffect(() => {
const client = new Client({
brokerURL: 'ws://localhost:8080/ws',
onConnect: () => {
setConnected(true);
client.subscribe(`/topic/tractor/${tractorId}`, (message) => {
setTelemetry(JSON.parse(message.body));
});
},
onDisconnect: () => setConnected(false),
});
client.activate();
return () => { client.deactivate(); };
}, [tractorId]);
return { telemetry, connected };
}
// Gebruik:
// const { telemetry } = useTractorTelemetry('TRACTOR_001');
// <EngineGauge value={telemetry?.engineTempC} warning={88} critical={95} />
# 8. Voorspellend Onderhoud: Anomaly Detection
De meest waardevolle enterprise-functie: een probleem detecteren voordat de tractor kapotgaat op de akker.
// maintenance/AnomalyDetector.java
@Service
@RequiredArgsConstructor
public class AnomalyDetector {
private final TelemetryRepository telemetryRepository;
/**
* Detecteer afwijkend gedrag via Exponential Moving Average.
*
* EMA geeft meer gewicht aan recente waarden.
* Z-score: hoeveel standaarddeviaties wijkt de meting af van de EMA-baseline?
* Z > 3.0: statistisch zeer ongebruikelijk (kans < 0.3%).
*/
public Optional<MaintenanceAlert> detectTemperatureAnomaly(
String tractorId, TelemetryFrame current) {
var baseline = telemetryRepository.getTemperatureBaseline(tractorId);
if (baseline.isEmpty()) return Optional.empty();
double ema = baseline.get().getEmaTemp();
double stddev = Math.sqrt(baseline.get().getVariance());
double zScore = (current.getEngineTempC() - ema) / stddev;
if (zScore > 3.0 && current.getEngineTempC() > 75.0) {
return Optional.of(MaintenanceAlert.builder()
.tractorId(tractorId)
.type("TEMPERATURE_ANOMALY")
.severity("WARNING")
.message("Ongebruikelijke temperatuurstijging: %.1f°C (%.1f sigma boven normaal). Controleer koelsysteem."
.formatted(current.getEngineTempC(), zScore))
.detectedAt(current.getTimestamp())
.recommendation("Inspecteer koelsysteem: koelvloeistofpeil, thermostaat, radiateur")
.build());
}
// Update EMA voor volgende detectie (alpha = 0.1: trage filter)
double alpha = 0.1;
double newEma = alpha * current.getEngineTempC() + (1 - alpha) * ema;
double newVariance = (1 - alpha) * (baseline.get().getVariance() +
alpha * Math.pow(current.getEngineTempC() - ema, 2));
telemetryRepository.updateTemperatureBaseline(tractorId, newEma, newVariance);
return Optional.empty();
}
/**
* RPM-instabiliteit detectie.
* Een gezonde motor heeft stabiele RPM bij constante PTO-belasting.
* Variatiecoefficient > 5%: ongebruikelijk, waarschijnlijk brandstoffilter.
*/
public Optional<MaintenanceAlert> detectRpmInstability(
String tractorId, List<TelemetryFrame> recentFrames) {
if (recentFrames.size() < 10) return Optional.empty();
var ptoFrames = recentFrames.stream()
.filter(TelemetryFrame::isPtoActive).toList();
if (ptoFrames.size() < 5) return Optional.empty();
double mean = ptoFrames.stream()
.mapToInt(TelemetryFrame::getRpm).average().orElse(0);
double stddev = Math.sqrt(ptoFrames.stream()
.mapToDouble(f -> Math.pow(f.getRpm() - mean, 2)).average().orElse(0));
double cv = stddev / mean;
if (cv > 0.05) {
return Optional.of(MaintenanceAlert.builder()
.tractorId(tractorId)
.type("RPM_INSTABILITY")
.severity("INFO")
.message("RPM-instabiliteit: %.1f%% variatie tijdens PTO-gebruik".formatted(cv * 100))
.detectedAt(recentFrames.getLast().getTimestamp())
.recommendation("Brandstoffilter vervangen (part #KU-BF-1247)")
.build());
}
return Optional.empty();
}
}
# 9. De Factuurkoppeling: Exact Online
Het loonbedrijf wil dat afgeronde werksessies automatisch factuurregels genereren in Exact Online.
// invoicing/InvoiceService.java
@Service
@RequiredArgsConstructor
@Slf4j
public class InvoiceService {
private final WorkSessionRepository workSessionRepository;
private final ExactOnlineClient exactClient;
private final PriceRepository priceRepository;
/**
* Verwerk een afgeronde werksessie naar een factuurregel.
* Event-driven: aangeroepen vanuit WorkSessionService zodra een
* sessie de status COMPLETED krijgt.
*/
@Transactional
public void processCompletedSession(WorkSession session) {
log.info("Factuurverwerking voor sessie {}", session.getId());
var customer = session.getCustomer();
var billableItems = calculateBillableItems(session);
for (var item : billableItems) {
var invoiceLine = ExactInvoiceLine.builder()
.accountCode(customer.getExactAccountCode())
.itemCode(item.itemCode())
.description(item.description())
.quantity(item.quantity())
.unitPrice(item.unitPrice())
.vatCode("21")
.build();
exactClient.createInvoiceLine(invoiceLine).ifPresentOrElse(
invoiceId -> {
session.setInvoiceId(invoiceId);
workSessionRepository.save(session);
log.info("Factuurregel aangemaakt: {}", invoiceId);
},
() -> log.error("Factuurlijn aanmaken mislukt voor sessie {}", session.getId())
);
}
}
private List<BillableItem> calculateBillableItems(WorkSession session) {
var prices = priceRepository.getPricesForCustomer(
session.getCustomerId(), session.getStartedAt().toLocalDate());
var items = new ArrayList<BillableItem>();
// Uurprijs voor inzet (afgerond op kwartier)
double hours = session.getDurationS() / 3600.0;
items.add(new BillableItem(
"TRACTOR_HOUR",
"Tractor inzet %s — %s".formatted(
session.getTractor().getName(),
session.getStartedAt().toLocalDate()),
roundToQuarter(hours),
prices.getHourlyRate()
));
// Hectare-toeslag indien van toepassing
if (session.getAreaM2() > 0 && prices.getAreaRate() > 0) {
double hectares = session.getAreaM2() / 10000.0;
items.add(new BillableItem(
"HECTARE_SURCHARGE",
"Bewerkt oppervlak: %.2f ha".formatted(hectares),
Math.round(hectares * 100) / 100.0,
prices.getAreaRate()
));
}
return items;
}
private double roundToQuarter(double hours) {
return Math.round(hours * 4) / 4.0;
}
record BillableItem(String itemCode, String description,
double quantity, double unitPrice) {}
}
# 10. Deployment: Docker Compose naar Kubernetes
# 10.1 Docker Compose voor development en staging
# docker-compose.yml
version: '3.8'
services:
db:
image: timescale/timescaledb:latest-pg15
environment:
POSTGRES_DB: tractorplatform
POSTGRES_USER: tractor
POSTGRES_PASSWORD: ${DB_PASSWORD}
volumes:
- pgdata:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U tractor -d tractorplatform"]
interval: 10s
retries: 5
mqtt:
image: eclipse-mosquitto:2
volumes:
- ./config/mosquitto:/mosquitto/config
- ./certs:/mosquitto/certs:ro
ports:
- "8883:8883" # MQTT over TLS
- "9001:9001" # WebSocket voor debugging
backend:
build: ./backend
depends_on:
db:
condition: service_healthy
environment:
SPRING_DATASOURCE_URL: jdbc:postgresql://db:5432/tractorplatform
SPRING_DATASOURCE_USERNAME: tractor
SPRING_DATASOURCE_PASSWORD: ${DB_PASSWORD}
MQTT_BROKER_URL: ssl://mqtt:8883
EXACT_CLIENT_ID: ${EXACT_CLIENT_ID}
EXACT_CLIENT_SECRET: ${EXACT_CLIENT_SECRET}
ports:
- "8080:8080"
dashboard:
build: ./frontend
depends_on:
- backend
ports:
- "3000:80"
volumes:
pgdata:
# 10.2 Kubernetes productie excerpts
# k8s/backend-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: tractor-backend
spec:
replicas: 2 # Horizontaal schaalbaar
selector:
matchLabels:
app: tractor-backend
template:
spec:
containers:
- name: backend
image: vandenberg/tractor-backend:1.2.0
resources:
requests: { memory: "512Mi", cpu: "250m" }
limits: { memory: "1Gi", cpu: "1000m" }
env:
- name: DB_PASSWORD
valueFrom:
secretKeyRef: { name: db-secret, key: password }
livenessProbe:
httpGet: { path: /actuator/health/liveness, port: 8080 }
initialDelaySeconds: 30
readinessProbe:
httpGet: { path: /actuator/health/readiness, port: 8080 }
# 11. Security: End-to-End Beveiliging
Een tractor die gespoofde GPS-data ontvangt, of waarvan de firmware vervangen kan worden door een aanvaller, is een reeel veiligheidsrisico.
# 11.1 mTLS: mutual TLS voor MQTT
Standaard TLS valideert alleen de server. Mutual TLS (mTLS) valideert ook de client via een certificaat. Elke tractor krijgt een uniek client-certificaat:
# Eenmalig: maak CA aan
openssl genrsa -out ca.key 4096
openssl req -new -x509 -days 3650 -key ca.key -out ca.crt \
-subj "/CN=TractorOS CA/O=Van der Berg Loonbedrijf"
# Per tractor: uniek client certificaat
TRACTOR_ID="TRACTOR_001"
openssl genrsa -out ${TRACTOR_ID}.key 2048
openssl req -new -key ${TRACTOR_ID}.key -out ${TRACTOR_ID}.csr \
-subj "/CN=${TRACTOR_ID}/O=Van der Berg Loonbedrijf"
openssl x509 -req -days 365 -in ${TRACTOR_ID}.csr \
-CA ca.crt -CAkey ca.key -out ${TRACTOR_ID}.crt
# Flash naar ESP32 NVS via provisioning script
python3 scripts/provision_tractor.py \
--tractor-id ${TRACTOR_ID} \
--ca-cert ca.crt \
--client-cert ${TRACTOR_ID}.crt \
--client-key ${TRACTOR_ID}.key \
--port /dev/ttyUSB0
# 11.2 Firmware signing voor OTA
// src/ota/FirmwareVerifier.h
#pragma once
#include <mbedtls/sha256.h>
#include <mbedtls/rsa.h>
#include "diagnostics/Logger.h"
namespace TractorOS {
/**
* Verifieert de digitale handtekening van een firmware-image voor installatie.
*
* De CI/CD pipeline ondertekent elk image met de private key van het devteam.
* De ESP32 verifieert met de bijbehorende public key die in flash is ingebrand.
* Zonder geldige handtekening: update categorisch geweigerd.
*
* Dit voorkomt dat een aanvaller via MQTT een kwaadaardige firmware pusht.
*/
class FirmwareVerifier {
public:
static bool verify(const uint8_t* firmwareData, size_t firmwareLen,
const uint8_t* signature, size_t sigLen) {
// 1. Bereken SHA-256 hash van het firmware-image
uint8_t hash[32];
mbedtls_sha256_context sha_ctx;
mbedtls_sha256_init(&sha_ctx);
mbedtls_sha256_starts(&sha_ctx, 0);
mbedtls_sha256_update(&sha_ctx, firmwareData, firmwareLen);
mbedtls_sha256_finish(&sha_ctx, hash);
mbedtls_sha256_free(&sha_ctx);
// 2. Verifieer RSA-2048 handtekening met ingebakken public key
mbedtls_rsa_context rsa;
mbedtls_rsa_init(&rsa);
bool valid = false;
if (loadPublicKey(rsa)) {
int ret = mbedtls_rsa_pkcs1_verify(
&rsa, MBEDTLS_MD_SHA256, 32, hash, signature);
valid = (ret == 0);
}
mbedtls_rsa_free(&rsa);
if (!valid) LOG_E("Verifier", "HANDTEKENING ONGELDIG — update geweigerd!");
return valid;
}
private:
static bool loadPublicKey(mbedtls_rsa_context& ctx);
FirmwareVerifier() = delete;
};
} // namespace TractorOS
# 12. De Complete Data Flow: Van Sensor naar Factuur
Laten we de volledige reis van een meting door het hele systeem volgen.
T+0ms DS18B20 sensor geeft spanning af: 87.4 graden Celsius
T+1ms SensorTask leest waarde, bouwt SensorReading (stack-gealloceerd).
Publiceert SensorReadingEvent op de EventBus.
T+2ms WorkSessionDetector ontvangt event.
State: WORKING — snelheid 8.3 km/h, PTO actief.
Voegt GPS-punt toe aan CircularBuffer (track, elke 10 meter).
T+3ms TelemetryEncoder bouwt protobuf TelemetryFrame:
engine_temp_c10 = 874 (87.4 * 10, integer)
lat_e7 = 520347812 (52.0347812 * 1e7)
Framegrootte: 34 bytes vs. ~160 bytes JSON.
T+4ms MiddlewarePipeline:
RateLimitMiddleware -> OK (100ms verstreken)
ChecksumMiddleware -> XOR checksum toegevoegd
CompressionMiddleware -> 34 bytes < drempel, geen compressie
T+5ms MqttTransport publiceert op "tractoros/TRACTOR_001/telemetry".
QoS 0 — fire and forget voor hoge-frequentie telemetrie.
T+15ms Spring Boot MQTT Consumer ontvangt het bericht.
TelemetryMqttHandler.handleMessage() wordt aangeroepen.
T+16ms Protobuf deserialisatie:
engine_temp_c = 874 / 10.0 = 87.4 (terug naar double)
latitude = 520347812 / 1e7 = 52.0347812
T+17ms TelemetryService.processBatch():
1. JDBC batch insert naar TimescaleDB (5 frames per INSERT).
T+18ms BusinessRuleEngine evalueert alle regels:
EngineTemperatureRule: 87.4 < 88.0 (warning drempel) -> OK
MaintenanceIntervalRule: 63 uur tot olieverversing -> OK
GeofenceRule: binnen perceel "Akker Noord" -> OK
AnomalyDetector: z-score = 0.8 (normaal) -> OK
T+19ms DashboardWebSocketHandler pusht naar "/topic/tractor/TRACTOR_001".
React dashboard ontvangt update, gauge-display refresht.
------- 3 uur later --------------------------------------------------------
T+3u Tractor stopt: PTO uitgeschakeld, motor stationair.
WorkSessionDetector: transitie WORKING -> IDLE.
Publiceert WorkSessionEndedEvent:
duration: 10.847 seconden
area: 3.2 ha (shoelace-algoritme over 847 GPS-punten)
fuel: 42.3 liter
T+3u+50ms WorkSessionService ontvangt event via EventBus.
Sessie opgeslagen als COMPLETED in work_sessions tabel.
T+3u+51ms InvoiceService.processCompletedSession():
Klant: Maatschap Hendriks, perceel: Akker Noord
Berekening:
3.0 uur (afgerond op kwartier) x EUR 145/uur = EUR 435,00
3.20 ha x EUR 12,00/ha = EUR 38,40
Totaal excl. BTW: EUR 473,40
T+3u+52ms ExactOnlineClient:
POST https://start.exactonline.nl/api/v1/{div}/salesentry/SalesEntryLines
Response: InvoiceID "INV-2024-1847"
T+3u+52ms WorkSession.invoiceId = "INV-2024-1847", opgeslagen.
Dashboard toont: "Sessie gefactureerd"
Totale latency sensor -> database: ~17ms
Totale latency sensor -> dashboard: ~19ms
Totale latency sessie -> factuur: ~2 seconden (inclusief Exact Online API)
# Bijlage A: De Gedeelde Proto Library
Het slimste architectuurbesluit: het .proto bestand wordt gedeeld door firmware en backend.
proto/tractoros.proto
|
+-- nanopb_generator.py --> src/protocol/tractoros.pb.h (ESP32/C++)
| src/protocol/tractoros.pb.c
|
+-- protoc + grpc-java --> TelemetryFrame.java (Spring Boot/Java)
TelemetryBatch.java
Als je een veld toevoegt aan het .proto bestand, compileert de ESP32 firmware niet totdat je het ook verwerkt in de C++ encoder. De Java backend geeft een compileerfout als de decoder het veld niet afhandelt. Het type-systeem vangt protocol-incompatibiliteiten voor deployment — hetzelfde principe als het Result
# Bijlage B: Wat Verder Uitgebouwd Kan Worden
Dit systeem is productie-klaar voor een loonbedrijf met vijf tractoren. Voor grotere schaal of hogere eisen:
Schaalbaarheid: Bij 50+ tractoren wordt Mosquitto een bottleneck. Vervang door geclusterde HiveMQ/VerneMQ, of gebruik AWS IoT Core als managed service.
Event sourcing: De work_sessions tabel slaat alleen het eindresultaat op. Met event sourcing (elke state-transitie als onveranderlijk event) kun je facturen herberekenen bij tariefwijzigingen zonder historische data te verliezen.
CQRS: De BusinessRuleEngine en queries draaien nu op dezelfde database. Bij hogere load: write-database voor live data, read replica (TimescaleDB) voor analytische queries.
Edge ML: De AnomalyDetector gebruikt een eenvoudige z-score. Met genoeg historische data train je een TensorFlow Lite model dat als binary op de ESP32 draait — anomaliedetectie zonder netwerk, nul latency.
Multi-tenancy: Voor een SaaS-platform met meerdere loonbedrijven: voeg tenant_id toe aan alle tabellen, gebruik PostgreSQL Row-Level Security, en OAuth2/OIDC per tenant.
Cursusreeks volledig. Van de byte-level Protobuf encoder op de ESP32, via de event-driven middleware pipeline en de Business Rule Engine, tot de automatisch gegenereerde factuurregels in Exact Online — met de architectuurdiscipline van een ervaren software-architect door de hele stack heen.
Reacties (0 )
Geen reacties beschikbaar.
Log in om een reactie te plaatsen.