diff --git a/Cargo.lock b/Cargo.lock index 808e3fc..edf33ac 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -292,6 +292,16 @@ version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" +[[package]] +name = "bytes" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c" +dependencies = [ + "byteorder", + "iovec", +] + [[package]] name = "bytes" version = "1.6.0" @@ -307,7 +317,7 @@ version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7dafe3a8757b027e2be6e4e5601ed563c55989fcf1546e933c66c8eb3a058d35" dependencies = [ - "bytes", + "bytes 1.6.0", "either", ] @@ -428,7 +438,7 @@ version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9885fa71e26b8ab7855e2ec7cae6e9b380edff76cd052e07c683a0319d51b3a2" dependencies = [ - "futures", + "futures 0.3.30", ] [[package]] @@ -643,11 +653,11 @@ dependencies = [ "arc-swap", "arcstr", "async-trait", - "bytes", + "bytes 1.6.0", "bytes-utils", "cfg-if", "float-cmp", - "futures", + "futures 0.3.30", "lazy_static", "log", "nom", @@ -668,6 +678,12 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" +[[package]] +name = "futures" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678" + [[package]] name = "futures" version = "0.3.30" @@ -761,6 +777,7 @@ dependencies = [ "pin-project-lite", "pin-utils", "slab", + "tokio-io", ] [[package]] @@ -890,7 +907,7 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "21b9ddb458710bc376481b842f5da65cdf31522de232c1ca8146abce2a358258" dependencies = [ - "bytes", + "bytes 1.6.0", "fnv", "itoa", ] @@ -974,12 +991,13 @@ version = "0.1.0" dependencies = [ "anyhow", "bstr", - "bytes", + "bytes 1.6.0", "chrono", "clap", "derive_more", "dotenvy", "fred", + "futures-util", "heapless", "hex", "hifitime", @@ -999,6 +1017,15 @@ dependencies = [ "ufmt", ] +[[package]] +name = "iovec" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" +dependencies = [ + "libc", +] + [[package]] name = "is_terminal_polyfill" version = "1.70.0" @@ -1236,7 +1263,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2ffd6ac357a3fd885753ddeb4130ec92474e79d013362532eba4778854466981" dependencies = [ "bitflags", - "bytes", + "bytes 1.6.0", "futures-core", "serde", ] @@ -1726,7 +1753,7 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c31deddf734dc0a39d3112e73490e88b61a05e83e074d211f348404cee4d2c6" dependencies = [ - "bytes", + "bytes 1.6.0", "bytes-utils", "cookie-factory", "crc16", @@ -1804,7 +1831,7 @@ checksum = "5cba464629b3394fc4dbc6f940ff8f5b4ff5c7aef40f29166fd4ad12acbc99c0" dependencies = [ "bitvec", "bytecheck", - "bytes", + "bytes 1.6.0", "hashbrown 0.12.3", "ptr_meta", "rend", @@ -1833,7 +1860,7 @@ checksum = "1790d1c4c0ca81211399e0e0af16333276f375209e71a37b67698a373db5b47a" dependencies = [ "arrayvec", "borsh", - "bytes", + "bytes 1.6.0", "num-traits", "rand", "rkyv", @@ -2225,7 +2252,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ba4f4a02a7a80d6f274636f0aa95c7e383b912d41fe721a31f29e29698585a4a" dependencies = [ "backtrace", - "bytes", + "bytes 1.6.0", "libc", "mio", "num_cpus", @@ -2237,6 +2264,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "tokio-io" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57fc868aae093479e3131e3d165c93b1c7474109d13c90ec0dda2a1bbfff0674" +dependencies = [ + "bytes 0.4.12", + "futures 0.1.31", + "log", +] + [[package]] name = "tokio-macros" version = "2.3.0" @@ -2265,7 +2303,7 @@ version = "0.7.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" dependencies = [ - "bytes", + "bytes 1.6.0", "futures-core", "futures-sink", "pin-project-lite", diff --git a/Cargo.toml b/Cargo.toml index e76204a..12fcd1c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,3 +31,4 @@ smallstr = { version = "0.3.0", features = ["std", "union"] } thiserror = "1.0.40" tokio = { version = "1.28.2", features = ["full"] } ufmt = { version = "0.2.0", features = ["std"] } +futures-util = { version = "0.3.30", features = ["tokio-io"] } diff --git a/docs/kv_db_arch.md b/docs/kv_db_arch.md index 74a3cff..a51ae56 100644 --- a/docs/kv_db_arch.md +++ b/docs/kv_db_arch.md @@ -13,3 +13,5 @@ - Поля - `exists`: bool - `unit`: str + +!!!! Убедитесь что в переменных ключей нет `_` !!!! diff --git a/src/ingest_protocol/packet_types.rs b/src/ingest_protocol/packet_types.rs index 3e3ad5d..48e4d25 100644 --- a/src/ingest_protocol/packet_types.rs +++ b/src/ingest_protocol/packet_types.rs @@ -6,15 +6,20 @@ // #[serde(rename = "...")] // #[serde(alias = "...")] -use crate::utils::{EpochUTC, SupportedUnit}; use crate::ingest_protocol::error::Error; use crate::ingest_protocol::parser::parse_mac_address; +use crate::utils::{EpochUTC, SupportedUnit}; +use crate::web_server::app_error::AppError; +use fred::clients::RedisClient; +use fred::interfaces::{HashesInterface, KeysInterface}; +use hifitime::Epoch; use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; use serde_with::serde_as; +use ufmt::uwrite; -use std::collections::HashSet; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::hash::{Hash, Hasher}; /// Данные с одного датчика. @@ -74,6 +79,16 @@ pub struct NMDeviceDataPacket { pub lon: Option, pub alt: Option, + /// Команды для управления устройством. + /// + /// Текст после декодировки. + /// + /// ## Отчёт совместимости + /// Функционал изначально narodmon-овский. + /// Про то, как кодировать при наличии символов `#;=` или поддержку UTF-8 не сказано. + /// Рассчитываю на то, что все значения нужно URL кодировать. + pub commands: Option>, + // HTTP GET/POST url-encode specific parameters /// TODO: Желательное поведение в будущем: /// - Если в values есть хотябы один times и этот time не None => Игнорируем этот time @@ -82,7 +97,74 @@ pub struct NMDeviceDataPacket { pub time: Option, } +impl NMDeviceDataPacket { + pub async fn save_to_db(&self, redis: &RedisClient) -> Result<(), AppError> { + let device_mac_enc = hex::encode(self.mac); + + let mut key = String::new(); + + let now = Epoch::now().unwrap().into(); + let mut device_time = self.time.unwrap_or(now); + + // TODO: Добавить гистерезис + // Отчёт совместимости: отсутствует + if device_time > now { + device_time = now; + } + + let device_tai_timestamp = device_time.0.to_duration_since_j1900().to_seconds(); + + uwrite!(&mut key, "devices_{}", device_mac_enc).unwrap(); + + let device_exists: Option = redis.hget(key.as_str(), "exists").await?; + + if !device_exists.is_some_and(|v| v == true) { + return Err(AppError::DeviceNotFound(hex::encode(&self.mac))); + } + + // devices_{device_id}_{tai_timestamp}_{sensor_id} + for sensor in &self.values { + let mut key = String::new(); + uwrite!( + &mut key, + "devices_{}_{}_{}", + device_mac_enc, + device_tai_timestamp.to_string(), + sensor.mac + ) + .unwrap(); + + redis + .set(key.as_str(), sensor.value.to_string(), None, None, false) + .await?; + } + + if let Some(commands) = &self.commands { + for (cmd_key, cmd_value) in commands { + let mut key = String::new(); + uwrite!(&mut key, "devices_{}_cmds_{}", device_mac_enc, cmd_key).unwrap(); + + redis + .set(key.as_str(), cmd_value, None, None, false) + .await?; + } + } + + return Ok(()); + } +} + #[derive(Debug, Clone, Default, PartialEq, Deserialize)] pub struct NMJsonPacket { pub devices: Vec, } + +impl NMJsonPacket { + pub async fn save_to_db(&self, redis: &RedisClient) -> Result<(), AppError> { + for device in &self.devices { + device.save_to_db(redis).await?; + } + + return Ok(()); + } +} diff --git a/src/ingest_socket_server/mod.rs b/src/ingest_socket_server/mod.rs new file mode 100644 index 0000000..cdfb594 --- /dev/null +++ b/src/ingest_socket_server/mod.rs @@ -0,0 +1,12 @@ +// let body = "OK"; +// +// let mut stream = app_state.redis_client.scan("devices_{}", None, None); +// +// while let Some(cmd) = stream.next().await { +// let mut cmd = cmd?; +// let redis_cmd_keys = cmd.take_results().unwrap(); +// +// for redis_cmd_key in redis_cmd_keys { +// redis_cmd_key.as_str().unwrap().split("_"); // Продолжи +// } +// } \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 2b5fb61..7808067 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,12 +1,11 @@ #![doc = include_str!("../README.md")] - #![feature(try_blocks)] extern crate core; mod ingest_protocol; -mod web_server; +mod ingest_socket_server; mod utils; - +mod web_server; use crate::web_server::server_main; struct Params {} diff --git a/src/web_server/old_device_sensor_api/mod.rs b/src/web_server/old_device_sensor_api/mod.rs index 6eddb27..f591fb7 100644 --- a/src/web_server/old_device_sensor_api/mod.rs +++ b/src/web_server/old_device_sensor_api/mod.rs @@ -4,18 +4,21 @@ pub mod qs_parser; use crate::ingest_protocol::{NMDeviceDataPacket, NMJsonPacket}; use crate::web_server::app_error::AppError; +use fred::bytes_utils::Str; - -use ntex::http::{HttpMessage, StatusCode}; -use ntex::util::Bytes; -use ntex::{http, web}; use fred::prelude::*; use hifitime::Epoch; +use ntex::http::{HttpMessage, StatusCode}; +use ntex::util::Bytes; use ntex::web::types::State; +use ntex::{http, web}; +use qs_parser::QSParserError; use thiserror::Error; use ufmt::uwrite; -use crate::web_server::NMAppState; -use crate::web_server::old_device_sensor_api::qs_parser::QSParserError; + +use futures_util::stream::StreamExt; + +use super::NMAppState; #[derive(Error, Debug)] pub enum Error { @@ -27,7 +30,6 @@ pub enum Error { QSParserError(#[from] QSParserError), } - /// Обработчик данных датчиков с устройств. /// /// Слушает /post и /get. @@ -76,41 +78,7 @@ pub async fn device_handler<'a>( } if let Some(real_body) = real_body { - for device in real_body.devices { - let mut device_key_str = String::new(); - - let now = Epoch::now().unwrap().into(); - let mut device_time = device.time - .unwrap_or(now); - - // TODO: Добавить гистерезис - // Отчёт совместимости: отсутствует - if device_time > now { - device_time = now; - } - - let device_tai_timestamp = device_time.0 - .to_duration_since_j1900() - .to_seconds(); - - uwrite!(device_key_str, "devices_{}", hex::encode(device.mac)); - - let device_exists: bool = app_state.redis_client.hget(device_key_str.as_str(), "exists").await?; - - if !device_exists { - return Err(AppError::DeviceNotFound(hex::encode(&device.mac))); - } - - // devices_{device_id}_{tai_timestamp}_{sensor_id} - for sensor in device.values { - let mut device_report_key_str = String::new(); - uwrite!(&mut device_report_key_str, "devices_{}_{}_{}", hex::encode(device.mac), device_tai_timestamp.to_string(), sensor.mac); - - app_state.redis_client.set( - device_key_str.as_str(), sensor.value.to_string(), None, None, false, - ).await?; - } - } + real_body.save_to_db(&app_state.redis_client).await?; } else { return Err(AppError::UnknownBody { json_err: json_error, @@ -118,6 +86,5 @@ pub async fn device_handler<'a>( }); } - Ok(web::HttpResponseBuilder::new(StatusCode::OK) - .finish()) + Ok(web::HttpResponseBuilder::new(StatusCode::OK).finish()) } diff --git a/src/web_server/old_device_sensor_api/qs_parser.rs b/src/web_server/old_device_sensor_api/qs_parser.rs index e0524d5..193bee3 100644 --- a/src/web_server/old_device_sensor_api/qs_parser.rs +++ b/src/web_server/old_device_sensor_api/qs_parser.rs @@ -1,14 +1,14 @@ +use crate::ingest_protocol::error::Error; +use crate::ingest_protocol::parser::parse_mac_address; +use crate::ingest_protocol::{NMDeviceDataPacket, SensorValue}; +use hifitime::Epoch; +use ntex::util::HashMap; +use rust_decimal::Decimal; use std::collections::HashSet; use std::num::ParseFloatError; use std::str::FromStr; use std::sync::Arc; -use hifitime::Epoch; -use ntex::util::HashMap; -use rust_decimal::Decimal; use thiserror::Error; -use crate::ingest_protocol::error::Error; -use crate::ingest_protocol::{NMDeviceDataPacket, SensorValue}; -use crate::ingest_protocol::parser::parse_mac_address; /// В иделае было бы хорошо сделать всё как у [serde_json::Error], но это слишком большая морока #[derive(Error, Clone, Debug)] @@ -45,7 +45,9 @@ impl From for QSParserError { /// /// Формат: `=`. /// Других данных на подобии названия и времени нет. -pub fn qs_rest_to_values(parsed: HashMap) -> Result, QSParserError> { +pub fn qs_rest_to_values( + parsed: HashMap, +) -> Result, QSParserError> { let mut hashset = HashSet::new(); for (key, value) in parsed { @@ -62,24 +64,26 @@ pub fn qs_rest_to_values(parsed: HashMap) -> Result, key: &str) -> Result, QSParserError> { +pub fn parse_decimal_if_exists( + parsed: &mut HashMap, + key: &str, +) -> Result, QSParserError> { return if let Some(unwrapped_value) = parsed.remove(key) { - Ok(Some( - Decimal::from_str(unwrapped_value.as_str())? - )) + Ok(Some(Decimal::from_str(unwrapped_value.as_str())?)) } else { Ok(None) - } + }; } -pub fn parse_epoch_if_exists(parsed: &mut HashMap, key: &str) -> Result, QSParserError> { +pub fn parse_epoch_if_exists( + parsed: &mut HashMap, + key: &str, +) -> Result, QSParserError> { return if let Some(unwrapped_value) = parsed.remove(key) { - Ok(Some( - Epoch::from_unix_seconds(unwrapped_value.parse()?) - )) + Ok(Some(Epoch::from_unix_seconds(unwrapped_value.parse()?))) } else { Ok(None) - } + }; } pub async fn parse_nm_qs_format(input: &str) -> Result { @@ -99,9 +103,12 @@ pub async fn parse_nm_qs_format(input: &str) -> Result