From 6090b2b47f9fad03e773722696679c7ec154a3b7 Mon Sep 17 00:00:00 2001 From: nm17 Date: Sat, 1 Jun 2024 00:57:44 +0400 Subject: [PATCH] fix: add proper epoch serialization --- src/ingest_protocol/packet_types.rs | 9 ++- src/ingest_protocol/parser.rs | 2 +- src/ingest_protocol/tests/mod.rs | 13 ++-- src/main.rs | 3 +- src/utils/hifitime_serde.rs | 64 +++++++++++++++++++ src/{utils.rs => utils/mod.rs} | 14 ++-- src/web_server/old_device_sensor_api/mod.rs | 7 +- .../old_device_sensor_api/qs_parser.rs | 18 +----- 8 files changed, 95 insertions(+), 35 deletions(-) create mode 100644 src/utils/hifitime_serde.rs rename src/{utils.rs => utils/mod.rs} (93%) diff --git a/src/ingest_protocol/packet_types.rs b/src/ingest_protocol/packet_types.rs index e1c07b5..3e3ad5d 100644 --- a/src/ingest_protocol/packet_types.rs +++ b/src/ingest_protocol/packet_types.rs @@ -6,11 +6,10 @@ // #[serde(rename = "...")] // #[serde(alias = "...")] -use crate::utils::SupportedUnit; +use crate::utils::{EpochUTC, SupportedUnit}; use crate::ingest_protocol::error::Error; use crate::ingest_protocol::parser::parse_mac_address; -use hifitime::Epoch; use rust_decimal::Decimal; use serde::{Deserialize, Serialize}; use serde_with::serde_as; @@ -33,7 +32,7 @@ use std::hash::{Hash, Hasher}; pub struct SensorValue { pub mac: String, pub value: Decimal, - pub time: Option, + pub time: Option, pub unit: Option, pub name: Option, } @@ -59,7 +58,7 @@ serde_with::serde_conv!( ); #[serde_as] -#[derive(Debug, Clone, Default, PartialEq, Eq, Deserialize)] +#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] pub struct NMDeviceDataPacket { #[serde(alias = "ID")] #[serde_as(as = "MacAsArray")] @@ -80,7 +79,7 @@ pub struct NMDeviceDataPacket { /// - Если в values есть хотябы один times и этот time не None => Игнорируем этот time /// - Если time нет у values => используем этот time (если он None, то соотв. считаем что информации о времени нет) /// TODO: В базе всё должно храниться как секунды по TAI спустя J1900 для возможности сортировки по времени позже. - pub time: Option, + pub time: Option, } #[derive(Debug, Clone, Default, PartialEq, Deserialize)] diff --git a/src/ingest_protocol/parser.rs b/src/ingest_protocol/parser.rs index ffc4133..81d557c 100644 --- a/src/ingest_protocol/parser.rs +++ b/src/ingest_protocol/parser.rs @@ -94,7 +94,7 @@ pub fn parse_packet_body<'a>( packet.values.insert(SensorValue { mac: sensor_mac.into(), value: Decimal::from_str(sensor_value)?, - time: sensor_time, // TODO + time: sensor_time.map(|v| v.into()), // TODO unit: None, name: sensor_name.map(|v| v.to_string()), }); diff --git a/src/ingest_protocol/tests/mod.rs b/src/ingest_protocol/tests/mod.rs index 12e5967..278926b 100644 --- a/src/ingest_protocol/tests/mod.rs +++ b/src/ingest_protocol/tests/mod.rs @@ -1,3 +1,4 @@ +use hifitime::Epoch; use crate::ingest_protocol::parser::{parse_mac_address, parse_packet}; #[test] @@ -5,7 +6,7 @@ fn test_asd() { let asd = r#" #A2-C6-47-01-DF-E1#Метео #OWNER#unknown -#T1#13.44#Outdoor +#T1#13.44#1000000#Outdoor #T2#27.74#Indoor #P1#691.02#Barometer #LAT#55.738178 @@ -14,14 +15,18 @@ fn test_asd() { ## "# .trim(); - dbg!(parse_packet(asd)); + let packet = parse_packet(asd).unwrap().1; + assert!(serde_json::to_string_pretty(&packet).unwrap() + .contains(r#""time": 1000000.0"#)); } #[test] fn test_mac() { - //assert_eq!(parse_mac_address("12-34-AA-12-55-AA"), Ok(("", [18, 52, 170, 18, 85, 170])) ); + assert_eq!(parse_mac_address("12-34-AA-12-55-AA").unwrap(), ("", [18, 52, 170, 18, 85, 170]) ); - println!("{:?}", parse_mac_address("12-34-AA-12-55-AA")); + dbg!(Epoch::now().unwrap().to_unix_seconds()); + + //println!("{:?}", parse_mac_address("12-34-AA-12-55-AA")); } #[test] diff --git a/src/main.rs b/src/main.rs index 5539a55..2b5fb61 100644 --- a/src/main.rs +++ b/src/main.rs @@ -3,9 +3,10 @@ #![feature(try_blocks)] extern crate core; -mod utils; mod ingest_protocol; mod web_server; +mod utils; + use crate::web_server::server_main; struct Params {} diff --git a/src/utils/hifitime_serde.rs b/src/utils/hifitime_serde.rs new file mode 100644 index 0000000..75044bb --- /dev/null +++ b/src/utils/hifitime_serde.rs @@ -0,0 +1,64 @@ +use std::fmt; +use std::fmt::{Formatter, Write}; +use hifitime::Epoch; +use serde::{de, Deserialize, Deserializer, Serialize, Serializer}; + +#[derive(PartialOrd, PartialEq, Ord, Eq, Clone, Copy, Debug, Default)] +#[repr(transparent)] +pub struct EpochUTC(pub Epoch); + +impl From for EpochUTC { + #[inline] + fn from(value: Epoch) -> Self { + EpochUTC(value) + } +} + +impl Serialize for EpochUTC { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + serializer.serialize_f64(self.0.to_unix_seconds()) + } +} + +impl<'de> Deserialize<'de> for EpochUTC { + fn deserialize(deserializer: D) -> Result + where + D: Deserializer<'de>, + { + struct EpochVisitor; + + impl<'de> de::Visitor<'de> for EpochVisitor { + type Value = EpochUTC; + + fn expecting(&self, formatter: &mut Formatter) -> fmt::Result { + formatter.write_str("a string or a float") + } + + fn visit_i64(self, v: i64) -> Result where E: de::Error { + return Ok(Epoch::from_unix_seconds(v as f64).into()) + } + + fn visit_f32(self, v: f32) -> Result where E: de::Error { + return Ok(Epoch::from_unix_seconds(v as f64).into()) + } + + fn visit_f64(self, v: f64) -> Result where E: de::Error { + return Ok(Epoch::from_unix_seconds(v).into()) + } + + fn visit_str(self, value: &str) -> Result + where + E: de::Error, + { + return Ok(Epoch::from_unix_seconds( + value.parse().map_err(de::Error::custom)? + ).into()) + } + } + + deserializer.deserialize_any(EpochVisitor) + } +} \ No newline at end of file diff --git a/src/utils.rs b/src/utils/mod.rs similarity index 93% rename from src/utils.rs rename to src/utils/mod.rs index 0f4e19a..9a91b6b 100644 --- a/src/utils.rs +++ b/src/utils/mod.rs @@ -2,10 +2,14 @@ //! +mod hifitime_serde; + use phf::phf_map; use serde::{Deserialize, Deserializer, Serialize, Serializer}; use std::borrow::Cow; +pub use hifitime_serde::EpochUTC; + /// Поддерживаемые типы. /// @@ -31,8 +35,8 @@ pub enum SupportedUnit { impl Serialize for SupportedUnit { fn serialize(&self, serializer: S) -> Result - where - S: Serializer, + where + S: Serializer, { serializer.serialize_str(match self { SupportedUnit::Celsius => "C", @@ -54,8 +58,8 @@ impl Serialize for SupportedUnit { impl<'de> Deserialize<'de> for SupportedUnit { fn deserialize<'a, D>(deserializer: D) -> Result - where - D: Deserializer<'de>, + where + D: Deserializer<'de>, { let color_str = Cow::<'a, str>::deserialize(deserializer)?; match STR_TO_UNITS.get(color_str.as_ref()) { @@ -78,5 +82,5 @@ static STR_TO_UNITS: phf::Map<&'static str, SupportedUnit> = phf_map! { "V" => SupportedUnit::Volts, "W" => SupportedUnit::Watts, "s" => SupportedUnit::Seconds, - "KWh" => SupportedUnit::Seconds, + "KWh" => SupportedUnit::KWh, }; diff --git a/src/web_server/old_device_sensor_api/mod.rs b/src/web_server/old_device_sensor_api/mod.rs index 9a140df..6eddb27 100644 --- a/src/web_server/old_device_sensor_api/mod.rs +++ b/src/web_server/old_device_sensor_api/mod.rs @@ -6,9 +6,8 @@ use crate::ingest_protocol::{NMDeviceDataPacket, NMJsonPacket}; use crate::web_server::app_error::AppError; -use fred::types::RedisMap; use ntex::http::{HttpMessage, StatusCode}; -use ntex::util::{Bytes, HashMap}; +use ntex::util::Bytes; use ntex::{http, web}; use fred::prelude::*; use hifitime::Epoch; @@ -80,7 +79,7 @@ pub async fn device_handler<'a>( for device in real_body.devices { let mut device_key_str = String::new(); - let now = Epoch::now().unwrap(); + let now = Epoch::now().unwrap().into(); let mut device_time = device.time .unwrap_or(now); @@ -90,7 +89,7 @@ pub async fn device_handler<'a>( device_time = now; } - let device_tai_timestamp = device_time + let device_tai_timestamp = device_time.0 .to_duration_since_j1900() .to_seconds(); diff --git a/src/web_server/old_device_sensor_api/qs_parser.rs b/src/web_server/old_device_sensor_api/qs_parser.rs index 1bf1e3e..e0524d5 100644 --- a/src/web_server/old_device_sensor_api/qs_parser.rs +++ b/src/web_server/old_device_sensor_api/qs_parser.rs @@ -85,32 +85,20 @@ pub fn parse_epoch_if_exists(parsed: &mut HashMap, key: &str) -> pub async fn parse_nm_qs_format(input: &str) -> Result { let mut parsed: HashMap = serde_qs::from_str(input)?; - let (_, _device_mac) = if let Some(id) = parsed.get("ID") { + let (_, device_mac) = if let Some(id) = parsed.get("ID") { parse_mac_address(id)? } else { return Err(QSParserError::NoMAC); }; - for keys in parsed.keys() { - if keys.starts_with("") { - - } - } - - let lat = if let Some(lat) = parsed.remove("lat") { - Some(Decimal::from_str(lat.as_str())?) - } else { - None - }; - let device_data = NMDeviceDataPacket { - mac: _device_mac, + mac: device_mac, name: parsed.remove("name").map(|v| v.to_owned()), owner: parsed.remove("owner").map(|v| v.to_owned()), lat: parse_decimal_if_exists(&mut parsed, "lat")?, lon: parse_decimal_if_exists(&mut parsed, "lon")?, alt: parse_decimal_if_exists(&mut parsed, "alt")?, - time: parse_epoch_if_exists(&mut parsed, "time")?, + time: parse_epoch_if_exists(&mut parsed, "time")?.map(|v| v.into()), values: qs_rest_to_values(parsed)?, };