diff --git a/Cargo.lock b/Cargo.lock index 85ce0e4..eb37b9d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -225,6 +225,17 @@ dependencies = [ "syn 2.0.95", ] +[[package]] +name = "bstr" +version = "1.11.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "531a9155a481e2ee699d4f98f43c0ca4ff8ee1bfd55c31e9e98fb29d2b176fe0" +dependencies = [ + "memchr", + "regex-automata", + "serde", +] + [[package]] name = "bumpalo" version = "3.16.0" @@ -1070,6 +1081,7 @@ dependencies = [ name = "iotishnik-server" version = "0.1.0" dependencies = [ + "bstr", "bytes 1.9.0", "chrono", "clap", @@ -1081,6 +1093,7 @@ dependencies = [ "hex", "hifitime", "lazy_static", + "nom", "ntex", "phf", "regex", @@ -1091,6 +1104,7 @@ dependencies = [ "serde_with", "smallstr", "snafu", + "thiserror", "tokio", "ufmt", ] diff --git a/Cargo.toml b/Cargo.toml index befe4aa..d571bc0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +bstr = { version = "1.9.0", features = ["serde"] } bytes = { version = "1.4.0", features = ["serde"] } chrono = { version = "0.4.26", features = ["serde"] } clap = { version = "4.3.8", features = ["derive", "env"] } @@ -15,6 +16,7 @@ heapless = { version = "0.8.0", features = ["ufmt"] } hex = { version = "0.4.3", default-features = false, features = ["std", "alloc"] } hifitime = "4.0.2" lazy_static = "1.4.0" +nom = { version = "7.1.3", default-features = false, features = ["std", "alloc"] } ntex = { version = "2.10.0", features = ["tokio", "cookie", "url"] } phf = { version = "0.11.2", features = ["serde", "macros"] } regex = "1.8.4" @@ -24,6 +26,7 @@ serde_json = "1.0.99" serde_qs = "0.13.0" serde_with = { version = "3.6.1", features = ["hex"] } smallstr = { version = "0.3.0", features = ["std", "union"] } +thiserror = "1.0.40" tokio = { version = "1.28.2", features = ["full"] } ufmt = { version = "0.2.0", features = ["std"] } futures-util = { version = "0.3.30", features = ["tokio-io"] } diff --git a/src/ingest_protocol/error.rs b/src/ingest_protocol/error.rs index a152d85..53e7e8f 100644 --- a/src/ingest_protocol/error.rs +++ b/src/ingest_protocol/error.rs @@ -1,39 +1,28 @@ -use snafu::Snafu; +use nom::error::VerboseError; +use std::fmt::Debug; +use std::num::ParseFloatError; +use thiserror::Error as ThisError; +/// /// Тип ошибки Ingest протокола -#[derive(Debug, Snafu, PartialEq)] -#[snafu(visibility(pub))] -pub enum Error { - Incomplete, +/// +/// К сожалению, не может быть переделан на Snafu, так как +/// Snafu не поддерживает generic типы как source ошибки, +/// не приделывая 'static лайфтайм. +/// +/// См. https://github.com/shepmaster/snafu/issues/99 +/// +#[allow(dead_code)] +#[derive(Debug, ThisError)] +pub enum Error { + #[error("Nom error: {0}")] + Nom(#[from] nom::Err>), - #[snafu(display("Cannot parse packet: {msg}"))] - PacketParseError { - msg: &'static str, - }, + #[error("Failed to parse a timestamp")] + TimestampParseError(ParseFloatError), - #[snafu(display("Cannot parse MAC address: unexpected byte {value}"))] - MacInvalidChar { - value: u8, - }, - - #[snafu(display("Cannot decode as UTF-8 String"))] - Utf8Error { - source: std::string::FromUtf8Error, - }, - - // TODO: can we merge these two errors? - #[snafu(display("Cannot decode as UTF-8 &str"))] - Utf8StrError { - source: std::str::Utf8Error, - }, - - #[snafu(display("Cannot parse timestamp"))] - TimestampParseError { - source: std::num::ParseFloatError, - }, - - #[snafu(display("Cannot parse number"))] - DecimalParseError { - source: rust_decimal::Error, - }, + #[error("Unknown unit")] + UnknownUnit(I), + #[error("Failed to parse a number")] + DecimalParsing(#[from] rust_decimal::Error), } diff --git a/src/ingest_protocol/packet_types.rs b/src/ingest_protocol/packet_types.rs index 2c0db43..de8e5c8 100644 --- a/src/ingest_protocol/packet_types.rs +++ b/src/ingest_protocol/packet_types.rs @@ -30,9 +30,9 @@ use std::hash::{Hash, Hasher}; /// NarodMon, `mac` может означать EUI-48 совместимый MAC адрес, или же просто /// уникальный идентификатор. /// -/// Парсинг этих данных отличается в разных транспортных протоколах. -/// Для HTTP /post или /get: см. [crate::web_server::old_device_sensor_api] -/// Для TCP/UDP: см. [crate::ingest_protocol] +/// Парсинг этих данных отличается в разных транспортных протоколах. +/// Для HTTP /post или /get: см. [crate::web_server::old_device_sensor_api] +/// Для TCP/UDP: TODO /// Для MQTT: TODO #[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)] pub struct SensorValue { @@ -52,8 +52,8 @@ impl Hash for SensorValue { /// Функция-помощник для [MacAsArray], предназначенный для использования с [serde_with]. /// /// Преобразует MAC-адрес. -fn mac_as_array(value: &str) -> Result<[u8; 6], Error> { - parse_mac_address(value.as_bytes()) +fn mac_as_array(value: &str) -> Result<[u8; 6], Error<&str>> { + Ok(parse_mac_address(value)?.1) } serde_with::serde_conv!( @@ -74,10 +74,6 @@ pub struct NMDeviceDataPacket { pub values: HashSet, - /// Владелец датчика - /// - /// Будет игнорироваться, так как в данном значении нет смысла, одни проблемы. - /// Автором датчика может быть только тот, кто его создал. pub owner: Option, pub lat: Option, diff --git a/src/ingest_protocol/parser.rs b/src/ingest_protocol/parser.rs index 250ba3b..1d49108 100644 --- a/src/ingest_protocol/parser.rs +++ b/src/ingest_protocol/parser.rs @@ -1,198 +1,136 @@ +use hifitime::Epoch; +use nom::bytes::complete::tag; +use nom::bytes::complete::take_until1; +use nom::bytes::complete::{take, take_while, take_while1}; +use nom::character::complete::hex_digit1; use std::str::FromStr; -use hifitime::Epoch; -use rust_decimal::Decimal; - -use snafu::ResultExt; - -use super::error::{DecimalParseSnafu, TimestampParseSnafu, Utf8Snafu, Utf8StrSnafu}; use crate::ingest_protocol::error::Error; use crate::ingest_protocol::{NMDeviceDataPacket, SensorValue}; +use nom::combinator::{map_parser, opt}; +use nom::error::context; +use nom::multi::{count, separated_list0}; +use nom::sequence::{delimited, preceded}; +use rust_decimal::Decimal; -pub fn parse_packet(input: impl AsRef<[u8]>) -> Result { - let input = input.as_ref(); +type MyIError = Result<(I, O), Error>; - let input = if input.first().ok_or(Error::Incomplete)? != &b'#' { - return Err(Error::PacketParseError { - msg: "# expected at beginning", - })?; - } else { - &input[1..] - }; +pub fn parse_mac_address(input: &str) -> MyIError<&str, [u8; 6]> { + let mut mac = [0u8; 6]; - let (input, mac, name) = { - let idx_lf = find_byte(input, b'\n')?; + let mut counter = 0; - let mut args = input[..idx_lf].split(|b| b == &b'#'); + let (leftovers, i) = + context("17 символов для MAC адреса", take(17usize))(input)?; - let mac = parse_mac_address(args.next().ok_or(Error::PacketParseError { - msg: "device MAC field expected", - })?)?; + let (_, out) = count( + |inp| { + let (mut i, o) = context("Октет", map_parser(take(2usize), hex_digit1))(inp)?; + if counter != 5 { + (i, _) = tag("-")(i)?; + } + counter += 1; + Ok((i, o)) + }, + 6, + )(i)?; - let name = - match args.next() { - // TODO: optimize? one extra alloc - Some(value) => Some(String::from_utf8(value.to_vec()).map_err(|_| { - Error::PacketParseError { - msg: "device name is not UTF-8", - } - })?), + hex::decode_to_slice(out.join(""), &mut mac).unwrap(); + + Ok((leftovers, mac)) +} + +fn handle_special_sensor_macs<'a>( + mac: &'a str, + sensor_value: &str, + packet: &mut NMDeviceDataPacket, +) -> Result> { + match mac.to_uppercase().as_str() { + "LAT" => packet.lat = Some(Decimal::from_str(sensor_value)?), + "LON" => packet.lon = Some(Decimal::from_str(sensor_value)?), + "ALT" => packet.alt = Some(Decimal::from_str(sensor_value)?), + _ => { + return Ok(false); + } + } + Ok(true) +} + +pub fn parse_packet_body<'a>( + line: &'a str, + packet: &mut NMDeviceDataPacket, +) -> MyIError<&'a str, ()> { + let (line, _) = tag("#")(line)?; + let (line, sensor_mac) = take_while1(|c| c != '\n' && c != '#')(line)?; + + let (line, _) = tag("#")(line)?; + + match sensor_mac { + "OWNER" => { + let (_line, owner_value) = take_while1(|c| c != '\n')(line)?; + + packet.owner = Some(owner_value.into()) + + //hs.insert(NarodMonValues::Owner(owner_value.into())); + //let (line, _) = tag("\n")(line)?; + } + _ => { + let (line, sensor_value) = take_while1(|c| c != '\n' && c != '#')(line)?; + let (line, sensor_time) = opt(preceded( + tag("#"), + take_while1(|c| c != '\n' && c != '#' && "1234567890.".contains(c)), + ))(line)?; + let (_line, sensor_name) = opt(preceded(tag("#"), take_while1(|c| c != '\n')))(line)?; + + let sensor_time = match sensor_time { + Some(v) => Some(Epoch::from_unix_seconds( + v.parse().map_err(Error::TimestampParseError)?, + )), None => None, }; - (&input[idx_lf+1..], mac, name) - }; - - let mut packet = NMDeviceDataPacket { - mac, - name, - ..Default::default() - }; - - let mut input = input; - - loop { - // TODO: searching for # and \n duplicates code above - - if input.first().ok_or(Error::Incomplete)? != &b'#' { - return Err(Error::PacketParseError { msg: "# expected" })?; - } - - input = &input[1..]; - - // If we see another #, this means we hit a end of packet marker - // ## - if input.first() == Some(&b'#') { - break; - } - - let idx_lf = find_byte(input, b'\n')?; - - let mut args = input[..idx_lf].split(|b| b == &b'#'); - - let sensor_id = String::from_utf8( - args.next() - .ok_or(Error::PacketParseError { - msg: "sensor ID expected", - })? - .to_vec(), // TODO: optimize? one extra alloc - ) - .context(Utf8Snafu)?; - - let value = std::str::from_utf8(args.next().ok_or(Error::PacketParseError { - msg: "sensor value expected", - })?) - .context(Utf8StrSnafu)?; - - match sensor_id.as_str() { - "OWNER" => { - packet.owner = Some(value.to_owned()); - } - "LAT" => { - packet.lat = Some(Decimal::from_str(value).context(DecimalParseSnafu)?); - } - "LON" => { - packet.lon = Some(Decimal::from_str(value).context(DecimalParseSnafu)?); - } - "ALT" => { - packet.alt = Some(Decimal::from_str(value).context(DecimalParseSnafu)?); - } - _ => { - let name_or_time = args.next(); - - let (time, name) = if let Some(arg) = args.next() { - let timestamp = std::str::from_utf8(name_or_time.unwrap()) - .map_err(|_| Error::PacketParseError { - msg: "timestamp is not UTF-8", - })? - .parse() - .context(TimestampParseSnafu)?; - let time = Epoch::from_unix_seconds(timestamp); - - // TODO: optimize? one extra alloc - let name = - String::from_utf8(arg.to_vec()).map_err(|_| Error::PacketParseError { - msg: "device name is not UTF-8", - })?; - - (Some(time), Some(name)) - } else { - let name = match name_or_time { - // TODO: optimize? - Some(value) => Some(String::from_utf8(value.to_vec()).map_err(|_| { - Error::PacketParseError { - msg: "device name is not UTF-8", - } - })?), - None => None, - }; - - (None, name) - }; - + if !handle_special_sensor_macs(sensor_mac, sensor_value, packet)? { packet.values.insert(SensorValue { - mac: sensor_id, - value: Decimal::from_str(value).context(DecimalParseSnafu)?, - time: time.map(|t| t.into()), // TODO: is it ok to cast Epoch -> EpochUTC? + mac: sensor_mac.into(), + value: Decimal::from_str(sensor_value)?, + time: sensor_time.map(|v| v.into()), // TODO unit: None, - name, + name: sensor_name.map(|v| v.to_string()), }); } } - - input = &input[idx_lf+1..]; } - Ok(packet) + Ok((line, ())) } -pub fn parse_mac_address(input: &[u8]) -> Result<[u8; 6], Error> { - const MAC_LEN: usize = 6; - let mut mac = [0u8; MAC_LEN]; +pub fn parse_packet(input: &str) -> MyIError<&str, NMDeviceDataPacket> { + let (input, _) = tag("#")(input)?; - let mut idx = 0; - let mut cur_byte = 0usize; // index of current byte in `mac` - let mut least_bit = false; - loop { - if idx >= input.len() || cur_byte >= MAC_LEN { - break; - } + let (input, device_mac) = parse_mac_address(input)?; - if !least_bit && input[idx] == b'-' { - idx += 1; - continue; - } + let (input, opt_name) = opt(delimited(tag("#"), take_while(|c| c != '\n'), tag("\n")))(input)?; - let bit = if (b'0'..=b'9').contains(&input[idx]) { - input[idx] - b'0' - } else if (b'A'..=b'F').contains(&input[idx]) { - input[idx] - b'A' + 10 - } else if (b'a'..=b'f').contains(&input[idx]) { - input[idx] - b'a' + 10 - } else { - return Err(Error::MacInvalidChar { value: input[idx] }); - }; + let mut packet = NMDeviceDataPacket { + mac: device_mac, + ..Default::default() + }; - if !least_bit { - mac[cur_byte] = bit * 16; - least_bit = true; - } else { - mac[cur_byte] |= bit; - least_bit = false; - cur_byte += 1; - } + let (input, lines) = context( + "Получение значений до тега терминатора", + map_parser( + take_until1("##"), + separated_list0(tag("\n"), take_while1(|c| c != '\n')), + ), + )(input)?; - idx += 1; + for line in lines { + parse_packet_body(line, &mut packet)?; } - Ok(mac) -} + let (input, _) = tag("##")(input)?; -fn find_byte(haystack: &[u8], needle: u8) -> Result { - for (idx, byte) in haystack.as_ref().iter().enumerate() { - if byte == &needle { - return Ok(idx); - } - } - Err(Error::Incomplete) + packet.name = opt_name.map(|v| v.to_string()); + + Ok((input, packet)) } diff --git a/src/ingest_protocol/tests/mod.rs b/src/ingest_protocol/tests/mod.rs index 21767df..6a213ad 100644 --- a/src/ingest_protocol/tests/mod.rs +++ b/src/ingest_protocol/tests/mod.rs @@ -1,14 +1,13 @@ -use std::collections::HashSet; - -use crate::{ingest_protocol::{ - parser::{parse_mac_address, parse_packet}, NMDeviceDataPacket, SensorValue -}, utils::EpochUTC}; +use crate::ingest_protocol::{ + parser::{parse_mac_address, parse_packet}, + SensorValue, +}; use hifitime::Epoch; -use rust_decimal_macros::dec; #[test] fn test_asd() { - let asd = r#"#A2-C6-47-01-DF-E1#Метео + let asd = r#" +#A2-C6-47-01-DF-E1#Метео #OWNER#unknown #T1#13.44#1000000#Outdoor #T2#27.74#Indoor @@ -16,76 +15,20 @@ fn test_asd() { #LAT#55.738178 #LON#37.6068 #ALT#38 -##"# - .trim() - .as_bytes(); - let packet = parse_packet(asd).unwrap(); - - dbg!(&packet); - - let real_packet = NMDeviceDataPacket { - mac: [162, 198, 71, 1, 223, 225], - name: Some( - "Метео".to_owned(), - ), - values: HashSet::from_iter([ - SensorValue { - mac: "T2".to_owned(), - value: dec!(27.74), - time: None, - unit: None, - name: Some( - "Indoor".to_owned(), - ), - }, - SensorValue { - mac: "P1".to_owned(), - value: dec!(691.02), - time: None, - unit: None, - name: Some( - "Barometer".to_owned(), - ), - }, - SensorValue { - mac: "T1".to_owned(), - value: dec!(13.44), - time: Some( - EpochUTC(Epoch::from_unix_seconds(1000000.0)), - ), - unit: None, - name: Some( - "Outdoor".to_owned(), - ), - }, - ].into_iter()), - owner: Some( - "unknown".to_owned(), - ), - lat: Some( - dec!(55.738178), - ), - lon: Some( - dec!(37.6068), - ), - alt: Some( - dec!(38), - ), - commands: None, - time: None, - }; - - assert_eq!( - packet, - real_packet - ); +## +"# + .trim(); + let packet = parse_packet(asd).unwrap().1; + assert!(serde_json::to_string_pretty(&packet) + .unwrap() + .contains(r#""time": 1000000.0"#)); } #[test] fn test_mac() { assert_eq!( - parse_mac_address(b"12-34-Aa-1255-fA").unwrap(), - [18, 52, 170, 18, 85, 250], + parse_mac_address("12-34-AA-12-55-AA").unwrap(), + ("", [18, 52, 170, 18, 85, 170]) ); dbg!(Epoch::now().unwrap().to_unix_seconds()); @@ -102,7 +45,7 @@ fn test_packet() { #T2#1.2#3400005345 ##"#; - println!("{:#?}", parse_packet(inp.as_bytes())); + println!("{:#?}", parse_packet(inp)); } #[test] diff --git a/src/web_server/old_app_api/mod.rs b/src/web_server/old_app_api/mod.rs index 660a193..3430f7c 100644 --- a/src/web_server/old_app_api/mod.rs +++ b/src/web_server/old_app_api/mod.rs @@ -3,18 +3,20 @@ mod handlers; mod types; +use ntex::web::types::State; +use ntex::util::Bytes; +use ntex::web; +use nom::AsBytes; +use snafu::ResultExt; use crate::web_server::app_error::AppError; +use crate::web_server::NMAppState; use crate::web_server::old_app_api::handlers::{app_init, version}; use crate::web_server::old_app_api::types::{AppInitRequest, MandatoryParams}; use crate::web_server::utils::redis::is_api_key_valid; -use crate::web_server::NMAppState; -use ntex::util::Bytes; -use ntex::web; -use ntex::web::types::State; -use snafu::ResultExt; use super::app_error; + /// Обработчик запросов от приложений. /// /// Отвечает за разделение на функции по `cmd`. @@ -29,26 +31,22 @@ pub async fn old_api_handler( return Err(AppError::RequestTooLarge); } - let mandatory_params: MandatoryParams<'_> = - serde_json::from_slice(&body_bytes).context(app_error::JsonSnafu {})?; // TODO: Simd-JSON + let body_bytes = body_bytes.as_bytes(); + + let mandatory_params: MandatoryParams<'_> = serde_json::from_slice(body_bytes).context(app_error::JsonSnafu {})?; // TODO: Simd-JSON // Ignore clippy singlematch - if mandatory_params.cmd.as_ref() == "version" { - return version((), &app_state).await; - } + if mandatory_params.cmd.as_ref() == "version" { return version((), &app_state).await } is_api_key_valid(&app_state.redis_client, mandatory_params.api_key.as_ref()).await?; match mandatory_params.cmd.as_ref() { "appInit" => { - let body: AppInitRequest = - serde_json::from_slice(&body_bytes).context(app_error::JsonSnafu {})?; + let body: AppInitRequest = serde_json::from_slice(body_bytes).context(app_error::JsonSnafu {})?; app_init(body, &app_state).await } - _ => Err(AppError::UnknownMethod { - method: mandatory_params.cmd.to_string(), - }), + _ => Err(AppError::UnknownMethod { method: mandatory_params.cmd.to_string() }), } //Ok("fuck") diff --git a/src/web_server/old_device_sensor_api/qs_parser.rs b/src/web_server/old_device_sensor_api/qs_parser.rs index dd5f131..0b4ca09 100644 --- a/src/web_server/old_device_sensor_api/qs_parser.rs +++ b/src/web_server/old_device_sensor_api/qs_parser.rs @@ -19,29 +19,31 @@ pub enum QSParserError { #[snafu(display("asd"))] SerdeQS { #[snafu(source(from(serde_qs::Error, convert_to_arc::)))] - source: Arc, + source: Arc }, #[snafu(display("asd"))] - Parsing { context: String }, + Parsing { + context: String + }, #[snafu(display("asd"))] FloatP { #[snafu(source)] - source: ParseFloatError, + source: ParseFloatError }, #[snafu(display("failed to parse into decimal"))] - DecimalParse { source: rust_decimal::Error }, + DecimalParse { + source: rust_decimal::Error + }, #[snafu(display("asd"))] NoMAC, } -impl From for QSParserError { - fn from(value: Error) -> Self { - QSParserError::Parsing { - context: format!("{:?}", value), - } +impl From> for QSParserError { + fn from(value: Error<&str>) -> Self { + QSParserError::Parsing { context: format!("{:?}", value)} } } @@ -64,7 +66,7 @@ pub fn qs_rest_to_values( for (key, value) in parsed { hashset.insert(SensorValue { mac: key, - value: Decimal::from_str(value.as_str()).context(DecimalParseSnafu {})?, + value: Decimal::from_str(value.as_str()).context(DecimalParseSnafu{})?, time: None, unit: None, @@ -80,9 +82,7 @@ pub fn parse_decimal_if_exists( key: &str, ) -> Result, QSParserError> { if let Some(unwrapped_value) = parsed.remove(key) { - Ok(Some( - Decimal::from_str(unwrapped_value.as_str()).context(DecimalParseSnafu {})?, - )) + Ok(Some(Decimal::from_str(unwrapped_value.as_str()).context(DecimalParseSnafu{})?)) } else { Ok(None) } @@ -93,19 +93,17 @@ pub fn parse_epoch_if_exists( key: &str, ) -> Result, QSParserError> { if let Some(unwrapped_value) = parsed.remove(key) { - Ok(Some(Epoch::from_unix_seconds( - unwrapped_value.parse().context(FloatPSnafu {})?, - ))) + Ok(Some(Epoch::from_unix_seconds(unwrapped_value.parse().context(FloatPSnafu{})?))) } else { Ok(None) } } pub async fn parse_nm_qs_format(input: &str) -> Result { - let mut parsed: HashMap = serde_qs::from_str(input).context(SerdeQSSnafu {})?; + let mut parsed: HashMap = serde_qs::from_str(input).context(SerdeQSSnafu{})?; - let device_mac = if let Some(id) = parsed.get("ID") { - parse_mac_address(id.as_bytes())? + let (_, device_mac) = if let Some(id) = parsed.get("ID") { + parse_mac_address(id)? } else { return Err(QSParserError::NoMAC); };