5.3. Umsetzung

5.3.1. Rust - Anfang

Cargo.toml:

[package]
name = "rusty-rpi"
version = "0.1.0"
edition = "2021"

[dependencies]
dht-mmap-rust = "0.1.0"

Für den Anfang benötigen wir lediglich eine Abhängigkeit (In Rust „crates“ genannt), nämlich dht-mmap-rust. Diese ist leicht hinzuzufügen durch:

cargo add dht-mmap-rust

Source Code:

use dht_mmap_rust::{Dht, DhtType};

fn main() {
    // The sensor is a DHT22 connected on pin 23
    let mut dht = Dht::new(DhtType::Dht22, 23).unwrap();
    let mut i = 0;
    let tries = 5;

    loop {
        let result = dht.read();

        match result {
            Ok(data) => {
                i = 0;
                println!(
                    "Temperature {} °C, Humidity {}%RH",
                    data.temperature(),
                    data.humidity()
                );
            }
            Err(_) => {
                i = i + 1;
                if i >= tries {
                    eprintln!("Exceed retry amount, exiting.");
                    break;
                }
                eprintln!("Error reading sensor, retrying.");
            }
        }
    }
}

Der anfängliche Source Code is sehr überschaubar.

Wir setzen unseren DHT22 sensor auf Pin 23 fest und versuchen in einer endlosen Schleife wiederholt Daten auszulesen, bis wir auf 5 Fehlversuche treffen.

Systemd Service:

Da das Lesen des Speichers, welcher mit den GPIO verbunden ist, öfter schief gegen kann, haben wir uns beschlossen unsere Rust Anwendung in einem Systemd Service zu packen.

Der Vorteil von einem Systemd Service ist, dass wir unsere Rust Anwendung automatisch neustarten lassen können sobald diese abstürzt.

[Unit]
Description=Rusty RPi - Sensor Data
After=network.target

[Service]
Type=simple
Restart=on-failure
RestartSec=3
ExecStart=/usr/bin/env "/home/mert/56/Versuche/Versuch 4 - Rusty Raspberry Pi/rusty-rpi/rusty-rpi"

[Install]
WantedBy=multi-user.target

Durch obige Systemd Unit Datei legen wir fest dass unsere Anwendung nach 3 Sekunden wieder gestartet werden soll, sobald diese abstürzt.

5.3.2. Graphite

Die installation von Graphite kann direkt von source über die Github-Repositories, per pip install, in einem virtualenv oder als Docker-container erfolgen. Da das Grafana setup ebenfalls in einer docker-compose.yaml aufgesetzt wurde, haben wir uns wieder für die Installation als container entschiede.

Graphite docker-compose konfiguration

Die docker compose sieht wie folgt aus:

networks:
  grafana:
    external: true

services:
  graphite:
    image: graphiteapp/graphite-statsd
    container_name: graphite
    restart: unless-stopped
    networks:
        - grafana
    ports:
        - '8070:8080'
       # - '8088:80' # nginx
        - '2003-2004:2003-2004'
        - '2023-2024:2023-2024'
        - "8125:8125/udp"
        - "8126:8126"
    volumes:
        - graphite-data:/opt/graphite
    labels:
        - traefik.enable=true
        - "traefik.http.routers.graphite.rule=Host(`graphite.dva.mahart.ma`)"
        - "traefik.http.routers.graphite.entrypoints=websecure"
        - "traefik.http.routers.graphite.tls.certresolver=myresolver"
        - "traefik.http.routers.graphite.service=graphite-graphite"                                                                                                                                                            t=80"
        - "traefik.http.services.graphite-graphite.loadbalancer.server.port=8080                                                                                                                                                             "
        - "traefik.http.routers.graphite.middlewares=dev-auth" # enable basic au                                                                                                                                                             th
        - "com.centurylinklabs.watchtower.enable=true"

volumes:
  graphite-data: {}

