mqtt
continuous-integration/drone/push Build is passing Details

This commit is contained in:
Paul Zinselmeyer 2023-08-22 14:07:45 +02:00
parent 80691285b2
commit 42f5458235
8 changed files with 777 additions and 651 deletions

39
.drone.yml Normal file
View File

@ -0,0 +1,39 @@
kind: pipeline
ype: docker
name: stromsensor
trigger:
event:
- push
steps:
- name: create_image
image: nixos/nix
commands:
- nix build --extra-experimental-features nix-command --extra-experimental-features flakes --cores 0 --max-jobs auto .#dockerImage
- cp result stromsensor.tar.gz
- name: upload_image
image: docker:dind
environment:
REGISTRY_PASSWD:
from_secret: REGISTRY_PASSWD
volumes:
- name: dockersock
path: /var/run
commands:
- docker login --username pfz4 --password $REGISTRY_PASSWD git2.zettoit.eu
- docker load < stromsensor.tar.gz
- docker tag stromsensor:latest git2.zettoit.eu/pfz4/stromsensor:latest
- docker push git2.zettoit.eu/pfz4/stromsensor:latest
services:
- name: docker
image: docker:dind
privileged: true
volumes:
- name: dockersock
path: /var/run
volumes:
- name: dockersock
temp: {}

View File

@ -1,3 +1,6 @@
DATABASE_URL=mysql://user:pass@example.com/database
USB_PORT=/dev/ttyUSB0
INSERT_EVERY_NTH=10
MQTT_HOST=example.com
MQTT_PORT=1883
MQTT_USER=username
MQTT_PASS=password

1
.gitignore vendored
View File

@ -1,2 +1,3 @@
/target
.env
result

1040
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -8,6 +8,12 @@ edition = "2021"
[dependencies]
dotenvy = "0.15"
serial = "0.4"
sqlx = { version = "0.6", features = ["runtime-tokio-rustls", "mysql"], default-features=false }
tokio = { version = "1", features = ["full"]}
anyhow = "1.0.66"
rumqttc = "0.22.0"
serde = "^1.0"
serde_json = "^1.0"
log = "0.4.20"
env_logger = "0.10.0"

129
flake.lock Normal file
View File

@ -0,0 +1,129 @@
{
"nodes": {
"crane": {
"inputs": {
"flake-compat": "flake-compat",
"flake-utils": [
"flake-utils"
],
"nixpkgs": [
"nixpkgs"
],
"rust-overlay": [
"rust-overlay"
]
},
"locked": {
"lastModified": 1691803597,
"narHash": "sha256-khWW1Owzselq5o816Lb7x624d6QGnv+kpronK3ndkr4=",
"owner": "ipetkov",
"repo": "crane",
"rev": "7809d369710abb17767b624f9e72b500373580bc",
"type": "github"
},
"original": {
"owner": "ipetkov",
"repo": "crane",
"type": "github"
}
},
"flake-compat": {
"flake": false,
"locked": {
"lastModified": 1673956053,
"narHash": "sha256-4gtG9iQuiKITOjNQQeQIpoIB6b16fm+504Ch3sNKLd8=",
"owner": "edolstra",
"repo": "flake-compat",
"rev": "35bb57c0c8d8b62bbfd284272c928ceb64ddbde9",
"type": "github"
},
"original": {
"owner": "edolstra",
"repo": "flake-compat",
"type": "github"
}
},
"flake-utils": {
"inputs": {
"systems": "systems"
},
"locked": {
"lastModified": 1689068808,
"narHash": "sha256-6ixXo3wt24N/melDWjq70UuHQLxGV8jZvooRanIHXw0=",
"owner": "numtide",
"repo": "flake-utils",
"rev": "919d646de7be200f3bf08cb76ae1f09402b6f9b4",
"type": "github"
},
"original": {
"owner": "numtide",
"repo": "flake-utils",
"type": "github"
}
},
"nixpkgs": {
"locked": {
"lastModified": 1692447944,
"narHash": "sha256-fkJGNjEmTPvqBs215EQU4r9ivecV5Qge5cF/QDLVn3U=",
"owner": "NixOS",
"repo": "nixpkgs",
"rev": "d680ded26da5cf104dd2735a51e88d2d8f487b4d",
"type": "github"
},
"original": {
"owner": "NixOS",
"ref": "nixos-unstable",
"repo": "nixpkgs",
"type": "github"
}
},
"root": {
"inputs": {
"crane": "crane",
"flake-utils": "flake-utils",
"nixpkgs": "nixpkgs",
"rust-overlay": "rust-overlay"
}
},
"rust-overlay": {
"inputs": {
"flake-utils": [
"flake-utils"
],
"nixpkgs": [
"nixpkgs"
]
},
"locked": {
"lastModified": 1692670201,
"narHash": "sha256-WbCKJRfh1Zb7N7g8Fzq7/Hg6i6yCbvaa0OAi4cSHk1w=",
"owner": "oxalica",
"repo": "rust-overlay",
"rev": "bf5196c27545735374376d96d41f209bae3643e1",
"type": "github"
},
"original": {
"owner": "oxalica",
"repo": "rust-overlay",
"type": "github"
}
},
"systems": {
"locked": {
"lastModified": 1681028828,
"narHash": "sha256-Vy1rq5AaRuLzOxct8nz4T6wlgyUR7zLU309k9mBC768=",
"owner": "nix-systems",
"repo": "default",
"rev": "da67096a3b9bf56a91d16901293e51ba5b49a27e",
"type": "github"
},
"original": {
"owner": "nix-systems",
"repo": "default",
"type": "github"
}
}
},
"root": "root",
"version": 7
}

