fix(ingest_protocol): slight rework

This commit is contained in:
Даниил 2025-02-24 19:53:24 +04:00
parent 1559122219
commit 7e024a79fa
Signed by: nm17
GPG key ID: 3303B70C59145CD4
5 changed files with 83 additions and 24 deletions

View file

@ -1,7 +1,7 @@
use snafu::Snafu;
/// Тип ошибки Ingest протокола
#[derive(Debug, Snafu)]
#[derive(Debug, Snafu, PartialEq)]
#[snafu(visibility(pub))]
pub enum Error {
Incomplete,

View file

@ -32,7 +32,7 @@ use std::hash::{Hash, Hasher};
///
/// Парсинг этих данных отличается в разных транспортных протоколах.
/// Для HTTP /post или /get: см. [crate::web_server::old_device_sensor_api]
/// Для TCP/UDP: TODO
/// Для TCP/UDP: см. [crate::ingest_protocol]
/// Для MQTT: TODO
#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
pub struct SensorValue {
@ -74,6 +74,10 @@ pub struct NMDeviceDataPacket {
pub values: HashSet<SensorValue>,
/// Владелец датчика
///
/// Будет игнорироваться, так как в данном значении нет смысла, одни проблемы.
/// Автором датчика может быть только тот, кто его создал.
pub owner: Option<String>,
pub lat: Option<Decimal>,

View file

@ -40,7 +40,7 @@ pub fn parse_packet(input: impl AsRef<[u8]>) -> Result<NMDeviceDataPacket, Error
None => None,
};
(&input[idx_lf..], mac, name)
(&input[idx_lf+1..], mac, name)
};
let mut packet = NMDeviceDataPacket {
@ -58,7 +58,9 @@ pub fn parse_packet(input: impl AsRef<[u8]>) -> Result<NMDeviceDataPacket, Error
return Err(Error::PacketParseError { msg: "# expected" })?;
}
// end of packet marker
input = &input[1..];
// If we see another #, this means we hit a end of packet marker
// ##
if input.first() == Some(&b'#') {
break;
@ -138,7 +140,7 @@ pub fn parse_packet(input: impl AsRef<[u8]>) -> Result<NMDeviceDataPacket, Error
}
}
input = &input[idx_lf..];
input = &input[idx_lf+1..];
}
Ok(packet)

View file

@ -1,13 +1,14 @@
use crate::ingest_protocol::{
parser::{parse_mac_address, parse_packet},
SensorValue,
};
use std::collections::HashSet;
use crate::{ingest_protocol::{
parser::{parse_mac_address, parse_packet}, NMDeviceDataPacket, SensorValue
}, utils::EpochUTC};
use hifitime::Epoch;
use rust_decimal_macros::dec;
#[test]
fn test_asd() {
let asd = r#"
#A2-C6-47-01-DF-E1#Метео
let asd = r#"#A2-C6-47-01-DF-E1#Метео
#OWNER#unknown
#T1#13.44#1000000#Outdoor
#T2#27.74#Indoor
@ -15,14 +16,69 @@ fn test_asd() {
#LAT#55.738178
#LON#37.6068
#ALT#38
##
"#
##"#
.trim()
.as_bytes();
let packet = parse_packet(asd).unwrap();
assert!(serde_json::to_string_pretty(&packet)
.unwrap()
.contains(r#""time": 1000000.0"#));
dbg!(&packet);
let real_packet = NMDeviceDataPacket {
mac: [162, 198, 71, 1, 223, 225],
name: Some(
"Метео".to_owned(),
),
values: HashSet::from_iter([
SensorValue {
mac: "T2".to_owned(),
value: dec!(27.74),
time: None,
unit: None,
name: Some(
"Indoor".to_owned(),
),
},
SensorValue {
mac: "P1".to_owned(),
value: dec!(691.02),
time: None,
unit: None,
name: Some(
"Barometer".to_owned(),
),
},
SensorValue {
mac: "T1".to_owned(),
value: dec!(13.44),
time: Some(
EpochUTC(Epoch::from_unix_seconds(1000000.0)),
),
unit: None,
name: Some(
"Outdoor".to_owned(),
),
},
].into_iter()),
owner: Some(
"unknown".to_owned(),
),
lat: Some(
dec!(55.738178),
),
lon: Some(
dec!(37.6068),
),
alt: Some(
dec!(38),
),
commands: None,
time: None,
};
assert_eq!(
packet,
real_packet
);
}
#[test]

View file

@ -8,7 +8,6 @@ use crate::web_server::old_app_api::handlers::{app_init, version};
use crate::web_server::old_app_api::types::{AppInitRequest, MandatoryParams};
use crate::web_server::utils::redis::is_api_key_valid;
use crate::web_server::NMAppState;
use bstr::ByteSlice;
use ntex::util::Bytes;
use ntex::web;
use ntex::web::types::State;
@ -30,10 +29,8 @@ pub async fn old_api_handler(
return Err(AppError::RequestTooLarge);
}
let body_bytes = body_bytes.as_bstr();
let mandatory_params: MandatoryParams<'_> =
serde_json::from_slice(body_bytes).context(app_error::JsonSnafu {})?; // TODO: Simd-JSON
serde_json::from_slice(&body_bytes).context(app_error::JsonSnafu {})?; // TODO: Simd-JSON
// Ignore clippy singlematch
if mandatory_params.cmd.as_ref() == "version" {
@ -45,7 +42,7 @@ pub async fn old_api_handler(
match mandatory_params.cmd.as_ref() {
"appInit" => {
let body: AppInitRequest =
serde_json::from_slice(body_bytes).context(app_error::JsonSnafu {})?;
serde_json::from_slice(&body_bytes).context(app_error::JsonSnafu {})?;
app_init(body, &app_state).await
}