In dieser Konfiguration wird wieder das grafana-network verwendet um eine nahtlose Kommunikation zwischen Grafana und Graphite zu ermöglichen. Ebenfalls auffällig ist die Port konfiguration. Graphite läuft standartmäßig auf port 8080. In diesem Image läuft intern allerdings noch ein nginx als reverse-proxy, welcher mit Traefik in Konflikt geraten kann. Wir haben uns dazu entschieden den nginx direkt außen vor zu lassen und den Netzwerkverkehr Traefik zu überlassen. Wir exposen also den inneren Port 8080 über 8070 nach außen, weil 8080 bereits für den cAdvisor aus dem vergangenen Projekt in Verwendung ist. Alle weiteren Ports die in dieser Konfiguration auftauchen dienen als Endpunkte zum füttern von Daten, für verschiedene Formate und Backends

Graphite einrichten

Nachdem die Container gestartet wurden, können wir Graphite über Ihren Webbrowser aufrufen, indem wir graphite.dva.mahart.ma eingeben. Das Graphite-Webinterface ermöglicht es, Metriken zu erfassen und zu visualisieren. Wir nutzen Graphite allerdings nicht zu Veranschaulichung, sondern lediglich als Datenquelle für Grafana.

Grafana einrichten

Wir verwenden Grafana, um die in Graphite erfassten Metriken zu visualisieren. Wir fügen die data source hinzu indem wir unter „connections“ die data source für Graphite auswählen. Für die URL tragen wir http://78.47.10.198:8070 ein.

5.3.3. Daten zu Graphite senden

Um die erfassten Sensordaten an einen Graphite-Server zu senden, erweitern wir unser Projekt um zusätzliche Abhängigkeiten und passen den Code entsprechend an.

Source Code:

use dht_mmap_rust::{Dht, DhtType};
use std::process::{Command, Stdio};
use std::thread;
use std::time::{Duration, SystemTime, UNIX_EPOCH};

const HOST: &str = "78.47.10.198";
const PORT: &str = "2003";
const REFRESH_INTERVAL: u8 = 5; // Seconds

struct Data {
    name: String,
    value: f32,
    timestamp: u64,
}

fn main() {
    // The sensor is a DHT22 connected on pin 23
    let mut dht = Dht::new(DhtType::Dht22, 23).unwrap();
    let mut i: u8 = 0;
    let tries: u8 = 5;
    let mut history: Vec<(f32, f32)> = Vec::new();

    loop {
        let result = dht.read();

        match result {
            Ok(data) => {
                i = 0;

                history.push((data.temperature(), data.humidity()));
            }
            Err(_) => {
                i = i + 1;

                if i >= tries {
                    eprintln!("Exceeded retry amount, exiting.");
                    break;
                }

                eprintln!("Error reading sensor, retrying.");
            }
        }

        if history.len() as u8 >= REFRESH_INTERVAL {
            let timestamp = SystemTime::now()
                .duration_since(UNIX_EPOCH)
                .expect("Negative time.")
                .as_secs();
            let average = calculate_average(&history);
            let data_temp = Data {
                name: "sensor.temperature".to_string(),
                value: average.0,
                timestamp: timestamp,
            };
            let data_humd = Data {
                name: "sensor.humidity".to_string(),
                value: average.1,
                timestamp: timestamp,
            };

            if let Err(_) = send_data(data_temp) {
                eprintln!("Error sending temperature data.");
            }

            if let Err(_) = send_data(data_humd) {
                eprintln!("Error sending humidity data.");
            }

            history.clear();
        }

        thread::sleep(Duration::from_secs(1));
    }
}

fn calculate_average(history: &Vec<(f32, f32)>) -> (f32, f32) {
    let mut average = (0.0f32, 0.0f32);

    for (temp, humd) in history.iter() {
        average.0 = average.0 + temp;
        average.1 = average.1 + humd;
    }

    average.0 = average.0 / history.len() as f32;
    average.1 = average.1 / history.len() as f32;

    average
}

fn send_data(data: Data) -> Result<(), ()> {
    let body = format!("{} {} {}", data.name, data.value, data.timestamp);
    let echo = Command::new("echo")
        .arg(body)
        .stdout(Stdio::piped())
        .spawn()
        .expect("Failed to run echo.");
    let netcat = Command::new("nc")
        .arg("-q0")
        .arg(HOST)
        .arg(PORT)
        .stdin(Stdio::from(echo.stdout.unwrap()))
        .status()
        .expect("Failed to run netcat.");

    if netcat.code().unwrap() == 0 {
        Ok(())
    } else {
        Err(())
    }
}

