dev-nm17-2 #25
5 changed files with 201 additions and 59 deletions
161
src/business_logic/device.rs
Normal file
161
src/business_logic/device.rs
Normal file
|
@ -0,0 +1,161 @@
|
||||||
|
use std::str::FromStr;
|
||||||
|
|
||||||
|
use fred::prelude::{Client as RedisClient, HashesInterface, KeysInterface};
|
||||||
|
use hifitime::Epoch;
|
||||||
|
use rust_decimal::Decimal;
|
||||||
|
use snafu::ResultExt;
|
||||||
|
use uuid::{self, Uuid};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
ingest_protocol::NMDeviceDataPacket,
|
||||||
|
uformat,
|
||||||
|
web_server::{
|
||||||
|
app_error::{self, AppError, DecimalSnafu, ServerRedisSnafu, UuidSnafu},
|
||||||
|
old_device_sensor_api::qs_parser::DecimalParseSnafu,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Hash, serde::Deserialize, serde::Serialize)]
|
||||||
|
pub struct Device {
|
||||||
|
pub device_id: Uuid,
|
||||||
|
|
||||||
|
/// ID владельца
|
||||||
|
pub owner: Uuid,
|
||||||
|
|
||||||
|
pub lat: Option<Decimal>,
|
||||||
|
pub lon: Option<Decimal>,
|
||||||
|
pub alt: Option<Decimal>,
|
||||||
|
|
||||||
|
/// Данные датчика могут быть переданы только по TLS
|
||||||
|
pub tls_only: bool,
|
||||||
|
|
||||||
|
/// Данные датчика могут быть переданы только по Mutual TLS
|
||||||
|
pub mtls_only: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Device {
|
||||||
|
pub fn new(id: Uuid, owner: Uuid) -> Device {
|
||||||
|
Device {
|
||||||
|
device_id: id,
|
||||||
|
owner,
|
||||||
|
lat: None,
|
||||||
|
lon: None,
|
||||||
|
alt: None,
|
||||||
|
tls_only: false,
|
||||||
|
mtls_only: false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_by_id(redis: &RedisClient, id: Uuid) -> Result<Option<Device>, AppError> {
|
||||||
|
let key = uformat!("devices_{}", id.to_string());
|
||||||
|
|
||||||
|
if !redis.exists(&[&key]).await.context(ServerRedisSnafu)? {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
return Ok(Some(Device {
|
||||||
|
device_id: id,
|
||||||
|
owner: redis
|
||||||
|
.hget::<Option<String>, _, _>(&key, "lat")
|
||||||
|
.await
|
||||||
|
.context(ServerRedisSnafu)?
|
||||||
|
.map(|el| Uuid::from_str(&el).context(UuidSnafu))
|
||||||
|
.transpose()?
|
||||||
|
.unwrap(),
|
||||||
|
lat: redis
|
||||||
|
.hget::<Option<String>, _, _>(&key, "lat")
|
||||||
|
.await
|
||||||
|
.context(ServerRedisSnafu)?
|
||||||
|
.map(|el| el.parse().context(DecimalSnafu))
|
||||||
|
.transpose()?,
|
||||||
|
lon: redis
|
||||||
|
.hget::<Option<String>, _, _>(&key, "lon")
|
||||||
|
.await
|
||||||
|
.context(ServerRedisSnafu)?
|
||||||
|
.map(|el| el.parse().context(DecimalSnafu))
|
||||||
|
.transpose()?,
|
||||||
|
alt: redis
|
||||||
|
.hget::<Option<String>, _, _>(&key, "alt")
|
||||||
|
.await
|
||||||
|
.context(ServerRedisSnafu)?
|
||||||
|
.map(|el| el.parse().context(DecimalSnafu))
|
||||||
|
.transpose()?,
|
||||||
|
tls_only: redis
|
||||||
|
.hget::<Option<bool>, _, _>(&key, "tls_only")
|
||||||
|
.await
|
||||||
|
.context(ServerRedisSnafu)?
|
||||||
|
.unwrap(),
|
||||||
|
mtls_only: redis
|
||||||
|
.hget::<Option<bool>, _, _>(&key, "mtls_only")
|
||||||
|
.await
|
||||||
|
.context(ServerRedisSnafu)?
|
||||||
|
.unwrap(),
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn save_to_db(
|
||||||
|
packet: &NMDeviceDataPacket,
|
||||||
|
redis: &RedisClient,
|
||||||
|
) -> Result<(), AppError> {
|
||||||
|
let device_mac_enc = hex::encode(packet.mac);
|
||||||
|
let device_id_key = uformat!("devices_mac{}", &device_mac_enc);
|
||||||
|
|
||||||
|
let device_id: Option<String> = redis.get(device_id_key).await.context(ServerRedisSnafu)?;
|
||||||
|
|
||||||
|
let device_id = device_id.ok_or_else(|| AppError::DeviceNotFound {
|
||||||
|
mac: device_mac_enc.clone(),
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let now = Epoch::now().unwrap().into();
|
||||||
|
let mut device_time = packet.time.unwrap_or(now);
|
||||||
|
|
||||||
|
// TODO: Добавить гистерезис
|
||||||
|
// Отчёт совместимости: отсутствует
|
||||||
|
if device_time > now {
|
||||||
|
device_time = now;
|
||||||
|
}
|
||||||
|
|
||||||
|
let device_tai_timestamp = device_time.0.to_duration_since_j1900().to_seconds();
|
||||||
|
|
||||||
|
let key = uformat!("devices_{}", device_id);
|
||||||
|
|
||||||
|
let device_exists: Option<bool> = redis
|
||||||
|
.hget(key.as_str(), "exists")
|
||||||
|
.await
|
||||||
|
.context(ServerRedisSnafu)?;
|
||||||
|
|
||||||
|
if !device_exists.is_some_and(|v| v) {
|
||||||
|
return Err(AppError::DeviceNotFound {
|
||||||
|
mac: hex::encode(packet.mac),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// devices_{device_id}_{tai_timestamp}_{sensor_id}
|
||||||
|
for sensor in &packet.values {
|
||||||
|
let key = uformat!(
|
||||||
|
"devices_{}_sensor{}_{}",
|
||||||
|
device_id,
|
||||||
|
sensor.mac,
|
||||||
|
device_tai_timestamp.to_string(),
|
||||||
|
);
|
||||||
|
|
||||||
|
() = redis
|
||||||
|
.set(key.as_str(), sensor.value.to_string(), None, None, false)
|
||||||
|
.await
|
||||||
|
.context(ServerRedisSnafu)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(commands) = &packet.commands {
|
||||||
|
for (cmd_key, cmd_value) in commands {
|
||||||
|
let key = uformat!("devices_{}_cmds_{}", device_id, cmd_key);
|
||||||
|
|
||||||
|
() = redis
|
||||||
|
.set(key.as_str(), cmd_value, None, None, false)
|
||||||
|
.await
|
||||||
|
.context(ServerRedisSnafu)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
37
src/business_logic/mod.rs
Normal file
37
src/business_logic/mod.rs
Normal file
|
@ -0,0 +1,37 @@
|
||||||
|
use bytes::Bytes;
|
||||||
|
use device::Device;
|
||||||
|
use fred::prelude::Client;
|
||||||
|
use ntex::web::guard::Header;
|
||||||
|
use ppp::v2::Header;
|
||||||
|
use snafu::ResultExt;
|
||||||
|
|
||||||
|
pub mod device;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
ingest_protocol::NMDeviceDataPacket,
|
||||||
|
web_server::app_error::{AppError, ServerRedisSnafu},
|
||||||
|
};
|
||||||
|
|
||||||
|
pub async fn process_packet(
|
||||||
|
r_client: Client,
|
||||||
|
packet: NMDeviceDataPacket,
|
||||||
|
proxy_header: Option<Bytes>,
|
||||||
|
) -> String {
|
||||||
|
let proxy_ref = proxy_header.as_ref().map(|el| el.as_ref());
|
||||||
|
let proxy_header_parsed = proxy_ref.map(|el| Header::try_from(el).unwrap());
|
||||||
|
// proxy_header_parsed.unwrap().tlvs()
|
||||||
|
|
||||||
|
let asd: Result<String, AppError> = (|| async {
|
||||||
|
// TODO: Auth code here. Check if tls only is enabled.
|
||||||
|
|
||||||
|
Device::save_to_db(&packet, &r_client).await?;
|
||||||
|
|
||||||
|
return Ok("".to_string());
|
||||||
|
})()
|
||||||
|
.await;
|
||||||
|
|
||||||
|
return match asd {
|
||||||
|
Ok(resp) => resp,
|
||||||
|
Err(err) => format!("ERROR: {}", err),
|
||||||
|
};
|
||||||
|
}
|
0
src/business_logic/user.rs
Normal file
0
src/business_logic/user.rs
Normal file
|
@ -6,6 +6,7 @@
|
||||||
// #[serde(rename = "...")]
|
// #[serde(rename = "...")]
|
||||||
// #[serde(alias = "...")]
|
// #[serde(alias = "...")]
|
||||||
|
|
||||||
|
use crate::business_logic::device::Device;
|
||||||
use crate::ingest_protocol::error::Error;
|
use crate::ingest_protocol::error::Error;
|
||||||
use crate::ingest_protocol::parser::parse_mac_address;
|
use crate::ingest_protocol::parser::parse_mac_address;
|
||||||
use crate::uformat;
|
use crate::uformat;
|
||||||
|
@ -98,64 +99,6 @@ pub struct NMDeviceDataPacket {
|
||||||
pub time: Option<EpochUTC>,
|
pub time: Option<EpochUTC>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NMDeviceDataPacket {
|
|
||||||
pub async fn save_to_db(&self, redis: &RedisClient) -> Result<(), AppError> {
|
|
||||||
let device_mac_enc = hex::encode(self.mac);
|
|
||||||
|
|
||||||
let now = Epoch::now().unwrap().into();
|
|
||||||
let mut device_time = self.time.unwrap_or(now);
|
|
||||||
|
|
||||||
// TODO: Добавить гистерезис
|
|
||||||
// Отчёт совместимости: отсутствует
|
|
||||||
if device_time > now {
|
|
||||||
device_time = now;
|
|
||||||
}
|
|
||||||
|
|
||||||
let device_tai_timestamp = device_time.0.to_duration_since_j1900().to_seconds();
|
|
||||||
|
|
||||||
let key = uformat!("devices_{}", device_mac_enc);
|
|
||||||
|
|
||||||
let device_exists: Option<bool> = redis
|
|
||||||
.hget(key.as_str(), "exists")
|
|
||||||
.await
|
|
||||||
.context(ServerRedisSnafu)?;
|
|
||||||
|
|
||||||
if !device_exists.is_some_and(|v| v) {
|
|
||||||
return Err(AppError::DeviceNotFound {
|
|
||||||
mac: hex::encode(self.mac),
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
// devices_{device_id}_{tai_timestamp}_{sensor_id}
|
|
||||||
for sensor in &self.values {
|
|
||||||
let key = uformat!(
|
|
||||||
"devices_{}_{}_{}",
|
|
||||||
device_mac_enc,
|
|
||||||
device_tai_timestamp.to_string(),
|
|
||||||
sensor.mac
|
|
||||||
);
|
|
||||||
|
|
||||||
redis
|
|
||||||
.set(key.as_str(), sensor.value.to_string(), None, None, false)
|
|
||||||
.await
|
|
||||||
.context(ServerRedisSnafu)?;
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(commands) = &self.commands {
|
|
||||||
for (cmd_key, cmd_value) in commands {
|
|
||||||
let key = uformat!("devices_{}_cmds_{}", device_mac_enc, cmd_key);
|
|
||||||
|
|
||||||
redis
|
|
||||||
.set(key.as_str(), cmd_value, None, None, false)
|
|
||||||
.await
|
|
||||||
.context(ServerRedisSnafu)?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Default, PartialEq, Deserialize)]
|
#[derive(Debug, Clone, Default, PartialEq, Deserialize)]
|
||||||
pub struct NMJsonPacket {
|
pub struct NMJsonPacket {
|
||||||
pub devices: Vec<NMDeviceDataPacket>,
|
pub devices: Vec<NMDeviceDataPacket>,
|
||||||
|
@ -164,7 +107,7 @@ pub struct NMJsonPacket {
|
||||||
impl NMJsonPacket {
|
impl NMJsonPacket {
|
||||||
pub async fn save_to_db(&self, redis: &RedisClient) -> Result<(), AppError> {
|
pub async fn save_to_db(&self, redis: &RedisClient) -> Result<(), AppError> {
|
||||||
for device in &self.devices {
|
for device in &self.devices {
|
||||||
device.save_to_db(redis).await?;
|
Device::save_to_db(&device, redis).await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
#![doc = include_str!("../README.md")]
|
#![doc = include_str!("../README.md")]
|
||||||
extern crate core;
|
extern crate core;
|
||||||
|
|
||||||
|
mod business_logic;
|
||||||
mod cli;
|
mod cli;
|
||||||
mod ingest_protocol;
|
mod ingest_protocol;
|
||||||
mod ingest_socket_server;
|
mod ingest_socket_server;
|
||||||
|
|
Loading…
Add table
Reference in a new issue