From 9c8987f5b1952c57ec1883418c5882f99a45cae0 Mon Sep 17 00:00:00 2001 From: nm17 Date: Mon, 24 Feb 2025 19:59:42 +0400 Subject: [PATCH 1/6] docs: initial prototype of the db schema --- docs/kv_db_arch.md | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/docs/kv_db_arch.md b/docs/kv_db_arch.md index a51ae56..2b7aa1f 100644 --- a/docs/kv_db_arch.md +++ b/docs/kv_db_arch.md @@ -1,17 +1,29 @@ # Архитектура KV DB (Dragonfly) - `apikey_{apikey}` + - API ключ для приложений - Поля - `owner` - - Имеет время окончания - - `devices_{device_id}` - - Поля - - Вся информация о девайсе - - `devices_{device_id}_{tai_timestamp}_{sensor_id}` - - Только значение - - `devices_{device_id}` + - Имеет TTL + - `devices_{device_uuid}` - Поля - `exists`: bool + - `tls_only`: bool + - `mtls_only`: bool + - `lat`: decimal + - `long`: decimal + - `alt`: decimal + - `devices_{device_uuid}_sensor{sensor_mac}_{tai_timestamp}` + - Только значение + - `devices_{device_uuid}_{sensor_mac}` + - Поля - `unit`: str + - `devices_{device_uuid}_commands` + - `devices_mac{device_mac}` + - Маппинг до device_uuid + - `users_{user_uuid}` + - username: string + - password_hash: string + !!!! Убедитесь что в переменных ключей нет `_` !!!! -- 2.48.1 From 1f827114ebdd8b0c4dd9e71a561b90355e613e79 Mon Sep 17 00:00:00 2001 From: nm17 Date: Mon, 24 Feb 2025 20:04:50 +0400 Subject: [PATCH 2/6] feat(old_app_api): make all borrowed fields owned --- Cargo.lock | 73 +++++++++++++++++++--- Cargo.toml | 4 ++ src/web_server/old_app_api/handlers/mod.rs | 10 ++- src/web_server/old_app_api/mod.rs | 19 +++--- src/web_server/old_app_api/types/mod.rs | 29 ++++----- 5 files changed, 98 insertions(+), 37 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index eb37b9d..5a3a05d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,7 +23,7 @@ version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9" dependencies = [ - "getrandom", + "getrandom 0.2.15", "once_cell", "version_check", ] @@ -785,7 +785,19 @@ checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ "cfg-if", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", +] + +[[package]] +name = "getrandom" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a49c392881ce6d5c3b8cb70f98717b7c07aabbdff06687b9030dbfbe2725f8" +dependencies = [ + "cfg-if", + "libc", + "wasi 0.13.3+wasi-0.2.2", + "windows-targets", ] [[package]] @@ -1093,11 +1105,14 @@ dependencies = [ "hex", "hifitime", "lazy_static", + "log", "nom", "ntex", "phf", + "ppp", "regex", "rust_decimal", + "rust_decimal_macros", "serde", "serde_json", "serde_qs", @@ -1107,6 +1122,7 @@ dependencies = [ "thiserror", "tokio", "ufmt", + "uuid", ] [[package]] @@ -1223,9 +1239,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.22" +version = "0.4.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +checksum = "30bde2b3dc3671ae49d8e2e9f044c7c005836e7a023ee57cffa25ab82764bb9e" [[package]] name = "memchr" @@ -1261,7 +1277,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" dependencies = [ "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.52.0", ] @@ -1685,6 +1701,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" +[[package]] +name = "ppp" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a7a2049cd2570bd67bf0228e86bf850f8ceb5190a345c471d03a909da6049e0" +dependencies = [ + "thiserror", +] + [[package]] name = "ppv-lite86" version = "0.2.20" @@ -1774,7 +1799,7 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom", + "getrandom 0.2.15", ] [[package]] @@ -1883,6 +1908,16 @@ dependencies = [ "serde_json", ] +[[package]] +name = "rust_decimal_macros" +version = "1.36.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da991f231869f34268415a49724c6578e740ad697ba0999199d6f22b3949332c" +dependencies = [ + "quote", + "rust_decimal", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -2422,9 +2457,13 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.11.0" +version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" +checksum = "93d59ca99a559661b96bf898d8fce28ed87935fd2bea9f05983c1464dd6c71b1" +dependencies = [ + "getrandom 0.3.1", + "serde", +] [[package]] name = "version_check" @@ -2438,6 +2477,15 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasi" +version = "0.13.3+wasi-0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26816d2e1a4a36a2940b96c5296ce403917633dff8f3440e9b236ed6f6bacad2" +dependencies = [ + "wit-bindgen-rt", +] + [[package]] name = "wasm-bindgen" version = "0.2.99" @@ -2612,6 +2660,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "wit-bindgen-rt" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3268f3d866458b787f390cf61f4bbb563b922d091359f9608842999eaee3943c" +dependencies = [ + "bitflags", +] + [[package]] name = "write16" version = "1.0.0" diff --git a/Cargo.toml b/Cargo.toml index d571bc0..d074cd5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,3 +32,7 @@ ufmt = { version = "0.2.0", features = ["std"] } futures-util = { version = "0.3.30", features = ["tokio-io"] } snafu = "0.8.5" clap-verbosity-flag = "3.0.2" +ppp = "2.3.0" +log = "0.4.26" +uuid = { version = "1.14.0", features = ["serde", "v4"] } +rust_decimal_macros = "1.36.0" diff --git a/src/web_server/old_app_api/handlers/mod.rs b/src/web_server/old_app_api/handlers/mod.rs index 3c4c593..66b67bf 100644 --- a/src/web_server/old_app_api/handlers/mod.rs +++ b/src/web_server/old_app_api/handlers/mod.rs @@ -2,25 +2,23 @@ use crate::web_server::app_error::{self, AppError}; use crate::web_server::old_app_api::types::AppInitRequest; use crate::web_server::NMAppState; -use serde_json::{json}; +use serde_json::json; use snafu::ResultExt; - use crate::insert_header; use fred::interfaces::KeysInterface; use ntex::http::StatusCode; use ntex::web; - - pub async fn app_init( - _body: AppInitRequest<'_>, + _body: AppInitRequest, app_state: &NMAppState, ) -> Result { let _: () = app_state .redis_client .set("test", 123, None, None, true) - .await.context(app_error::ServerRedisSnafu)?; + .await + .context(app_error::ServerRedisSnafu)?; Ok(web::HttpResponse::build(StatusCode::OK).body("Hello world!")) } diff --git a/src/web_server/old_app_api/mod.rs b/src/web_server/old_app_api/mod.rs index 3430f7c..113d7dd 100644 --- a/src/web_server/old_app_api/mod.rs +++ b/src/web_server/old_app_api/mod.rs @@ -13,9 +13,14 @@ use crate::web_server::NMAppState; use crate::web_server::old_app_api::handlers::{app_init, version}; use crate::web_server::old_app_api::types::{AppInitRequest, MandatoryParams}; use crate::web_server::utils::redis::is_api_key_valid; +use crate::web_server::NMAppState; +use ntex::http::HeaderMap; +use ntex::util::Bytes; +use ntex::web::types::State; +use ntex::web::{self, HttpRequest}; +use snafu::{whatever, ResultExt}; -use super::app_error; - +use super::app_error::{self, Utf8Snafu}; /// Обработчик запросов от приложений. /// @@ -31,18 +36,18 @@ pub async fn old_api_handler( return Err(AppError::RequestTooLarge); } - let body_bytes = body_bytes.as_bytes(); - - let mandatory_params: MandatoryParams<'_> = serde_json::from_slice(body_bytes).context(app_error::JsonSnafu {})?; // TODO: Simd-JSON + let mandatory_params: MandatoryParams = + serde_json::from_slice(&body_bytes).context(app_error::JsonSnafu {})?; // TODO: Simd-JSON // Ignore clippy singlematch if mandatory_params.cmd.as_ref() == "version" { return version((), &app_state).await } is_api_key_valid(&app_state.redis_client, mandatory_params.api_key.as_ref()).await?; - match mandatory_params.cmd.as_ref() { + match mandatory_params.cmd.as_str() { "appInit" => { - let body: AppInitRequest = serde_json::from_slice(body_bytes).context(app_error::JsonSnafu {})?; + let body: AppInitRequest = + serde_json::from_slice(&body_bytes).context(app_error::JsonSnafu {})?; app_init(body, &app_state).await } diff --git a/src/web_server/old_app_api/types/mod.rs b/src/web_server/old_app_api/types/mod.rs index de9b9bb..7c03320 100644 --- a/src/web_server/old_app_api/types/mod.rs +++ b/src/web_server/old_app_api/types/mod.rs @@ -1,18 +1,16 @@ use serde::{Deserialize, Serialize}; use std::borrow::Cow; +use uuid::Uuid; // fn<'de, D>(D) -> Result where D: Deserializer<'de> #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct AppInitRequest<'a> { - #[serde(borrow)] - pub version: Cow<'a, str>, +pub struct AppInitRequest { + pub version: String, - #[serde(borrow)] - pub platform: Cow<'a, str>, + pub platform: String, - #[serde(borrow)] - pub model: Cow<'a, str>, + pub model: String, pub width: u64, } @@ -29,21 +27,20 @@ pub struct AddLikeRequest { /// получить [MandatoryParams], другой в зависимости от `cmd`. Это позволяет не добвалять поля из /// этой структуры в каждом специфичном типе. #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct MandatoryParams<'a> { - #[serde(borrow)] - pub cmd: Cow<'a, str>, +pub struct MandatoryParams { + pub cmd: String, - #[serde(borrow)] - pub lang: Cow<'a, str>, + pub lang: String, /// Уникальный ID клиента. /// /// Используется на подобии как куки PHPSESSID в php. /// /// См. также: - #[serde(borrow)] - pub uuid: Cow<'a, str>, + pub uuid: Uuid, - #[serde(borrow)] - pub api_key: Cow<'a, str>, + /// API ключ приложения + /// + /// Может быть указан в теле запроса, а может быть и заголовке Narodmon-Api-Key. + pub api_key: Option, } -- 2.48.1 From c5eed34237ba52ca807a40cb60b887b9c78eca3c Mon Sep 17 00:00:00 2001 From: nm17 Date: Mon, 24 Feb 2025 20:06:54 +0400 Subject: [PATCH 3/6] feat(old_app_api): proper api key handling --- src/web_server/old_app_api/mod.rs | 31 ++++++++++++++++++++----------- src/web_server/utils/redis.rs | 22 +++++++++++++--------- 2 files changed, 33 insertions(+), 20 deletions(-) diff --git a/src/web_server/old_app_api/mod.rs b/src/web_server/old_app_api/mod.rs index 113d7dd..55136a8 100644 --- a/src/web_server/old_app_api/mod.rs +++ b/src/web_server/old_app_api/mod.rs @@ -6,21 +6,15 @@ mod types; use ntex::web::types::State; use ntex::util::Bytes; use ntex::web; -use nom::AsBytes; -use snafu::ResultExt; use crate::web_server::app_error::AppError; use crate::web_server::NMAppState; use crate::web_server::old_app_api::handlers::{app_init, version}; use crate::web_server::old_app_api::types::{AppInitRequest, MandatoryParams}; use crate::web_server::utils::redis::is_api_key_valid; -use crate::web_server::NMAppState; -use ntex::http::HeaderMap; -use ntex::util::Bytes; -use ntex::web::types::State; -use ntex::web::{self, HttpRequest}; +use ntex::web::HttpRequest; use snafu::{whatever, ResultExt}; -use super::app_error::{self, Utf8Snafu}; +use super::app_error; /// Обработчик запросов от приложений. /// @@ -28,9 +22,12 @@ use super::app_error::{self, Utf8Snafu}; /// /// Вызывается напрямую из ntex приложения. pub async fn old_api_handler( + request: HttpRequest, app_state: State, body_bytes: Bytes, ) -> Result { + let headers = request.headers(); + if body_bytes.len() > 10 * 1024 { // 10 KiB return Err(AppError::RequestTooLarge); @@ -39,10 +36,22 @@ pub async fn old_api_handler( let mandatory_params: MandatoryParams = serde_json::from_slice(&body_bytes).context(app_error::JsonSnafu {})?; // TODO: Simd-JSON - // Ignore clippy singlematch - if mandatory_params.cmd.as_ref() == "version" { return version((), &app_state).await } + // Тут все cmd которые могут быть вызваны без api ключа + if mandatory_params.cmd == "version" { + return version((), &app_state).await; + } - is_api_key_valid(&app_state.redis_client, mandatory_params.api_key.as_ref()).await?; + let api_key: String; + + if let Some(key) = mandatory_params.api_key { + api_key = key; + } else if let Some(key) = headers.get("Narodmon-Api-Key") { + api_key = key.to_str().with_whatever_context(|_| "asd")?.to_string(); + } else { + whatever!("No API key found") + } + + is_api_key_valid(&app_state.redis_client, api_key).await?; match mandatory_params.cmd.as_str() { "appInit" => { diff --git a/src/web_server/utils/redis.rs b/src/web_server/utils/redis.rs index 124e3cd..bf3f19d 100644 --- a/src/web_server/utils/redis.rs +++ b/src/web_server/utils/redis.rs @@ -1,18 +1,22 @@ //! Сборник утилит для работы с Redis. -use crate::web_server::app_error::{AppError, ServerRedisSnafu}; -use fred::prelude::*; +use crate::{ + uformat, + web_server::app_error::{AppError, ServerRedisSnafu}, +}; use fred::clients::Client as RedisClient; +use fred::prelude::*; use heapless::String as HeaplessString; use lazy_static::lazy_static; use regex::Regex; use serde::{Deserialize, Serialize}; use snafu::ResultExt; use ufmt::uwrite; +use uuid::Uuid; lazy_static! { /// Разрешённые знаки для API ключа. - static ref ALLOWED_API_KEY_CHARACTERS: Regex = Regex::new("[a-zA-Z0-9]{13}").unwrap(); + static ref ALLOWED_API_KEY_CHARACTERS: Regex = Regex::new("[a-zA-Z0-9\\-]{13,36}").unwrap(); } /// Описание полей в KV DB у `apikey_{}`. @@ -25,18 +29,18 @@ pub struct ApiKeyDescription { /// Проверка API ключа на валидность. pub async fn is_api_key_valid( client: &RedisClient, - api_key: &str, + api_key: String, ) -> Result { - if !ALLOWED_API_KEY_CHARACTERS.is_match(api_key) { + if !ALLOWED_API_KEY_CHARACTERS.is_match(&api_key) { return Err(AppError::ApiKeyInvalid { reason: "Invalid characters present in the API key.", }); } - let mut key_buffer = HeaplessString::<{ 7 + 13 }>::new(); - uwrite!(key_buffer, "apikey_{}", api_key).expect("TODO"); // TODO: Error handling - - let valid: Option = client.hget(key_buffer.as_str(), "owner").await.context(ServerRedisSnafu)?; + let valid: Option = client + .hget(uformat!("apikey_{}", api_key), "owner") + .await + .context(ServerRedisSnafu)?; valid .map(|uid| ApiKeyDescription { apikey_owner: uid }) -- 2.48.1 From 2aaf27f8c0ed743a072be7f5071cc36ee7717bf9 Mon Sep 17 00:00:00 2001 From: nm17 Date: Mon, 24 Feb 2025 20:11:05 +0400 Subject: [PATCH 4/6] feat: initial work on the business_logic module --- src/business_logic/device.rs | 161 ++++++++++++++++++++++++++++ src/business_logic/mod.rs | 37 +++++++ src/business_logic/user.rs | 0 src/ingest_protocol/packet_types.rs | 61 +---------- src/main.rs | 1 + 5 files changed, 201 insertions(+), 59 deletions(-) create mode 100644 src/business_logic/device.rs create mode 100644 src/business_logic/mod.rs create mode 100644 src/business_logic/user.rs diff --git a/src/business_logic/device.rs b/src/business_logic/device.rs new file mode 100644 index 0000000..1f5c43a --- /dev/null +++ b/src/business_logic/device.rs @@ -0,0 +1,161 @@ +use std::str::FromStr; + +use fred::prelude::{Client as RedisClient, HashesInterface, KeysInterface}; +use hifitime::Epoch; +use rust_decimal::Decimal; +use snafu::ResultExt; +use uuid::{self, Uuid}; + +use crate::{ + ingest_protocol::NMDeviceDataPacket, + uformat, + web_server::{ + app_error::{self, AppError, DecimalSnafu, ServerRedisSnafu, UuidSnafu}, + old_device_sensor_api::qs_parser::DecimalParseSnafu, + }, +}; + +#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Deserialize, serde::Serialize)] +pub struct Device { + pub device_id: Uuid, + + /// ID владельца + pub owner: Uuid, + + pub lat: Option, + pub lon: Option, + pub alt: Option, + + /// Данные датчика могут быть переданы только по TLS + pub tls_only: bool, + + /// Данные датчика могут быть переданы только по Mutual TLS + pub mtls_only: bool, +} + +impl Device { + pub fn new(id: Uuid, owner: Uuid) -> Device { + Device { + device_id: id, + owner, + lat: None, + lon: None, + alt: None, + tls_only: false, + mtls_only: false, + } + } + + pub async fn get_by_id(redis: &RedisClient, id: Uuid) -> Result, AppError> { + let key = uformat!("devices_{}", id.to_string()); + + if !redis.exists(&[&key]).await.context(ServerRedisSnafu)? { + return Ok(None); + } + + return Ok(Some(Device { + device_id: id, + owner: redis + .hget::, _, _>(&key, "lat") + .await + .context(ServerRedisSnafu)? + .map(|el| Uuid::from_str(&el).context(UuidSnafu)) + .transpose()? + .unwrap(), + lat: redis + .hget::, _, _>(&key, "lat") + .await + .context(ServerRedisSnafu)? + .map(|el| el.parse().context(DecimalSnafu)) + .transpose()?, + lon: redis + .hget::, _, _>(&key, "lon") + .await + .context(ServerRedisSnafu)? + .map(|el| el.parse().context(DecimalSnafu)) + .transpose()?, + alt: redis + .hget::, _, _>(&key, "alt") + .await + .context(ServerRedisSnafu)? + .map(|el| el.parse().context(DecimalSnafu)) + .transpose()?, + tls_only: redis + .hget::, _, _>(&key, "tls_only") + .await + .context(ServerRedisSnafu)? + .unwrap(), + mtls_only: redis + .hget::, _, _>(&key, "mtls_only") + .await + .context(ServerRedisSnafu)? + .unwrap(), + })); + } + + pub async fn save_to_db( + packet: &NMDeviceDataPacket, + redis: &RedisClient, + ) -> Result<(), AppError> { + let device_mac_enc = hex::encode(packet.mac); + let device_id_key = uformat!("devices_mac{}", &device_mac_enc); + + let device_id: Option = redis.get(device_id_key).await.context(ServerRedisSnafu)?; + + let device_id = device_id.ok_or_else(|| AppError::DeviceNotFound { + mac: device_mac_enc.clone(), + })?; + + let now = Epoch::now().unwrap().into(); + let mut device_time = packet.time.unwrap_or(now); + + // TODO: Добавить гистерезис + // Отчёт совместимости: отсутствует + if device_time > now { + device_time = now; + } + + let device_tai_timestamp = device_time.0.to_duration_since_j1900().to_seconds(); + + let key = uformat!("devices_{}", device_id); + + let device_exists: Option = redis + .hget(key.as_str(), "exists") + .await + .context(ServerRedisSnafu)?; + + if !device_exists.is_some_and(|v| v) { + return Err(AppError::DeviceNotFound { + mac: hex::encode(packet.mac), + }); + } + + // devices_{device_id}_{tai_timestamp}_{sensor_id} + for sensor in &packet.values { + let key = uformat!( + "devices_{}_sensor{}_{}", + device_id, + sensor.mac, + device_tai_timestamp.to_string(), + ); + + () = redis + .set(key.as_str(), sensor.value.to_string(), None, None, false) + .await + .context(ServerRedisSnafu)?; + } + + if let Some(commands) = &packet.commands { + for (cmd_key, cmd_value) in commands { + let key = uformat!("devices_{}_cmds_{}", device_id, cmd_key); + + () = redis + .set(key.as_str(), cmd_value, None, None, false) + .await + .context(ServerRedisSnafu)?; + } + } + + Ok(()) + } +} diff --git a/src/business_logic/mod.rs b/src/business_logic/mod.rs new file mode 100644 index 0000000..4a2a358 --- /dev/null +++ b/src/business_logic/mod.rs @@ -0,0 +1,37 @@ +use bytes::Bytes; +use device::Device; +use fred::prelude::Client; +use ntex::web::guard::Header; +use ppp::v2::Header; +use snafu::ResultExt; + +pub mod device; + +use crate::{ + ingest_protocol::NMDeviceDataPacket, + web_server::app_error::{AppError, ServerRedisSnafu}, +}; + +pub async fn process_packet( + r_client: Client, + packet: NMDeviceDataPacket, + proxy_header: Option, +) -> String { + let proxy_ref = proxy_header.as_ref().map(|el| el.as_ref()); + let proxy_header_parsed = proxy_ref.map(|el| Header::try_from(el).unwrap()); + // proxy_header_parsed.unwrap().tlvs() + + let asd: Result = (|| async { + // TODO: Auth code here. Check if tls only is enabled. + + Device::save_to_db(&packet, &r_client).await?; + + return Ok("".to_string()); + })() + .await; + + return match asd { + Ok(resp) => resp, + Err(err) => format!("ERROR: {}", err), + }; +} diff --git a/src/business_logic/user.rs b/src/business_logic/user.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/ingest_protocol/packet_types.rs b/src/ingest_protocol/packet_types.rs index de8e5c8..723aeeb 100644 --- a/src/ingest_protocol/packet_types.rs +++ b/src/ingest_protocol/packet_types.rs @@ -6,6 +6,7 @@ // #[serde(rename = "...")] // #[serde(alias = "...")] +use crate::business_logic::device::Device; use crate::ingest_protocol::error::Error; use crate::ingest_protocol::parser::parse_mac_address; use crate::uformat; @@ -98,64 +99,6 @@ pub struct NMDeviceDataPacket { pub time: Option, } -impl NMDeviceDataPacket { - pub async fn save_to_db(&self, redis: &RedisClient) -> Result<(), AppError> { - let device_mac_enc = hex::encode(self.mac); - - let now = Epoch::now().unwrap().into(); - let mut device_time = self.time.unwrap_or(now); - - // TODO: Добавить гистерезис - // Отчёт совместимости: отсутствует - if device_time > now { - device_time = now; - } - - let device_tai_timestamp = device_time.0.to_duration_since_j1900().to_seconds(); - - let key = uformat!("devices_{}", device_mac_enc); - - let device_exists: Option = redis - .hget(key.as_str(), "exists") - .await - .context(ServerRedisSnafu)?; - - if !device_exists.is_some_and(|v| v) { - return Err(AppError::DeviceNotFound { - mac: hex::encode(self.mac), - }); - } - - // devices_{device_id}_{tai_timestamp}_{sensor_id} - for sensor in &self.values { - let key = uformat!( - "devices_{}_{}_{}", - device_mac_enc, - device_tai_timestamp.to_string(), - sensor.mac - ); - - redis - .set(key.as_str(), sensor.value.to_string(), None, None, false) - .await - .context(ServerRedisSnafu)?; - } - - if let Some(commands) = &self.commands { - for (cmd_key, cmd_value) in commands { - let key = uformat!("devices_{}_cmds_{}", device_mac_enc, cmd_key); - - redis - .set(key.as_str(), cmd_value, None, None, false) - .await - .context(ServerRedisSnafu)?; - } - } - - Ok(()) - } -} - #[derive(Debug, Clone, Default, PartialEq, Deserialize)] pub struct NMJsonPacket { pub devices: Vec, @@ -164,7 +107,7 @@ pub struct NMJsonPacket { impl NMJsonPacket { pub async fn save_to_db(&self, redis: &RedisClient) -> Result<(), AppError> { for device in &self.devices { - device.save_to_db(redis).await?; + Device::save_to_db(&device, redis).await?; } Ok(()) diff --git a/src/main.rs b/src/main.rs index 26a4243..cc3fa32 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ #![doc = include_str!("../README.md")] extern crate core; +mod business_logic; mod cli; mod ingest_protocol; mod ingest_socket_server; -- 2.48.1 From 812ac99e701cbed2d7e546d672b2b701d428184f Mon Sep 17 00:00:00 2001 From: nm17 Date: Mon, 24 Feb 2025 20:12:19 +0400 Subject: [PATCH 5/6] feat(ingest_socket_server): initial work --- src/cli.rs | 11 +- src/ingest_socket_server/mod.rs | 116 +++++++++++++++++++- src/main.rs | 10 +- src/utils/hifitime_serde.rs | 4 +- src/utils/mod.rs | 10 +- src/web_server/app_error.rs | 62 ++++++----- src/web_server/old_device_sensor_api/mod.rs | 8 +- 7 files changed, 177 insertions(+), 44 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index e26834b..335f621 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,3 +1,5 @@ +use std::net::IpAddr; + use clap::{Args, Parser, Subcommand}; #[derive(Parser, Clone)] @@ -32,9 +34,12 @@ pub struct WebServerArgs { #[derive(Args, Clone)] pub struct SocketServerArgs { - #[arg(short, long, default_value = "localhost")] - pub addr: String, + #[arg(short, long)] + pub addr: IpAddr, #[arg(short = 'p', default_value_t = 8283)] pub port: u16, -} \ No newline at end of file + + #[arg(long, default_value_t = false)] + pub proxy_protocol: bool, +} diff --git a/src/ingest_socket_server/mod.rs b/src/ingest_socket_server/mod.rs index b93a9c8..56e6e1c 100644 --- a/src/ingest_socket_server/mod.rs +++ b/src/ingest_socket_server/mod.rs @@ -11,10 +11,120 @@ // } // } +use std::{fmt::Display, io::Read, net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, ops::ControlFlow}; + +use bytes::{buf, Buf, Bytes, BytesMut}; use fred::prelude::Client; +use ppp::v2::{self, Addresses, Command, Header, ParseError}; +use snafu::{whatever, ResultExt, Whatever}; +use tokio::{io::{AsyncReadExt, AsyncWriteExt, ReadBuf}, net::{TcpSocket, TcpStream}}; +use log::{*}; -use crate::cli::{Cli, SocketServerArgs}; +use crate::{business_logic::process_packet, cli::{Cli, SocketServerArgs}, ingest_protocol::{self, error::Error}, web_server::app_error::{AppError, StdIOSnafu}}; -pub async fn socketserv_main(args: Cli, specific_args: SocketServerArgs, client: Client) { - todo!() +pub async fn socketserv_main(args: Cli, specific_args: SocketServerArgs, client: Client) -> Result<(), AppError> { + return Ok(()); +} + +#[derive(Clone, PartialEq, Eq, Debug)] +pub struct AddressesUnpacked { + pub source: SocketAddr, + pub dest: SocketAddr +} + +pub fn get_sockaddr(addresses: Addresses) -> Option { + match addresses { + Addresses::IPv4(addr) => Some(SocketAddr::V4(SocketAddrV4::new(addr.source_address, addr.source_port))), + Addresses::IPv6(addr) => Some(SocketAddr::V6(SocketAddrV6::new(addr.source_address, addr.source_port, 0, 0))), + _ => None + } +} + +pub async fn tcp_proxy_handler(mut buffer: BytesMut, stream: &mut TcpStream, actual_origin: &mut SocketAddr, proxy_data: &mut Option) -> Result<(), AppError> { + 'proxy_loop: loop { + let len = stream.read(&mut buffer).await.context(StdIOSnafu)?; + + match v2::Header::try_from(buffer.as_ref()) { + Ok(header) => { + match header.command { + Command::Proxy => { + if let Some(new_socket_addr) = get_sockaddr(header.addresses) { + *actual_origin = new_socket_addr; + *proxy_data = Some(Bytes::copy_from_slice(&header.header)); + break 'proxy_loop; + } + }, + _ => { + // Продолажем работать как обычно (health check от прокси или т.п) + } + } + }, + Err(err) => { + + match err { + ParseError::Incomplete(_) => { + // Продолжаем заполнять буфер + + continue 'proxy_loop; + }, + _ => { + warn!("Получили неисправимую ошибку при парсинге proxy протокола. Убедитесь что никто не пытается подключиться к сервису напрямую без прокси. {error}", error = err); + + whatever!("No proxy headers detected"); + } + } + } + } + } + + buffer = buffer.split_off(proxy_data.as_ref().unwrap().len()); + + return Ok(()); +} + +pub async fn socketserv_tcp(args: Cli, specific_args: SocketServerArgs, client: Client) -> Result<(), AppError> { + // TODO: errors should not break the server + + let mut buffer = BytesMut::with_capacity(16 * 1024); + let socket = TcpSocket::new_v4().context(StdIOSnafu)?; + + socket.bind(SocketAddr::new(specific_args.addr, specific_args.port)).context(StdIOSnafu)?; + + let listener = socket.listen(256).context(StdIOSnafu)?; + + 'tcp_loop: while let Ok((mut stream, socketaddr)) = listener.accept().await { + // Нужно для правильного применения рейт лимита + let mut actual_origin = socketaddr; + let mut proxy_data = None; + + if specific_args.proxy_protocol { + tcp_proxy_handler(buffer.clone(), &mut stream, &mut actual_origin, &mut proxy_data).await?; + } + + 'protocol_loop: loop { + let len = stream.read(&mut buffer).await.context(StdIOSnafu)?; + + match ingest_protocol::parser::parse_packet(&buffer) { + Ok(packet) => { + let resp = process_packet(client.clone(), packet, proxy_data.clone()).await; + let _ = stream.write(resp.as_bytes()).await; + let _ = stream.shutdown().await; + }, + Err(err) if err == Error::Incomplete => { + continue 'protocol_loop; + }, + Err(err) => { + stream.write(format!("ERROR: {}", err).as_bytes()).await.context(StdIOSnafu)?; + continue 'tcp_loop; + } + } + } + } + + return Ok(()); + +} + +pub async fn socketserv_udp(args: Cli, specific_args: SocketServerArgs, client: Client) { + } diff --git a/src/main.rs b/src/main.rs index cc3fa32..a31ed14 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,6 @@ #![doc = include_str!("../README.md")] +#![feature(try_blocks)] + extern crate core; mod business_logic; @@ -15,11 +17,13 @@ use fred::prelude::{ ReconnectPolicy, Server, ServerConfig, }; use ingest_socket_server::socketserv_main; +use web_server::app_error::AppError; use crate::web_server::server_main; +#[snafu::report] #[ntex::main] -async fn main() { +async fn main() -> Result<(), AppError> { let result = Cli::parse(); let mut config = RedisConfig::default(); @@ -40,7 +44,9 @@ async fn main() { server_main(result.clone(), specific_args.clone(), redis).await; } MyCommand::SocketServer(specific_args) => { - socketserv_main(result.clone(), specific_args.clone(), redis).await; + socketserv_main(result.clone(), specific_args.clone(), redis).await?; } }; + + return Ok(()); } diff --git a/src/utils/hifitime_serde.rs b/src/utils/hifitime_serde.rs index 16bcffc..4e605f8 100644 --- a/src/utils/hifitime_serde.rs +++ b/src/utils/hifitime_serde.rs @@ -65,9 +65,7 @@ impl<'de> Deserialize<'de> for EpochUTC { ) as f64) .into()); } - Ok( - Epoch::from_unix_seconds(value.parse().map_err(de::Error::custom)?).into(), - ) + Ok(Epoch::from_unix_seconds(value.parse().map_err(de::Error::custom)?).into()) } } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index b7193a0..890ccb6 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -70,7 +70,7 @@ impl<'de> Deserialize<'de> for SupportedUnit { } /// Таблица преобразования текстового представления единиц в значения [SupportedUnit]. -static STR_TO_UNITS: phf::Map<&'static str, SupportedUnit> = phf_map! { +pub static STR_TO_UNITS: phf::Map<&'static str, SupportedUnit> = phf_map! { "C" => SupportedUnit::Celsius, "%" => SupportedUnit::Percentage, "mmHg" => SupportedUnit::MillimeterHg, @@ -85,6 +85,12 @@ static STR_TO_UNITS: phf::Map<&'static str, SupportedUnit> = phf_map! { "KWh" => SupportedUnit::KWh, }; -pub fn convert_to_arc(error: T) -> Arc { +pub fn convert_to_arc(error: T) -> Arc { Arc::new(error) } + +pub fn convert_to_arced_dynerror( + error: Option>, +) -> Option> { + error.map(|el| Arc::from(el)) +} diff --git a/src/web_server/app_error.rs b/src/web_server/app_error.rs index d13243c..78b39ed 100644 --- a/src/web_server/app_error.rs +++ b/src/web_server/app_error.rs @@ -8,42 +8,36 @@ use ntex::web::{HttpRequest, HttpResponse}; use snafu::Snafu; use crate::insert_header; -use crate::utils::convert_to_arc; +use crate::utils::{convert_to_arc, convert_to_arced_dynerror}; use crate::web_server::old_device_sensor_api::qs_parser::QSParserError; use rust_decimal::Decimal; use serde_json::json; use super::old_device_sensor_api::qs_parser; - - - /// Главный объект ошибки [std::error::Error] для всего Web API. /// /// В целом, все Result у Web сервера должны использовать этот Error. -#[derive(Debug, Snafu, Clone)] +#[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum AppError { #[snafu(display("Could not read file"))] JsonError { #[snafu(source(from(serde_json::Error, convert_to_arc::)))] - source: Arc + source: Arc, }, #[snafu(display("Could not read file"))] - QSError { - source: QSParserError - }, + QSError { source: QSParserError }, #[snafu(display("Could not read file"))] - ServerRedisError { - source: fred::error::Error - }, + ServerRedisError { source: fred::error::Error }, + + #[snafu(display("Could not parse decimal"))] + Decimal { source: rust_decimal::Error }, #[snafu(display("Could not read file"))] - UnknownMethod { - method: String - }, + UnknownMethod { method: String }, #[snafu(display("Could not read file"))] RequestTooLarge, @@ -58,9 +52,10 @@ pub enum AppError { }, #[snafu(display("UTF-8 Error"))] - Utf8Error { - source: std::str::Utf8Error - }, + Utf8Error { source: std::str::Utf8Error }, + + #[snafu(display("String cannot be parced into a UUID"))] + Uuid { source: uuid::Error }, #[snafu(display("Could not read file"))] UnknownBody { @@ -69,12 +64,24 @@ pub enum AppError { }, #[snafu(display("Could not read file"))] - DeviceNotFound { - mac: String + DeviceNotFound { mac: String }, + + #[snafu(display("Std IO error"))] + StdIO { + #[snafu(source(from(std::io::Error, convert_to_arc::)))] + source: Arc, }, #[snafu(display("Could not read file"))] - TimeIsLongBehindNow + TimeIsLongBehindNow, + + #[snafu(display("Could not read file"))] + #[snafu(whatever)] + Whatever { + #[snafu(source(from(Box, Some)))] + source: Option>, + message: String, + }, } impl web::error::WebResponseError for AppError { @@ -90,7 +97,12 @@ impl web::error::WebResponseError for AppError { AppError::UnknownBody { .. } => StatusCode::BAD_REQUEST, AppError::QSError { .. } => StatusCode::BAD_REQUEST, AppError::DeviceNotFound { .. } => StatusCode::BAD_REQUEST, - AppError::TimeIsLongBehindNow { .. } => StatusCode::BAD_REQUEST + AppError::TimeIsLongBehindNow { .. } => StatusCode::BAD_REQUEST, + + // Не знаю что лучше тут использовать + AppError::Whatever { .. } => StatusCode::INTERNAL_SERVER_ERROR, + + _ => StatusCode::INTERNAL_SERVER_ERROR, } } @@ -108,7 +120,8 @@ impl web::error::WebResponseError for AppError { } AppError::QSError { .. } => "UrlEncoded body or query params are incorrect", AppError::DeviceNotFound { .. } => "Device not found", - AppError::TimeIsLongBehindNow { .. } => "Time is long behind what it should be" + AppError::TimeIsLongBehindNow { .. } => "Time is long behind what it should be", + _ => "Internal server error. Please make sure to tell the devs they are stupid :p", }; let status_code = self.status_code(); @@ -121,7 +134,6 @@ impl web::error::WebResponseError for AppError { let mut resp = HttpResponse::build(status_code).json(&body); let headers = resp.headers_mut(); - match self { AppError::JsonError { source } => { insert_header!(headers, "X-Error-Line", source.line()); @@ -132,7 +144,7 @@ impl web::error::WebResponseError for AppError { source.to_string().escape_default().collect::() ); } - AppError::UnknownMethod {method} => { + AppError::UnknownMethod { method } => { insert_header!( headers, "X-Unknown-Cmd", diff --git a/src/web_server/old_device_sensor_api/mod.rs b/src/web_server/old_device_sensor_api/mod.rs index 9f2053b..bcf1c65 100644 --- a/src/web_server/old_device_sensor_api/mod.rs +++ b/src/web_server/old_device_sensor_api/mod.rs @@ -20,17 +20,13 @@ use super::NMAppState; #[snafu(visibility(pub))] pub enum Error { #[snafu(display("Device not found"))] - DeviceNotFound { - mac: String - }, + DeviceNotFound { mac: String }, #[snafu(display("Time sent with the device is way too behind now"))] TimeIsLongBehindNow, #[snafu(display("{source}"))] - QSParser { - source: QSParserError - }, + QSParser { source: QSParserError }, } /// Обработчик данных датчиков с устройств. -- 2.48.1 From d5488fd2eb368461378728ab6f8da13e4e1f2105 Mon Sep 17 00:00:00 2001 From: nm17 Date: Mon, 24 Feb 2025 20:13:59 +0400 Subject: [PATCH 6/6] style: rustfmt --- src/ingest_protocol/error.rs | 10 +-- src/ingest_socket_server/mod.rs | 89 ++++++++++++++----- src/web_server/old_app_api/mod.rs | 12 +-- .../old_device_sensor_api/qs_parser.rs | 28 +++--- 4 files changed, 92 insertions(+), 47 deletions(-) diff --git a/src/ingest_protocol/error.rs b/src/ingest_protocol/error.rs index 53e7e8f..8a61cd6 100644 --- a/src/ingest_protocol/error.rs +++ b/src/ingest_protocol/error.rs @@ -3,15 +3,15 @@ use std::fmt::Debug; use std::num::ParseFloatError; use thiserror::Error as ThisError; -/// +/// /// Тип ошибки Ingest протокола -/// +/// /// К сожалению, не может быть переделан на Snafu, так как /// Snafu не поддерживает generic типы как source ошибки, /// не приделывая 'static лайфтайм. -/// -/// См. https://github.com/shepmaster/snafu/issues/99 -/// +/// +/// См. https://github.com/shepmaster/snafu/issues/99 +/// #[allow(dead_code)] #[derive(Debug, ThisError)] pub enum Error { diff --git a/src/ingest_socket_server/mod.rs b/src/ingest_socket_server/mod.rs index 56e6e1c..f60bb34 100644 --- a/src/ingest_socket_server/mod.rs +++ b/src/ingest_socket_server/mod.rs @@ -11,36 +11,66 @@ // } // } -use std::{fmt::Display, io::Read, net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, ops::ControlFlow}; +use std::{ + fmt::Display, + io::Read, + net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, + ops::ControlFlow, +}; use bytes::{buf, Buf, Bytes, BytesMut}; use fred::prelude::Client; +use log::*; use ppp::v2::{self, Addresses, Command, Header, ParseError}; use snafu::{whatever, ResultExt, Whatever}; -use tokio::{io::{AsyncReadExt, AsyncWriteExt, ReadBuf}, net::{TcpSocket, TcpStream}}; -use log::{*}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt, ReadBuf}, + net::{TcpSocket, TcpStream}, +}; -use crate::{business_logic::process_packet, cli::{Cli, SocketServerArgs}, ingest_protocol::{self, error::Error}, web_server::app_error::{AppError, StdIOSnafu}}; +use crate::{ + business_logic::process_packet, + cli::{Cli, SocketServerArgs}, + ingest_protocol::{self, error::Error}, + web_server::app_error::{AppError, StdIOSnafu}, +}; -pub async fn socketserv_main(args: Cli, specific_args: SocketServerArgs, client: Client) -> Result<(), AppError> { +pub async fn socketserv_main( + args: Cli, + specific_args: SocketServerArgs, + client: Client, +) -> Result<(), AppError> { return Ok(()); } #[derive(Clone, PartialEq, Eq, Debug)] pub struct AddressesUnpacked { pub source: SocketAddr, - pub dest: SocketAddr + pub dest: SocketAddr, } pub fn get_sockaddr(addresses: Addresses) -> Option { match addresses { - Addresses::IPv4(addr) => Some(SocketAddr::V4(SocketAddrV4::new(addr.source_address, addr.source_port))), - Addresses::IPv6(addr) => Some(SocketAddr::V6(SocketAddrV6::new(addr.source_address, addr.source_port, 0, 0))), - _ => None + Addresses::IPv4(addr) => Some(SocketAddr::V4(SocketAddrV4::new( + addr.source_address, + addr.source_port, + ))), + Addresses::IPv6(addr) => Some(SocketAddr::V6(SocketAddrV6::new( + addr.source_address, + addr.source_port, + 0, + 0, + ))), + _ => None, } } -pub async fn tcp_proxy_handler(mut buffer: BytesMut, stream: &mut TcpStream, actual_origin: &mut SocketAddr, proxy_data: &mut Option) -> Result<(), AppError> { +pub async fn tcp_proxy_handler( + mut buffer: BytesMut, + stream: &mut TcpStream, + actual_origin: &mut SocketAddr, + proxy_data: &mut Option, +) -> Result<(), AppError> { 'proxy_loop: loop { let len = stream.read(&mut buffer).await.context(StdIOSnafu)?; @@ -53,20 +83,19 @@ pub async fn tcp_proxy_handler(mut buffer: BytesMut, stream: &mut TcpStream, act *proxy_data = Some(Bytes::copy_from_slice(&header.header)); break 'proxy_loop; } - }, + } _ => { // Продолажем работать как обычно (health check от прокси или т.п) } } - }, + } Err(err) => { - match err { ParseError::Incomplete(_) => { // Продолжаем заполнять буфер continue 'proxy_loop; - }, + } _ => { warn!("Получили неисправимую ошибку при парсинге proxy протокола. Убедитесь что никто не пытается подключиться к сервису напрямую без прокси. {error}", error = err); @@ -82,13 +111,19 @@ pub async fn tcp_proxy_handler(mut buffer: BytesMut, stream: &mut TcpStream, act return Ok(()); } -pub async fn socketserv_tcp(args: Cli, specific_args: SocketServerArgs, client: Client) -> Result<(), AppError> { +pub async fn socketserv_tcp( + args: Cli, + specific_args: SocketServerArgs, + client: Client, +) -> Result<(), AppError> { // TODO: errors should not break the server let mut buffer = BytesMut::with_capacity(16 * 1024); let socket = TcpSocket::new_v4().context(StdIOSnafu)?; - socket.bind(SocketAddr::new(specific_args.addr, specific_args.port)).context(StdIOSnafu)?; + socket + .bind(SocketAddr::new(specific_args.addr, specific_args.port)) + .context(StdIOSnafu)?; let listener = socket.listen(256).context(StdIOSnafu)?; @@ -98,7 +133,13 @@ pub async fn socketserv_tcp(args: Cli, specific_args: SocketServerArgs, client: let mut proxy_data = None; if specific_args.proxy_protocol { - tcp_proxy_handler(buffer.clone(), &mut stream, &mut actual_origin, &mut proxy_data).await?; + tcp_proxy_handler( + buffer.clone(), + &mut stream, + &mut actual_origin, + &mut proxy_data, + ) + .await?; } 'protocol_loop: loop { @@ -109,12 +150,15 @@ pub async fn socketserv_tcp(args: Cli, specific_args: SocketServerArgs, client: let resp = process_packet(client.clone(), packet, proxy_data.clone()).await; let _ = stream.write(resp.as_bytes()).await; let _ = stream.shutdown().await; - }, + } Err(err) if err == Error::Incomplete => { continue 'protocol_loop; - }, + } Err(err) => { - stream.write(format!("ERROR: {}", err).as_bytes()).await.context(StdIOSnafu)?; + stream + .write(format!("ERROR: {}", err).as_bytes()) + .await + .context(StdIOSnafu)?; continue 'tcp_loop; } } @@ -122,9 +166,6 @@ pub async fn socketserv_tcp(args: Cli, specific_args: SocketServerArgs, client: } return Ok(()); - } -pub async fn socketserv_udp(args: Cli, specific_args: SocketServerArgs, client: Client) { - -} +pub async fn socketserv_udp(args: Cli, specific_args: SocketServerArgs, client: Client) {} diff --git a/src/web_server/old_app_api/mod.rs b/src/web_server/old_app_api/mod.rs index 55136a8..b940ddb 100644 --- a/src/web_server/old_app_api/mod.rs +++ b/src/web_server/old_app_api/mod.rs @@ -3,14 +3,14 @@ mod handlers; mod types; -use ntex::web::types::State; -use ntex::util::Bytes; -use ntex::web; use crate::web_server::app_error::AppError; -use crate::web_server::NMAppState; use crate::web_server::old_app_api::handlers::{app_init, version}; use crate::web_server::old_app_api::types::{AppInitRequest, MandatoryParams}; use crate::web_server::utils::redis::is_api_key_valid; +use crate::web_server::NMAppState; +use ntex::util::Bytes; +use ntex::web; +use ntex::web::types::State; use ntex::web::HttpRequest; use snafu::{whatever, ResultExt}; @@ -60,7 +60,9 @@ pub async fn old_api_handler( app_init(body, &app_state).await } - _ => Err(AppError::UnknownMethod { method: mandatory_params.cmd.to_string() }), + _ => Err(AppError::UnknownMethod { + method: mandatory_params.cmd.to_string(), + }), } //Ok("fuck") diff --git a/src/web_server/old_device_sensor_api/qs_parser.rs b/src/web_server/old_device_sensor_api/qs_parser.rs index 0b4ca09..65e440d 100644 --- a/src/web_server/old_device_sensor_api/qs_parser.rs +++ b/src/web_server/old_device_sensor_api/qs_parser.rs @@ -19,23 +19,19 @@ pub enum QSParserError { #[snafu(display("asd"))] SerdeQS { #[snafu(source(from(serde_qs::Error, convert_to_arc::)))] - source: Arc + source: Arc, }, #[snafu(display("asd"))] - Parsing { - context: String - }, + Parsing { context: String }, #[snafu(display("asd"))] FloatP { #[snafu(source)] - source: ParseFloatError + source: ParseFloatError, }, #[snafu(display("failed to parse into decimal"))] - DecimalParse { - source: rust_decimal::Error - }, + DecimalParse { source: rust_decimal::Error }, #[snafu(display("asd"))] NoMAC, @@ -43,7 +39,9 @@ pub enum QSParserError { impl From> for QSParserError { fn from(value: Error<&str>) -> Self { - QSParserError::Parsing { context: format!("{:?}", value)} + QSParserError::Parsing { + context: format!("{:?}", value), + } } } @@ -66,7 +64,7 @@ pub fn qs_rest_to_values( for (key, value) in parsed { hashset.insert(SensorValue { mac: key, - value: Decimal::from_str(value.as_str()).context(DecimalParseSnafu{})?, + value: Decimal::from_str(value.as_str()).context(DecimalParseSnafu {})?, time: None, unit: None, @@ -82,7 +80,9 @@ pub fn parse_decimal_if_exists( key: &str, ) -> Result, QSParserError> { if let Some(unwrapped_value) = parsed.remove(key) { - Ok(Some(Decimal::from_str(unwrapped_value.as_str()).context(DecimalParseSnafu{})?)) + Ok(Some( + Decimal::from_str(unwrapped_value.as_str()).context(DecimalParseSnafu {})?, + )) } else { Ok(None) } @@ -93,14 +93,16 @@ pub fn parse_epoch_if_exists( key: &str, ) -> Result, QSParserError> { if let Some(unwrapped_value) = parsed.remove(key) { - Ok(Some(Epoch::from_unix_seconds(unwrapped_value.parse().context(FloatPSnafu{})?))) + Ok(Some(Epoch::from_unix_seconds( + unwrapped_value.parse().context(FloatPSnafu {})?, + ))) } else { Ok(None) } } pub async fn parse_nm_qs_format(input: &str) -> Result { - let mut parsed: HashMap = serde_qs::from_str(input).context(SerdeQSSnafu{})?; + let mut parsed: HashMap = serde_qs::from_str(input).context(SerdeQSSnafu {})?; let (_, device_mac) = if let Some(id) = parsed.get("ID") { parse_mac_address(id)? -- 2.48.1