feat: begin work on #7 and #10

This commit is contained in:
nm17 2024-05-31 16:50:58 +04:00
parent dc81c21ea9
commit 3e684d093f
Signed by: nm17
GPG key ID: 3303B70C59145CD4
9 changed files with 130 additions and 34 deletions

86
Cargo.lock generated
View file

@ -289,6 +289,16 @@ version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c"
dependencies = [
"byteorder",
"iovec",
]
[[package]] [[package]]
name = "bytes" name = "bytes"
version = "1.4.0" version = "1.4.0"
@ -304,7 +314,7 @@ version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e47d3a8076e283f3acd27400535992edb3ba4b5bb72f8891ad8fbe7932a7d4b9" checksum = "e47d3a8076e283f3acd27400535992edb3ba4b5bb72f8891ad8fbe7932a7d4b9"
dependencies = [ dependencies = [
"bytes", "bytes 1.4.0",
"either", "either",
] ]
@ -630,11 +640,11 @@ dependencies = [
"arc-swap", "arc-swap",
"arcstr", "arcstr",
"async-trait", "async-trait",
"bytes", "bytes 1.4.0",
"bytes-utils", "bytes-utils",
"cfg-if", "cfg-if",
"float-cmp", "float-cmp",
"futures", "futures 0.3.28",
"lazy_static", "lazy_static",
"log", "log",
"nom", "nom",
@ -656,6 +666,12 @@ version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c" checksum = "e6d5a32815ae3f33302d95fdcb2ce17862f8c65363dcfd29360480ba1001fc9c"
[[package]]
name = "futures"
version = "0.1.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678"
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.28" version = "0.3.28"
@ -673,9 +689,9 @@ dependencies = [
[[package]] [[package]]
name = "futures-channel" name = "futures-channel"
version = "0.3.28" version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78"
dependencies = [ dependencies = [
"futures-core", "futures-core",
"futures-sink", "futures-sink",
@ -683,9 +699,9 @@ dependencies = [
[[package]] [[package]]
name = "futures-core" name = "futures-core"
version = "0.3.28" version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d"
[[package]] [[package]]
name = "futures-executor" name = "futures-executor"
@ -700,15 +716,15 @@ dependencies = [
[[package]] [[package]]
name = "futures-io" name = "futures-io"
version = "0.3.28" version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1"
[[package]] [[package]]
name = "futures-macro" name = "futures-macro"
version = "0.3.28" version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -717,15 +733,15 @@ dependencies = [
[[package]] [[package]]
name = "futures-sink" name = "futures-sink"
version = "0.3.28" version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5"
[[package]] [[package]]
name = "futures-task" name = "futures-task"
version = "0.3.28" version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004"
[[package]] [[package]]
name = "futures-timer" name = "futures-timer"
@ -735,9 +751,9 @@ checksum = "f288b0a4f20f9a56b5d1da57e2227c661b7b16168e2f72365f57b63326e29b24"
[[package]] [[package]]
name = "futures-util" name = "futures-util"
version = "0.3.28" version = "0.3.30"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48"
dependencies = [ dependencies = [
"futures-channel", "futures-channel",
"futures-core", "futures-core",
@ -749,6 +765,7 @@ dependencies = [
"pin-project-lite", "pin-project-lite",
"pin-utils", "pin-utils",
"slab", "slab",
"tokio-io",
] ]
[[package]] [[package]]
@ -886,7 +903,7 @@ version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea" checksum = "b32afd38673a8016f7c9ae69e5af41a58f81b1d31689040f2f1959594ce194ea"
dependencies = [ dependencies = [
"bytes", "bytes 1.4.0",
"fnv", "fnv",
"itoa", "itoa",
] ]
@ -990,12 +1007,13 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"bstr", "bstr",
"bytes", "bytes 1.4.0",
"chrono", "chrono",
"clap", "clap",
"derive_more", "derive_more",
"dotenvy", "dotenvy",
"fred", "fred",
"futures-util",
"heapless", "heapless",
"hex", "hex",
"hifitime", "hifitime",
@ -1015,6 +1033,15 @@ dependencies = [
"ufmt", "ufmt",
] ]
[[package]]
name = "iovec"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "is-terminal" name = "is-terminal"
version = "0.4.7" version = "0.4.7"
@ -1247,7 +1274,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcb17fec7b1e5f1b504a7f2f02a05ccfe7ad9ecb47fa039a0d7f5a55ff940dfa" checksum = "dcb17fec7b1e5f1b504a7f2f02a05ccfe7ad9ecb47fa039a0d7f5a55ff940dfa"
dependencies = [ dependencies = [
"bitflags 2.4.2", "bitflags 2.4.2",
"bytes", "bytes 1.4.0",
"futures-core", "futures-core",
"serde", "serde",
] ]
@ -1688,7 +1715,7 @@ version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c31deddf734dc0a39d3112e73490e88b61a05e83e074d211f348404cee4d2c6" checksum = "9c31deddf734dc0a39d3112e73490e88b61a05e83e074d211f348404cee4d2c6"
dependencies = [ dependencies = [
"bytes", "bytes 1.4.0",
"bytes-utils", "bytes-utils",
"cookie-factory", "cookie-factory",
"crc16", "crc16",
@ -1796,7 +1823,7 @@ dependencies = [
"borsh", "borsh",
"bytecheck", "bytecheck",
"byteorder", "byteorder",
"bytes", "bytes 1.4.0",
"num-traits", "num-traits",
"rand", "rand",
"rkyv", "rkyv",
@ -2201,7 +2228,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2" checksum = "94d7b1cfd2aa4011f2de74c2c4c63665e27a71006b0a192dcd2710272e73dfa2"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"bytes", "bytes 1.4.0",
"libc", "libc",
"mio", "mio",
"num_cpus", "num_cpus",
@ -2213,6 +2240,17 @@ dependencies = [
"windows-sys 0.48.0", "windows-sys 0.48.0",
] ]
[[package]]
name = "tokio-io"
version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57fc868aae093479e3131e3d165c93b1c7474109d13c90ec0dda2a1bbfff0674"
dependencies = [
"bytes 0.4.12",
"futures 0.1.31",
"log",
]
[[package]] [[package]]
name = "tokio-macros" name = "tokio-macros"
version = "2.1.0" version = "2.1.0"
@ -2241,7 +2279,7 @@ version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d"
dependencies = [ dependencies = [
"bytes", "bytes 1.4.0",
"futures-core", "futures-core",
"futures-sink", "futures-sink",
"pin-project-lite", "pin-project-lite",

View file

@ -31,3 +31,4 @@ smallstr = { version = "0.3.0", features = ["std", "union"] }
thiserror = "1.0.40" thiserror = "1.0.40"
tokio = { version = "1.28.2", features = ["full"] } tokio = { version = "1.28.2", features = ["full"] }
ufmt = { version = "0.2.0", features = ["std"] } ufmt = { version = "0.2.0", features = ["std"] }
futures-util = { version = "0.3.30", features = ["tokio-io"] }

View file

@ -13,3 +13,5 @@
- Поля - Поля
- `exists`: bool - `exists`: bool
- `unit`: str - `unit`: str
!!!! Убедитесь что в переменных ключей нет `_` !!!!

View file

@ -15,7 +15,7 @@ use rust_decimal::Decimal;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_with::serde_as; use serde_with::serde_as;
use std::collections::HashSet; use std::collections::{BTreeMap, HashMap, HashSet};
use std::hash::{Hash, Hasher}; use std::hash::{Hash, Hasher};
/// Данные с одного датчика. /// Данные с одного датчика.
@ -75,11 +75,24 @@ pub struct NMDeviceDataPacket {
pub lon: Option<Decimal>, pub lon: Option<Decimal>,
pub alt: Option<Decimal>, pub alt: Option<Decimal>,
/// Команды для управления устройством.
///
/// Текст после декодировки.
///
/// ## Отчёт совместимости
/// Функционал изначально narodmon-овский.
/// Про то, как кодировать при наличии символов `#;=` или поддержку UTF-8 не сказано.
/// Рассчитываю на то, что все значения нужно URL кодировать.
pub commands: Option<HashMap<String, String>>,
// HTTP GET/POST url-encode specific parameters // HTTP GET/POST url-encode specific parameters
/// TODO: Желательное поведение в будущем: /// TODO: Желательное поведение в будущем:
/// - Если в values есть хотябы один times и этот time не None => Игнорируем этот time /// - Если в values есть хотябы один times и этот time не None => Игнорируем этот time
/// - Если time нет у values => используем этот time (если он None, то соотв. считаем что информации о времени нет) /// - Если time нет у values => используем этот time (если он None, то соотв. считаем
/// TODO: В базе всё должно храниться как секунды по TAI спустя J1900 для возможности сортировки по времени позже. /// что информации о времени нет)
/// TODO: В базе всё должно храниться как секунды по TAI спустя J1900 для возможности
/// сортировки по времени позже.
pub time: Option<Epoch>, pub time: Option<Epoch>,
} }

View file

@ -0,0 +1,12 @@
// let body = "OK";
//
// let mut stream = app_state.redis_client.scan("devices_{}", None, None);
//
// while let Some(cmd) = stream.next().await {
// let mut cmd = cmd?;
// let redis_cmd_keys = cmd.take_results().unwrap();
//
// for redis_cmd_key in redis_cmd_keys {
// redis_cmd_key.as_str().unwrap().split("_"); // Продолжи
// }
// }

View file

@ -6,6 +6,8 @@ extern crate core;
mod utils; mod utils;
mod ingest_protocol; mod ingest_protocol;
mod web_server; mod web_server;
mod ingest_socket_server;
use crate::web_server::server_main; use crate::web_server::server_main;
struct Params {} struct Params {}

View file

@ -2,11 +2,12 @@
pub mod qs_parser; pub mod qs_parser;
use fred::bytes_utils::Str;
use crate::ingest_protocol::{NMDeviceDataPacket, NMJsonPacket}; use crate::ingest_protocol::{NMDeviceDataPacket, NMJsonPacket};
use crate::web_server::app_error::AppError; use crate::web_server::app_error::AppError;
use fred::types::RedisMap; use fred::types::{RedisMap, Scanner};
use ntex::http::{HttpMessage, StatusCode}; use ntex::http::{HttpMessage, StatusCode};
use ntex::util::{Bytes, HashMap}; use ntex::util::{Bytes, HashMap};
use ntex::{http, web}; use ntex::{http, web};
@ -18,6 +19,8 @@ use ufmt::uwrite;
use crate::web_server::NMAppState; use crate::web_server::NMAppState;
use crate::web_server::old_device_sensor_api::qs_parser::QSParserError; use crate::web_server::old_device_sensor_api::qs_parser::QSParserError;
use futures_util::stream::StreamExt;
#[derive(Error, Debug)] #[derive(Error, Debug)]
pub enum Error { pub enum Error {
#[error("Device not found")] #[error("Device not found")]
@ -76,9 +79,15 @@ pub async fn device_handler<'a>(
} }
} }
let mut devices_touched_mac = vec![];
if let Some(real_body) = real_body { if let Some(real_body) = real_body {
for device in real_body.devices { for device in real_body.devices {
let mut device_key_str = String::new(); // TODO: Переместить это всё в отдельную функцию, а лучше две
let device_mac_enc = hex::encode(device.mac);
let mut key = String::new();
let now = Epoch::now().unwrap(); let now = Epoch::now().unwrap();
let mut device_time = device.time let mut device_time = device.time
@ -94,9 +103,9 @@ pub async fn device_handler<'a>(
.to_duration_since_j1900() .to_duration_since_j1900()
.to_seconds(); .to_seconds();
uwrite!(device_key_str, "devices_{}", hex::encode(device.mac)); uwrite!(&mut key, "devices_{}", device_mac_enc).unwrap();
let device_exists: bool = app_state.redis_client.hget(device_key_str.as_str(), "exists").await?; let device_exists: bool = app_state.redis_client.hget(key.as_str(), "exists").await?;
if !device_exists { if !device_exists {
return Err(AppError::DeviceNotFound(hex::encode(&device.mac))); return Err(AppError::DeviceNotFound(hex::encode(&device.mac)));
@ -104,13 +113,26 @@ pub async fn device_handler<'a>(
// devices_{device_id}_{tai_timestamp}_{sensor_id} // devices_{device_id}_{tai_timestamp}_{sensor_id}
for sensor in device.values { for sensor in device.values {
let mut device_report_key_str = String::new(); let mut key = String::new();
uwrite!(&mut device_report_key_str, "devices_{}_{}_{}", hex::encode(device.mac), device_tai_timestamp.to_string(), sensor.mac); uwrite!(&mut key, "devices_{}_{}_{}", device_mac_enc, device_tai_timestamp.to_string(), sensor.mac).unwrap();
app_state.redis_client.set( app_state.redis_client.set(
device_key_str.as_str(), sensor.value.to_string(), None, None, false, key.as_str(), sensor.value.to_string(), None, None, false,
).await?; ).await?;
} }
if let Some(commands) = device.commands {
for (cmd_key, cmd_value) in commands {
let mut key = String::new();
uwrite!(&mut key, "devices_{}_cmds_{}", device_mac_enc, cmd_key).unwrap();
app_state.redis_client.set(
key.as_str(), cmd_value, None, None, false,
).await?;
}
}
devices_touched_mac.push(device_mac_enc);
} }
} else { } else {
return Err(AppError::UnknownBody { return Err(AppError::UnknownBody {

View file

@ -111,6 +111,10 @@ pub async fn parse_nm_qs_format(input: &str) -> Result<NMDeviceDataPacket, QSPar
lon: parse_decimal_if_exists(&mut parsed, "lon")?, lon: parse_decimal_if_exists(&mut parsed, "lon")?,
alt: parse_decimal_if_exists(&mut parsed, "alt")?, alt: parse_decimal_if_exists(&mut parsed, "alt")?,
time: parse_epoch_if_exists(&mut parsed, "time")?, time: parse_epoch_if_exists(&mut parsed, "time")?,
// TODO: Выяснить можно ли передавать команды по QS и можно ли их отличить от
// маков и значений сенсоров.
commands: None,
values: qs_rest_to_values(parsed)?, values: qs_rest_to_values(parsed)?,
}; };

View file

@ -5,6 +5,7 @@ use fred::prelude::*;
use heapless::String as HeaplessString; use heapless::String as HeaplessString;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use regex::Regex; use regex::Regex;
use serde::{Deserialize, Serialize};
use ufmt::uwrite; use ufmt::uwrite;
lazy_static! { lazy_static! {
@ -13,6 +14,7 @@ lazy_static! {
} }
/// Описание полей в KV DB у `apikey_{}`. /// Описание полей в KV DB у `apikey_{}`.
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct ApiKeyDescription { pub struct ApiKeyDescription {
/// ID владельца API ключа. /// ID владельца API ключа.
apikey_owner: i64, apikey_owner: i64,