dev-nm17-2 #25
20 changed files with 588 additions and 182 deletions
73
Cargo.lock
generated
73
Cargo.lock
generated
|
@ -23,7 +23,7 @@ version = "0.7.8"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9"
|
checksum = "891477e0c6a8957309ee5c45a6368af3ae14bb510732d2684ffa19af310920f9"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"getrandom",
|
"getrandom 0.2.15",
|
||||||
"once_cell",
|
"once_cell",
|
||||||
"version_check",
|
"version_check",
|
||||||
]
|
]
|
||||||
|
@ -785,7 +785,19 @@ checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
"libc",
|
"libc",
|
||||||
"wasi",
|
"wasi 0.11.0+wasi-snapshot-preview1",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "getrandom"
|
||||||
|
version = "0.3.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "43a49c392881ce6d5c3b8cb70f98717b7c07aabbdff06687b9030dbfbe2725f8"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if",
|
||||||
|
"libc",
|
||||||
|
"wasi 0.13.3+wasi-0.2.2",
|
||||||
|
"windows-targets",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -1093,11 +1105,14 @@ dependencies = [
|
||||||
"hex",
|
"hex",
|
||||||
"hifitime",
|
"hifitime",
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
|
"log",
|
||||||
"nom",
|
"nom",
|
||||||
"ntex",
|
"ntex",
|
||||||
"phf",
|
"phf",
|
||||||
|
"ppp",
|
||||||
"regex",
|
"regex",
|
||||||
"rust_decimal",
|
"rust_decimal",
|
||||||
|
"rust_decimal_macros",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"serde_qs",
|
"serde_qs",
|
||||||
|
@ -1107,6 +1122,7 @@ dependencies = [
|
||||||
"thiserror",
|
"thiserror",
|
||||||
"tokio",
|
"tokio",
|
||||||
"ufmt",
|
"ufmt",
|
||||||
|
"uuid",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -1223,9 +1239,9 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "log"
|
name = "log"
|
||||||
version = "0.4.22"
|
version = "0.4.26"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
|
checksum = "30bde2b3dc3671ae49d8e2e9f044c7c005836e7a023ee57cffa25ab82764bb9e"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "memchr"
|
name = "memchr"
|
||||||
|
@ -1261,7 +1277,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd"
|
checksum = "2886843bf800fba2e3377cff24abf6379b4c4d5c6681eaf9ea5b0d15090450bd"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"libc",
|
"libc",
|
||||||
"wasi",
|
"wasi 0.11.0+wasi-snapshot-preview1",
|
||||||
"windows-sys 0.52.0",
|
"windows-sys 0.52.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -1685,6 +1701,15 @@ version = "0.2.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
|
checksum = "439ee305def115ba05938db6eb1644ff94165c5ab5e9420d1c1bcedbba909391"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "ppp"
|
||||||
|
version = "2.3.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "1a7a2049cd2570bd67bf0228e86bf850f8ceb5190a345c471d03a909da6049e0"
|
||||||
|
dependencies = [
|
||||||
|
"thiserror",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ppv-lite86"
|
name = "ppv-lite86"
|
||||||
version = "0.2.20"
|
version = "0.2.20"
|
||||||
|
@ -1774,7 +1799,7 @@ version = "0.6.4"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
|
checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"getrandom",
|
"getrandom 0.2.15",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -1883,6 +1908,16 @@ dependencies = [
|
||||||
"serde_json",
|
"serde_json",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "rust_decimal_macros"
|
||||||
|
version = "1.36.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "da991f231869f34268415a49724c6578e740ad697ba0999199d6f22b3949332c"
|
||||||
|
dependencies = [
|
||||||
|
"quote",
|
||||||
|
"rust_decimal",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "rustc-demangle"
|
name = "rustc-demangle"
|
||||||
version = "0.1.24"
|
version = "0.1.24"
|
||||||
|
@ -2422,9 +2457,13 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "uuid"
|
name = "uuid"
|
||||||
version = "1.11.0"
|
version = "1.14.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a"
|
checksum = "93d59ca99a559661b96bf898d8fce28ed87935fd2bea9f05983c1464dd6c71b1"
|
||||||
|
dependencies = [
|
||||||
|
"getrandom 0.3.1",
|
||||||
|
"serde",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "version_check"
|
name = "version_check"
|
||||||
|
@ -2438,6 +2477,15 @@ version = "0.11.0+wasi-snapshot-preview1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
|
checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "wasi"
|
||||||
|
version = "0.13.3+wasi-0.2.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "26816d2e1a4a36a2940b96c5296ce403917633dff8f3440e9b236ed6f6bacad2"
|
||||||
|
dependencies = [
|
||||||
|
"wit-bindgen-rt",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "wasm-bindgen"
|
name = "wasm-bindgen"
|
||||||
version = "0.2.99"
|
version = "0.2.99"
|
||||||
|
@ -2612,6 +2660,15 @@ dependencies = [
|
||||||
"memchr",
|
"memchr",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "wit-bindgen-rt"
|
||||||
|
version = "0.33.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "3268f3d866458b787f390cf61f4bbb563b922d091359f9608842999eaee3943c"
|
||||||
|
dependencies = [
|
||||||
|
"bitflags",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "write16"
|
name = "write16"
|
||||||
version = "1.0.0"
|
version = "1.0.0"
|
||||||
|
|
|
@ -32,3 +32,7 @@ ufmt = { version = "0.2.0", features = ["std"] }
|
||||||
futures-util = { version = "0.3.30", features = ["tokio-io"] }
|
futures-util = { version = "0.3.30", features = ["tokio-io"] }
|
||||||
snafu = "0.8.5"
|
snafu = "0.8.5"
|
||||||
clap-verbosity-flag = "3.0.2"
|
clap-verbosity-flag = "3.0.2"
|
||||||
|
ppp = "2.3.0"
|
||||||
|
log = "0.4.26"
|
||||||
|
uuid = { version = "1.14.0", features = ["serde", "v4"] }
|
||||||
|
rust_decimal_macros = "1.36.0"
|
||||||
|
|
|
@ -1,17 +1,29 @@
|
||||||
# Архитектура KV DB (Dragonfly)
|
# Архитектура KV DB (Dragonfly)
|
||||||
|
|
||||||
- `apikey_{apikey}`
|
- `apikey_{apikey}`
|
||||||
|
- API ключ для приложений
|
||||||
- Поля
|
- Поля
|
||||||
- `owner`
|
- `owner`
|
||||||
- Имеет время окончания
|
- Имеет TTL
|
||||||
- `devices_{device_id}`
|
- `devices_{device_uuid}`
|
||||||
- Поля
|
|
||||||
- Вся информация о девайсе
|
|
||||||
- `devices_{device_id}_{tai_timestamp}_{sensor_id}`
|
|
||||||
- Только значение
|
|
||||||
- `devices_{device_id}`
|
|
||||||
- Поля
|
- Поля
|
||||||
- `exists`: bool
|
- `exists`: bool
|
||||||
|
- `tls_only`: bool
|
||||||
|
- `mtls_only`: bool
|
||||||
|
- `lat`: decimal
|
||||||
|
- `long`: decimal
|
||||||
|
- `alt`: decimal
|
||||||
|
- `devices_{device_uuid}_sensor{sensor_mac}_{tai_timestamp}`
|
||||||
|
- Только значение
|
||||||
|
- `devices_{device_uuid}_{sensor_mac}`
|
||||||
|
- Поля
|
||||||
- `unit`: str
|
- `unit`: str
|
||||||
|
- `devices_{device_uuid}_commands`
|
||||||
|
- `devices_mac{device_mac}`
|
||||||
|
- Маппинг до device_uuid
|
||||||
|
- `users_{user_uuid}`
|
||||||
|
- username: string
|
||||||
|
- password_hash: string
|
||||||
|
|
||||||
|
|
||||||
!!!! Убедитесь что в переменных ключей нет `_` !!!!
|
!!!! Убедитесь что в переменных ключей нет `_` !!!!
|
||||||
|
|
161
src/business_logic/device.rs
Normal file
161
src/business_logic/device.rs
Normal file
|
@ -0,0 +1,161 @@
|
||||||
|
use std::str::FromStr;
|
||||||
|
|
||||||
|
use fred::prelude::{Client as RedisClient, HashesInterface, KeysInterface};
|
||||||
|
use hifitime::Epoch;
|
||||||
|
use rust_decimal::Decimal;
|
||||||
|
use snafu::ResultExt;
|
||||||
|
use uuid::{self, Uuid};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
ingest_protocol::NMDeviceDataPacket,
|
||||||
|
uformat,
|
||||||
|
web_server::{
|
||||||
|
app_error::{self, AppError, DecimalSnafu, ServerRedisSnafu, UuidSnafu},
|
||||||
|
old_device_sensor_api::qs_parser::DecimalParseSnafu,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Deserialize, serde::Serialize)]
|
||||||
|
pub struct Device {
|
||||||
|
pub device_id: Uuid,
|
||||||
|
|
||||||
|
/// ID владельца
|
||||||
|
pub owner: Uuid,
|
||||||
|
|
||||||
|
pub lat: Option<Decimal>,
|
||||||
|
pub lon: Option<Decimal>,
|
||||||
|
pub alt: Option<Decimal>,
|
||||||
|
|
||||||
|
/// Данные датчика могут быть переданы только по TLS
|
||||||
|
pub tls_only: bool,
|
||||||
|
|
||||||
|
/// Данные датчика могут быть переданы только по Mutual TLS
|
||||||
|
pub mtls_only: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Device {
|
||||||
|
pub fn new(id: Uuid, owner: Uuid) -> Device {
|
||||||
|
Device {
|
||||||
|
device_id: id,
|
||||||
|
owner,
|
||||||
|
lat: None,
|
||||||
|
lon: None,
|
||||||
|
alt: None,
|
||||||
|
tls_only: false,
|
||||||
|
mtls_only: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_by_id(redis: &RedisClient, id: Uuid) -> Result<Option<Device>, AppError> {
|
||||||
|
let key = uformat!("devices_{}", id.to_string());
|
||||||
|
|
||||||
|
if !redis.exists(&[&key]).await.context(ServerRedisSnafu)? {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
return Ok(Some(Device {
|
||||||
|
device_id: id,
|
||||||
|
owner: redis
|
||||||
|
.hget::<Option<String>, _, _>(&key, "lat")
|
||||||
|
.await
|
||||||
|
.context(ServerRedisSnafu)?
|
||||||
|
.map(|el| Uuid::from_str(&el).context(UuidSnafu))
|
||||||
|
.transpose()?
|
||||||
|
.unwrap(),
|
||||||
|
lat: redis
|
||||||
|
.hget::<Option<String>, _, _>(&key, "lat")
|
||||||
|
.await
|
||||||
|
.context(ServerRedisSnafu)?
|
||||||
|
.map(|el| el.parse().context(DecimalSnafu))
|
||||||
|
.transpose()?,
|
||||||
|
lon: redis
|
||||||
|
.hget::<Option<String>, _, _>(&key, "lon")
|
||||||
|
.await
|
||||||
|
.context(ServerRedisSnafu)?
|
||||||
|
.map(|el| el.parse().context(DecimalSnafu))
|
||||||
|
.transpose()?,
|
||||||
|
alt: redis
|
||||||
|
.hget::<Option<String>, _, _>(&key, "alt")
|
||||||
|
.await
|
||||||
|
.context(ServerRedisSnafu)?
|
||||||
|
.map(|el| el.parse().context(DecimalSnafu))
|
||||||
|
.transpose()?,
|
||||||
|
tls_only: redis
|
||||||
|
.hget::<Option<bool>, _, _>(&key, "tls_only")
|
||||||
|
.await
|
||||||
|
.context(ServerRedisSnafu)?
|
||||||
|
.unwrap(),
|
||||||
|
mtls_only: redis
|
||||||
|
.hget::<Option<bool>, _, _>(&key, "mtls_only")
|
||||||
|
.await
|
||||||
|
.context(ServerRedisSnafu)?
|
||||||
|
.unwrap(),
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn save_to_db(
|
||||||
|
packet: &NMDeviceDataPacket,
|
||||||
|
redis: &RedisClient,
|
||||||
|
) -> Result<(), AppError> {
|
||||||
|
let device_mac_enc = hex::encode(packet.mac);
|
||||||
|
let device_id_key = uformat!("devices_mac{}", &device_mac_enc);
|
||||||
|
|
||||||
|
let device_id: Option<String> = redis.get(device_id_key).await.context(ServerRedisSnafu)?;
|
||||||
|
|
||||||
|
let device_id = device_id.ok_or_else(|| AppError::DeviceNotFound {
|
||||||
|
mac: device_mac_enc.clone(),
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let now = Epoch::now().unwrap().into();
|
||||||
|
let mut device_time = packet.time.unwrap_or(now);
|
||||||
|
|
||||||
|
// TODO: Добавить гистерезис
|
||||||
|
// Отчёт совместимости: отсутствует
|
||||||
|
if device_time > now {
|
||||||
|
device_time = now;
|
||||||
|
}
|
||||||
|
|
||||||
|
let device_tai_timestamp = device_time.0.to_duration_since_j1900().to_seconds();
|
||||||
|
|
||||||
|
let key = uformat!("devices_{}", device_id);
|
||||||
|
|
||||||
|
let device_exists: Option<bool> = redis
|
||||||
|
.hget(key.as_str(), "exists")
|
||||||
|
.await
|
||||||
|
.context(ServerRedisSnafu)?;
|
||||||
|
|
||||||
|
if !device_exists.is_some_and(|v| v) {
|
||||||
|
return Err(AppError::DeviceNotFound {
|
||||||
|
mac: hex::encode(packet.mac),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// devices_{device_id}_{tai_timestamp}_{sensor_id}
|
||||||
|
for sensor in &packet.values {
|
||||||
|
let key = uformat!(
|
||||||
|
"devices_{}_sensor{}_{}",
|
||||||
|
device_id,
|
||||||
|
sensor.mac,
|
||||||
|
device_tai_timestamp.to_string(),
|
||||||
|
);
|
||||||
|
|
||||||
|
() = redis
|
||||||
|
.set(key.as_str(), sensor.value.to_string(), None, None, false)
|
||||||
|
.await
|
||||||
|
.context(ServerRedisSnafu)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(commands) = &packet.commands {
|
||||||
|
for (cmd_key, cmd_value) in commands {
|
||||||
|
let key = uformat!("devices_{}_cmds_{}", device_id, cmd_key);
|
||||||
|
|
||||||
|
() = redis
|
||||||
|
.set(key.as_str(), cmd_value, None, None, false)
|
||||||
|
.await
|
||||||
|
.context(ServerRedisSnafu)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
37
src/business_logic/mod.rs
Normal file
37
src/business_logic/mod.rs
Normal file
|
@ -0,0 +1,37 @@
|
||||||
|
use bytes::Bytes;
|
||||||
|
use device::Device;
|
||||||
|
use fred::prelude::Client;
|
||||||
|
use ntex::web::guard::Header;
|
||||||
|
use ppp::v2::Header;
|
||||||
|
use snafu::ResultExt;
|
||||||
|
|
||||||
|
pub mod device;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
ingest_protocol::NMDeviceDataPacket,
|
||||||
|
web_server::app_error::{AppError, ServerRedisSnafu},
|
||||||
|
};
|
||||||
|
|
||||||
|
pub async fn process_packet(
|
||||||
|
r_client: Client,
|
||||||
|
packet: NMDeviceDataPacket,
|
||||||
|
proxy_header: Option<Bytes>,
|
||||||
|
) -> String {
|
||||||
|
let proxy_ref = proxy_header.as_ref().map(|el| el.as_ref());
|
||||||
|
let proxy_header_parsed = proxy_ref.map(|el| Header::try_from(el).unwrap());
|
||||||
|
// proxy_header_parsed.unwrap().tlvs()
|
||||||
|
|
||||||
|
let asd: Result<String, AppError> = (|| async {
|
||||||
|
// TODO: Auth code here. Check if tls only is enabled.
|
||||||
|
|
||||||
|
Device::save_to_db(&packet, &r_client).await?;
|
||||||
|
|
||||||
|
return Ok("".to_string());
|
||||||
|
})()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
return match asd {
|
||||||
|
Ok(resp) => resp,
|
||||||
|
Err(err) => format!("ERROR: {}", err),
|
||||||
|
};
|
||||||
|
}
|
0
src/business_logic/user.rs
Normal file
0
src/business_logic/user.rs
Normal file
11
src/cli.rs
11
src/cli.rs
|
@ -1,3 +1,5 @@
|
||||||
|
use std::net::IpAddr;
|
||||||
|
|
||||||
use clap::{Args, Parser, Subcommand};
|
use clap::{Args, Parser, Subcommand};
|
||||||
|
|
||||||
#[derive(Parser, Clone)]
|
#[derive(Parser, Clone)]
|
||||||
|
@ -32,9 +34,12 @@ pub struct WebServerArgs {
|
||||||
|
|
||||||
#[derive(Args, Clone)]
|
#[derive(Args, Clone)]
|
||||||
pub struct SocketServerArgs {
|
pub struct SocketServerArgs {
|
||||||
#[arg(short, long, default_value = "localhost")]
|
#[arg(short, long)]
|
||||||
pub addr: String,
|
pub addr: IpAddr,
|
||||||
|
|
||||||
#[arg(short = 'p', default_value_t = 8283)]
|
#[arg(short = 'p', default_value_t = 8283)]
|
||||||
pub port: u16,
|
pub port: u16,
|
||||||
}
|
|
||||||
|
#[arg(long, default_value_t = false)]
|
||||||
|
pub proxy_protocol: bool,
|
||||||
|
}
|
||||||
|
|
|
@ -3,15 +3,15 @@ use std::fmt::Debug;
|
||||||
use std::num::ParseFloatError;
|
use std::num::ParseFloatError;
|
||||||
use thiserror::Error as ThisError;
|
use thiserror::Error as ThisError;
|
||||||
|
|
||||||
///
|
///
|
||||||
/// Тип ошибки Ingest протокола
|
/// Тип ошибки Ingest протокола
|
||||||
///
|
///
|
||||||
/// К сожалению, не может быть переделан на Snafu, так как
|
/// К сожалению, не может быть переделан на Snafu, так как
|
||||||
/// Snafu не поддерживает generic типы как source ошибки,
|
/// Snafu не поддерживает generic типы как source ошибки,
|
||||||
/// не приделывая 'static лайфтайм.
|
/// не приделывая 'static лайфтайм.
|
||||||
///
|
///
|
||||||
/// См. https://github.com/shepmaster/snafu/issues/99
|
/// См. https://github.com/shepmaster/snafu/issues/99
|
||||||
///
|
///
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
#[derive(Debug, ThisError)]
|
#[derive(Debug, ThisError)]
|
||||||
pub enum Error<I: Debug> {
|
pub enum Error<I: Debug> {
|
||||||
|
|
|
@ -6,6 +6,7 @@
|
||||||
// #[serde(rename = "...")]
|
// #[serde(rename = "...")]
|
||||||
// #[serde(alias = "...")]
|
// #[serde(alias = "...")]
|
||||||
|
|
||||||
|
use crate::business_logic::device::Device;
|
||||||
use crate::ingest_protocol::error::Error;
|
use crate::ingest_protocol::error::Error;
|
||||||
use crate::ingest_protocol::parser::parse_mac_address;
|
use crate::ingest_protocol::parser::parse_mac_address;
|
||||||
use crate::uformat;
|
use crate::uformat;
|
||||||
|
@ -98,64 +99,6 @@ pub struct NMDeviceDataPacket {
|
||||||
pub time: Option<EpochUTC>,
|
pub time: Option<EpochUTC>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NMDeviceDataPacket {
|
|
||||||
pub async fn save_to_db(&self, redis: &RedisClient) -> Result<(), AppError> {
|
|
||||||
let device_mac_enc = hex::encode(self.mac);
|
|
||||||
|
|
||||||
let now = Epoch::now().unwrap().into();
|
|
||||||
let mut device_time = self.time.unwrap_or(now);
|
|
||||||
|
|
||||||
// TODO: Добавить гистерезис
|
|
||||||
// Отчёт совместимости: отсутствует
|
|
||||||
if device_time > now {
|
|
||||||
device_time = now;
|
|
||||||
}
|
|
||||||
|
|
||||||
let device_tai_timestamp = device_time.0.to_duration_since_j1900().to_seconds();
|
|
||||||
|
|
||||||
let key = uformat!("devices_{}", device_mac_enc);
|
|
||||||
|
|
||||||
let device_exists: Option<bool> = redis
|
|
||||||
.hget(key.as_str(), "exists")
|
|
||||||
.await
|
|
||||||
.context(ServerRedisSnafu)?;
|
|
||||||
|
|
||||||
if !device_exists.is_some_and(|v| v) {
|
|
||||||
return Err(AppError::DeviceNotFound {
|
|
||||||
mac: hex::encode(self.mac),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// devices_{device_id}_{tai_timestamp}_{sensor_id}
|
|
||||||
for sensor in &self.values {
|
|
||||||
let key = uformat!(
|
|
||||||
"devices_{}_{}_{}",
|
|
||||||
device_mac_enc,
|
|
||||||
device_tai_timestamp.to_string(),
|
|
||||||
sensor.mac
|
|
||||||
);
|
|
||||||
|
|
||||||
redis
|
|
||||||
.set(key.as_str(), sensor.value.to_string(), None, None, false)
|
|
||||||
.await
|
|
||||||
.context(ServerRedisSnafu)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(commands) = &self.commands {
|
|
||||||
for (cmd_key, cmd_value) in commands {
|
|
||||||
let key = uformat!("devices_{}_cmds_{}", device_mac_enc, cmd_key);
|
|
||||||
|
|
||||||
redis
|
|
||||||
.set(key.as_str(), cmd_value, None, None, false)
|
|
||||||
.await
|
|
||||||
.context(ServerRedisSnafu)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Default, PartialEq, Deserialize)]
|
#[derive(Debug, Clone, Default, PartialEq, Deserialize)]
|
||||||
pub struct NMJsonPacket {
|
pub struct NMJsonPacket {
|
||||||
pub devices: Vec<NMDeviceDataPacket>,
|
pub devices: Vec<NMDeviceDataPacket>,
|
||||||
|
@ -164,7 +107,7 @@ pub struct NMJsonPacket {
|
||||||
impl NMJsonPacket {
|
impl NMJsonPacket {
|
||||||
pub async fn save_to_db(&self, redis: &RedisClient) -> Result<(), AppError> {
|
pub async fn save_to_db(&self, redis: &RedisClient) -> Result<(), AppError> {
|
||||||
for device in &self.devices {
|
for device in &self.devices {
|
||||||
device.save_to_db(redis).await?;
|
Device::save_to_db(&device, redis).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
@ -11,10 +11,161 @@
|
||||||
// }
|
// }
|
||||||
// }
|
// }
|
||||||
|
|
||||||
|
use std::{
|
||||||
|
fmt::Display,
|
||||||
|
io::Read,
|
||||||
|
net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
|
||||||
|
ops::ControlFlow,
|
||||||
|
};
|
||||||
|
|
||||||
|
use bytes::{buf, Buf, Bytes, BytesMut};
|
||||||
use fred::prelude::Client;
|
use fred::prelude::Client;
|
||||||
|
use log::*;
|
||||||
|
use ppp::v2::{self, Addresses, Command, Header, ParseError};
|
||||||
|
use snafu::{whatever, ResultExt, Whatever};
|
||||||
|
use tokio::{
|
||||||
|
io::{AsyncReadExt, AsyncWriteExt, ReadBuf},
|
||||||
|
net::{TcpSocket, TcpStream},
|
||||||
|
};
|
||||||
|
|
||||||
use crate::cli::{Cli, SocketServerArgs};
|
use crate::{
|
||||||
|
business_logic::process_packet,
|
||||||
|
cli::{Cli, SocketServerArgs},
|
||||||
|
ingest_protocol::{self, error::Error},
|
||||||
|
web_server::app_error::{AppError, StdIOSnafu},
|
||||||
|
};
|
||||||
|
|
||||||
pub async fn socketserv_main(args: Cli, specific_args: SocketServerArgs, client: Client) {
|
pub async fn socketserv_main(
|
||||||
todo!()
|
args: Cli,
|
||||||
|
specific_args: SocketServerArgs,
|
||||||
|
client: Client,
|
||||||
|
) -> Result<(), AppError> {
|
||||||
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, PartialEq, Eq, Debug)]
|
||||||
|
pub struct AddressesUnpacked {
|
||||||
|
pub source: SocketAddr,
|
||||||
|
pub dest: SocketAddr,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_sockaddr(addresses: Addresses) -> Option<SocketAddr> {
|
||||||
|
match addresses {
|
||||||
|
Addresses::IPv4(addr) => Some(SocketAddr::V4(SocketAddrV4::new(
|
||||||
|
addr.source_address,
|
||||||
|
addr.source_port,
|
||||||
|
))),
|
||||||
|
Addresses::IPv6(addr) => Some(SocketAddr::V6(SocketAddrV6::new(
|
||||||
|
addr.source_address,
|
||||||
|
addr.source_port,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
))),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn tcp_proxy_handler(
|
||||||
|
mut buffer: BytesMut,
|
||||||
|
stream: &mut TcpStream,
|
||||||
|
actual_origin: &mut SocketAddr,
|
||||||
|
proxy_data: &mut Option<Bytes>,
|
||||||
|
) -> Result<(), AppError> {
|
||||||
|
'proxy_loop: loop {
|
||||||
|
let len = stream.read(&mut buffer).await.context(StdIOSnafu)?;
|
||||||
|
|
||||||
|
match v2::Header::try_from(buffer.as_ref()) {
|
||||||
|
Ok(header) => {
|
||||||
|
match header.command {
|
||||||
|
Command::Proxy => {
|
||||||
|
if let Some(new_socket_addr) = get_sockaddr(header.addresses) {
|
||||||
|
*actual_origin = new_socket_addr;
|
||||||
|
*proxy_data = Some(Bytes::copy_from_slice(&header.header));
|
||||||
|
break 'proxy_loop;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
// Продолажем работать как обычно (health check от прокси или т.п)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
match err {
|
||||||
|
ParseError::Incomplete(_) => {
|
||||||
|
// Продолжаем заполнять буфер
|
||||||
|
|
||||||
|
continue 'proxy_loop;
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
warn!("Получили неисправимую ошибку при парсинге proxy протокола. Убедитесь что никто не пытается подключиться к сервису напрямую без прокси. {error}", error = err);
|
||||||
|
|
||||||
|
whatever!("No proxy headers detected");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
buffer = buffer.split_off(proxy_data.as_ref().unwrap().len());
|
||||||
|
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn socketserv_tcp(
|
||||||
|
args: Cli,
|
||||||
|
specific_args: SocketServerArgs,
|
||||||
|
client: Client,
|
||||||
|
) -> Result<(), AppError> {
|
||||||
|
// TODO: errors should not break the server
|
||||||
|
|
||||||
|
let mut buffer = BytesMut::with_capacity(16 * 1024);
|
||||||
|
let socket = TcpSocket::new_v4().context(StdIOSnafu)?;
|
||||||
|
|
||||||
|
socket
|
||||||
|
.bind(SocketAddr::new(specific_args.addr, specific_args.port))
|
||||||
|
.context(StdIOSnafu)?;
|
||||||
|
|
||||||
|
let listener = socket.listen(256).context(StdIOSnafu)?;
|
||||||
|
|
||||||
|
'tcp_loop: while let Ok((mut stream, socketaddr)) = listener.accept().await {
|
||||||
|
// Нужно для правильного применения рейт лимита
|
||||||
|
let mut actual_origin = socketaddr;
|
||||||
|
let mut proxy_data = None;
|
||||||
|
|
||||||
|
if specific_args.proxy_protocol {
|
||||||
|
tcp_proxy_handler(
|
||||||
|
buffer.clone(),
|
||||||
|
&mut stream,
|
||||||
|
&mut actual_origin,
|
||||||
|
&mut proxy_data,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
'protocol_loop: loop {
|
||||||
|
let len = stream.read(&mut buffer).await.context(StdIOSnafu)?;
|
||||||
|
|
||||||
|
match ingest_protocol::parser::parse_packet(&buffer) {
|
||||||
|
Ok(packet) => {
|
||||||
|
let resp = process_packet(client.clone(), packet, proxy_data.clone()).await;
|
||||||
|
let _ = stream.write(resp.as_bytes()).await;
|
||||||
|
let _ = stream.shutdown().await;
|
||||||
|
}
|
||||||
|
Err(err) if err == Error::Incomplete => {
|
||||||
|
continue 'protocol_loop;
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
stream
|
||||||
|
.write(format!("ERROR: {}", err).as_bytes())
|
||||||
|
.await
|
||||||
|
.context(StdIOSnafu)?;
|
||||||
|
continue 'tcp_loop;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn socketserv_udp(args: Cli, specific_args: SocketServerArgs, client: Client) {}
|
||||||
|
|
11
src/main.rs
11
src/main.rs
|
@ -1,6 +1,9 @@
|
||||||
#![doc = include_str!("../README.md")]
|
#![doc = include_str!("../README.md")]
|
||||||
|
#![feature(try_blocks)]
|
||||||
|
|
||||||
extern crate core;
|
extern crate core;
|
||||||
|
|
||||||
|
mod business_logic;
|
||||||
mod cli;
|
mod cli;
|
||||||
mod ingest_protocol;
|
mod ingest_protocol;
|
||||||
mod ingest_socket_server;
|
mod ingest_socket_server;
|
||||||
|
@ -14,11 +17,13 @@ use fred::prelude::{
|
||||||
ReconnectPolicy, Server, ServerConfig,
|
ReconnectPolicy, Server, ServerConfig,
|
||||||
};
|
};
|
||||||
use ingest_socket_server::socketserv_main;
|
use ingest_socket_server::socketserv_main;
|
||||||
|
use web_server::app_error::AppError;
|
||||||
|
|
||||||
use crate::web_server::server_main;
|
use crate::web_server::server_main;
|
||||||
|
|
||||||
|
#[snafu::report]
|
||||||
#[ntex::main]
|
#[ntex::main]
|
||||||
async fn main() {
|
async fn main() -> Result<(), AppError> {
|
||||||
let result = Cli::parse();
|
let result = Cli::parse();
|
||||||
|
|
||||||
let mut config = RedisConfig::default();
|
let mut config = RedisConfig::default();
|
||||||
|
@ -39,7 +44,9 @@ async fn main() {
|
||||||
server_main(result.clone(), specific_args.clone(), redis).await;
|
server_main(result.clone(), specific_args.clone(), redis).await;
|
||||||
}
|
}
|
||||||
MyCommand::SocketServer(specific_args) => {
|
MyCommand::SocketServer(specific_args) => {
|
||||||
socketserv_main(result.clone(), specific_args.clone(), redis).await;
|
socketserv_main(result.clone(), specific_args.clone(), redis).await?;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,9 +65,7 @@ impl<'de> Deserialize<'de> for EpochUTC {
|
||||||
) as f64)
|
) as f64)
|
||||||
.into());
|
.into());
|
||||||
}
|
}
|
||||||
Ok(
|
Ok(Epoch::from_unix_seconds(value.parse().map_err(de::Error::custom)?).into())
|
||||||
Epoch::from_unix_seconds(value.parse().map_err(de::Error::custom)?).into(),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -70,7 +70,7 @@ impl<'de> Deserialize<'de> for SupportedUnit {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Таблица преобразования текстового представления единиц в значения [SupportedUnit].
|
/// Таблица преобразования текстового представления единиц в значения [SupportedUnit].
|
||||||
static STR_TO_UNITS: phf::Map<&'static str, SupportedUnit> = phf_map! {
|
pub static STR_TO_UNITS: phf::Map<&'static str, SupportedUnit> = phf_map! {
|
||||||
"C" => SupportedUnit::Celsius,
|
"C" => SupportedUnit::Celsius,
|
||||||
"%" => SupportedUnit::Percentage,
|
"%" => SupportedUnit::Percentage,
|
||||||
"mmHg" => SupportedUnit::MillimeterHg,
|
"mmHg" => SupportedUnit::MillimeterHg,
|
||||||
|
@ -85,6 +85,12 @@ static STR_TO_UNITS: phf::Map<&'static str, SupportedUnit> = phf_map! {
|
||||||
"KWh" => SupportedUnit::KWh,
|
"KWh" => SupportedUnit::KWh,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub fn convert_to_arc<T: std::error::Error>(error: T) -> Arc<T> {
|
pub fn convert_to_arc<T: core::error::Error>(error: T) -> Arc<T> {
|
||||||
Arc::new(error)
|
Arc::new(error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn convert_to_arced_dynerror(
|
||||||
|
error: Option<Box<dyn core::error::Error>>,
|
||||||
|
) -> Option<Arc<dyn core::error::Error>> {
|
||||||
|
error.map(|el| Arc::from(el))
|
||||||
|
}
|
||||||
|
|
|
@ -8,42 +8,36 @@ use ntex::web::{HttpRequest, HttpResponse};
|
||||||
use snafu::Snafu;
|
use snafu::Snafu;
|
||||||
|
|
||||||
use crate::insert_header;
|
use crate::insert_header;
|
||||||
use crate::utils::convert_to_arc;
|
use crate::utils::{convert_to_arc, convert_to_arced_dynerror};
|
||||||
use crate::web_server::old_device_sensor_api::qs_parser::QSParserError;
|
use crate::web_server::old_device_sensor_api::qs_parser::QSParserError;
|
||||||
use rust_decimal::Decimal;
|
use rust_decimal::Decimal;
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
|
|
||||||
use super::old_device_sensor_api::qs_parser;
|
use super::old_device_sensor_api::qs_parser;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/// Главный объект ошибки [std::error::Error] для всего Web API.
|
/// Главный объект ошибки [std::error::Error] для всего Web API.
|
||||||
///
|
///
|
||||||
/// В целом, все Result у Web сервера должны использовать этот Error.
|
/// В целом, все Result у Web сервера должны использовать этот Error.
|
||||||
#[derive(Debug, Snafu, Clone)]
|
#[derive(Debug, Snafu)]
|
||||||
#[snafu(visibility(pub))]
|
#[snafu(visibility(pub))]
|
||||||
pub enum AppError {
|
pub enum AppError {
|
||||||
#[snafu(display("Could not read file"))]
|
#[snafu(display("Could not read file"))]
|
||||||
JsonError {
|
JsonError {
|
||||||
#[snafu(source(from(serde_json::Error, convert_to_arc::<serde_json::Error>)))]
|
#[snafu(source(from(serde_json::Error, convert_to_arc::<serde_json::Error>)))]
|
||||||
source: Arc<serde_json::Error>
|
source: Arc<serde_json::Error>,
|
||||||
},
|
},
|
||||||
|
|
||||||
#[snafu(display("Could not read file"))]
|
#[snafu(display("Could not read file"))]
|
||||||
QSError {
|
QSError { source: QSParserError },
|
||||||
source: QSParserError
|
|
||||||
},
|
|
||||||
|
|
||||||
#[snafu(display("Could not read file"))]
|
#[snafu(display("Could not read file"))]
|
||||||
ServerRedisError {
|
ServerRedisError { source: fred::error::Error },
|
||||||
source: fred::error::Error
|
|
||||||
},
|
#[snafu(display("Could not parse decimal"))]
|
||||||
|
Decimal { source: rust_decimal::Error },
|
||||||
|
|
||||||
#[snafu(display("Could not read file"))]
|
#[snafu(display("Could not read file"))]
|
||||||
UnknownMethod {
|
UnknownMethod { method: String },
|
||||||
method: String
|
|
||||||
},
|
|
||||||
|
|
||||||
#[snafu(display("Could not read file"))]
|
#[snafu(display("Could not read file"))]
|
||||||
RequestTooLarge,
|
RequestTooLarge,
|
||||||
|
@ -58,9 +52,10 @@ pub enum AppError {
|
||||||
},
|
},
|
||||||
|
|
||||||
#[snafu(display("UTF-8 Error"))]
|
#[snafu(display("UTF-8 Error"))]
|
||||||
Utf8Error {
|
Utf8Error { source: std::str::Utf8Error },
|
||||||
source: std::str::Utf8Error
|
|
||||||
},
|
#[snafu(display("String cannot be parced into a UUID"))]
|
||||||
|
Uuid { source: uuid::Error },
|
||||||
|
|
||||||
#[snafu(display("Could not read file"))]
|
#[snafu(display("Could not read file"))]
|
||||||
UnknownBody {
|
UnknownBody {
|
||||||
|
@ -69,12 +64,24 @@ pub enum AppError {
|
||||||
},
|
},
|
||||||
|
|
||||||
#[snafu(display("Could not read file"))]
|
#[snafu(display("Could not read file"))]
|
||||||
DeviceNotFound {
|
DeviceNotFound { mac: String },
|
||||||
mac: String
|
|
||||||
|
#[snafu(display("Std IO error"))]
|
||||||
|
StdIO {
|
||||||
|
#[snafu(source(from(std::io::Error, convert_to_arc::<std::io::Error>)))]
|
||||||
|
source: Arc<std::io::Error>,
|
||||||
},
|
},
|
||||||
|
|
||||||
#[snafu(display("Could not read file"))]
|
#[snafu(display("Could not read file"))]
|
||||||
TimeIsLongBehindNow
|
TimeIsLongBehindNow,
|
||||||
|
|
||||||
|
#[snafu(display("Could not read file"))]
|
||||||
|
#[snafu(whatever)]
|
||||||
|
Whatever {
|
||||||
|
#[snafu(source(from(Box<dyn std::error::Error>, Some)))]
|
||||||
|
source: Option<Box<dyn core::error::Error>>,
|
||||||
|
message: String,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl web::error::WebResponseError for AppError {
|
impl web::error::WebResponseError for AppError {
|
||||||
|
@ -90,7 +97,12 @@ impl web::error::WebResponseError for AppError {
|
||||||
AppError::UnknownBody { .. } => StatusCode::BAD_REQUEST,
|
AppError::UnknownBody { .. } => StatusCode::BAD_REQUEST,
|
||||||
AppError::QSError { .. } => StatusCode::BAD_REQUEST,
|
AppError::QSError { .. } => StatusCode::BAD_REQUEST,
|
||||||
AppError::DeviceNotFound { .. } => StatusCode::BAD_REQUEST,
|
AppError::DeviceNotFound { .. } => StatusCode::BAD_REQUEST,
|
||||||
AppError::TimeIsLongBehindNow { .. } => StatusCode::BAD_REQUEST
|
AppError::TimeIsLongBehindNow { .. } => StatusCode::BAD_REQUEST,
|
||||||
|
|
||||||
|
// Не знаю что лучше тут использовать
|
||||||
|
AppError::Whatever { .. } => StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
|
||||||
|
_ => StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,7 +120,8 @@ impl web::error::WebResponseError for AppError {
|
||||||
}
|
}
|
||||||
AppError::QSError { .. } => "UrlEncoded body or query params are incorrect",
|
AppError::QSError { .. } => "UrlEncoded body or query params are incorrect",
|
||||||
AppError::DeviceNotFound { .. } => "Device not found",
|
AppError::DeviceNotFound { .. } => "Device not found",
|
||||||
AppError::TimeIsLongBehindNow { .. } => "Time is long behind what it should be"
|
AppError::TimeIsLongBehindNow { .. } => "Time is long behind what it should be",
|
||||||
|
_ => "Internal server error. Please make sure to tell the devs they are stupid :p",
|
||||||
};
|
};
|
||||||
|
|
||||||
let status_code = self.status_code();
|
let status_code = self.status_code();
|
||||||
|
@ -121,7 +134,6 @@ impl web::error::WebResponseError for AppError {
|
||||||
let mut resp = HttpResponse::build(status_code).json(&body);
|
let mut resp = HttpResponse::build(status_code).json(&body);
|
||||||
let headers = resp.headers_mut();
|
let headers = resp.headers_mut();
|
||||||
|
|
||||||
|
|
||||||
match self {
|
match self {
|
||||||
AppError::JsonError { source } => {
|
AppError::JsonError { source } => {
|
||||||
insert_header!(headers, "X-Error-Line", source.line());
|
insert_header!(headers, "X-Error-Line", source.line());
|
||||||
|
@ -132,7 +144,7 @@ impl web::error::WebResponseError for AppError {
|
||||||
source.to_string().escape_default().collect::<String>()
|
source.to_string().escape_default().collect::<String>()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
AppError::UnknownMethod {method} => {
|
AppError::UnknownMethod { method } => {
|
||||||
insert_header!(
|
insert_header!(
|
||||||
headers,
|
headers,
|
||||||
"X-Unknown-Cmd",
|
"X-Unknown-Cmd",
|
||||||
|
|
|
@ -2,25 +2,23 @@ use crate::web_server::app_error::{self, AppError};
|
||||||
use crate::web_server::old_app_api::types::AppInitRequest;
|
use crate::web_server::old_app_api::types::AppInitRequest;
|
||||||
use crate::web_server::NMAppState;
|
use crate::web_server::NMAppState;
|
||||||
|
|
||||||
use serde_json::{json};
|
use serde_json::json;
|
||||||
use snafu::ResultExt;
|
use snafu::ResultExt;
|
||||||
|
|
||||||
|
|
||||||
use crate::insert_header;
|
use crate::insert_header;
|
||||||
use fred::interfaces::KeysInterface;
|
use fred::interfaces::KeysInterface;
|
||||||
use ntex::http::StatusCode;
|
use ntex::http::StatusCode;
|
||||||
use ntex::web;
|
use ntex::web;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
pub async fn app_init(
|
pub async fn app_init(
|
||||||
_body: AppInitRequest<'_>,
|
_body: AppInitRequest,
|
||||||
app_state: &NMAppState,
|
app_state: &NMAppState,
|
||||||
) -> Result<web::HttpResponse, AppError> {
|
) -> Result<web::HttpResponse, AppError> {
|
||||||
let _: () = app_state
|
let _: () = app_state
|
||||||
.redis_client
|
.redis_client
|
||||||
.set("test", 123, None, None, true)
|
.set("test", 123, None, None, true)
|
||||||
.await.context(app_error::ServerRedisSnafu)?;
|
.await
|
||||||
|
.context(app_error::ServerRedisSnafu)?;
|
||||||
|
|
||||||
Ok(web::HttpResponse::build(StatusCode::OK).body("Hello world!"))
|
Ok(web::HttpResponse::build(StatusCode::OK).body("Hello world!"))
|
||||||
}
|
}
|
||||||
|
|
|
@ -3,50 +3,66 @@
|
||||||
mod handlers;
|
mod handlers;
|
||||||
mod types;
|
mod types;
|
||||||
|
|
||||||
use ntex::web::types::State;
|
|
||||||
use ntex::util::Bytes;
|
|
||||||
use ntex::web;
|
|
||||||
use nom::AsBytes;
|
|
||||||
use snafu::ResultExt;
|
|
||||||
use crate::web_server::app_error::AppError;
|
use crate::web_server::app_error::AppError;
|
||||||
use crate::web_server::NMAppState;
|
|
||||||
use crate::web_server::old_app_api::handlers::{app_init, version};
|
use crate::web_server::old_app_api::handlers::{app_init, version};
|
||||||
use crate::web_server::old_app_api::types::{AppInitRequest, MandatoryParams};
|
use crate::web_server::old_app_api::types::{AppInitRequest, MandatoryParams};
|
||||||
use crate::web_server::utils::redis::is_api_key_valid;
|
use crate::web_server::utils::redis::is_api_key_valid;
|
||||||
|
use crate::web_server::NMAppState;
|
||||||
|
use ntex::util::Bytes;
|
||||||
|
use ntex::web;
|
||||||
|
use ntex::web::types::State;
|
||||||
|
use ntex::web::HttpRequest;
|
||||||
|
use snafu::{whatever, ResultExt};
|
||||||
|
|
||||||
use super::app_error;
|
use super::app_error;
|
||||||
|
|
||||||
|
|
||||||
/// Обработчик запросов от приложений.
|
/// Обработчик запросов от приложений.
|
||||||
///
|
///
|
||||||
/// Отвечает за разделение на функции по `cmd`.
|
/// Отвечает за разделение на функции по `cmd`.
|
||||||
///
|
///
|
||||||
/// Вызывается напрямую из ntex приложения.
|
/// Вызывается напрямую из ntex приложения.
|
||||||
pub async fn old_api_handler(
|
pub async fn old_api_handler(
|
||||||
|
request: HttpRequest,
|
||||||
app_state: State<NMAppState>,
|
app_state: State<NMAppState>,
|
||||||
body_bytes: Bytes,
|
body_bytes: Bytes,
|
||||||
) -> Result<impl web::Responder, AppError> {
|
) -> Result<impl web::Responder, AppError> {
|
||||||
|
let headers = request.headers();
|
||||||
|
|
||||||
if body_bytes.len() > 10 * 1024 {
|
if body_bytes.len() > 10 * 1024 {
|
||||||
// 10 KiB
|
// 10 KiB
|
||||||
return Err(AppError::RequestTooLarge);
|
return Err(AppError::RequestTooLarge);
|
||||||
}
|
}
|
||||||
|
|
||||||
let body_bytes = body_bytes.as_bytes();
|
let mandatory_params: MandatoryParams =
|
||||||
|
serde_json::from_slice(&body_bytes).context(app_error::JsonSnafu {})?; // TODO: Simd-JSON
|
||||||
|
|
||||||
let mandatory_params: MandatoryParams<'_> = serde_json::from_slice(body_bytes).context(app_error::JsonSnafu {})?; // TODO: Simd-JSON
|
// Тут все cmd которые могут быть вызваны без api ключа
|
||||||
|
if mandatory_params.cmd == "version" {
|
||||||
|
return version((), &app_state).await;
|
||||||
|
}
|
||||||
|
|
||||||
// Ignore clippy singlematch
|
let api_key: String;
|
||||||
if mandatory_params.cmd.as_ref() == "version" { return version((), &app_state).await }
|
|
||||||
|
|
||||||
is_api_key_valid(&app_state.redis_client, mandatory_params.api_key.as_ref()).await?;
|
if let Some(key) = mandatory_params.api_key {
|
||||||
|
api_key = key;
|
||||||
|
} else if let Some(key) = headers.get("Narodmon-Api-Key") {
|
||||||
|
api_key = key.to_str().with_whatever_context(|_| "asd")?.to_string();
|
||||||
|
} else {
|
||||||
|
whatever!("No API key found")
|
||||||
|
}
|
||||||
|
|
||||||
match mandatory_params.cmd.as_ref() {
|
is_api_key_valid(&app_state.redis_client, api_key).await?;
|
||||||
|
|
||||||
|
match mandatory_params.cmd.as_str() {
|
||||||
"appInit" => {
|
"appInit" => {
|
||||||
let body: AppInitRequest = serde_json::from_slice(body_bytes).context(app_error::JsonSnafu {})?;
|
let body: AppInitRequest =
|
||||||
|
serde_json::from_slice(&body_bytes).context(app_error::JsonSnafu {})?;
|
||||||
|
|
||||||
app_init(body, &app_state).await
|
app_init(body, &app_state).await
|
||||||
}
|
}
|
||||||
_ => Err(AppError::UnknownMethod { method: mandatory_params.cmd.to_string() }),
|
_ => Err(AppError::UnknownMethod {
|
||||||
|
method: mandatory_params.cmd.to_string(),
|
||||||
|
}),
|
||||||
}
|
}
|
||||||
|
|
||||||
//Ok("fuck")
|
//Ok("fuck")
|
||||||
|
|
|
@ -1,18 +1,16 @@
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use std::borrow::Cow;
|
use std::borrow::Cow;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
// fn<'de, D>(D) -> Result<T, D::Error> where D: Deserializer<'de>
|
// fn<'de, D>(D) -> Result<T, D::Error> where D: Deserializer<'de>
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct AppInitRequest<'a> {
|
pub struct AppInitRequest {
|
||||||
#[serde(borrow)]
|
pub version: String,
|
||||||
pub version: Cow<'a, str>,
|
|
||||||
|
|
||||||
#[serde(borrow)]
|
pub platform: String,
|
||||||
pub platform: Cow<'a, str>,
|
|
||||||
|
|
||||||
#[serde(borrow)]
|
pub model: String,
|
||||||
pub model: Cow<'a, str>,
|
|
||||||
|
|
||||||
pub width: u64,
|
pub width: u64,
|
||||||
}
|
}
|
||||||
|
@ -29,21 +27,20 @@ pub struct AddLikeRequest {
|
||||||
/// получить [MandatoryParams], другой в зависимости от `cmd`. Это позволяет не добвалять поля из
|
/// получить [MandatoryParams], другой в зависимости от `cmd`. Это позволяет не добвалять поля из
|
||||||
/// этой структуры в каждом специфичном типе.
|
/// этой структуры в каждом специфичном типе.
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct MandatoryParams<'a> {
|
pub struct MandatoryParams {
|
||||||
#[serde(borrow)]
|
pub cmd: String,
|
||||||
pub cmd: Cow<'a, str>,
|
|
||||||
|
|
||||||
#[serde(borrow)]
|
pub lang: String,
|
||||||
pub lang: Cow<'a, str>,
|
|
||||||
|
|
||||||
/// Уникальный ID клиента.
|
/// Уникальный ID клиента.
|
||||||
///
|
///
|
||||||
/// Используется на подобии как куки PHPSESSID в php.
|
/// Используется на подобии как куки PHPSESSID в php.
|
||||||
///
|
///
|
||||||
/// См. также: <https://www.php.net/manual/en/book.session.php>
|
/// См. также: <https://www.php.net/manual/en/book.session.php>
|
||||||
#[serde(borrow)]
|
pub uuid: Uuid,
|
||||||
pub uuid: Cow<'a, str>,
|
|
||||||
|
|
||||||
#[serde(borrow)]
|
/// API ключ приложения
|
||||||
pub api_key: Cow<'a, str>,
|
///
|
||||||
|
/// Может быть указан в теле запроса, а может быть и заголовке Narodmon-Api-Key.
|
||||||
|
pub api_key: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,17 +20,13 @@ use super::NMAppState;
|
||||||
#[snafu(visibility(pub))]
|
#[snafu(visibility(pub))]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
#[snafu(display("Device not found"))]
|
#[snafu(display("Device not found"))]
|
||||||
DeviceNotFound {
|
DeviceNotFound { mac: String },
|
||||||
mac: String
|
|
||||||
},
|
|
||||||
|
|
||||||
#[snafu(display("Time sent with the device is way too behind now"))]
|
#[snafu(display("Time sent with the device is way too behind now"))]
|
||||||
TimeIsLongBehindNow,
|
TimeIsLongBehindNow,
|
||||||
|
|
||||||
#[snafu(display("{source}"))]
|
#[snafu(display("{source}"))]
|
||||||
QSParser {
|
QSParser { source: QSParserError },
|
||||||
source: QSParserError
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Обработчик данных датчиков с устройств.
|
/// Обработчик данных датчиков с устройств.
|
||||||
|
|
|
@ -19,23 +19,19 @@ pub enum QSParserError {
|
||||||
#[snafu(display("asd"))]
|
#[snafu(display("asd"))]
|
||||||
SerdeQS {
|
SerdeQS {
|
||||||
#[snafu(source(from(serde_qs::Error, convert_to_arc::<serde_qs::Error>)))]
|
#[snafu(source(from(serde_qs::Error, convert_to_arc::<serde_qs::Error>)))]
|
||||||
source: Arc<serde_qs::Error>
|
source: Arc<serde_qs::Error>,
|
||||||
},
|
},
|
||||||
#[snafu(display("asd"))]
|
#[snafu(display("asd"))]
|
||||||
Parsing {
|
Parsing { context: String },
|
||||||
context: String
|
|
||||||
},
|
|
||||||
|
|
||||||
#[snafu(display("asd"))]
|
#[snafu(display("asd"))]
|
||||||
FloatP {
|
FloatP {
|
||||||
#[snafu(source)]
|
#[snafu(source)]
|
||||||
source: ParseFloatError
|
source: ParseFloatError,
|
||||||
},
|
},
|
||||||
|
|
||||||
#[snafu(display("failed to parse into decimal"))]
|
#[snafu(display("failed to parse into decimal"))]
|
||||||
DecimalParse {
|
DecimalParse { source: rust_decimal::Error },
|
||||||
source: rust_decimal::Error
|
|
||||||
},
|
|
||||||
|
|
||||||
#[snafu(display("asd"))]
|
#[snafu(display("asd"))]
|
||||||
NoMAC,
|
NoMAC,
|
||||||
|
@ -43,7 +39,9 @@ pub enum QSParserError {
|
||||||
|
|
||||||
impl From<Error<&str>> for QSParserError {
|
impl From<Error<&str>> for QSParserError {
|
||||||
fn from(value: Error<&str>) -> Self {
|
fn from(value: Error<&str>) -> Self {
|
||||||
QSParserError::Parsing { context: format!("{:?}", value)}
|
QSParserError::Parsing {
|
||||||
|
context: format!("{:?}", value),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -66,7 +64,7 @@ pub fn qs_rest_to_values(
|
||||||
for (key, value) in parsed {
|
for (key, value) in parsed {
|
||||||
hashset.insert(SensorValue {
|
hashset.insert(SensorValue {
|
||||||
mac: key,
|
mac: key,
|
||||||
value: Decimal::from_str(value.as_str()).context(DecimalParseSnafu{})?,
|
value: Decimal::from_str(value.as_str()).context(DecimalParseSnafu {})?,
|
||||||
|
|
||||||
time: None,
|
time: None,
|
||||||
unit: None,
|
unit: None,
|
||||||
|
@ -82,7 +80,9 @@ pub fn parse_decimal_if_exists(
|
||||||
key: &str,
|
key: &str,
|
||||||
) -> Result<Option<Decimal>, QSParserError> {
|
) -> Result<Option<Decimal>, QSParserError> {
|
||||||
if let Some(unwrapped_value) = parsed.remove(key) {
|
if let Some(unwrapped_value) = parsed.remove(key) {
|
||||||
Ok(Some(Decimal::from_str(unwrapped_value.as_str()).context(DecimalParseSnafu{})?))
|
Ok(Some(
|
||||||
|
Decimal::from_str(unwrapped_value.as_str()).context(DecimalParseSnafu {})?,
|
||||||
|
))
|
||||||
} else {
|
} else {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
|
@ -93,14 +93,16 @@ pub fn parse_epoch_if_exists(
|
||||||
key: &str,
|
key: &str,
|
||||||
) -> Result<Option<Epoch>, QSParserError> {
|
) -> Result<Option<Epoch>, QSParserError> {
|
||||||
if let Some(unwrapped_value) = parsed.remove(key) {
|
if let Some(unwrapped_value) = parsed.remove(key) {
|
||||||
Ok(Some(Epoch::from_unix_seconds(unwrapped_value.parse().context(FloatPSnafu{})?)))
|
Ok(Some(Epoch::from_unix_seconds(
|
||||||
|
unwrapped_value.parse().context(FloatPSnafu {})?,
|
||||||
|
)))
|
||||||
} else {
|
} else {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn parse_nm_qs_format(input: &str) -> Result<NMDeviceDataPacket, QSParserError> {
|
pub async fn parse_nm_qs_format(input: &str) -> Result<NMDeviceDataPacket, QSParserError> {
|
||||||
let mut parsed: HashMap<String, String> = serde_qs::from_str(input).context(SerdeQSSnafu{})?;
|
let mut parsed: HashMap<String, String> = serde_qs::from_str(input).context(SerdeQSSnafu {})?;
|
||||||
|
|
||||||
let (_, device_mac) = if let Some(id) = parsed.get("ID") {
|
let (_, device_mac) = if let Some(id) = parsed.get("ID") {
|
||||||
parse_mac_address(id)?
|
parse_mac_address(id)?
|
||||||
|
|
|
@ -1,18 +1,22 @@
|
||||||
//! Сборник утилит для работы с Redis.
|
//! Сборник утилит для работы с Redis.
|
||||||
|
|
||||||
use crate::web_server::app_error::{AppError, ServerRedisSnafu};
|
use crate::{
|
||||||
use fred::prelude::*;
|
uformat,
|
||||||
|
web_server::app_error::{AppError, ServerRedisSnafu},
|
||||||
|
};
|
||||||
use fred::clients::Client as RedisClient;
|
use fred::clients::Client as RedisClient;
|
||||||
|
use fred::prelude::*;
|
||||||
use heapless::String as HeaplessString;
|
use heapless::String as HeaplessString;
|
||||||
use lazy_static::lazy_static;
|
use lazy_static::lazy_static;
|
||||||
use regex::Regex;
|
use regex::Regex;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use snafu::ResultExt;
|
use snafu::ResultExt;
|
||||||
use ufmt::uwrite;
|
use ufmt::uwrite;
|
||||||
|
use uuid::Uuid;
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
/// Разрешённые знаки для API ключа.
|
/// Разрешённые знаки для API ключа.
|
||||||
static ref ALLOWED_API_KEY_CHARACTERS: Regex = Regex::new("[a-zA-Z0-9]{13}").unwrap();
|
static ref ALLOWED_API_KEY_CHARACTERS: Regex = Regex::new("[a-zA-Z0-9\\-]{13,36}").unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Описание полей в KV DB у `apikey_{}`.
|
/// Описание полей в KV DB у `apikey_{}`.
|
||||||
|
@ -25,18 +29,18 @@ pub struct ApiKeyDescription {
|
||||||
/// Проверка API ключа на валидность.
|
/// Проверка API ключа на валидность.
|
||||||
pub async fn is_api_key_valid(
|
pub async fn is_api_key_valid(
|
||||||
client: &RedisClient,
|
client: &RedisClient,
|
||||||
api_key: &str,
|
api_key: String,
|
||||||
) -> Result<ApiKeyDescription, AppError> {
|
) -> Result<ApiKeyDescription, AppError> {
|
||||||
if !ALLOWED_API_KEY_CHARACTERS.is_match(api_key) {
|
if !ALLOWED_API_KEY_CHARACTERS.is_match(&api_key) {
|
||||||
return Err(AppError::ApiKeyInvalid {
|
return Err(AppError::ApiKeyInvalid {
|
||||||
reason: "Invalid characters present in the API key.",
|
reason: "Invalid characters present in the API key.",
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut key_buffer = HeaplessString::<{ 7 + 13 }>::new();
|
let valid: Option<i64> = client
|
||||||
uwrite!(key_buffer, "apikey_{}", api_key).expect("TODO"); // TODO: Error handling
|
.hget(uformat!("apikey_{}", api_key), "owner")
|
||||||
|
.await
|
||||||
let valid: Option<i64> = client.hget(key_buffer.as_str(), "owner").await.context(ServerRedisSnafu)?;
|
.context(ServerRedisSnafu)?;
|
||||||
|
|
||||||
valid
|
valid
|
||||||
.map(|uid| ApiKeyDescription { apikey_owner: uid })
|
.map(|uid| ApiKeyDescription { apikey_owner: uid })
|
||||||
|
|
Loading…
Add table
Reference in a new issue