71
flake.nix Normal file
View File

@ -0,0 +1,71 @@
{
description = "stromsensor";
inputs = {
nixpkgs.url = "github:NixOS/nixpkgs/nixos-unstable";
flake-utils.url = "github:numtide/flake-utils";
rust-overlay = {
url = "github:oxalica/rust-overlay";
inputs = {
nixpkgs.follows = "nixpkgs";
flake-utils.follows = "flake-utils";
};
};
crane = {
url = "github:ipetkov/crane";
inputs = {
nixpkgs.follows = "nixpkgs";
flake-utils.follows = "flake-utils";
rust-overlay.follows = "rust-overlay";
};
};
};
outputs = { self, nixpkgs, flake-utils, rust-overlay, crane}:
flake-utils.lib.eachDefaultSystem
(system:
let
overlays = [ (import rust-overlay) ];
pkgs = import nixpkgs {
inherit system overlays;
};
rustToolchain = pkgs.rust-bin.stable.latest.default;
craneLib = (crane.mkLib pkgs).overrideToolchain rustToolchain;
src = craneLib.cleanCargoSource (craneLib.path ./.);
nativeBuildInputs = with pkgs; [ rustToolchain pkg-config ];
buildInputs = with pkgs; [ protobuf ];
commonArgs = {
inherit src buildInputs nativeBuildInputs;
};
cargoArtifacts = craneLib.buildDepsOnly commonArgs;
bin = craneLib.buildPackage (commonArgs // {
inherit cargoArtifacts;
});
dockerImage = pkgs.dockerTools.buildImage {
name = "stromsensor";
tag = "latest";
config = {
Cmd = [ "${bin}/bin/stromsensor" ];
};
};
in
with pkgs;
{
packages = {
inherit bin dockerImage;
default = bin;
};
devShells.default = mkShell {
inputsFrom = [ bin ];
};
}
);
}

View File

