feat: begin work on #7 and #10

This commit is contained in:
nm17 2024-05-31 16:50:58 +04:00
parent dc81c21ea9
commit 956905e6e8
Signed by: nm17
GPG key ID: 3303B70C59145CD4
9 changed files with 845 additions and 677 deletions

1352
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -31,3 +31,4 @@ smallstr = { version = "0.3.0", features = ["std", "union"] }
thiserror = "1.0.40"
tokio = { version = "1.28.2", features = ["full"] }
ufmt = { version = "0.2.0", features = ["std"] }
futures-util = { version = "0.3.30", features = ["tokio-io"] }

View file

@ -13,3 +13,5 @@
- Поля
- `exists`: bool
- `unit`: str
!!!! Убедитесь что в переменных ключей нет `_` !!!!

View file

@ -6,16 +6,20 @@
// #[serde(rename = "...")]
// #[serde(alias = "...")]
use crate::utils::SupportedUnit;
use crate::ingest_protocol::error::Error;
use crate::ingest_protocol::parser::parse_mac_address;
use crate::utils::SupportedUnit;
use crate::web_server::app_error::AppError;
use fred::clients::RedisClient;
use fred::interfaces::{HashesInterface, KeysInterface};
use hifitime::Epoch;
use rust_decimal::Decimal;
use serde::{Deserialize, Serialize};
use serde_with::serde_as;
use ufmt::uwrite;
use std::collections::HashSet;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::hash::{Hash, Hasher};
/// Данные с одного датчика.
@ -75,15 +79,94 @@ pub struct NMDeviceDataPacket {
pub lon: Option<Decimal>,
pub alt: Option<Decimal>,
/// Команды для управления устройством.
///
/// Текст после декодировки.
///
/// ## Отчёт совместимости
/// Функционал изначально narodmon-овский.
/// Про то, как кодировать при наличии символов `#;=` или поддержку UTF-8 не сказано.
/// Рассчитываю на то, что все значения нужно URL кодировать.
pub commands: Option<HashMap<String, String>>,
// HTTP GET/POST url-encode specific parameters
/// TODO: Желательное поведение в будущем:
/// - Если в values есть хотябы один times и этот time не None => Игнорируем этот time
/// - Если time нет у values => используем этот time (если он None, то соотв. считаем что информации о времени нет)
/// TODO: В базе всё должно храниться как секунды по TAI спустя J1900 для возможности сортировки по времени позже.
/// - Если time нет у values => используем этот time (если он None, то соотв. считаем
/// что информации о времени нет)
/// TODO: В базе всё должно храниться как секунды по TAI спустя J1900 для возможности
/// сортировки по времени позже.
pub time: Option<Epoch>,
}
impl NMDeviceDataPacket {
pub async fn save_to_db(&self, redis: &RedisClient) -> Result<(), AppError> {
let device_mac_enc = hex::encode(self.mac);
let mut key = String::new();
let now = Epoch::now().unwrap();
let mut device_time = self.time.unwrap_or(now);
// TODO: Добавить гистерезис
// Отчёт совместимости: отсутствует
if device_time > now {
device_time = now;
}
let device_tai_timestamp = device_time.to_duration_since_j1900().to_seconds();
uwrite!(&mut key, "devices_{}", device_mac_enc).unwrap();
let device_exists: Option<bool> = redis.hget(key.as_str(), "exists").await?;
if !device_exists.is_some_and(|v| v == true) {
return Err(AppError::DeviceNotFound(hex::encode(&self.mac)));
}
// devices_{device_id}_{tai_timestamp}_{sensor_id}
for sensor in &self.values {
let mut key = String::new();
uwrite!(
&mut key,
"devices_{}_{}_{}",
device_mac_enc,
device_tai_timestamp.to_string(),
sensor.mac
)
.unwrap();
redis
.set(key.as_str(), sensor.value.to_string(), None, None, false)
.await?;
}
if let Some(commands) = &self.commands {
for (cmd_key, cmd_value) in commands {
let mut key = String::new();
uwrite!(&mut key, "devices_{}_cmds_{}", device_mac_enc, cmd_key).unwrap();
redis
.set(key.as_str(), cmd_value, None, None, false)
.await?;
}
}
return Ok(());
}
}
#[derive(Debug, Clone, Default, PartialEq, Deserialize)]
pub struct NMJsonPacket {
pub devices: Vec<NMDeviceDataPacket>,
}
impl NMJsonPacket {
pub async fn save_to_db(&self, redis: &RedisClient) -> Result<(), AppError> {
for device in &self.devices {
device.save_to_db(redis).await?;
}
return Ok(());
}
}

