dev-nm17-2 #25

Open
nm17 wants to merge 6 commits from dev-nm17-2 into master
20 changed files with 588 additions and 182 deletions

73
Cargo.lock generated
View file

@ -23,7 +23,7 @@ version = "0.7.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9" checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9"
dependencies = [ dependencies = [
"getrandom", "getrandom 0.2.15",
"once_cell", "once_cell",
"version_check", "version_check",
] ]
@ -785,7 +785,19 @@ checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"libc", "libc",
"wasi", "wasi 0.11.0+wasi-snapshot-preview1",
]
[[package]]
name = "getrandom"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "43a49c392881ce6d5c3b8cb70f98717b7c07aabbdff06687b9030dbfbe2725f8"
dependencies = [
"cfg-if",
"libc",
"wasi 0.13.3+wasi-0.2.2",
"windows-targets",
] ]
[[package]] [[package]]
@ -1093,11 +1105,14 @@ dependencies = [
"hex", "hex",
"hifitime", "hifitime",
"lazy_static", "lazy_static",
"log",
"nom", "nom",
"ntex", "ntex",
"phf", "phf",
"ppp",
"regex", "regex",
"rust_decimal", "rust_decimal",
"rust_decimal_macros",
"serde", "serde",
"serde_json", "serde_json",
"serde_qs", "serde_qs",
@ -1107,6 +1122,7 @@ dependencies = [
"thiserror", "thiserror",
"tokio", "tokio",
"ufmt", "ufmt",
"uuid",
] ]
[[package]] [[package]]
@ -1223,9 +1239,9 @@ dependencies = [
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.22" version = "0.4.26"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" checksum = "30bde2b3dc3671ae49d8e2e9f044c7c005836e7a023ee57cffa25ab82764bb9e"
[[package]] [[package]]
name = "memchr" name = "memchr"
@ -1261,7 +1277,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd" checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd"
dependencies = [ dependencies = [
"libc", "libc",
"wasi", "wasi 0.11.0+wasi-snapshot-preview1",
"windows-sys 0.52.0", "windows-sys 0.52.0",
] ]
@ -1685,6 +1701,15 @@ version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391" checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
[[package]]
name = "ppp"
version = "2.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a7a2049cd2570bd67bf0228e86bf850f8ceb5190a345c471d03a909da6049e0"
dependencies = [
"thiserror",
]
[[package]] [[package]]
name = "ppv-lite86" name = "ppv-lite86"
version = "0.2.20" version = "0.2.20"
@ -1774,7 +1799,7 @@ version = "0.6.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
dependencies = [ dependencies = [
"getrandom", "getrandom 0.2.15",
] ]
[[package]] [[package]]
@ -1883,6 +1908,16 @@ dependencies = [
"serde_json", "serde_json",
] ]
[[package]]
name = "rust_decimal_macros"
version = "1.36.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "da991f231869f34268415a49724c6578e740ad697ba0999199d6f22b3949332c"
dependencies = [
"quote",
"rust_decimal",
]
[[package]] [[package]]
name = "rustc-demangle" name = "rustc-demangle"
version = "0.1.24" version = "0.1.24"
@ -2422,9 +2457,13 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]] [[package]]
name = "uuid" name = "uuid"
version = "1.11.0" version = "1.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" checksum = "93d59ca99a559661b96bf898d8fce28ed87935fd2bea9f05983c1464dd6c71b1"
dependencies = [
"getrandom 0.3.1",
"serde",
]
[[package]] [[package]]
name = "version_check" name = "version_check"
@ -2438,6 +2477,15 @@ version = "0.11.0+wasi-snapshot-preview1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
[[package]]
name = "wasi"
version = "0.13.3+wasi-0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26816d2e1a4a36a2940b96c5296ce403917633dff8f3440e9b236ed6f6bacad2"
dependencies = [
"wit-bindgen-rt",
]
[[package]] [[package]]
name = "wasm-bindgen" name = "wasm-bindgen"
version = "0.2.99" version = "0.2.99"
@ -2612,6 +2660,15 @@ dependencies = [
"memchr", "memchr",
] ]
[[package]]
name = "wit-bindgen-rt"
version = "0.33.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3268f3d866458b787f390cf61f4bbb563b922d091359f9608842999eaee3943c"
dependencies = [
"bitflags",
]
[[package]] [[package]]
name = "write16" name = "write16"
version = "1.0.0" version = "1.0.0"

View file

@ -32,3 +32,7 @@ ufmt = { version = "0.2.0", features = ["std"] }
futures-util = { version = "0.3.30", features = ["tokio-io"] } futures-util = { version = "0.3.30", features = ["tokio-io"] }
snafu = "0.8.5" snafu = "0.8.5"
clap-verbosity-flag = "3.0.2" clap-verbosity-flag = "3.0.2"
ppp = "2.3.0"
log = "0.4.26"
uuid = { version = "1.14.0", features = ["serde", "v4"] }
rust_decimal_macros = "1.36.0"

View file

