diff --git a/Cargo.lock b/Cargo.lock index eb37b9d..5a3a05d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -23,7 +23,7 @@ version = "0.7.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9" dependencies = [ - "getrandom", + "getrandom 0.2.15", "once_cell", "version_check", ] @@ -785,7 +785,19 @@ checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" dependencies = [ "cfg-if", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", +] + +[[package]] +name = "getrandom" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "43a49c392881ce6d5c3b8cb70f98717b7c07aabbdff06687b9030dbfbe2725f8" +dependencies = [ + "cfg-if", + "libc", + "wasi 0.13.3+wasi-0.2.2", + "windows-targets", ] [[package]] @@ -1093,11 +1105,14 @@ dependencies = [ "hex", "hifitime", "lazy_static", + "log", "nom", "ntex", "phf", + "ppp", "regex", "rust_decimal", + "rust_decimal_macros", "serde", "serde_json", "serde_qs", @@ -1107,6 +1122,7 @@ dependencies = [ "thiserror", "tokio", "ufmt", + "uuid", ] [[package]] @@ -1223,9 +1239,9 @@ dependencies = [ [[package]] name = "log" -version = "0.4.22" +version = "0.4.26" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" +checksum = "30bde2b3dc3671ae49d8e2e9f044c7c005836e7a023ee57cffa25ab82764bb9e" [[package]] name = "memchr" @@ -1261,7 +1277,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" dependencies = [ "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.52.0", ] @@ -1685,6 +1701,15 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" +[[package]] +name = "ppp" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a7a2049cd2570bd67bf0228e86bf850f8ceb5190a345c471d03a909da6049e0" +dependencies = [ + "thiserror", +] + [[package]] name = "ppv-lite86" version = "0.2.20" @@ -1774,7 +1799,7 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom", + "getrandom 0.2.15", ] [[package]] @@ -1883,6 +1908,16 @@ dependencies = [ "serde_json", ] +[[package]] +name = "rust_decimal_macros" +version = "1.36.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da991f231869f34268415a49724c6578e740ad697ba0999199d6f22b3949332c" +dependencies = [ + "quote", + "rust_decimal", +] + [[package]] name = "rustc-demangle" version = "0.1.24" @@ -2422,9 +2457,13 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" [[package]] name = "uuid" -version = "1.11.0" +version = "1.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" +checksum = "93d59ca99a559661b96bf898d8fce28ed87935fd2bea9f05983c1464dd6c71b1" +dependencies = [ + "getrandom 0.3.1", + "serde", +] [[package]] name = "version_check" @@ -2438,6 +2477,15 @@ version = "0.11.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" +[[package]] +name = "wasi" +version = "0.13.3+wasi-0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26816d2e1a4a36a2940b96c5296ce403917633dff8f3440e9b236ed6f6bacad2" +dependencies = [ + "wit-bindgen-rt", +] + [[package]] name = "wasm-bindgen" version = "0.2.99" @@ -2612,6 +2660,15 @@ dependencies = [ "memchr", ] +[[package]] +name = "wit-bindgen-rt" +version = "0.33.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3268f3d866458b787f390cf61f4bbb563b922d091359f9608842999eaee3943c" +dependencies = [ + "bitflags", +] + [[package]] name = "write16" version = "1.0.0" diff --git a/Cargo.toml b/Cargo.toml index d571bc0..d074cd5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -32,3 +32,7 @@ ufmt = { version = "0.2.0", features = ["std"] } futures-util = { version = "0.3.30", features = ["tokio-io"] } snafu = "0.8.5" clap-verbosity-flag = "3.0.2" +ppp = "2.3.0" +log = "0.4.26" +uuid = { version = "1.14.0", features = ["serde", "v4"] } +rust_decimal_macros = "1.36.0" diff --git a/docs/kv_db_arch.md b/docs/kv_db_arch.md index a51ae56..2b7aa1f 100644 --- a/docs/kv_db_arch.md +++ b/docs/kv_db_arch.md @@ -1,17 +1,29 @@ # Архитектура KV DB (Dragonfly) - `apikey_{apikey}` + - API ключ для приложений - Поля - `owner` - - Имеет время окончания - - `devices_{device_id}` - - Поля - - Вся информация о девайсе - - `devices_{device_id}_{tai_timestamp}_{sensor_id}` - - Только значение - - `devices_{device_id}` + - Имеет TTL + - `devices_{device_uuid}` - Поля - `exists`: bool + - `tls_only`: bool + - `mtls_only`: bool + - `lat`: decimal + - `long`: decimal + - `alt`: decimal + - `devices_{device_uuid}_sensor{sensor_mac}_{tai_timestamp}` + - Только значение + - `devices_{device_uuid}_{sensor_mac}` + - Поля - `unit`: str + - `devices_{device_uuid}_commands` + - `devices_mac{device_mac}` + - Маппинг до device_uuid + - `users_{user_uuid}` + - username: string + - password_hash: string + !!!! Убедитесь что в переменных ключей нет `_` !!!! diff --git a/src/business_logic/device.rs b/src/business_logic/device.rs new file mode 100644 index 0000000..1f5c43a --- /dev/null +++ b/src/business_logic/device.rs @@ -0,0 +1,161 @@ +use std::str::FromStr; + +use fred::prelude::{Client as RedisClient, HashesInterface, KeysInterface}; +use hifitime::Epoch; +use rust_decimal::Decimal; +use snafu::ResultExt; +use uuid::{self, Uuid}; + +use crate::{ + ingest_protocol::NMDeviceDataPacket, + uformat, + web_server::{ + app_error::{self, AppError, DecimalSnafu, ServerRedisSnafu, UuidSnafu}, + old_device_sensor_api::qs_parser::DecimalParseSnafu, + }, +}; + +#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Deserialize, serde::Serialize)] +pub struct Device { + pub device_id: Uuid, + + /// ID владельца + pub owner: Uuid, + + pub lat: Option, + pub lon: Option, + pub alt: Option, + + /// Данные датчика могут быть переданы только по TLS + pub tls_only: bool, + + /// Данные датчика могут быть переданы только по Mutual TLS + pub mtls_only: bool, +} + +impl Device { + pub fn new(id: Uuid, owner: Uuid) -> Device { + Device { + device_id: id, + owner, + lat: None, + lon: None, + alt: None, + tls_only: false, + mtls_only: false, + } + } + + pub async fn get_by_id(redis: &RedisClient, id: Uuid) -> Result, AppError> { + let key = uformat!("devices_{}", id.to_string()); + + if !redis.exists(&[&key]).await.context(ServerRedisSnafu)? { + return Ok(None); + } + + return Ok(Some(Device { + device_id: id, + owner: redis + .hget::, _, _>(&key, "lat") + .await + .context(ServerRedisSnafu)? + .map(|el| Uuid::from_str(&el).context(UuidSnafu)) + .transpose()? + .unwrap(), + lat: redis + .hget::, _, _>(&key, "lat") + .await + .context(ServerRedisSnafu)? + .map(|el| el.parse().context(DecimalSnafu)) + .transpose()?, + lon: redis + .hget::, _, _>(&key, "lon") + .await + .context(ServerRedisSnafu)? + .map(|el| el.parse().context(DecimalSnafu)) + .transpose()?, + alt: redis + .hget::, _, _>(&key, "alt") + .await + .context(ServerRedisSnafu)? + .map(|el| el.parse().context(DecimalSnafu)) + .transpose()?, + tls_only: redis + .hget::, _, _>(&key, "tls_only") + .await + .context(ServerRedisSnafu)? + .unwrap(), + mtls_only: redis + .hget::, _, _>(&key, "mtls_only") + .await + .context(ServerRedisSnafu)? + .unwrap(), + })); + } + + pub async fn save_to_db( + packet: &NMDeviceDataPacket, + redis: &RedisClient, + ) -> Result<(), AppError> { + let device_mac_enc = hex::encode(packet.mac); + let device_id_key = uformat!("devices_mac{}", &device_mac_enc); + + let device_id: Option = redis.get(device_id_key).await.context(ServerRedisSnafu)?; + + let device_id = device_id.ok_or_else(|| AppError::DeviceNotFound { + mac: device_mac_enc.clone(), + })?; + + let now = Epoch::now().unwrap().into(); + let mut device_time = packet.time.unwrap_or(now); + + // TODO: Добавить гистерезис + // Отчёт совместимости: отсутствует + if device_time > now { + device_time = now; + } + + let device_tai_timestamp = device_time.0.to_duration_since_j1900().to_seconds(); + + let key = uformat!("devices_{}", device_id); + + let device_exists: Option = redis + .hget(key.as_str(), "exists") + .await + .context(ServerRedisSnafu)?; + + if !device_exists.is_some_and(|v| v) { + return Err(AppError::DeviceNotFound { + mac: hex::encode(packet.mac), + }); + } + + // devices_{device_id}_{tai_timestamp}_{sensor_id} + for sensor in &packet.values { + let key = uformat!( + "devices_{}_sensor{}_{}", + device_id, + sensor.mac, + device_tai_timestamp.to_string(), + ); + + () = redis + .set(key.as_str(), sensor.value.to_string(), None, None, false) + .await + .context(ServerRedisSnafu)?; + } + + if let Some(commands) = &packet.commands { + for (cmd_key, cmd_value) in commands { + let key = uformat!("devices_{}_cmds_{}", device_id, cmd_key); + + () = redis + .set(key.as_str(), cmd_value, None, None, false) + .await + .context(ServerRedisSnafu)?; + } + } + + Ok(()) + } +} diff --git a/src/business_logic/mod.rs b/src/business_logic/mod.rs new file mode 100644 index 0000000..4a2a358 --- /dev/null +++ b/src/business_logic/mod.rs @@ -0,0 +1,37 @@ +use bytes::Bytes; +use device::Device; +use fred::prelude::Client; +use ntex::web::guard::Header; +use ppp::v2::Header; +use snafu::ResultExt; + +pub mod device; + +use crate::{ + ingest_protocol::NMDeviceDataPacket, + web_server::app_error::{AppError, ServerRedisSnafu}, +}; + +pub async fn process_packet( + r_client: Client, + packet: NMDeviceDataPacket, + proxy_header: Option, +) -> String { + let proxy_ref = proxy_header.as_ref().map(|el| el.as_ref()); + let proxy_header_parsed = proxy_ref.map(|el| Header::try_from(el).unwrap()); + // proxy_header_parsed.unwrap().tlvs() + + let asd: Result = (|| async { + // TODO: Auth code here. Check if tls only is enabled. + + Device::save_to_db(&packet, &r_client).await?; + + return Ok("".to_string()); + })() + .await; + + return match asd { + Ok(resp) => resp, + Err(err) => format!("ERROR: {}", err), + }; +} diff --git a/src/business_logic/user.rs b/src/business_logic/user.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/cli.rs b/src/cli.rs index e26834b..335f621 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,3 +1,5 @@ +use std::net::IpAddr; + use clap::{Args, Parser, Subcommand}; #[derive(Parser, Clone)] @@ -32,9 +34,12 @@ pub struct WebServerArgs { #[derive(Args, Clone)] pub struct SocketServerArgs { - #[arg(short, long, default_value = "localhost")] - pub addr: String, + #[arg(short, long)] + pub addr: IpAddr, #[arg(short = 'p', default_value_t = 8283)] pub port: u16, -} \ No newline at end of file + + #[arg(long, default_value_t = false)] + pub proxy_protocol: bool, +} diff --git a/src/ingest_protocol/error.rs b/src/ingest_protocol/error.rs index 53e7e8f..8a61cd6 100644 --- a/src/ingest_protocol/error.rs +++ b/src/ingest_protocol/error.rs @@ -3,15 +3,15 @@ use std::fmt::Debug; use std::num::ParseFloatError; use thiserror::Error as ThisError; -/// +/// /// Тип ошибки Ingest протокола -/// +/// /// К сожалению, не может быть переделан на Snafu, так как /// Snafu не поддерживает generic типы как source ошибки, /// не приделывая 'static лайфтайм. -/// -/// См. https://github.com/shepmaster/snafu/issues/99 -/// +/// +/// См. https://github.com/shepmaster/snafu/issues/99 +/// #[allow(dead_code)] #[derive(Debug, ThisError)] pub enum Error { diff --git a/src/ingest_protocol/packet_types.rs b/src/ingest_protocol/packet_types.rs index de8e5c8..723aeeb 100644 --- a/src/ingest_protocol/packet_types.rs +++ b/src/ingest_protocol/packet_types.rs @@ -6,6 +6,7 @@ // #[serde(rename = "...")] // #[serde(alias = "...")] +use crate::business_logic::device::Device; use crate::ingest_protocol::error::Error; use crate::ingest_protocol::parser::parse_mac_address; use crate::uformat; @@ -98,64 +99,6 @@ pub struct NMDeviceDataPacket { pub time: Option, } -impl NMDeviceDataPacket { - pub async fn save_to_db(&self, redis: &RedisClient) -> Result<(), AppError> { - let device_mac_enc = hex::encode(self.mac); - - let now = Epoch::now().unwrap().into(); - let mut device_time = self.time.unwrap_or(now); - - // TODO: Добавить гистерезис - // Отчёт совместимости: отсутствует - if device_time > now { - device_time = now; - } - - let device_tai_timestamp = device_time.0.to_duration_since_j1900().to_seconds(); - - let key = uformat!("devices_{}", device_mac_enc); - - let device_exists: Option = redis - .hget(key.as_str(), "exists") - .await - .context(ServerRedisSnafu)?; - - if !device_exists.is_some_and(|v| v) { - return Err(AppError::DeviceNotFound { - mac: hex::encode(self.mac), - }); - } - - // devices_{device_id}_{tai_timestamp}_{sensor_id} - for sensor in &self.values { - let key = uformat!( - "devices_{}_{}_{}", - device_mac_enc, - device_tai_timestamp.to_string(), - sensor.mac - ); - - redis - .set(key.as_str(), sensor.value.to_string(), None, None, false) - .await - .context(ServerRedisSnafu)?; - } - - if let Some(commands) = &self.commands { - for (cmd_key, cmd_value) in commands { - let key = uformat!("devices_{}_cmds_{}", device_mac_enc, cmd_key); - - redis - .set(key.as_str(), cmd_value, None, None, false) - .await - .context(ServerRedisSnafu)?; - } - } - - Ok(()) - } -} - #[derive(Debug, Clone, Default, PartialEq, Deserialize)] pub struct NMJsonPacket { pub devices: Vec, @@ -164,7 +107,7 @@ pub struct NMJsonPacket { impl NMJsonPacket { pub async fn save_to_db(&self, redis: &RedisClient) -> Result<(), AppError> { for device in &self.devices { - device.save_to_db(redis).await?; + Device::save_to_db(&device, redis).await?; } Ok(()) diff --git a/src/ingest_socket_server/mod.rs b/src/ingest_socket_server/mod.rs index b93a9c8..f60bb34 100644 --- a/src/ingest_socket_server/mod.rs +++ b/src/ingest_socket_server/mod.rs @@ -11,10 +11,161 @@ // } // } +use std::{ + fmt::Display, + io::Read, + net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, + ops::ControlFlow, +}; + +use bytes::{buf, Buf, Bytes, BytesMut}; use fred::prelude::Client; +use log::*; +use ppp::v2::{self, Addresses, Command, Header, ParseError}; +use snafu::{whatever, ResultExt, Whatever}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt, ReadBuf}, + net::{TcpSocket, TcpStream}, +}; -use crate::cli::{Cli, SocketServerArgs}; +use crate::{ + business_logic::process_packet, + cli::{Cli, SocketServerArgs}, + ingest_protocol::{self, error::Error}, + web_server::app_error::{AppError, StdIOSnafu}, +}; -pub async fn socketserv_main(args: Cli, specific_args: SocketServerArgs, client: Client) { - todo!() +pub async fn socketserv_main( + args: Cli, + specific_args: SocketServerArgs, + client: Client, +) -> Result<(), AppError> { + return Ok(()); } + +#[derive(Clone, PartialEq, Eq, Debug)] +pub struct AddressesUnpacked { + pub source: SocketAddr, + pub dest: SocketAddr, +} + +pub fn get_sockaddr(addresses: Addresses) -> Option { + match addresses { + Addresses::IPv4(addr) => Some(SocketAddr::V4(SocketAddrV4::new( + addr.source_address, + addr.source_port, + ))), + Addresses::IPv6(addr) => Some(SocketAddr::V6(SocketAddrV6::new( + addr.source_address, + addr.source_port, + 0, + 0, + ))), + _ => None, + } +} + +pub async fn tcp_proxy_handler( + mut buffer: BytesMut, + stream: &mut TcpStream, + actual_origin: &mut SocketAddr, + proxy_data: &mut Option, +) -> Result<(), AppError> { + 'proxy_loop: loop { + let len = stream.read(&mut buffer).await.context(StdIOSnafu)?; + + match v2::Header::try_from(buffer.as_ref()) { + Ok(header) => { + match header.command { + Command::Proxy => { + if let Some(new_socket_addr) = get_sockaddr(header.addresses) { + *actual_origin = new_socket_addr; + *proxy_data = Some(Bytes::copy_from_slice(&header.header)); + break 'proxy_loop; + } + } + _ => { + // Продолажем работать как обычно (health check от прокси или т.п) + } + } + } + Err(err) => { + match err { + ParseError::Incomplete(_) => { + // Продолжаем заполнять буфер + + continue 'proxy_loop; + } + _ => { + warn!("Получили неисправимую ошибку при парсинге proxy протокола. Убедитесь что никто не пытается подключиться к сервису напрямую без прокси. {error}", error = err); + + whatever!("No proxy headers detected"); + } + } + } + } + } + + buffer = buffer.split_off(proxy_data.as_ref().unwrap().len()); + + return Ok(()); +} + +pub async fn socketserv_tcp( + args: Cli, + specific_args: SocketServerArgs, + client: Client, +) -> Result<(), AppError> { + // TODO: errors should not break the server + + let mut buffer = BytesMut::with_capacity(16 * 1024); + let socket = TcpSocket::new_v4().context(StdIOSnafu)?; + + socket + .bind(SocketAddr::new(specific_args.addr, specific_args.port)) + .context(StdIOSnafu)?; + + let listener = socket.listen(256).context(StdIOSnafu)?; + + 'tcp_loop: while let Ok((mut stream, socketaddr)) = listener.accept().await { + // Нужно для правильного применения рейт лимита + let mut actual_origin = socketaddr; + let mut proxy_data = None; + + if specific_args.proxy_protocol { + tcp_proxy_handler( + buffer.clone(), + &mut stream, + &mut actual_origin, + &mut proxy_data, + ) + .await?; + } + + 'protocol_loop: loop { + let len = stream.read(&mut buffer).await.context(StdIOSnafu)?; + + match ingest_protocol::parser::parse_packet(&buffer) { + Ok(packet) => { + let resp = process_packet(client.clone(), packet, proxy_data.clone()).await; + let _ = stream.write(resp.as_bytes()).await; + let _ = stream.shutdown().await; + } + Err(err) if err == Error::Incomplete => { + continue 'protocol_loop; + } + Err(err) => { + stream + .write(format!("ERROR: {}", err).as_bytes()) + .await + .context(StdIOSnafu)?; + continue 'tcp_loop; + } + } + } + } + + return Ok(()); +} + +pub async fn socketserv_udp(args: Cli, specific_args: SocketServerArgs, client: Client) {} diff --git a/src/main.rs b/src/main.rs index 26a4243..a31ed14 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,9 @@ #![doc = include_str!("../README.md")] +#![feature(try_blocks)] + extern crate core; +mod business_logic; mod cli; mod ingest_protocol; mod ingest_socket_server; @@ -14,11 +17,13 @@ use fred::prelude::{ ReconnectPolicy, Server, ServerConfig, }; use ingest_socket_server::socketserv_main; +use web_server::app_error::AppError; use crate::web_server::server_main; +#[snafu::report] #[ntex::main] -async fn main() { +async fn main() -> Result<(), AppError> { let result = Cli::parse(); let mut config = RedisConfig::default(); @@ -39,7 +44,9 @@ async fn main() { server_main(result.clone(), specific_args.clone(), redis).await; } MyCommand::SocketServer(specific_args) => { - socketserv_main(result.clone(), specific_args.clone(), redis).await; + socketserv_main(result.clone(), specific_args.clone(), redis).await?; } }; + + return Ok(()); } diff --git a/src/utils/hifitime_serde.rs b/src/utils/hifitime_serde.rs index 16bcffc..4e605f8 100644 --- a/src/utils/hifitime_serde.rs +++ b/src/utils/hifitime_serde.rs @@ -65,9 +65,7 @@ impl<'de> Deserialize<'de> for EpochUTC { ) as f64) .into()); } - Ok( - Epoch::from_unix_seconds(value.parse().map_err(de::Error::custom)?).into(), - ) + Ok(Epoch::from_unix_seconds(value.parse().map_err(de::Error::custom)?).into()) } } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index b7193a0..890ccb6 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -70,7 +70,7 @@ impl<'de> Deserialize<'de> for SupportedUnit { } /// Таблица преобразования текстового представления единиц в значения [SupportedUnit]. -static STR_TO_UNITS: phf::Map<&'static str, SupportedUnit> = phf_map! { +pub static STR_TO_UNITS: phf::Map<&'static str, SupportedUnit> = phf_map! { "C" => SupportedUnit::Celsius, "%" => SupportedUnit::Percentage, "mmHg" => SupportedUnit::MillimeterHg, @@ -85,6 +85,12 @@ static STR_TO_UNITS: phf::Map<&'static str, SupportedUnit> = phf_map! { "KWh" => SupportedUnit::KWh, }; -pub fn convert_to_arc(error: T) -> Arc { +pub fn convert_to_arc(error: T) -> Arc { Arc::new(error) } + +pub fn convert_to_arced_dynerror( + error: Option>, +) -> Option> { + error.map(|el| Arc::from(el)) +} diff --git a/src/web_server/app_error.rs b/src/web_server/app_error.rs index d13243c..78b39ed 100644 --- a/src/web_server/app_error.rs +++ b/src/web_server/app_error.rs @@ -8,42 +8,36 @@ use ntex::web::{HttpRequest, HttpResponse}; use snafu::Snafu; use crate::insert_header; -use crate::utils::convert_to_arc; +use crate::utils::{convert_to_arc, convert_to_arced_dynerror}; use crate::web_server::old_device_sensor_api::qs_parser::QSParserError; use rust_decimal::Decimal; use serde_json::json; use super::old_device_sensor_api::qs_parser; - - - /// Главный объект ошибки [std::error::Error] для всего Web API. /// /// В целом, все Result у Web сервера должны использовать этот Error. -#[derive(Debug, Snafu, Clone)] +#[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum AppError { #[snafu(display("Could not read file"))] JsonError { #[snafu(source(from(serde_json::Error, convert_to_arc::)))] - source: Arc + source: Arc, }, #[snafu(display("Could not read file"))] - QSError { - source: QSParserError - }, + QSError { source: QSParserError }, #[snafu(display("Could not read file"))] - ServerRedisError { - source: fred::error::Error - }, + ServerRedisError { source: fred::error::Error }, + + #[snafu(display("Could not parse decimal"))] + Decimal { source: rust_decimal::Error }, #[snafu(display("Could not read file"))] - UnknownMethod { - method: String - }, + UnknownMethod { method: String }, #[snafu(display("Could not read file"))] RequestTooLarge, @@ -58,9 +52,10 @@ pub enum AppError { }, #[snafu(display("UTF-8 Error"))] - Utf8Error { - source: std::str::Utf8Error - }, + Utf8Error { source: std::str::Utf8Error }, + + #[snafu(display("String cannot be parced into a UUID"))] + Uuid { source: uuid::Error }, #[snafu(display("Could not read file"))] UnknownBody { @@ -69,12 +64,24 @@ pub enum AppError { }, #[snafu(display("Could not read file"))] - DeviceNotFound { - mac: String + DeviceNotFound { mac: String }, + + #[snafu(display("Std IO error"))] + StdIO { + #[snafu(source(from(std::io::Error, convert_to_arc::)))] + source: Arc, }, #[snafu(display("Could not read file"))] - TimeIsLongBehindNow + TimeIsLongBehindNow, + + #[snafu(display("Could not read file"))] + #[snafu(whatever)] + Whatever { + #[snafu(source(from(Box, Some)))] + source: Option>, + message: String, + }, } impl web::error::WebResponseError for AppError { @@ -90,7 +97,12 @@ impl web::error::WebResponseError for AppError { AppError::UnknownBody { .. } => StatusCode::BAD_REQUEST, AppError::QSError { .. } => StatusCode::BAD_REQUEST, AppError::DeviceNotFound { .. } => StatusCode::BAD_REQUEST, - AppError::TimeIsLongBehindNow { .. } => StatusCode::BAD_REQUEST + AppError::TimeIsLongBehindNow { .. } => StatusCode::BAD_REQUEST, + + // Не знаю что лучше тут использовать + AppError::Whatever { .. } => StatusCode::INTERNAL_SERVER_ERROR, + + _ => StatusCode::INTERNAL_SERVER_ERROR, } } @@ -108,7 +120,8 @@ impl web::error::WebResponseError for AppError { } AppError::QSError { .. } => "UrlEncoded body or query params are incorrect", AppError::DeviceNotFound { .. } => "Device not found", - AppError::TimeIsLongBehindNow { .. } => "Time is long behind what it should be" + AppError::TimeIsLongBehindNow { .. } => "Time is long behind what it should be", + _ => "Internal server error. Please make sure to tell the devs they are stupid :p", }; let status_code = self.status_code(); @@ -121,7 +134,6 @@ impl web::error::WebResponseError for AppError { let mut resp = HttpResponse::build(status_code).json(&body); let headers = resp.headers_mut(); - match self { AppError::JsonError { source } => { insert_header!(headers, "X-Error-Line", source.line()); @@ -132,7 +144,7 @@ impl web::error::WebResponseError for AppError { source.to_string().escape_default().collect::() ); } - AppError::UnknownMethod {method} => { + AppError::UnknownMethod { method } => { insert_header!( headers, "X-Unknown-Cmd", diff --git a/src/web_server/old_app_api/handlers/mod.rs b/src/web_server/old_app_api/handlers/mod.rs index 3c4c593..66b67bf 100644 --- a/src/web_server/old_app_api/handlers/mod.rs +++ b/src/web_server/old_app_api/handlers/mod.rs @@ -2,25 +2,23 @@ use crate::web_server::app_error::{self, AppError}; use crate::web_server::old_app_api::types::AppInitRequest; use crate::web_server::NMAppState; -use serde_json::{json}; +use serde_json::json; use snafu::ResultExt; - use crate::insert_header; use fred::interfaces::KeysInterface; use ntex::http::StatusCode; use ntex::web; - - pub async fn app_init( - _body: AppInitRequest<'_>, + _body: AppInitRequest, app_state: &NMAppState, ) -> Result { let _: () = app_state .redis_client .set("test", 123, None, None, true) - .await.context(app_error::ServerRedisSnafu)?; + .await + .context(app_error::ServerRedisSnafu)?; Ok(web::HttpResponse::build(StatusCode::OK).body("Hello world!")) } diff --git a/src/web_server/old_app_api/mod.rs b/src/web_server/old_app_api/mod.rs index 3430f7c..b940ddb 100644 --- a/src/web_server/old_app_api/mod.rs +++ b/src/web_server/old_app_api/mod.rs @@ -3,50 +3,66 @@ mod handlers; mod types; -use ntex::web::types::State; -use ntex::util::Bytes; -use ntex::web; -use nom::AsBytes; -use snafu::ResultExt; use crate::web_server::app_error::AppError; -use crate::web_server::NMAppState; use crate::web_server::old_app_api::handlers::{app_init, version}; use crate::web_server::old_app_api::types::{AppInitRequest, MandatoryParams}; use crate::web_server::utils::redis::is_api_key_valid; +use crate::web_server::NMAppState; +use ntex::util::Bytes; +use ntex::web; +use ntex::web::types::State; +use ntex::web::HttpRequest; +use snafu::{whatever, ResultExt}; use super::app_error; - /// Обработчик запросов от приложений. /// /// Отвечает за разделение на функции по `cmd`. /// /// Вызывается напрямую из ntex приложения. pub async fn old_api_handler( + request: HttpRequest, app_state: State, body_bytes: Bytes, ) -> Result { + let headers = request.headers(); + if body_bytes.len() > 10 * 1024 { // 10 KiB return Err(AppError::RequestTooLarge); } - let body_bytes = body_bytes.as_bytes(); + let mandatory_params: MandatoryParams = + serde_json::from_slice(&body_bytes).context(app_error::JsonSnafu {})?; // TODO: Simd-JSON - let mandatory_params: MandatoryParams<'_> = serde_json::from_slice(body_bytes).context(app_error::JsonSnafu {})?; // TODO: Simd-JSON + // Тут все cmd которые могут быть вызваны без api ключа + if mandatory_params.cmd == "version" { + return version((), &app_state).await; + } - // Ignore clippy singlematch - if mandatory_params.cmd.as_ref() == "version" { return version((), &app_state).await } + let api_key: String; - is_api_key_valid(&app_state.redis_client, mandatory_params.api_key.as_ref()).await?; + if let Some(key) = mandatory_params.api_key { + api_key = key; + } else if let Some(key) = headers.get("Narodmon-Api-Key") { + api_key = key.to_str().with_whatever_context(|_| "asd")?.to_string(); + } else { + whatever!("No API key found") + } - match mandatory_params.cmd.as_ref() { + is_api_key_valid(&app_state.redis_client, api_key).await?; + + match mandatory_params.cmd.as_str() { "appInit" => { - let body: AppInitRequest = serde_json::from_slice(body_bytes).context(app_error::JsonSnafu {})?; + let body: AppInitRequest = + serde_json::from_slice(&body_bytes).context(app_error::JsonSnafu {})?; app_init(body, &app_state).await } - _ => Err(AppError::UnknownMethod { method: mandatory_params.cmd.to_string() }), + _ => Err(AppError::UnknownMethod { + method: mandatory_params.cmd.to_string(), + }), } //Ok("fuck") diff --git a/src/web_server/old_app_api/types/mod.rs b/src/web_server/old_app_api/types/mod.rs index de9b9bb..7c03320 100644 --- a/src/web_server/old_app_api/types/mod.rs +++ b/src/web_server/old_app_api/types/mod.rs @@ -1,18 +1,16 @@ use serde::{Deserialize, Serialize}; use std::borrow::Cow; +use uuid::Uuid; // fn<'de, D>(D) -> Result where D: Deserializer<'de> #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct AppInitRequest<'a> { - #[serde(borrow)] - pub version: Cow<'a, str>, +pub struct AppInitRequest { + pub version: String, - #[serde(borrow)] - pub platform: Cow<'a, str>, + pub platform: String, - #[serde(borrow)] - pub model: Cow<'a, str>, + pub model: String, pub width: u64, } @@ -29,21 +27,20 @@ pub struct AddLikeRequest { /// получить [MandatoryParams], другой в зависимости от `cmd`. Это позволяет не добвалять поля из /// этой структуры в каждом специфичном типе. #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct MandatoryParams<'a> { - #[serde(borrow)] - pub cmd: Cow<'a, str>, +pub struct MandatoryParams { + pub cmd: String, - #[serde(borrow)] - pub lang: Cow<'a, str>, + pub lang: String, /// Уникальный ID клиента. /// /// Используется на подобии как куки PHPSESSID в php. /// /// См. также: - #[serde(borrow)] - pub uuid: Cow<'a, str>, + pub uuid: Uuid, - #[serde(borrow)] - pub api_key: Cow<'a, str>, + /// API ключ приложения + /// + /// Может быть указан в теле запроса, а может быть и заголовке Narodmon-Api-Key. + pub api_key: Option, } diff --git a/src/web_server/old_device_sensor_api/mod.rs b/src/web_server/old_device_sensor_api/mod.rs index 9f2053b..bcf1c65 100644 --- a/src/web_server/old_device_sensor_api/mod.rs +++ b/src/web_server/old_device_sensor_api/mod.rs @@ -20,17 +20,13 @@ use super::NMAppState; #[snafu(visibility(pub))] pub enum Error { #[snafu(display("Device not found"))] - DeviceNotFound { - mac: String - }, + DeviceNotFound { mac: String }, #[snafu(display("Time sent with the device is way too behind now"))] TimeIsLongBehindNow, #[snafu(display("{source}"))] - QSParser { - source: QSParserError - }, + QSParser { source: QSParserError }, } /// Обработчик данных датчиков с устройств. diff --git a/src/web_server/old_device_sensor_api/qs_parser.rs b/src/web_server/old_device_sensor_api/qs_parser.rs index 0b4ca09..65e440d 100644 --- a/src/web_server/old_device_sensor_api/qs_parser.rs +++ b/src/web_server/old_device_sensor_api/qs_parser.rs @@ -19,23 +19,19 @@ pub enum QSParserError { #[snafu(display("asd"))] SerdeQS { #[snafu(source(from(serde_qs::Error, convert_to_arc::)))] - source: Arc + source: Arc, }, #[snafu(display("asd"))] - Parsing { - context: String - }, + Parsing { context: String }, #[snafu(display("asd"))] FloatP { #[snafu(source)] - source: ParseFloatError + source: ParseFloatError, }, #[snafu(display("failed to parse into decimal"))] - DecimalParse { - source: rust_decimal::Error - }, + DecimalParse { source: rust_decimal::Error }, #[snafu(display("asd"))] NoMAC, @@ -43,7 +39,9 @@ pub enum QSParserError { impl From> for QSParserError { fn from(value: Error<&str>) -> Self { - QSParserError::Parsing { context: format!("{:?}", value)} + QSParserError::Parsing { + context: format!("{:?}", value), + } } } @@ -66,7 +64,7 @@ pub fn qs_rest_to_values( for (key, value) in parsed { hashset.insert(SensorValue { mac: key, - value: Decimal::from_str(value.as_str()).context(DecimalParseSnafu{})?, + value: Decimal::from_str(value.as_str()).context(DecimalParseSnafu {})?, time: None, unit: None, @@ -82,7 +80,9 @@ pub fn parse_decimal_if_exists( key: &str, ) -> Result, QSParserError> { if let Some(unwrapped_value) = parsed.remove(key) { - Ok(Some(Decimal::from_str(unwrapped_value.as_str()).context(DecimalParseSnafu{})?)) + Ok(Some( + Decimal::from_str(unwrapped_value.as_str()).context(DecimalParseSnafu {})?, + )) } else { Ok(None) } @@ -93,14 +93,16 @@ pub fn parse_epoch_if_exists( key: &str, ) -> Result, QSParserError> { if let Some(unwrapped_value) = parsed.remove(key) { - Ok(Some(Epoch::from_unix_seconds(unwrapped_value.parse().context(FloatPSnafu{})?))) + Ok(Some(Epoch::from_unix_seconds( + unwrapped_value.parse().context(FloatPSnafu {})?, + ))) } else { Ok(None) } } pub async fn parse_nm_qs_format(input: &str) -> Result { - let mut parsed: HashMap = serde_qs::from_str(input).context(SerdeQSSnafu{})?; + let mut parsed: HashMap = serde_qs::from_str(input).context(SerdeQSSnafu {})?; let (_, device_mac) = if let Some(id) = parsed.get("ID") { parse_mac_address(id)? diff --git a/src/web_server/utils/redis.rs b/src/web_server/utils/redis.rs index 124e3cd..bf3f19d 100644 --- a/src/web_server/utils/redis.rs +++ b/src/web_server/utils/redis.rs @@ -1,18 +1,22 @@ //! Сборник утилит для работы с Redis. -use crate::web_server::app_error::{AppError, ServerRedisSnafu}; -use fred::prelude::*; +use crate::{ + uformat, + web_server::app_error::{AppError, ServerRedisSnafu}, +}; use fred::clients::Client as RedisClient; +use fred::prelude::*; use heapless::String as HeaplessString; use lazy_static::lazy_static; use regex::Regex; use serde::{Deserialize, Serialize}; use snafu::ResultExt; use ufmt::uwrite; +use uuid::Uuid; lazy_static! { /// Разрешённые знаки для API ключа. - static ref ALLOWED_API_KEY_CHARACTERS: Regex = Regex::new("[a-zA-Z0-9]{13}").unwrap(); + static ref ALLOWED_API_KEY_CHARACTERS: Regex = Regex::new("[a-zA-Z0-9\\-]{13,36}").unwrap(); } /// Описание полей в KV DB у `apikey_{}`. @@ -25,18 +29,18 @@ pub struct ApiKeyDescription { /// Проверка API ключа на валидность. pub async fn is_api_key_valid( client: &RedisClient, - api_key: &str, + api_key: String, ) -> Result { - if !ALLOWED_API_KEY_CHARACTERS.is_match(api_key) { + if !ALLOWED_API_KEY_CHARACTERS.is_match(&api_key) { return Err(AppError::ApiKeyInvalid { reason: "Invalid characters present in the API key.", }); } - let mut key_buffer = HeaplessString::<{ 7 + 13 }>::new(); - uwrite!(key_buffer, "apikey_{}", api_key).expect("TODO"); // TODO: Error handling - - let valid: Option = client.hget(key_buffer.as_str(), "owner").await.context(ServerRedisSnafu)?; + let valid: Option = client + .hget(uformat!("apikey_{}", api_key), "owner") + .await + .context(ServerRedisSnafu)?; valid .map(|uid| ApiKeyDescription { apikey_owner: uid })