@ -3,8 +3,10 @@ use std::{
time::{Duration, Instant},
};
use log::{debug, info, warn};
use rumqttc::{AsyncClient, ClientError, MqttOptions, QoS};
use serde_json::json;
use serial::{CharSize, Parity, SerialPort, StopBits};
use sqlx::{Connection, MySqlConnection};
const PATTERN_COUNT: usize = 1;
const PATTERN: [[u8; 19]; PATTERN_COUNT] = [[
@ -15,23 +17,41 @@ const PATTERN: [[u8; 19]; PATTERN_COUNT] = [[
#[tokio::main]
async fn main() {
dotenvy::dotenv().ok();
env_logger::init();
loop {
println!("Connecting to DB and SerialPort");
info!("Connecting to MQTT");
let mut mqtt_options = MqttOptions::new(
"stromsensor",
env::var("MQTT_HOST").expect("MQTT_HOST env var"),
env::var("MQTT_PORT")
.expect("MQTT_PORT env var")
.parse()
.expect("valid MQTT_PORT"),
);
mqtt_options.set_credentials(
env::var("MQTT_USER").expect("MQTT_USER env var"),
env::var("MQTT_PASS").expect("MQTT_PASS env var"),
);
mqtt_options.set_keep_alive(Duration::from_secs(5));
let (mut mqtt, mut eventloop) = AsyncClient::new(mqtt_options, 10);
tokio::spawn(async move {
loop {
eventloop.poll().await.unwrap();
}
});
advertise(&mut mqtt).await.unwrap();
let mut port =
serial::open(&env::var("USB_PORT").expect("USB_PORT environment variable")).unwrap();
let mut connection = MySqlConnection::connect(
&env::var("DATABASE_URL").expect("DATABASE_URL environment variable"),
)
.await
.unwrap();
interact(&mut port, &mut connection).await.unwrap();
interact(&mut port, &mut mqtt).await.unwrap();
}
}
async fn interact<T: SerialPort>(
port: &mut T,
connection: &mut MySqlConnection,
) -> anyhow::Result<()> {
async fn interact<T: SerialPort>(port: &mut T, mqtt: &mut AsyncClient) -> anyhow::Result<()> {
port.reconfigure(&|settings| {
settings.set_baud_rate(serial::Baud9600).unwrap();
settings.set_parity(Parity::ParityNone);
@ -57,9 +77,6 @@ async fn interact<T: SerialPort>(
let mut empty_messages = 0;
loop {
if let Err(err) = connection.ping().await {
return Err(err.into());
}
let read_result = port.read(&mut read_buf);
if let Ok(queue_length) = read_result {
empty_messages = 0;
@ -77,25 +94,17 @@ async fn interact<T: SerialPort>(
let val =
((result[0] - prev_value) * 3600) as f64 / (prev_time.elapsed().as_secs_f64());
if !(0.0..=200000.0).contains(&val) || result[0] > 1000000000 {
println!("received invalid data, dropping it");
warn!("received invalid data, dropping it");
continue;
}
println!(
info!(
"Zählerstand: {} Wh, Aktueller Verbrauch: {} W",
result[0] / 10,
val / 10.0
);
sqlx::query("INSERT INTO counter_value (timestamp, value) VALUES (NOW(), ?)")
.bind(result[0] / 10)
.execute(&mut *connection)
.await
.unwrap();
sqlx::query("INSERT INTO current_power (timestamp, value) VALUES (NOW(), ?)")
.bind(val as u32 / 10)
.execute(&mut *connection)
.await
.unwrap();
publish(result[0] / 10, val / 10.0, mqtt).await?;
}
prev_value = result[0];
@ -135,3 +144,75 @@ fn extract_values(queue: &mut Vec<u8>) -> [i32; PATTERN_COUNT] {
queue.clear();
value
}
async fn publish(energy: i32, power: f64, client: &mut AsyncClient) -> Result<(), ClientError> {
debug!("publishing sensor state to mqtt");
client
.publish(
"pfz4/stromsensor",
QoS::AtLeastOnce,
false,
json!({
"power": power,
"energy": energy
})
.to_string(),
)
.await?;
Ok(())
}
async fn advertise(client: &mut AsyncClient) -> Result<(), ClientError> {
debug!("advertising device on mqtt");
let device = json!({
"name":"Stromsensor",
"manufacturer": "pfz4",
"model": "stromsensor",
"identifiers": [
"stromsensor"
]
});
client
.publish(
"homeassistant/sensor/stromsensor/power/config",
QoS::AtLeastOnce,
true,
json! ({
"device": device,
"name": "Aktuell",
"unit_of_measurement": "W",
"state_class": "measurement",
"device_class": "power",
"state_topic": "pfz4/stromsensor",
"suggested_display_precision":2,
"unique_id": "stromsensor_power",
"object_id": "stromsensor_power",
"value_template": "{{ value_json.power }}"
})
.to_string(),
)
.await?;
client
.publish(
"homeassistant/sensor/stromsensor/energy/config",
QoS::AtLeastOnce,
true,
json! ({
"device": device,
"name": "Zaehlerstand",
"unit_of_measurement": "Wh",
"state_class": "total_increasing",
"device_class": "energy",
"state_topic": "pfz4/stromsensor",
"unique_id": "stromsensor_energy",
"object_id": "stromsensor_energy",
"value_template": "{{ value_json.energy }}"
})
.to_string(),
)
.await?;
Ok(())
}