diff --git a/src/business_logic/device.rs b/src/business_logic/device.rs new file mode 100644 index 0000000..1f5c43a --- /dev/null +++ b/src/business_logic/device.rs @@ -0,0 +1,161 @@ +use std::str::FromStr; + +use fred::prelude::{Client as RedisClient, HashesInterface, KeysInterface}; +use hifitime::Epoch; +use rust_decimal::Decimal; +use snafu::ResultExt; +use uuid::{self, Uuid}; + +use crate::{ + ingest_protocol::NMDeviceDataPacket, + uformat, + web_server::{ + app_error::{self, AppError, DecimalSnafu, ServerRedisSnafu, UuidSnafu}, + old_device_sensor_api::qs_parser::DecimalParseSnafu, + }, +}; + +#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Deserialize, serde::Serialize)] +pub struct Device { + pub device_id: Uuid, + + /// ID владельца + pub owner: Uuid, + + pub lat: Option, + pub lon: Option, + pub alt: Option, + + /// Данные датчика могут быть переданы только по TLS + pub tls_only: bool, + + /// Данные датчика могут быть переданы только по Mutual TLS + pub mtls_only: bool, +} + +impl Device { + pub fn new(id: Uuid, owner: Uuid) -> Device { + Device { + device_id: id, + owner, + lat: None, + lon: None, + alt: None, + tls_only: false, + mtls_only: false, + } + } + + pub async fn get_by_id(redis: &RedisClient, id: Uuid) -> Result, AppError> { + let key = uformat!("devices_{}", id.to_string()); + + if !redis.exists(&[&key]).await.context(ServerRedisSnafu)? { + return Ok(None); + } + + return Ok(Some(Device { + device_id: id, + owner: redis + .hget::, _, _>(&key, "lat") + .await + .context(ServerRedisSnafu)? + .map(|el| Uuid::from_str(&el).context(UuidSnafu)) + .transpose()? + .unwrap(), + lat: redis + .hget::, _, _>(&key, "lat") + .await + .context(ServerRedisSnafu)? + .map(|el| el.parse().context(DecimalSnafu)) + .transpose()?, + lon: redis + .hget::, _, _>(&key, "lon") + .await + .context(ServerRedisSnafu)? + .map(|el| el.parse().context(DecimalSnafu)) + .transpose()?, + alt: redis + .hget::, _, _>(&key, "alt") + .await + .context(ServerRedisSnafu)? + .map(|el| el.parse().context(DecimalSnafu)) + .transpose()?, + tls_only: redis + .hget::, _, _>(&key, "tls_only") + .await + .context(ServerRedisSnafu)? + .unwrap(), + mtls_only: redis + .hget::, _, _>(&key, "mtls_only") + .await + .context(ServerRedisSnafu)? + .unwrap(), + })); + } + + pub async fn save_to_db( + packet: &NMDeviceDataPacket, + redis: &RedisClient, + ) -> Result<(), AppError> { + let device_mac_enc = hex::encode(packet.mac); + let device_id_key = uformat!("devices_mac{}", &device_mac_enc); + + let device_id: Option = redis.get(device_id_key).await.context(ServerRedisSnafu)?; + + let device_id = device_id.ok_or_else(|| AppError::DeviceNotFound { + mac: device_mac_enc.clone(), + })?; + + let now = Epoch::now().unwrap().into(); + let mut device_time = packet.time.unwrap_or(now); + + // TODO: Добавить гистерезис + // Отчёт совместимости: отсутствует + if device_time > now { + device_time = now; + } + + let device_tai_timestamp = device_time.0.to_duration_since_j1900().to_seconds(); + + let key = uformat!("devices_{}", device_id); + + let device_exists: Option = redis + .hget(key.as_str(), "exists") + .await + .context(ServerRedisSnafu)?; + + if !device_exists.is_some_and(|v| v) { + return Err(AppError::DeviceNotFound { + mac: hex::encode(packet.mac), + }); + } + + // devices_{device_id}_{tai_timestamp}_{sensor_id} + for sensor in &packet.values { + let key = uformat!( + "devices_{}_sensor{}_{}", + device_id, + sensor.mac, + device_tai_timestamp.to_string(), + ); + + () = redis + .set(key.as_str(), sensor.value.to_string(), None, None, false) + .await + .context(ServerRedisSnafu)?; + } + + if let Some(commands) = &packet.commands { + for (cmd_key, cmd_value) in commands { + let key = uformat!("devices_{}_cmds_{}", device_id, cmd_key); + + () = redis + .set(key.as_str(), cmd_value, None, None, false) + .await + .context(ServerRedisSnafu)?; + } + } + + Ok(()) + } +} diff --git a/src/business_logic/mod.rs b/src/business_logic/mod.rs new file mode 100644 index 0000000..4a2a358 --- /dev/null +++ b/src/business_logic/mod.rs @@ -0,0 +1,37 @@ +use bytes::Bytes; +use device::Device; +use fred::prelude::Client; +use ntex::web::guard::Header; +use ppp::v2::Header; +use snafu::ResultExt; + +pub mod device; + +use crate::{ + ingest_protocol::NMDeviceDataPacket, + web_server::app_error::{AppError, ServerRedisSnafu}, +}; + +pub async fn process_packet( + r_client: Client, + packet: NMDeviceDataPacket, + proxy_header: Option, +) -> String { + let proxy_ref = proxy_header.as_ref().map(|el| el.as_ref()); + let proxy_header_parsed = proxy_ref.map(|el| Header::try_from(el).unwrap()); + // proxy_header_parsed.unwrap().tlvs() + + let asd: Result = (|| async { + // TODO: Auth code here. Check if tls only is enabled. + + Device::save_to_db(&packet, &r_client).await?; + + return Ok("".to_string()); + })() + .await; + + return match asd { + Ok(resp) => resp, + Err(err) => format!("ERROR: {}", err), + }; +} diff --git a/src/business_logic/user.rs b/src/business_logic/user.rs new file mode 100644 index 0000000..e69de29 diff --git a/src/ingest_protocol/packet_types.rs b/src/ingest_protocol/packet_types.rs index de8e5c8..723aeeb 100644 --- a/src/ingest_protocol/packet_types.rs +++ b/src/ingest_protocol/packet_types.rs @@ -6,6 +6,7 @@ // #[serde(rename = "...")] // #[serde(alias = "...")] +use crate::business_logic::device::Device; use crate::ingest_protocol::error::Error; use crate::ingest_protocol::parser::parse_mac_address; use crate::uformat; @@ -98,64 +99,6 @@ pub struct NMDeviceDataPacket { pub time: Option, } -impl NMDeviceDataPacket { - pub async fn save_to_db(&self, redis: &RedisClient) -> Result<(), AppError> { - let device_mac_enc = hex::encode(self.mac); - - let now = Epoch::now().unwrap().into(); - let mut device_time = self.time.unwrap_or(now); - - // TODO: Добавить гистерезис - // Отчёт совместимости: отсутствует - if device_time > now { - device_time = now; - } - - let device_tai_timestamp = device_time.0.to_duration_since_j1900().to_seconds(); - - let key = uformat!("devices_{}", device_mac_enc); - - let device_exists: Option = redis - .hget(key.as_str(), "exists") - .await - .context(ServerRedisSnafu)?; - - if !device_exists.is_some_and(|v| v) { - return Err(AppError::DeviceNotFound { - mac: hex::encode(self.mac), - }); - } - - // devices_{device_id}_{tai_timestamp}_{sensor_id} - for sensor in &self.values { - let key = uformat!( - "devices_{}_{}_{}", - device_mac_enc, - device_tai_timestamp.to_string(), - sensor.mac - ); - - redis - .set(key.as_str(), sensor.value.to_string(), None, None, false) - .await - .context(ServerRedisSnafu)?; - } - - if let Some(commands) = &self.commands { - for (cmd_key, cmd_value) in commands { - let key = uformat!("devices_{}_cmds_{}", device_mac_enc, cmd_key); - - redis - .set(key.as_str(), cmd_value, None, None, false) - .await - .context(ServerRedisSnafu)?; - } - } - - Ok(()) - } -} - #[derive(Debug, Clone, Default, PartialEq, Deserialize)] pub struct NMJsonPacket { pub devices: Vec, @@ -164,7 +107,7 @@ pub struct NMJsonPacket { impl NMJsonPacket { pub async fn save_to_db(&self, redis: &RedisClient) -> Result<(), AppError> { for device in &self.devices { - device.save_to_db(redis).await?; + Device::save_to_db(&device, redis).await?; } Ok(()) diff --git a/src/main.rs b/src/main.rs index 26a4243..cc3fa32 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,7 @@ #![doc = include_str!("../README.md")] extern crate core; +mod business_logic; mod cli; mod ingest_protocol; mod ingest_socket_server;