diff --git a/Cargo.lock b/Cargo.lock index 3e852f6..28be32c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -289,6 +289,16 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +[[package]] +name = "bytes" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c" +dependencies = [ + "byteorder", + "iovec", +] + [[package]] name = "bytes" version = "1.4.0" @@ -304,7 +314,7 @@ version = "0.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e47d3a8076e283f3acd27400535992edb3ba4b5bb72f8891ad8fbe7932a7d4b9" dependencies = [ - "bytes", + "bytes 1.4.0", "either", ] @@ -630,11 +640,11 @@ dependencies = [ "arc-swap", "arcstr", "async-trait", - "bytes", + "bytes 1.4.0", "bytes-utils", "cfg-if", "float-cmp", - "futures", + "futures 0.3.28", "lazy_static", "log", "nom", @@ -656,6 +666,12 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" +[[package]] +name = "futures" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678" + [[package]] name = "futures" version = "0.3.28" @@ -673,9 +689,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" dependencies = [ "futures-core", "futures-sink", @@ -683,9 +699,9 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" +checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" [[package]] name = "futures-executor" @@ -700,15 +716,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" [[package]] name = "futures-macro" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" +checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", @@ -717,15 +733,15 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" +checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" [[package]] name = "futures-task" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" +checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" [[package]] name = "futures-timer" @@ -735,9 +751,9 @@ checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24" [[package]] name = "futures-util" -version = "0.3.28" +version = "0.3.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" +checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" dependencies = [ "futures-channel", "futures-core", @@ -749,6 +765,7 @@ dependencies = [ "pin-project-lite", "pin-utils", "slab", + "tokio-io", ] [[package]] @@ -886,7 +903,7 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" dependencies = [ - "bytes", + "bytes 1.4.0", "fnv", "itoa", ] @@ -990,12 +1007,13 @@ version = "0.1.0" dependencies = [ "anyhow", "bstr", - "bytes", + "bytes 1.4.0", "chrono", "clap", "derive_more", "dotenvy", "fred", + "futures-util", "heapless", "hex", "hifitime", @@ -1015,6 +1033,15 @@ dependencies = [ "ufmt", ] +[[package]] +name = "iovec" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" +dependencies = [ + "libc", +] + [[package]] name = "is-terminal" version = "0.4.7" @@ -1247,7 +1274,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dcb17fec7b1e5f1b504a7f2f02a05ccfe7ad9ecb47fa039a0d7f5a55ff940dfa" dependencies = [ "bitflags 2.4.2", - "bytes", + "bytes 1.4.0", "futures-core", "serde", ] @@ -1688,7 +1715,7 @@ version = "4.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c31deddf734dc0a39d3112e73490e88b61a05e83e074d211f348404cee4d2c6" dependencies = [ - "bytes", + "bytes 1.4.0", "bytes-utils", "cookie-factory", "crc16", @@ -1796,7 +1823,7 @@ dependencies = [ "borsh", "bytecheck", "byteorder", - "bytes", + "bytes 1.4.0", "num-traits", "rand", "rkyv", @@ -2201,7 +2228,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2" dependencies = [ "autocfg", - "bytes", + "bytes 1.4.0", "libc", "mio", "num_cpus", @@ -2213,6 +2240,17 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "tokio-io" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57fc868aae093479e3131e3d165c93b1c7474109d13c90ec0dda2a1bbfff0674" +dependencies = [ + "bytes 0.4.12", + "futures 0.1.31", + "log", +] + [[package]] name = "tokio-macros" version = "2.1.0" @@ -2241,7 +2279,7 @@ version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" dependencies = [ - "bytes", + "bytes 1.4.0", "futures-core", "futures-sink", "pin-project-lite", diff --git a/Cargo.toml b/Cargo.toml index e76204a..12fcd1c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,3 +31,4 @@ smallstr = { version = "0.3.0", features = ["std", "union"] } thiserror = "1.0.40" tokio = { version = "1.28.2", features = ["full"] } ufmt = { version = "0.2.0", features = ["std"] } +futures-util = { version = "0.3.30", features = ["tokio-io"] } diff --git a/docs/kv_db_arch.md b/docs/kv_db_arch.md index 74a3cff..a51ae56 100644 --- a/docs/kv_db_arch.md +++ b/docs/kv_db_arch.md @@ -13,3 +13,5 @@ - Поля - `exists`: bool - `unit`: str + +!!!! Убедитесь что в переменных ключей нет `_` !!!! diff --git a/src/ingest_protocol/packet_types.rs b/src/ingest_protocol/packet_types.rs index e1c07b5..d25f552 100644 --- a/src/ingest_protocol/packet_types.rs +++ b/src/ingest_protocol/packet_types.rs @@ -15,7 +15,7 @@ use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; use serde_with::serde_as; -use std::collections::HashSet; +use std::collections::{BTreeMap, HashMap, HashSet}; use std::hash::{Hash, Hasher}; /// Данные с одного датчика. @@ -75,11 +75,24 @@ pub struct NMDeviceDataPacket { pub lon: Option, pub alt: Option, + /// Команды для управления устройством. + /// + /// Текст после декодировки. + /// + /// ## Отчёт совместимости + /// Функционал изначально narodmon-овский. + /// Про то, как кодировать при наличии символов `#;=` или поддержку UTF-8 не сказано. + /// Рассчитываю на то, что все значения нужно URL кодировать. + pub commands: Option>, + // HTTP GET/POST url-encode specific parameters + /// TODO: Желательное поведение в будущем: /// - Если в values есть хотябы один times и этот time не None => Игнорируем этот time - /// - Если time нет у values => используем этот time (если он None, то соотв. считаем что информации о времени нет) - /// TODO: В базе всё должно храниться как секунды по TAI спустя J1900 для возможности сортировки по времени позже. + /// - Если time нет у values => используем этот time (если он None, то соотв. считаем + /// что информации о времени нет) + /// TODO: В базе всё должно храниться как секунды по TAI спустя J1900 для возможности + /// сортировки по времени позже. pub time: Option, } diff --git a/src/ingest_socket_server/mod.rs b/src/ingest_socket_server/mod.rs new file mode 100644 index 0000000..cdfb594 --- /dev/null +++ b/src/ingest_socket_server/mod.rs @@ -0,0 +1,12 @@ +// let body = "OK"; +// +// let mut stream = app_state.redis_client.scan("devices_{}", None, None); +// +// while let Some(cmd) = stream.next().await { +// let mut cmd = cmd?; +// let redis_cmd_keys = cmd.take_results().unwrap(); +// +// for redis_cmd_key in redis_cmd_keys { +// redis_cmd_key.as_str().unwrap().split("_"); // Продолжи +// } +// } \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 5539a55..8f5014c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,6 +6,8 @@ extern crate core; mod utils; mod ingest_protocol; mod web_server; +mod ingest_socket_server; + use crate::web_server::server_main; struct Params {} diff --git a/src/web_server/old_device_sensor_api/mod.rs b/src/web_server/old_device_sensor_api/mod.rs index 9a140df..bf377fe 100644 --- a/src/web_server/old_device_sensor_api/mod.rs +++ b/src/web_server/old_device_sensor_api/mod.rs @@ -2,11 +2,12 @@ pub mod qs_parser; +use fred::bytes_utils::Str; use crate::ingest_protocol::{NMDeviceDataPacket, NMJsonPacket}; use crate::web_server::app_error::AppError; -use fred::types::RedisMap; +use fred::types::{RedisMap, Scanner}; use ntex::http::{HttpMessage, StatusCode}; use ntex::util::{Bytes, HashMap}; use ntex::{http, web}; @@ -18,6 +19,8 @@ use ufmt::uwrite; use crate::web_server::NMAppState; use crate::web_server::old_device_sensor_api::qs_parser::QSParserError; +use futures_util::stream::StreamExt; + #[derive(Error, Debug)] pub enum Error { #[error("Device not found")] @@ -76,9 +79,15 @@ pub async fn device_handler<'a>( } } + let mut devices_touched_mac = vec![]; + if let Some(real_body) = real_body { for device in real_body.devices { - let mut device_key_str = String::new(); + // TODO: Переместить это всё в отдельную функцию, а лучше две + + let device_mac_enc = hex::encode(device.mac); + + let mut key = String::new(); let now = Epoch::now().unwrap(); let mut device_time = device.time @@ -94,9 +103,9 @@ pub async fn device_handler<'a>( .to_duration_since_j1900() .to_seconds(); - uwrite!(device_key_str, "devices_{}", hex::encode(device.mac)); + uwrite!(&mut key, "devices_{}", device_mac_enc).unwrap(); - let device_exists: bool = app_state.redis_client.hget(device_key_str.as_str(), "exists").await?; + let device_exists: bool = app_state.redis_client.hget(key.as_str(), "exists").await?; if !device_exists { return Err(AppError::DeviceNotFound(hex::encode(&device.mac))); @@ -104,13 +113,26 @@ pub async fn device_handler<'a>( // devices_{device_id}_{tai_timestamp}_{sensor_id} for sensor in device.values { - let mut device_report_key_str = String::new(); - uwrite!(&mut device_report_key_str, "devices_{}_{}_{}", hex::encode(device.mac), device_tai_timestamp.to_string(), sensor.mac); + let mut key = String::new(); + uwrite!(&mut key, "devices_{}_{}_{}", device_mac_enc, device_tai_timestamp.to_string(), sensor.mac).unwrap(); app_state.redis_client.set( - device_key_str.as_str(), sensor.value.to_string(), None, None, false, + key.as_str(), sensor.value.to_string(), None, None, false, ).await?; } + + if let Some(commands) = device.commands { + for (cmd_key, cmd_value) in commands { + let mut key = String::new(); + uwrite!(&mut key, "devices_{}_cmds_{}", device_mac_enc, cmd_key).unwrap(); + + app_state.redis_client.set( + key.as_str(), cmd_value, None, None, false, + ).await?; + } + } + + devices_touched_mac.push(device_mac_enc); } } else { return Err(AppError::UnknownBody { diff --git a/src/web_server/old_device_sensor_api/qs_parser.rs b/src/web_server/old_device_sensor_api/qs_parser.rs index 1bf1e3e..3f23aec 100644 --- a/src/web_server/old_device_sensor_api/qs_parser.rs +++ b/src/web_server/old_device_sensor_api/qs_parser.rs @@ -111,6 +111,10 @@ pub async fn parse_nm_qs_format(input: &str) -> Result