@ -1,17 +1,29 @@
# Архитектура KV DB (Dragonfly) # Архитектура KV DB (Dragonfly)
- `apikey_{apikey}` - `apikey_{apikey}`
- API ключ для приложений
- Поля - Поля
- `owner` - `owner`
- Имеет время окончания - Имеет TTL
- `devices_{device_id}` - `devices_{device_uuid}`
- Поля
- Вся информация о девайсе
- `devices_{device_id}_{tai_timestamp}_{sensor_id}`
- Только значение
- `devices_{device_id}`
- Поля - Поля
- `exists`: bool - `exists`: bool
- `tls_only`: bool
- `mtls_only`: bool
- `lat`: decimal
- `long`: decimal
- `alt`: decimal
- `devices_{device_uuid}_sensor{sensor_mac}_{tai_timestamp}`
- Только значение
- `devices_{device_uuid}_{sensor_mac}`
- Поля
- `unit`: str - `unit`: str
- `devices_{device_uuid}_commands`
- `devices_mac{device_mac}`
- Маппинг до device_uuid
- `users_{user_uuid}`
- username: string
- password_hash: string
!!!! Убедитесь что в переменных ключей нет `_` !!!! !!!! Убедитесь что в переменных ключей нет `_` !!!!

View file

@ -0,0 +1,161 @@
use std::str::FromStr;
use fred::prelude::{Client as RedisClient, HashesInterface, KeysInterface};
use hifitime::Epoch;
use rust_decimal::Decimal;
use snafu::ResultExt;
use uuid::{self, Uuid};
use crate::{
ingest_protocol::NMDeviceDataPacket,
uformat,
web_server::{
app_error::{self, AppError, DecimalSnafu, ServerRedisSnafu, UuidSnafu},
old_device_sensor_api::qs_parser::DecimalParseSnafu,
},
};
#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Deserialize, serde::Serialize)]
pub struct Device {
pub device_id: Uuid,
/// ID владельца
pub owner: Uuid,
pub lat: Option<Decimal>,
pub lon: Option<Decimal>,
pub alt: Option<Decimal>,
/// Данные датчика могут быть переданы только по TLS
pub tls_only: bool,
/// Данные датчика могут быть переданы только по Mutual TLS
pub mtls_only: bool,
}
impl Device {
pub fn new(id: Uuid, owner: Uuid) -> Device {
Device {
device_id: id,
owner,
lat: None,
lon: None,
alt: None,
tls_only: false,
mtls_only: false,
}
}
pub async fn get_by_id(redis: &RedisClient, id: Uuid) -> Result<Option<Device>, AppError> {
let key = uformat!("devices_{}", id.to_string());
if !redis.exists(&[&key]).await.context(ServerRedisSnafu)? {
return Ok(None);
}
return Ok(Some(Device {
device_id: id,
owner: redis
.hget::<Option<String>, _, _>(&key, "lat")
.await
.context(ServerRedisSnafu)?
.map(|el| Uuid::from_str(&el).context(UuidSnafu))
.transpose()?
.unwrap(),
lat: redis
.hget::<Option<String>, _, _>(&key, "lat")
.await
.context(ServerRedisSnafu)?
.map(|el| el.parse().context(DecimalSnafu))
.transpose()?,
lon: redis
.hget::<Option<String>, _, _>(&key, "lon")
.await
.context(ServerRedisSnafu)?
.map(|el| el.parse().context(DecimalSnafu))
.transpose()?,
alt: redis
.hget::<Option<String>, _, _>(&key, "alt")
.await
.context(ServerRedisSnafu)?
.map(|el| el.parse().context(DecimalSnafu))
.transpose()?,
tls_only: redis
.hget::<Option<bool>, _, _>(&key, "tls_only")
.await
.context(ServerRedisSnafu)?
.unwrap(),
mtls_only: redis
.hget::<Option<bool>, _, _>(&key, "mtls_only")
.await
.context(ServerRedisSnafu)?
.unwrap(),
}));
}
pub async fn save_to_db(
packet: &NMDeviceDataPacket,
redis: &RedisClient,
) -> Result<(), AppError> {
let device_mac_enc = hex::encode(packet.mac);
let device_id_key = uformat!("devices_mac{}", &device_mac_enc);
let device_id: Option<String> = redis.get(device_id_key).await.context(ServerRedisSnafu)?;
let device_id = device_id.ok_or_else(|| AppError::DeviceNotFound {
mac: device_mac_enc.clone(),
})?;
let now = Epoch::now().unwrap().into();
let mut device_time = packet.time.unwrap_or(now);
// TODO: Добавить гистерезис
// Отчёт совместимости: отсутствует
if device_time > now {
device_time = now;
}
let device_tai_timestamp = device_time.0.to_duration_since_j1900().to_seconds();
let key = uformat!("devices_{}", device_id);
let device_exists: Option<bool> = redis
.hget(key.as_str(), "exists")
.await
.context(ServerRedisSnafu)?;
if !device_exists.is_some_and(|v| v) {
return Err(AppError::DeviceNotFound {
mac: hex::encode(packet.mac),
});
}
// devices_{device_id}_{tai_timestamp}_{sensor_id}
for sensor in &packet.values {
let key = uformat!(
"devices_{}_sensor{}_{}",
device_id,
sensor.mac,
device_tai_timestamp.to_string(),
);
() = redis
.set(key.as_str(), sensor.value.to_string(), None, None, false)
.await
.context(ServerRedisSnafu)?;
}
if let Some(commands) = &packet.commands {
for (cmd_key, cmd_value) in commands {
let key = uformat!("devices_{}_cmds_{}", device_id, cmd_key);
() = redis
.set(key.as_str(), cmd_value, None, None, false)
.await
.context(ServerRedisSnafu)?;
}
}
Ok(())
}
}

37
src/business_logic/mod.rs Normal file
View file