Dieser erweiterte Source Code liest die Sensordaten vom DHT22 Sensor, berechnet Durchschnittswerte und sendet die Daten alle 5 Sekunden an einen Graphite-Server. Die Daten werden über das echo- und netcat-Kommando gesendet.

Warum netcat?

Wir haben am Anfang versucht, die Daten per POST-Request mit Hilfe des reqwest-Pakets zu senden, aber unsere Requests wurden von Graphite Carbon ignoriert.

async fn send_data(client: &Client, datapoint: DataPoint) -> Result<(), reqwest::Error> {
    let body = format!(
        "{} {} {}",
        datapoint.name, datapoint.value, datapoint.timestamp
    );

    let response = client
        .post(GRAPHITE_URL)
        .header("Content-Type", "application/json")
        .body(body)
        .send()
        .await?
        .error_for_status()?;

    match response.status() {
        reqwest::StatusCode::OK => info!("Data submitted to Graphite successfully!"),
        reqwest::StatusCode::FORBIDDEN => error!("Unauthorized! Check the token."),
        reqwest::StatusCode::BAD_REQUEST => error!("Bad request!"),
        _ => error!("Uncaught error writing data"),
    };
    Ok(())
}

Unsere Anfragen kamen Zeile für Zeile in Carbon an, weshalb er sie nicht verarbeiten konnte. Wie man in den Logs sieht:

15/05/2024 21:13:07 :: [listener] invalid line received from client 141.82.146.66:41872, ignoring [POST / HTTP/1.1]
15/05/2024 21:13:07 :: [listener] invalid line received from client 141.82.146.66:41872, ignoring [Host: 78.47.10.198:2003]
15/05/2024 21:13:07 :: [listener] invalid line received from client 141.82.146.66:41872, ignoring [User-Agent: curl/7.88.1]
15/05/2024 21:13:07 :: [listener] invalid line received from client 141.82.146.66:41872, ignoring [Accept: */*]
15/05/2024 21:13:07 :: [listener] invalid line received from client 141.82.146.66:41872, ignoring [Content-Type: application/octet-stream]
15/05/2024 21:13:07 :: [listener] invalid line received from client 141.82.146.66:41872, ignoring [Content-Length: 27]
15/05/2024 21:13:07 :: [listener] invalid line received from client 141.82.146.66:41872, ignoring []

POST-Requests mit Hilfe von curl wurden ebenfalls ignoriert. Leider konnten wir dieses Verhalten nicht erklären.

curl -X POST -d "sensor.temperature 25.0 1649995987" $HOST:$PORT

Letztendlich haben wir uns entschieden, netcat zu verwenden, um die Daten an den Graphite-Server zu senden. netcat ermöglicht es uns, die Daten im richtigen Format direkt an Carbon zu übermitteln, ohne dass die Anfragen zeilenweise empfangen und ignoriert werden.

5.3.4. Grafana

Unsere Grafana Instanz ist weiterhin über grafana.dva.mahart.ma erreichbar.

Erstellung des Dashboards

Ist die Datenquelle erfolgreich eingerichtet, getestet und mit ersten Daten bespeist, kann das Dashboard erstellt und mit ersten Graphen gebaut werden.

Die Queries bestehen aus den Referenzen der Datenpunkte im Verbund mit Berechnungs-/Analyse-Funktionen von Grafana.

Als Beispiel verwenden wir die Referenz sensor.temperature und können diese mit einem summarize 1min max Verbund aggregieren. Die vollständige Funktion lautet dann alias(summarize(sensor.temperature, '1min', 'max'), 'Aktuelle Temperatur') und bildet einen Zeitbasierten Verlauf der aktuellen Temperatur. Die Funktion bündelt immer das Maximum in einem 1-Minütigen Intervall als Datenpunkt und der Alias dient als Bezeichnung des Graphen und macht den Kontext für den Nutzer verständlich.

Wir haben weitere Panels mit 1Std, 24Std, sowie einer 7 Tage Analyse angelegt.