Merge branch 'device_managment'

This commit is contained in:
nm17 2024-06-02 00:24:09 +04:00
commit 04a21c79f6
Signed by: nm17
GPG key ID: 3303B70C59145CD4
9 changed files with 189 additions and 79 deletions

62
Cargo.lock generated
View file

@ -292,6 +292,16 @@ version = "1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]]
name = "bytes"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c"
dependencies = [
"byteorder",
"iovec",
]
[[package]]
name = "bytes"
version = "1.6.0"
@ -307,7 +317,7 @@ version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7dafe3a8757b027e2be6e4e5601ed563c55989fcf1546e933c66c8eb3a058d35"
dependencies = [
"bytes",
"bytes 1.6.0",
"either",
]
@ -428,7 +438,7 @@ version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9885fa71e26b8ab7855e2ec7cae6e9b380edff76cd052e07c683a0319d51b3a2"
dependencies = [
"futures",
"futures 0.3.30",
]
[[package]]
@ -643,11 +653,11 @@ dependencies = [
"arc-swap",
"arcstr",
"async-trait",
"bytes",
"bytes 1.6.0",
"bytes-utils",
"cfg-if",
"float-cmp",
"futures",
"futures 0.3.30",
"lazy_static",
"log",
"nom",
@ -668,6 +678,12 @@ version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c"
[[package]]
name = "futures"
version = "0.1.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678"
[[package]]
name = "futures"
version = "0.3.30"
@ -761,6 +777,7 @@ dependencies = [
"pin-project-lite",
"pin-utils",
"slab",
"tokio-io",
]
[[package]]
@ -890,7 +907,7 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258"
dependencies = [
"bytes",
"bytes 1.6.0",
"fnv",
"itoa",
]
@ -974,12 +991,13 @@ version = "0.1.0"
dependencies = [
"anyhow",
"bstr",
"bytes",
"bytes 1.6.0",
"chrono",
"clap",
"derive_more",
"dotenvy",
"fred",
"futures-util",
"heapless",
"hex",
"hifitime",
@ -999,6 +1017,15 @@ dependencies = [
"ufmt",
]
[[package]]
name = "iovec"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
dependencies = [
"libc",
]
[[package]]
name = "is_terminal_polyfill"
version = "1.70.0"
@ -1236,7 +1263,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ffd6ac357a3fd885753ddeb4130ec92474e79d013362532eba4778854466981"
dependencies = [
"bitflags",
"bytes",
"bytes 1.6.0",
"futures-core",
"serde",
]
@ -1726,7 +1753,7 @@ version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c31deddf734dc0a39d3112e73490e88b61a05e83e074d211f348404cee4d2c6"
dependencies = [
"bytes",
"bytes 1.6.0",
"bytes-utils",
"cookie-factory",
"crc16",
@ -1804,7 +1831,7 @@ checksum = "5cba464629b3394fc4dbc6f940ff8f5b4ff5c7aef40f29166fd4ad12acbc99c0"
dependencies = [
"bitvec",
"bytecheck",
"bytes",
"bytes 1.6.0",
"hashbrown 0.12.3",
"ptr_meta",
"rend",
@ -1833,7 +1860,7 @@ checksum = "1790d1c4c0ca81211399e0e0af16333276f375209e71a37b67698a373db5b47a"
dependencies = [
"arrayvec",
"borsh",
"bytes",
"bytes 1.6.0",
"num-traits",
"rand",
"rkyv",
@ -2225,7 +2252,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a"
dependencies = [
"backtrace",
"bytes",
"bytes 1.6.0",
"libc",
"mio",
"num_cpus",
@ -2237,6 +2264,17 @@ dependencies = [
"windows-sys 0.48.0",
]
[[package]]
name = "tokio-io"
version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57fc868aae093479e3131e3d165c93b1c7474109d13c90ec0dda2a1bbfff0674"
dependencies = [
"bytes 0.4.12",
"futures 0.1.31",
"log",
]
[[package]]
name = "tokio-macros"
version = "2.3.0"
@ -2265,7 +2303,7 @@ version = "0.7.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1"
dependencies = [
"bytes",
"bytes 1.6.0",
"futures-core",
"futures-sink",
"pin-project-lite",

View file

@ -31,3 +31,4 @@ smallstr = { version = "0.3.0", features = ["std", "union"] }
thiserror = "1.0.40"
tokio = { version = "1.28.2", features = ["full"] }
ufmt = { version = "0.2.0", features = ["std"] }
futures-util = { version = "0.3.30", features = ["tokio-io"] }

View file

@ -13,3 +13,5 @@
- Поля
- `exists`: bool
- `unit`: str
!!!! Убедитесь что в переменных ключей нет `_` !!!!

View file

@ -6,15 +6,20 @@
// #[serde(rename = "...")]
// #[serde(alias = "...")]
use crate::utils::{EpochUTC, SupportedUnit};
use crate::ingest_protocol::error::Error;
use crate::ingest_protocol::parser::parse_mac_address;
use crate::utils::{EpochUTC, SupportedUnit};
use crate::web_server::app_error::AppError;
use fred::clients::RedisClient;
use fred::interfaces::{HashesInterface, KeysInterface};
use hifitime::Epoch;
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use ufmt::uwrite;
use std::collections::HashSet;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::hash::{Hash, Hasher};
/// Данные с одного датчика.
@ -74,6 +79,16 @@ pub struct NMDeviceDataPacket {
pub lon: Option<Decimal>,
pub alt: Option<Decimal>,
/// Команды для управления устройством.
///
/// Текст после декодировки.
///
/// ## Отчёт совместимости
/// Функционал изначально narodmon-овский.
/// Про то, как кодировать при наличии символов `#;=` или поддержку UTF-8 не сказано.
/// Рассчитываю на то, что все значения нужно URL кодировать.
pub commands: Option<HashMap<String, String>>,
// HTTP GET/POST url-encode specific parameters
/// TODO: Желательное поведение в будущем:
/// - Если в values есть хотябы один times и этот time не None => Игнорируем этот time
@ -82,7 +97,74 @@ pub struct NMDeviceDataPacket {
pub time: Option<EpochUTC>,
}
impl NMDeviceDataPacket {
pub async fn save_to_db(&self, redis: &RedisClient) -> Result<(), AppError> {
let device_mac_enc = hex::encode(self.mac);
let mut key = String::new();
let now = Epoch::now().unwrap().into();
let mut device_time = self.time.unwrap_or(now);
// TODO: Добавить гистерезис
// Отчёт совместимости: отсутствует
if device_time > now {
device_time = now;
}
let device_tai_timestamp = device_time.0.to_duration_since_j1900().to_seconds();
uwrite!(&mut key, "devices_{}", device_mac_enc).unwrap();
let device_exists: Option<bool> = redis.hget(key.as_str(), "exists").await?;
if !device_exists.is_some_and(|v| v == true) {
return Err(AppError::DeviceNotFound(hex::encode(&self.mac)));
}
// devices_{device_id}_{tai_timestamp}_{sensor_id}
for sensor in &self.values {
let mut key = String::new();
uwrite!(
&mut key,
"devices_{}_{}_{}",
device_mac_enc,
device_tai_timestamp.to_string(),
sensor.mac
)
.unwrap();
redis
.set(key.as_str(), sensor.value.to_string(), None, None, false)
.await?;
}
if let Some(commands) = &self.commands {
for (cmd_key, cmd_value) in commands {
let mut key = String::new();
uwrite!(&mut key, "devices_{}_cmds_{}", device_mac_enc, cmd_key).unwrap();
redis
.set(key.as_str(), cmd_value, None, None, false)
.await?;
}
}
return Ok(());
}
}
#[derive(Debug, Clone, Default, PartialEq, Deserialize)]
pub struct NMJsonPacket {
pub devices: Vec<NMDeviceDataPacket>,
}
impl NMJsonPacket {
pub async fn save_to_db(&self, redis: &RedisClient) -> Result<(), AppError> {
for device in &self.devices {
device.save_to_db(redis).await?;
}
return Ok(());
}
}

View file

@ -0,0 +1,12 @@
// let body = "OK";
//
// let mut stream = app_state.redis_client.scan("devices_{}", None, None);
//
// while let Some(cmd) = stream.next().await {
// let mut cmd = cmd?;
// let redis_cmd_keys = cmd.take_results().unwrap();
//
// for redis_cmd_key in redis_cmd_keys {
// redis_cmd_key.as_str().unwrap().split("_"); // Продолжи
// }
// }

View file

@ -1,12 +1,11 @@
#![doc = include_str!("../README.md")]
#![feature(try_blocks)]
extern crate core;
mod ingest_protocol;
mod web_server;
mod ingest_socket_server;
mod utils;
mod web_server;
use crate::web_server::server_main;
struct Params {}

View file

@ -4,18 +4,21 @@ pub mod qs_parser;
use crate::ingest_protocol::{NMDeviceDataPacket, NMJsonPacket};
use crate::web_server::app_error::AppError;
use fred::bytes_utils::Str;
use ntex::http::{HttpMessage, StatusCode};
use ntex::util::Bytes;
use ntex::{http, web};
use fred::prelude::*;
use hifitime::Epoch;
use ntex::http::{HttpMessage, StatusCode};
use ntex::util::Bytes;
use ntex::web::types::State;
use ntex::{http, web};
use qs_parser::QSParserError;
use thiserror::Error;
use ufmt::uwrite;
use crate::web_server::NMAppState;
use crate::web_server::old_device_sensor_api::qs_parser::QSParserError;
use futures_util::stream::StreamExt;
use super::NMAppState;
#[derive(Error, Debug)]
pub enum Error {
@ -27,7 +30,6 @@ pub enum Error {
QSParserError(#[from] QSParserError),
}
/// Обработчик данных датчиков с устройств.
///
/// Слушает /post и /get.
@ -76,41 +78,7 @@ pub async fn device_handler<'a>(
}
if let Some(real_body) = real_body {
for device in real_body.devices {
let mut device_key_str = String::new();
let now = Epoch::now().unwrap().into();
let mut device_time = device.time
.unwrap_or(now);
// TODO: Добавить гистерезис
// Отчёт совместимости: отсутствует
if device_time > now {
device_time = now;
}
let device_tai_timestamp = device_time.0
.to_duration_since_j1900()
.to_seconds();
uwrite!(device_key_str, "devices_{}", hex::encode(device.mac));
let device_exists: bool = app_state.redis_client.hget(device_key_str.as_str(), "exists").await?;
if !device_exists {
return Err(AppError::DeviceNotFound(hex::encode(&device.mac)));
}
// devices_{device_id}_{tai_timestamp}_{sensor_id}
for sensor in device.values {
let mut device_report_key_str = String::new();
uwrite!(&mut device_report_key_str, "devices_{}_{}_{}", hex::encode(device.mac), device_tai_timestamp.to_string(), sensor.mac);
app_state.redis_client.set(
device_key_str.as_str(), sensor.value.to_string(), None, None, false,
).await?;
}
}
real_body.save_to_db(&app_state.redis_client).await?;
} else {
return Err(AppError::UnknownBody {
json_err: json_error,
@ -118,6 +86,5 @@ pub async fn device_handler<'a>(
});
}
Ok(web::HttpResponseBuilder::new(StatusCode::OK)
.finish())
Ok(web::HttpResponseBuilder::new(StatusCode::OK).finish())
}

View file

@ -1,14 +1,14 @@
use crate::ingest_protocol::error::Error;
use crate::ingest_protocol::parser::parse_mac_address;
use crate::ingest_protocol::{NMDeviceDataPacket, SensorValue};
use hifitime::Epoch;
use ntex::util::HashMap;
use rust_decimal::Decimal;
use std::collections::HashSet;
use std::num::ParseFloatError;
use std::str::FromStr;
use std::sync::Arc;
use hifitime::Epoch;
use ntex::util::HashMap;
use rust_decimal::Decimal;
use thiserror::Error;
use crate::ingest_protocol::error::Error;
use crate::ingest_protocol::{NMDeviceDataPacket, SensorValue};
use crate::ingest_protocol::parser::parse_mac_address;
/// В иделае было бы хорошо сделать всё как у [serde_json::Error], но это слишком большая морока
#[derive(Error, Clone, Debug)]
@ -45,7 +45,9 @@ impl From<serde_qs::Error> for QSParserError {
///
/// Формат: `<SENSOR_MAC>=<SENSOR_VALUE>`.
/// Других данных на подобии названия и времени нет.
pub fn qs_rest_to_values(parsed: HashMap<String, String>) -> Result<HashSet<SensorValue>, QSParserError> {
pub fn qs_rest_to_values(
parsed: HashMap<String, String>,
) -> Result<HashSet<SensorValue>, QSParserError> {
let mut hashset = HashSet::new();
for (key, value) in parsed {
@ -62,24 +64,26 @@ pub fn qs_rest_to_values(parsed: HashMap<String, String>) -> Result<HashSet<Sens
Ok(hashset)
}
pub fn parse_decimal_if_exists(parsed: &mut HashMap<String, String>, key: &str) -> Result<Option<Decimal>, QSParserError> {
pub fn parse_decimal_if_exists(
parsed: &mut HashMap<String, String>,
key: &str,
) -> Result<Option<Decimal>, QSParserError> {
return if let Some(unwrapped_value) = parsed.remove(key) {
Ok(Some(
Decimal::from_str(unwrapped_value.as_str())?
))
Ok(Some(Decimal::from_str(unwrapped_value.as_str())?))
} else {
Ok(None)
}
};
}
pub fn parse_epoch_if_exists(parsed: &mut HashMap<String, String>, key: &str) -> Result<Option<Epoch>, QSParserError> {
pub fn parse_epoch_if_exists(
parsed: &mut HashMap<String, String>,
key: &str,
) -> Result<Option<Epoch>, QSParserError> {
return if let Some(unwrapped_value) = parsed.remove(key) {
Ok(Some(
Epoch::from_unix_seconds(unwrapped_value.parse()?)
))
Ok(Some(Epoch::from_unix_seconds(unwrapped_value.parse()?)))
} else {
Ok(None)
}
};
}
pub async fn parse_nm_qs_format(input: &str) -> Result<NMDeviceDataPacket, QSParserError> {
@ -99,9 +103,12 @@ pub async fn parse_nm_qs_format(input: &str) -> Result<NMDeviceDataPacket, QSPar
lon: parse_decimal_if_exists(&mut parsed, "lon")?,
alt: parse_decimal_if_exists(&mut parsed, "alt")?,
time: parse_epoch_if_exists(&mut parsed, "time")?.map(|v| v.into()),
// TODO: Выяснить можно ли передавать команды по QS и можно ли их отличить от
// маков и значений сенсоров.
commands: None,
values: qs_rest_to_values(parsed)?,
};
Ok(device_data)
}
}

View file

@ -5,6 +5,7 @@ use fred::prelude::*;
use heapless::String as HeaplessString;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
use ufmt::uwrite;
lazy_static! {
@ -13,6 +14,7 @@ lazy_static! {
}
/// Описание полей в KV DB у `apikey_{}`.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ApiKeyDescription {
/// ID владельца API ключа.
apikey_owner: i64,