@ -0,0 +1,37 @@
use bytes::Bytes;
use device::Device;
use fred::prelude::Client;
use ntex::web::guard::Header;
use ppp::v2::Header;
use snafu::ResultExt;
pub mod device;
use crate::{
ingest_protocol::NMDeviceDataPacket,
web_server::app_error::{AppError, ServerRedisSnafu},
};
pub async fn process_packet(
r_client: Client,
packet: NMDeviceDataPacket,
proxy_header: Option<Bytes>,
) -> String {
let proxy_ref = proxy_header.as_ref().map(|el| el.as_ref());
let proxy_header_parsed = proxy_ref.map(|el| Header::try_from(el).unwrap());
// proxy_header_parsed.unwrap().tlvs()
let asd: Result<String, AppError> = (|| async {
// TODO: Auth code here. Check if tls only is enabled.
Device::save_to_db(&packet, &r_client).await?;
return Ok("".to_string());
})()
.await;
return match asd {
Ok(resp) => resp,
Err(err) => format!("ERROR: {}", err),
};
}

View file

View file

@ -1,3 +1,5 @@
use std::net::IpAddr;
use clap::{Args, Parser, Subcommand}; use clap::{Args, Parser, Subcommand};
#[derive(Parser, Clone)] #[derive(Parser, Clone)]
@ -32,9 +34,12 @@ pub struct WebServerArgs {
#[derive(Args, Clone)] #[derive(Args, Clone)]
pub struct SocketServerArgs { pub struct SocketServerArgs {
#[arg(short, long, default_value = "localhost")] #[arg(short, long)]
pub addr: String, pub addr: IpAddr,
#[arg(short = 'p', default_value_t = 8283)] #[arg(short = 'p', default_value_t = 8283)]
pub port: u16, pub port: u16,
#[arg(long, default_value_t = false)]
pub proxy_protocol: bool,
} }

View file

@ -6,6 +6,7 @@
// #[serde(rename = "...")] // #[serde(rename = "...")]
// #[serde(alias = "...")] // #[serde(alias = "...")]
use crate::business_logic::device::Device;
use crate::ingest_protocol::error::Error; use crate::ingest_protocol::error::Error;
use crate::ingest_protocol::parser::parse_mac_address; use crate::ingest_protocol::parser::parse_mac_address;
use crate::uformat; use crate::uformat;
@ -98,64 +99,6 @@ pub struct NMDeviceDataPacket {
pub time: Option<EpochUTC>, pub time: Option<EpochUTC>,
} }
impl NMDeviceDataPacket {
pub async fn save_to_db(&self, redis: &RedisClient) -> Result<(), AppError> {
let device_mac_enc = hex::encode(self.mac);
let now = Epoch::now().unwrap().into();
let mut device_time = self.time.unwrap_or(now);
// TODO: Добавить гистерезис
// Отчёт совместимости: отсутствует
if device_time > now {
device_time = now;
}
let device_tai_timestamp = device_time.0.to_duration_since_j1900().to_seconds();
let key = uformat!("devices_{}", device_mac_enc);
let device_exists: Option<bool> = redis
.hget(key.as_str(), "exists")
.await
.context(ServerRedisSnafu)?;
if !device_exists.is_some_and(|v| v) {
return Err(AppError::DeviceNotFound {
mac: hex::encode(self.mac),
});
}
// devices_{device_id}_{tai_timestamp}_{sensor_id}
for sensor in &self.values {
let key = uformat!(
"devices_{}_{}_{}",
device_mac_enc,
device_tai_timestamp.to_string(),
sensor.mac
);
redis
.set(key.as_str(), sensor.value.to_string(), None, None, false)
.await
.context(ServerRedisSnafu)?;
}
if let Some(commands) = &self.commands {
for (cmd_key, cmd_value) in commands {
let key = uformat!("devices_{}_cmds_{}", device_mac_enc, cmd_key);
redis
.set(key.as_str(), cmd_value, None, None, false)
.await
.context(ServerRedisSnafu)?;
}
}
Ok(())
}
}
#[derive(Debug, Clone, Default, PartialEq, Deserialize)] #[derive(Debug, Clone, Default, PartialEq, Deserialize)]
pub struct NMJsonPacket { pub struct NMJsonPacket {
pub devices: Vec<NMDeviceDataPacket>, pub devices: Vec<NMDeviceDataPacket>,
@ -164,7 +107,7 @@ pub struct NMJsonPacket {
impl NMJsonPacket { impl NMJsonPacket {
pub async fn save_to_db(&self, redis: &RedisClient) -> Result<(), AppError> { pub async fn save_to_db(&self, redis: &RedisClient) -> Result<(), AppError> {
for device in &self.devices { for device in &self.devices {
device.save_to_db(redis).await?; Device::save_to_db(&device, redis).await?;
} }
Ok(()) Ok(())

View file

@ -11,10 +11,161 @@
// } // }
// } // }
use std::{
fmt::Display,
io::Read,
net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
ops::ControlFlow,
};
use bytes::{buf, Buf, Bytes, BytesMut};
use fred::prelude::Client; use fred::prelude::Client;
use log::*;
use ppp::v2::{self, Addresses, Command, Header, ParseError};
use snafu::{whatever, ResultExt, Whatever};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt, ReadBuf},
net::{TcpSocket, TcpStream},
};
use crate::cli::{Cli, SocketServerArgs}; use crate::{
business_logic::process_packet,
cli::{Cli, SocketServerArgs},
ingest_protocol::{self, error::Error},
web_server::app_error::{AppError, StdIOSnafu},
};
pub async fn socketserv_main(args: Cli, specific_args: SocketServerArgs, client: Client) { pub async fn socketserv_main(
todo!() args: Cli,
specific_args: SocketServerArgs,
client: Client,
) -> Result<(), AppError> {
return Ok(());
} }
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct AddressesUnpacked {
pub source: SocketAddr,
pub dest: SocketAddr,
}
pub fn get_sockaddr(addresses: Addresses) -> Option<SocketAddr> {
match addresses {
Addresses::IPv4(addr) => Some(SocketAddr::V4(SocketAddrV4::new(
addr.source_address,
addr.source_port,
))),
Addresses::IPv6(addr) => Some(SocketAddr::V6(SocketAddrV6::new(
addr.source_address,
addr.source_port,
0,
0,
))),
_ => None,
}
}
pub async fn tcp_proxy_handler(
mut buffer: BytesMut,
stream: &mut TcpStream,
actual_origin: &mut SocketAddr,
proxy_data: &mut Option<Bytes>,
) -> Result<(), AppError> {
'proxy_loop: loop {
let len = stream.read(&mut buffer).await.context(StdIOSnafu)?;
match v2::Header::try_from(buffer.as_ref()) {
Ok(header) => {
match header.command {
Command::Proxy => {
if let Some(new_socket_addr) = get_sockaddr(header.addresses) {
*actual_origin = new_socket_addr;
*proxy_data = Some(Bytes::copy_from_slice(&header.header));
break 'proxy_loop;
}
}
_ => {
// Продолажем работать как обычно (health check от прокси или т.п)
}
}
}
Err(err) => {
match err {
ParseError::Incomplete(_) => {
// Продолжаем заполнять буфер
continue 'proxy_loop;
}
_ => {
warn!("Получили неисправимую ошибку при парсинге proxy протокола. Убедитесь что никто не пытается подключиться к сервису напрямую без прокси. {error}", error = err);
whatever!("No proxy headers detected");
}
}
}
}
}
buffer = buffer.split_off(proxy_data.as_ref().unwrap().len());
return Ok(());
}
pub async fn socketserv_tcp(
args: Cli,
specific_args: SocketServerArgs,
client: Client,
) -> Result<(), AppError> {
// TODO: errors should not break the server
let mut buffer = BytesMut::with_capacity(16 * 1024);
let socket = TcpSocket::new_v4().context(StdIOSnafu)?;
socket
.bind(SocketAddr::new(specific_args.addr, specific_args.port))
.context(StdIOSnafu)?;
let listener = socket.listen(256).context(StdIOSnafu)?;
'tcp_loop: while let Ok((mut stream, socketaddr)) = listener.accept().await {
// Нужно для правильного применения рейт лимита
let mut actual_origin = socketaddr;
let mut proxy_data = None;
if specific_args.proxy_protocol {
tcp_proxy_handler(
buffer.clone(),
&mut stream,
&mut actual_origin,
&mut proxy_data,
)
.await?;
}
'protocol_loop: loop {
let len = stream.read(&mut buffer).await.context(StdIOSnafu)?;
match ingest_protocol::parser::parse_packet(&buffer) {
Ok(packet) => {
let resp = process_packet(client.clone(), packet, proxy_data.clone()).await;
let _ = stream.write(resp.as_bytes()).await;
let _ = stream.shutdown().await;
}
Err(err) if err == Error::Incomplete => {
continue 'protocol_loop;
}
Err(err) => {
stream
.write(format!("ERROR: {}", err).as_bytes())
.await
.context(StdIOSnafu)?;
continue 'tcp_loop;
}
}
}
}
return Ok(());
}
pub async fn socketserv_udp(args: Cli, specific_args: SocketServerArgs, client: Client) {}

View file

@ -1,6 +1,9 @@
#![doc = include_str!("../README.md")] #![doc = include_str!("../README.md")]
#![feature(try_blocks)]
extern crate core; extern crate core;
mod business_logic;
mod cli; mod cli;
mod ingest_protocol; mod ingest_protocol;
mod ingest_socket_server; mod ingest_socket_server;
@ -14,11 +17,13 @@ use fred::prelude::{
ReconnectPolicy, Server, ServerConfig, ReconnectPolicy, Server, ServerConfig,
}; };
use ingest_socket_server::socketserv_main; use ingest_socket_server::socketserv_main;
use web_server::app_error::AppError;
use crate::web_server::server_main; use crate::web_server::server_main;
#[snafu::report]
#[ntex::main] #[ntex::main]
async fn main() { async fn main() -> Result<(), AppError> {
let result = Cli::parse(); let result = Cli::parse();
let mut config = RedisConfig::default(); let mut config = RedisConfig::default();
@ -39,7 +44,9 @@ async fn main() {
server_main(result.clone(), specific_args.clone(), redis).await; server_main(result.clone(), specific_args.clone(), redis).await;
} }
MyCommand::SocketServer(specific_args) => { MyCommand::SocketServer(specific_args) => {
socketserv_main(result.clone(), specific_args.clone(), redis).await; socketserv_main(result.clone(), specific_args.clone(), redis).await?;
} }
}; };
return Ok(());
} }