View file

@ -0,0 +1,12 @@
// let body = "OK";
//
// let mut stream = app_state.redis_client.scan("devices_{}", None, None);
//
// while let Some(cmd) = stream.next().await {
// let mut cmd = cmd?;
// let redis_cmd_keys = cmd.take_results().unwrap();
//
// for redis_cmd_key in redis_cmd_keys {
// redis_cmd_key.as_str().unwrap().split("_"); // Продолжи
// }
// }

View file

@ -6,6 +6,8 @@ extern crate core;
mod utils;
mod ingest_protocol;
mod web_server;
mod ingest_socket_server;
use crate::web_server::server_main;
struct Params {}

View file

@ -4,19 +4,21 @@ pub mod qs_parser;
use crate::ingest_protocol::{NMDeviceDataPacket, NMJsonPacket};
use crate::web_server::app_error::AppError;
use fred::bytes_utils::Str;
use fred::types::RedisMap;
use crate::web_server::old_device_sensor_api::qs_parser::QSParserError;
use crate::web_server::NMAppState;
use fred::prelude::*;
use fred::types::{RedisMap, Scanner};
use hifitime::Epoch;
use ntex::http::{HttpMessage, StatusCode};
use ntex::util::{Bytes, HashMap};
use ntex::{http, web};
use fred::prelude::*;
use hifitime::Epoch;
use ntex::web::types::State;
use ntex::{http, web};
use thiserror::Error;
use ufmt::uwrite;
use crate::web_server::NMAppState;
use crate::web_server::old_device_sensor_api::qs_parser::QSParserError;
use futures_util::stream::StreamExt;
#[derive(Error, Debug)]
pub enum Error {
@ -28,7 +30,6 @@ pub enum Error {
QSParserError(#[from] QSParserError),
}
/// Обработчик данных датчиков с устройств.
///
/// Слушает /post и /get.
@ -77,41 +78,7 @@ pub async fn device_handler<'a>(
}
if let Some(real_body) = real_body {
for device in real_body.devices {
let mut device_key_str = String::new();
let now = Epoch::now().unwrap();
let mut device_time = device.time
.unwrap_or(now);
// TODO: Добавить гистерезис
// Отчёт совместимости: отсутствует
if device_time > now {
device_time = now;
}
let device_tai_timestamp = device_time
.to_duration_since_j1900()
.to_seconds();
uwrite!(device_key_str, "devices_{}", hex::encode(device.mac));
let device_exists: bool = app_state.redis_client.hget(device_key_str.as_str(), "exists").await?;
if !device_exists {
return Err(AppError::DeviceNotFound(hex::encode(&device.mac)));
}
// devices_{device_id}_{tai_timestamp}_{sensor_id}
for sensor in device.values {
let mut device_report_key_str = String::new();
uwrite!(&mut device_report_key_str, "devices_{}_{}_{}", hex::encode(device.mac), device_tai_timestamp.to_string(), sensor.mac);
app_state.redis_client.set(
device_key_str.as_str(), sensor.value.to_string(), None, None, false,
).await?;
}
}
real_body.save_to_db(&app_state.redis_client).await?;
} else {
return Err(AppError::UnknownBody {
json_err: json_error,
@ -119,6 +86,5 @@ pub async fn device_handler<'a>(
});
}
Ok(web::HttpResponseBuilder::new(StatusCode::OK)
.finish())
Ok(web::HttpResponseBuilder::new(StatusCode::OK).finish())
}

View file

@ -111,6 +111,10 @@ pub async fn parse_nm_qs_format(input: &str) -> Result<NMDeviceDataPacket, QSPar
lon: parse_decimal_if_exists(&mut parsed, "lon")?,
alt: parse_decimal_if_exists(&mut parsed, "alt")?,
time: parse_epoch_if_exists(&mut parsed, "time")?,
// TODO: Выяснить можно ли передавать команды по QS и можно ли их отличить от
// маков и значений сенсоров.
commands: None,
values: qs_rest_to_values(parsed)?,
};

View file

@ -5,6 +5,7 @@ use fred::prelude::*;
use heapless::String as HeaplessString;
use lazy_static::lazy_static;
use regex::Regex;
use serde::{Deserialize, Serialize};
use ufmt::uwrite;
lazy_static! {
@ -13,6 +14,7 @@ lazy_static! {
}
/// Описание полей в KV DB у `apikey_{}`.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ApiKeyDescription {
/// ID владельца API ключа.
apikey_owner: i64,