View file

@ -65,9 +65,7 @@ impl<'de> Deserialize<'de> for EpochUTC {
) as f64) ) as f64)
.into()); .into());
} }
Ok( Ok(Epoch::from_unix_seconds(value.parse().map_err(de::Error::custom)?).into())
Epoch::from_unix_seconds(value.parse().map_err(de::Error::custom)?).into(),
)
} }
} }

View file

@ -70,7 +70,7 @@ impl<'de> Deserialize<'de> for SupportedUnit {
} }
/// Таблица преобразования текстового представления единиц в значения [SupportedUnit]. /// Таблица преобразования текстового представления единиц в значения [SupportedUnit].
static STR_TO_UNITS: phf::Map<&'static str, SupportedUnit> = phf_map! { pub static STR_TO_UNITS: phf::Map<&'static str, SupportedUnit> = phf_map! {
"C" => SupportedUnit::Celsius, "C" => SupportedUnit::Celsius,
"%" => SupportedUnit::Percentage, "%" => SupportedUnit::Percentage,
"mmHg" => SupportedUnit::MillimeterHg, "mmHg" => SupportedUnit::MillimeterHg,
@ -85,6 +85,12 @@ static STR_TO_UNITS: phf::Map<&'static str, SupportedUnit> = phf_map! {
"KWh" => SupportedUnit::KWh, "KWh" => SupportedUnit::KWh,
}; };
pub fn convert_to_arc<T: std::error::Error>(error: T) -> Arc<T> { pub fn convert_to_arc<T: core::error::Error>(error: T) -> Arc<T> {
Arc::new(error) Arc::new(error)
} }
pub fn convert_to_arced_dynerror(
error: Option<Box<dyn core::error::Error>>,
) -> Option<Arc<dyn core::error::Error>> {
error.map(|el| Arc::from(el))
}

View file

@ -8,42 +8,36 @@ use ntex::web::{HttpRequest, HttpResponse};
use snafu::Snafu; use snafu::Snafu;
use crate::insert_header; use crate::insert_header;
use crate::utils::convert_to_arc; use crate::utils::{convert_to_arc, convert_to_arced_dynerror};
use crate::web_server::old_device_sensor_api::qs_parser::QSParserError; use crate::web_server::old_device_sensor_api::qs_parser::QSParserError;
use rust_decimal::Decimal; use rust_decimal::Decimal;
use serde_json::json; use serde_json::json;
use super::old_device_sensor_api::qs_parser; use super::old_device_sensor_api::qs_parser;
/// Главный объект ошибки [std::error::Error] для всего Web API. /// Главный объект ошибки [std::error::Error] для всего Web API.
/// ///
/// В целом, все Result у Web сервера должны использовать этот Error. /// В целом, все Result у Web сервера должны использовать этот Error.
#[derive(Debug, Snafu, Clone)] #[derive(Debug, Snafu)]
#[snafu(visibility(pub))] #[snafu(visibility(pub))]
pub enum AppError { pub enum AppError {
#[snafu(display("Could not read file"))] #[snafu(display("Could not read file"))]
JsonError { JsonError {
#[snafu(source(from(serde_json::Error, convert_to_arc::<serde_json::Error>)))] #[snafu(source(from(serde_json::Error, convert_to_arc::<serde_json::Error>)))]
source: Arc<serde_json::Error> source: Arc<serde_json::Error>,
}, },
#[snafu(display("Could not read file"))] #[snafu(display("Could not read file"))]
QSError { QSError { source: QSParserError },
source: QSParserError
},
#[snafu(display("Could not read file"))] #[snafu(display("Could not read file"))]
ServerRedisError { ServerRedisError { source: fred::error::Error },
source: fred::error::Error
}, #[snafu(display("Could not parse decimal"))]
Decimal { source: rust_decimal::Error },
#[snafu(display("Could not read file"))] #[snafu(display("Could not read file"))]
UnknownMethod { UnknownMethod { method: String },
method: String
},
#[snafu(display("Could not read file"))] #[snafu(display("Could not read file"))]
RequestTooLarge, RequestTooLarge,
@ -58,9 +52,10 @@ pub enum AppError {
}, },
#[snafu(display("UTF-8 Error"))] #[snafu(display("UTF-8 Error"))]
Utf8Error { Utf8Error { source: std::str::Utf8Error },
source: std::str::Utf8Error
}, #[snafu(display("String cannot be parced into a UUID"))]
Uuid { source: uuid::Error },
#[snafu(display("Could not read file"))] #[snafu(display("Could not read file"))]
UnknownBody { UnknownBody {
@ -69,12 +64,24 @@ pub enum AppError {
}, },
#[snafu(display("Could not read file"))] #[snafu(display("Could not read file"))]
DeviceNotFound { DeviceNotFound { mac: String },
mac: String
#[snafu(display("Std IO error"))]
StdIO {
#[snafu(source(from(std::io::Error, convert_to_arc::<std::io::Error>)))]
source: Arc<std::io::Error>,
}, },
#[snafu(display("Could not read file"))] #[snafu(display("Could not read file"))]
TimeIsLongBehindNow TimeIsLongBehindNow,
#[snafu(display("Could not read file"))]
#[snafu(whatever)]
Whatever {
#[snafu(source(from(Box<dyn std::error::Error>, Some)))]
source: Option<Box<dyn core::error::Error>>,
message: String,
},
} }
impl web::error::WebResponseError for AppError { impl web::error::WebResponseError for AppError {
@ -90,7 +97,12 @@ impl web::error::WebResponseError for AppError {
AppError::UnknownBody { .. } => StatusCode::BAD_REQUEST, AppError::UnknownBody { .. } => StatusCode::BAD_REQUEST,
AppError::QSError { .. } => StatusCode::BAD_REQUEST, AppError::QSError { .. } => StatusCode::BAD_REQUEST,
AppError::DeviceNotFound { .. } => StatusCode::BAD_REQUEST, AppError::DeviceNotFound { .. } => StatusCode::BAD_REQUEST,
AppError::TimeIsLongBehindNow { .. } => StatusCode::BAD_REQUEST AppError::TimeIsLongBehindNow { .. } => StatusCode::BAD_REQUEST,
// Не знаю что лучше тут использовать
AppError::Whatever { .. } => StatusCode::INTERNAL_SERVER_ERROR,
_ => StatusCode::INTERNAL_SERVER_ERROR,
} }
} }
@ -108,7 +120,8 @@ impl web::error::WebResponseError for AppError {
} }
AppError::QSError { .. } => "UrlEncoded body or query params are incorrect", AppError::QSError { .. } => "UrlEncoded body or query params are incorrect",
AppError::DeviceNotFound { .. } => "Device not found", AppError::DeviceNotFound { .. } => "Device not found",
AppError::TimeIsLongBehindNow { .. } => "Time is long behind what it should be" AppError::TimeIsLongBehindNow { .. } => "Time is long behind what it should be",
_ => "Internal server error. Please make sure to tell the devs they are stupid :p",
}; };
let status_code = self.status_code(); let status_code = self.status_code();
@ -121,7 +134,6 @@ impl web::error::WebResponseError for AppError {
let mut resp = HttpResponse::build(status_code).json(&body); let mut resp = HttpResponse::build(status_code).json(&body);
let headers = resp.headers_mut(); let headers = resp.headers_mut();
match self { match self {
AppError::JsonError { source } => { AppError::JsonError { source } => {
insert_header!(headers, "X-Error-Line", source.line()); insert_header!(headers, "X-Error-Line", source.line());
@ -132,7 +144,7 @@ impl web::error::WebResponseError for AppError {
source.to_string().escape_default().collect::<String>() source.to_string().escape_default().collect::<String>()
); );
} }
AppError::UnknownMethod {method} => { AppError::UnknownMethod { method } => {
insert_header!( insert_header!(
headers, headers,
"X-Unknown-Cmd", "X-Unknown-Cmd",

View file

@ -2,25 +2,23 @@ use crate::web_server::app_error::{self, AppError};
use crate::web_server::old_app_api::types::AppInitRequest; use crate::web_server::old_app_api::types::AppInitRequest;
use crate::web_server::NMAppState; use crate::web_server::NMAppState;
use serde_json::{json}; use serde_json::json;
use snafu::ResultExt; use snafu::ResultExt;
use crate::insert_header; use crate::insert_header;
use fred::interfaces::KeysInterface; use fred::interfaces::KeysInterface;
use ntex::http::StatusCode; use ntex::http::StatusCode;
use ntex::web; use ntex::web;
pub async fn app_init( pub async fn app_init(
_body: AppInitRequest<'_>, _body: AppInitRequest,
app_state: &NMAppState, app_state: &NMAppState,
) -> Result<web::HttpResponse, AppError> { ) -> Result<web::HttpResponse, AppError> {
let _: () = app_state let _: () = app_state
.redis_client .redis_client
.set("test", 123, None, None, true) .set("test", 123, None, None, true)
.await.context(app_error::ServerRedisSnafu)?; .await
.context(app_error::ServerRedisSnafu)?;
Ok(web::HttpResponse::build(StatusCode::OK).body("Hello world!")) Ok(web::HttpResponse::build(StatusCode::OK).body("Hello world!"))
} }

View file

@ -3,50 +3,66 @@
mod handlers; mod handlers;
mod types; mod types;
use ntex::web::types::State;
use ntex::util::Bytes;
use ntex::web;
use nom::AsBytes;
use snafu::ResultExt;
use crate::web_server::app_error::AppError; use crate::web_server::app_error::AppError;
use crate::web_server::NMAppState;
use crate::web_server::old_app_api::handlers::{app_init, version}; use crate::web_server::old_app_api::handlers::{app_init, version};
use crate::web_server::old_app_api::types::{AppInitRequest, MandatoryParams}; use crate::web_server::old_app_api::types::{AppInitRequest, MandatoryParams};
use crate::web_server::utils::redis::is_api_key_valid; use crate::web_server::utils::redis::is_api_key_valid;
use crate::web_server::NMAppState;
use ntex::util::Bytes;
use ntex::web;
use ntex::web::types::State;
use ntex::web::HttpRequest;
use snafu::{whatever, ResultExt};
use super::app_error; use super::app_error;
/// Обработчик запросов от приложений. /// Обработчик запросов от приложений.
/// ///
/// Отвечает за разделение на функции по `cmd`. /// Отвечает за разделение на функции по `cmd`.
/// ///
/// Вызывается напрямую из ntex приложения. /// Вызывается напрямую из ntex приложения.
pub async fn old_api_handler( pub async fn old_api_handler(
request: HttpRequest,
app_state: State<NMAppState>, app_state: State<NMAppState>,
body_bytes: Bytes, body_bytes: Bytes,
) -> Result<impl web::Responder, AppError> { ) -> Result<impl web::Responder, AppError> {
let headers = request.headers();
if body_bytes.len() > 10 * 1024 { if body_bytes.len() > 10 * 1024 {
// 10 KiB // 10 KiB
return Err(AppError::RequestTooLarge); return Err(AppError::RequestTooLarge);
} }
let body_bytes = body_bytes.as_bytes(); let mandatory_params: MandatoryParams =
serde_json::from_slice(&body_bytes).context(app_error::JsonSnafu {})?; // TODO: Simd-JSON
let mandatory_params: MandatoryParams<'_> = serde_json::from_slice(body_bytes).context(app_error::JsonSnafu {})?; // TODO: Simd-JSON // Тут все cmd которые могут быть вызваны без api ключа
if mandatory_params.cmd == "version" {
return version((), &app_state).await;
}
// Ignore clippy singlematch let api_key: String;
if mandatory_params.cmd.as_ref() == "version" { return version((), &app_state).await }
is_api_key_valid(&app_state.redis_client, mandatory_params.api_key.as_ref()).await?; if let Some(key) = mandatory_params.api_key {
api_key = key;
} else if let Some(key) = headers.get("Narodmon-Api-Key") {
api_key = key.to_str().with_whatever_context(|_| "asd")?.to_string();
} else {
whatever!("No API key found")
}
match mandatory_params.cmd.as_ref() { is_api_key_valid(&app_state.redis_client, api_key).await?;
match mandatory_params.cmd.as_str() {
"appInit" => { "appInit" => {
let body: AppInitRequest = serde_json::from_slice(body_bytes).context(app_error::JsonSnafu {})?; let body: AppInitRequest =
serde_json::from_slice(&body_bytes).context(app_error::JsonSnafu {})?;
app_init(body, &app_state).await app_init(body, &app_state).await
} }
_ => Err(AppError::UnknownMethod { method: mandatory_params.cmd.to_string() }), _ => Err(AppError::UnknownMethod {
method: mandatory_params.cmd.to_string(),
}),
} }
//Ok("fuck") //Ok("fuck")

View file

@ -1,18 +1,16 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::borrow::Cow; use std::borrow::Cow;
use uuid::Uuid;
// fn<'de, D>(D) -> Result<T, D::Error> where D: Deserializer<'de> // fn<'de, D>(D) -> Result<T, D::Error> where D: Deserializer<'de>
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AppInitRequest<'a> { pub struct AppInitRequest {
#[serde(borrow)] pub version: String,
pub version: Cow<'a, str>,
#[serde(borrow)] pub platform: String,
pub platform: Cow<'a, str>,
#[serde(borrow)] pub model: String,
pub model: Cow<'a, str>,
pub width: u64, pub width: u64,
} }
@ -29,21 +27,20 @@ pub struct AddLikeRequest {
/// получить [MandatoryParams], другой в зависимости от `cmd`. Это позволяет не добвалять поля из /// получить [MandatoryParams], другой в зависимости от `cmd`. Это позволяет не добвалять поля из
/// этой структуры в каждом специфичном типе. /// этой структуры в каждом специфичном типе.
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize)]
pub struct MandatoryParams<'a> { pub struct MandatoryParams {
#[serde(borrow)] pub cmd: String,
pub cmd: Cow<'a, str>,
#[serde(borrow)] pub lang: String,
pub lang: Cow<'a, str>,
/// Уникальный ID клиента. /// Уникальный ID клиента.
/// ///
/// Используется на подобии как куки PHPSESSID в php. /// Используется на подобии как куки PHPSESSID в php.
/// ///
/// См. также: <https://www.php.net/manual/en/book.session.php> /// См. также: <https://www.php.net/manual/en/book.session.php>
#[serde(borrow)] pub uuid: Uuid,
pub uuid: Cow<'a, str>,
#[serde(borrow)] /// API ключ приложения
pub api_key: Cow<'a, str>, ///
/// Может быть указан в теле запроса, а может быть и заголовке Narodmon-Api-Key.
pub api_key: Option<String>,
} }

View file

@ -20,17 +20,13 @@ use super::NMAppState;
#[snafu(visibility(pub))] #[snafu(visibility(pub))]
pub enum Error { pub enum Error {
#[snafu(display("Device not found"))] #[snafu(display("Device not found"))]
DeviceNotFound { DeviceNotFound { mac: String },
mac: String
},
#[snafu(display("Time sent with the device is way too behind now"))] #[snafu(display("Time sent with the device is way too behind now"))]
TimeIsLongBehindNow, TimeIsLongBehindNow,
#[snafu(display("{source}"))] #[snafu(display("{source}"))]
QSParser { QSParser { source: QSParserError },
source: QSParserError
},
} }
/// Обработчик данных датчиков с устройств. /// Обработчик данных датчиков с устройств.

View file

@ -19,23 +19,19 @@ pub enum QSParserError {
#[snafu(display("asd"))] #[snafu(display("asd"))]
SerdeQS { SerdeQS {
#[snafu(source(from(serde_qs::Error, convert_to_arc::<serde_qs::Error>)))] #[snafu(source(from(serde_qs::Error, convert_to_arc::<serde_qs::Error>)))]
source: Arc<serde_qs::Error> source: Arc<serde_qs::Error>,
}, },
#[snafu(display("asd"))] #[snafu(display("asd"))]
Parsing { Parsing { context: String },
context: String
},
#[snafu(display("asd"))] #[snafu(display("asd"))]
FloatP { FloatP {
#[snafu(source)] #[snafu(source)]
source: ParseFloatError source: ParseFloatError,
}, },
#[snafu(display("failed to parse into decimal"))] #[snafu(display("failed to parse into decimal"))]
DecimalParse { DecimalParse { source: rust_decimal::Error },
source: rust_decimal::Error
},
#[snafu(display("asd"))] #[snafu(display("asd"))]
NoMAC, NoMAC,
@ -43,7 +39,9 @@ pub enum QSParserError {
impl From<Error<&str>> for QSParserError { impl From<Error<&str>> for QSParserError {
fn from(value: Error<&str>) -> Self { fn from(value: Error<&str>) -> Self {
QSParserError::Parsing { context: format!("{:?}", value)} QSParserError::Parsing {
context: format!("{:?}", value),
}
} }
} }
@ -66,7 +64,7 @@ pub fn qs_rest_to_values(
for (key, value) in parsed { for (key, value) in parsed {
hashset.insert(SensorValue { hashset.insert(SensorValue {
mac: key, mac: key,
value: Decimal::from_str(value.as_str()).context(DecimalParseSnafu{})?, value: Decimal::from_str(value.as_str()).context(DecimalParseSnafu {})?,
time: None, time: None,
unit: None, unit: None,
@ -82,7 +80,9 @@ pub fn parse_decimal_if_exists(
key: &str, key: &str,
) -> Result<Option<Decimal>, QSParserError> { ) -> Result<Option<Decimal>, QSParserError> {
if let Some(unwrapped_value) = parsed.remove(key) { if let Some(unwrapped_value) = parsed.remove(key) {
Ok(Some(Decimal::from_str(unwrapped_value.as_str()).context(DecimalParseSnafu{})?)) Ok(Some(
Decimal::from_str(unwrapped_value.as_str()).context(DecimalParseSnafu {})?,
))
} else { } else {
Ok(None) Ok(None)
} }
@ -93,14 +93,16 @@ pub fn parse_epoch_if_exists(
key: &str, key: &str,
) -> Result<Option<Epoch>, QSParserError> { ) -> Result<Option<Epoch>, QSParserError> {
if let Some(unwrapped_value) = parsed.remove(key) { if let Some(unwrapped_value) = parsed.remove(key) {
Ok(Some(Epoch::from_unix_seconds(unwrapped_value.parse().context(FloatPSnafu{})?))) Ok(Some(Epoch::from_unix_seconds(
unwrapped_value.parse().context(FloatPSnafu {})?,
)))
} else { } else {
Ok(None) Ok(None)
} }
} }
pub async fn parse_nm_qs_format(input: &str) -> Result<NMDeviceDataPacket, QSParserError> { pub async fn parse_nm_qs_format(input: &str) -> Result<NMDeviceDataPacket, QSParserError> {
let mut parsed: HashMap<String, String> = serde_qs::from_str(input).context(SerdeQSSnafu{})?; let mut parsed: HashMap<String, String> = serde_qs::from_str(input).context(SerdeQSSnafu {})?;
let (_, device_mac) = if let Some(id) = parsed.get("ID") { let (_, device_mac) = if let Some(id) = parsed.get("ID") {
parse_mac_address(id)? parse_mac_address(id)?

View file

@ -1,18 +1,22 @@
//! Сборник утилит для работы с Redis. //! Сборник утилит для работы с Redis.
use crate::web_server::app_error::{AppError, ServerRedisSnafu}; use crate::{
use fred::prelude::*; uformat,
web_server::app_error::{AppError, ServerRedisSnafu},
};
use fred::clients::Client as RedisClient; use fred::clients::Client as RedisClient;
use fred::prelude::*;
use heapless::String as HeaplessString; use heapless::String as HeaplessString;
use lazy_static::lazy_static; use lazy_static::lazy_static;
use regex::Regex; use regex::Regex;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use snafu::ResultExt; use snafu::ResultExt;
use ufmt::uwrite; use ufmt::uwrite;
use uuid::Uuid;
lazy_static! { lazy_static! {
/// Разрешённые знаки для API ключа. /// Разрешённые знаки для API ключа.
static ref ALLOWED_API_KEY_CHARACTERS: Regex = Regex::new("[a-zA-Z0-9]{13}").unwrap(); static ref ALLOWED_API_KEY_CHARACTERS: Regex = Regex::new("[a-zA-Z0-9\\-]{13,36}").unwrap();
} }
/// Описание полей в KV DB у `apikey_{}`. /// Описание полей в KV DB у `apikey_{}`.
@ -25,18 +29,18 @@ pub struct ApiKeyDescription {
/// Проверка API ключа на валидность. /// Проверка API ключа на валидность.
pub async fn is_api_key_valid( pub async fn is_api_key_valid(
client: &RedisClient, client: &RedisClient,
api_key: &str, api_key: String,
) -> Result<ApiKeyDescription, AppError> { ) -> Result<ApiKeyDescription, AppError> {
if !ALLOWED_API_KEY_CHARACTERS.is_match(api_key) { if !ALLOWED_API_KEY_CHARACTERS.is_match(&api_key) {
return Err(AppError::ApiKeyInvalid { return Err(AppError::ApiKeyInvalid {
reason: "Invalid characters present in the API key.", reason: "Invalid characters present in the API key.",
}); });
} }
let mut key_buffer = HeaplessString::<{ 7 + 13 }>::new(); let valid: Option<i64> = client
uwrite!(key_buffer, "apikey_{}", api_key).expect("TODO"); // TODO: Error handling .hget(uformat!("apikey_{}", api_key), "owner")
.await
let valid: Option<i64> = client.hget(key_buffer.as_str(), "owner").await.context(ServerRedisSnafu)?; .context(ServerRedisSnafu)?;
valid valid
.map(|uid| ApiKeyDescription { apikey_owner: uid }) .map(|uid| ApiKeyDescription { apikey_owner: uid })