diff --git a/Cargo.lock b/Cargo.lock index eb37b9d..85ce0e4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -225,17 +225,6 @@ dependencies = [ "syn 2.0.95", ] -[[package]] -name = "bstr" -version = "1.11.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "531a9155a481e2ee699d4f98f43c0ca4ff8ee1bfd55c31e9e98fb29d2b176fe0" -dependencies = [ - "memchr", - "regex-automata", - "serde", -] - [[package]] name = "bumpalo" version = "3.16.0" @@ -1081,7 +1070,6 @@ dependencies = [ name = "iotishnik-server" version = "0.1.0" dependencies = [ - "bstr", "bytes 1.9.0", "chrono", "clap", @@ -1093,7 +1081,6 @@ dependencies = [ "hex", "hifitime", "lazy_static", - "nom", "ntex", "phf", "regex", @@ -1104,7 +1091,6 @@ dependencies = [ "serde_with", "smallstr", "snafu", - "thiserror", "tokio", "ufmt", ] diff --git a/Cargo.toml b/Cargo.toml index d571bc0..befe4aa 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,6 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -bstr = { version = "1.9.0", features = ["serde"] } bytes = { version = "1.4.0", features = ["serde"] } chrono = { version = "0.4.26", features = ["serde"] } clap = { version = "4.3.8", features = ["derive", "env"] } @@ -16,7 +15,6 @@ heapless = { version = "0.8.0", features = ["ufmt"] } hex = { version = "0.4.3", default-features = false, features = ["std", "alloc"] } hifitime = "4.0.2" lazy_static = "1.4.0" -nom = { version = "7.1.3", default-features = false, features = ["std", "alloc"] } ntex = { version = "2.10.0", features = ["tokio", "cookie", "url"] } phf = { version = "0.11.2", features = ["serde", "macros"] } regex = "1.8.4" @@ -26,7 +24,6 @@ serde_json = "1.0.99" serde_qs = "0.13.0" serde_with = { version = "3.6.1", features = ["hex"] } smallstr = { version = "0.3.0", features = ["std", "union"] } -thiserror = "1.0.40" tokio = { version = "1.28.2", features = ["full"] } ufmt = { version = "0.2.0", features = ["std"] } futures-util = { version = "0.3.30", features = ["tokio-io"] } diff --git a/src/ingest_protocol/error.rs b/src/ingest_protocol/error.rs index 53e7e8f..843eddf 100644 --- a/src/ingest_protocol/error.rs +++ b/src/ingest_protocol/error.rs @@ -1,28 +1,39 @@ -use nom::error::VerboseError; -use std::fmt::Debug; -use std::num::ParseFloatError; -use thiserror::Error as ThisError; +use snafu::Snafu; -/// /// Тип ошибки Ingest протокола -/// -/// К сожалению, не может быть переделан на Snafu, так как -/// Snafu не поддерживает generic типы как source ошибки, -/// не приделывая 'static лайфтайм. -/// -/// См. https://github.com/shepmaster/snafu/issues/99 -/// -#[allow(dead_code)] -#[derive(Debug, ThisError)] -pub enum Error { - #[error("Nom error: {0}")] - Nom(#[from] nom::Err>), +#[derive(Debug, Snafu)] +#[snafu(visibility(pub))] +pub enum Error { + Incomplete, - #[error("Failed to parse a timestamp")] - TimestampParseError(ParseFloatError), + #[snafu(display("Cannot parse packet: {msg}"))] + PacketParseError { + msg: &'static str, + }, - #[error("Unknown unit")] - UnknownUnit(I), - #[error("Failed to parse a number")] - DecimalParsing(#[from] rust_decimal::Error), + #[snafu(display("Cannot parse MAC address: unexpected byte {value}"))] + MacInvalidChar { + value: u8, + }, + + #[snafu(display("Cannot decode as UTF-8 String"))] + Utf8Error { + source: std::string::FromUtf8Error, + }, + + // TODO: can we merge these two errors? + #[snafu(display("Cannot decode as UTF-8 &str"))] + Utf8StrError { + source: std::str::Utf8Error, + }, + + #[snafu(display("Cannot parse timestamp"))] + TimestampParseError { + source: std::num::ParseFloatError, + }, + + #[snafu(display("Cannot parse number"))] + DecimalParseError { + source: rust_decimal::Error, + }, } diff --git a/src/ingest_protocol/packet_types.rs b/src/ingest_protocol/packet_types.rs index de8e5c8..f3409c6 100644 --- a/src/ingest_protocol/packet_types.rs +++ b/src/ingest_protocol/packet_types.rs @@ -52,8 +52,8 @@ impl Hash for SensorValue { /// Функция-помощник для [MacAsArray], предназначенный для использования с [serde_with]. /// /// Преобразует MAC-адрес. -fn mac_as_array(value: &str) -> Result<[u8; 6], Error<&str>> { - Ok(parse_mac_address(value)?.1) +fn mac_as_array(value: &str) -> Result<[u8; 6], Error> { + parse_mac_address(value.as_bytes()) } serde_with::serde_conv!( diff --git a/src/ingest_protocol/parser.rs b/src/ingest_protocol/parser.rs index 1d49108..7efa60a 100644 --- a/src/ingest_protocol/parser.rs +++ b/src/ingest_protocol/parser.rs @@ -1,136 +1,196 @@ -use hifitime::Epoch; -use nom::bytes::complete::tag; -use nom::bytes::complete::take_until1; -use nom::bytes::complete::{take, take_while, take_while1}; -use nom::character::complete::hex_digit1; use std::str::FromStr; -use crate::ingest_protocol::error::Error; -use crate::ingest_protocol::{NMDeviceDataPacket, SensorValue}; -use nom::combinator::{map_parser, opt}; -use nom::error::context; -use nom::multi::{count, separated_list0}; -use nom::sequence::{delimited, preceded}; +use hifitime::Epoch; use rust_decimal::Decimal; -type MyIError = Result<(I, O), Error>; +use snafu::ResultExt; -pub fn parse_mac_address(input: &str) -> MyIError<&str, [u8; 6]> { - let mut mac = [0u8; 6]; +use super::error::{DecimalParseSnafu, TimestampParseSnafu, Utf8Snafu, Utf8StrSnafu}; +use crate::ingest_protocol::error::Error; +use crate::ingest_protocol::{NMDeviceDataPacket, SensorValue}; - let mut counter = 0; +pub fn parse_packet(input: impl AsRef<[u8]>) -> Result { + let input = input.as_ref(); - let (leftovers, i) = - context("17 символов для MAC адреса", take(17usize))(input)?; + let input = if input.first().ok_or(Error::Incomplete)? != &b'#' { + return Err(Error::PacketParseError { + msg: "# expected at beginning", + })?; + } else { + &input[1..] + }; - let (_, out) = count( - |inp| { - let (mut i, o) = context("Октет", map_parser(take(2usize), hex_digit1))(inp)?; - if counter != 5 { - (i, _) = tag("-")(i)?; - } - counter += 1; - Ok((i, o)) - }, - 6, - )(i)?; + let (input, mac, name) = { + let idx_lf = find_byte(input, b'\n')?; - hex::decode_to_slice(out.join(""), &mut mac).unwrap(); + let mut args = input[..idx_lf].split(|b| b == &b'#'); - Ok((leftovers, mac)) -} + let mac = parse_mac_address(args.next().ok_or(Error::PacketParseError { + msg: "device MAC field expected", + })?)?; -fn handle_special_sensor_macs<'a>( - mac: &'a str, - sensor_value: &str, - packet: &mut NMDeviceDataPacket, -) -> Result> { - match mac.to_uppercase().as_str() { - "LAT" => packet.lat = Some(Decimal::from_str(sensor_value)?), - "LON" => packet.lon = Some(Decimal::from_str(sensor_value)?), - "ALT" => packet.alt = Some(Decimal::from_str(sensor_value)?), - _ => { - return Ok(false); - } - } - Ok(true) -} - -pub fn parse_packet_body<'a>( - line: &'a str, - packet: &mut NMDeviceDataPacket, -) -> MyIError<&'a str, ()> { - let (line, _) = tag("#")(line)?; - let (line, sensor_mac) = take_while1(|c| c != '\n' && c != '#')(line)?; - - let (line, _) = tag("#")(line)?; - - match sensor_mac { - "OWNER" => { - let (_line, owner_value) = take_while1(|c| c != '\n')(line)?; - - packet.owner = Some(owner_value.into()) - - //hs.insert(NarodMonValues::Owner(owner_value.into())); - //let (line, _) = tag("\n")(line)?; - } - _ => { - let (line, sensor_value) = take_while1(|c| c != '\n' && c != '#')(line)?; - let (line, sensor_time) = opt(preceded( - tag("#"), - take_while1(|c| c != '\n' && c != '#' && "1234567890.".contains(c)), - ))(line)?; - let (_line, sensor_name) = opt(preceded(tag("#"), take_while1(|c| c != '\n')))(line)?; - - let sensor_time = match sensor_time { - Some(v) => Some(Epoch::from_unix_seconds( - v.parse().map_err(Error::TimestampParseError)?, - )), + let name = + match args.next() { + // TODO: optimize? one extra alloc + Some(value) => Some(String::from_utf8(value.to_vec()).map_err(|_| { + Error::PacketParseError { + msg: "device name is not UTF-8", + } + })?), None => None, }; - if !handle_special_sensor_macs(sensor_mac, sensor_value, packet)? { - packet.values.insert(SensorValue { - mac: sensor_mac.into(), - value: Decimal::from_str(sensor_value)?, - time: sensor_time.map(|v| v.into()), // TODO - unit: None, - name: sensor_name.map(|v| v.to_string()), - }); - } - } - } - - Ok((line, ())) -} - -pub fn parse_packet(input: &str) -> MyIError<&str, NMDeviceDataPacket> { - let (input, _) = tag("#")(input)?; - - let (input, device_mac) = parse_mac_address(input)?; - - let (input, opt_name) = opt(delimited(tag("#"), take_while(|c| c != '\n'), tag("\n")))(input)?; + (&input[idx_lf..], mac, name) + }; let mut packet = NMDeviceDataPacket { - mac: device_mac, + mac, + name, ..Default::default() }; - let (input, lines) = context( - "Получение значений до тега терминатора", - map_parser( - take_until1("##"), - separated_list0(tag("\n"), take_while1(|c| c != '\n')), - ), - )(input)?; + let mut input = input; - for line in lines { - parse_packet_body(line, &mut packet)?; + loop { + // TODO: searching for # and \n duplicates code above + + if input.first().ok_or(Error::Incomplete)? != &b'#' { + return Err(Error::PacketParseError { msg: "# expected" })?; + } + + // end of packet marker + // ## + if input.first() == Some(&b'#') { + break; + } + + let idx_lf = find_byte(input, b'\n')?; + + let mut args = input[..idx_lf].split(|b| b == &b'#'); + + let sensor_id = String::from_utf8( + args.next() + .ok_or(Error::PacketParseError { + msg: "sensor ID expected", + })? + .to_vec(), // TODO: optimize? one extra alloc + ) + .context(Utf8Snafu)?; + + let value = std::str::from_utf8(args.next().ok_or(Error::PacketParseError { + msg: "sensor value expected", + })?) + .context(Utf8StrSnafu)?; + + match sensor_id.as_str() { + "OWNER" => { + packet.owner = Some(value.to_owned()); + } + "LAT" => { + packet.lat = Some(Decimal::from_str(value).context(DecimalParseSnafu)?); + } + "LON" => { + packet.lon = Some(Decimal::from_str(value).context(DecimalParseSnafu)?); + } + "ALT" => { + packet.alt = Some(Decimal::from_str(value).context(DecimalParseSnafu)?); + } + _ => { + let name_or_time = args.next(); + + let (time, name) = if let Some(arg) = args.next() { + let timestamp = std::str::from_utf8(name_or_time.unwrap()) + .map_err(|_| Error::PacketParseError { + msg: "timestamp is not UTF-8", + })? + .parse() + .context(TimestampParseSnafu)?; + let time = Epoch::from_unix_seconds(timestamp); + + // TODO: optimize? one extra alloc + let name = + String::from_utf8(arg.to_vec()).map_err(|_| Error::PacketParseError { + msg: "device name is not UTF-8", + })?; + + (Some(time), Some(name)) + } else { + let name = match name_or_time { + // TODO: optimize? + Some(value) => Some(String::from_utf8(value.to_vec()).map_err(|_| { + Error::PacketParseError { + msg: "device name is not UTF-8", + } + })?), + None => None, + }; + + (None, name) + }; + + packet.values.insert(SensorValue { + mac: sensor_id, + value: Decimal::from_str(value).context(DecimalParseSnafu)?, + time: time.map(|t| t.into()), // TODO: is it ok to cast Epoch -> EpochUTC? + unit: None, + name, + }); + } + } + + input = &input[idx_lf..]; } - let (input, _) = tag("##")(input)?; - - packet.name = opt_name.map(|v| v.to_string()); - - Ok((input, packet)) + Ok(packet) +} + +pub fn parse_mac_address(input: &[u8]) -> Result<[u8; 6], Error> { + const MAC_LEN: usize = 6; + let mut mac = [0u8; MAC_LEN]; + + let mut idx = 0; + let mut cur_byte = 0usize; // index of current byte in `mac` + let mut least_bit = false; + loop { + if idx >= input.len() || cur_byte >= MAC_LEN { + break; + } + + if !least_bit && input[idx] == b'-' { + idx += 1; + continue; + } + + let bit = if (b'0'..=b'9').contains(&input[idx]) { + input[idx] - b'0' + } else if (b'A'..=b'F').contains(&input[idx]) { + input[idx] - b'A' + 10 + } else if (b'a'..=b'f').contains(&input[idx]) { + input[idx] - b'a' + 10 + } else { + return Err(Error::MacInvalidChar { value: input[idx] }); + }; + + if !least_bit { + mac[cur_byte] = bit * 16; + least_bit = true; + } else { + mac[cur_byte] |= bit; + least_bit = false; + cur_byte += 1; + } + + idx += 1; + } + + Ok(mac) +} + +fn find_byte(haystack: &[u8], needle: u8) -> Result { + for (idx, byte) in haystack.as_ref().iter().enumerate() { + if byte == &needle { + return Ok(idx); + } + } + Err(Error::Incomplete) } diff --git a/src/ingest_protocol/tests/mod.rs b/src/ingest_protocol/tests/mod.rs index 6a213ad..e93b835 100644 --- a/src/ingest_protocol/tests/mod.rs +++ b/src/ingest_protocol/tests/mod.rs @@ -17,8 +17,9 @@ fn test_asd() { #ALT#38 ## "# - .trim(); - let packet = parse_packet(asd).unwrap().1; + .trim() + .as_bytes(); + let packet = parse_packet(asd).unwrap(); assert!(serde_json::to_string_pretty(&packet) .unwrap() .contains(r#""time": 1000000.0"#)); @@ -27,8 +28,8 @@ fn test_asd() { #[test] fn test_mac() { assert_eq!( - parse_mac_address("12-34-AA-12-55-AA").unwrap(), - ("", [18, 52, 170, 18, 85, 170]) + parse_mac_address(b"12-34-Aa-1255-fA").unwrap(), + [18, 52, 170, 18, 85, 250], ); dbg!(Epoch::now().unwrap().to_unix_seconds()); @@ -45,7 +46,7 @@ fn test_packet() { #T2#1.2#3400005345 ##"#; - println!("{:#?}", parse_packet(inp)); + println!("{:#?}", parse_packet(inp.as_bytes())); } #[test] diff --git a/src/web_server/old_app_api/mod.rs b/src/web_server/old_app_api/mod.rs index 3430f7c..c970829 100644 --- a/src/web_server/old_app_api/mod.rs +++ b/src/web_server/old_app_api/mod.rs @@ -3,20 +3,19 @@ mod handlers; mod types; -use ntex::web::types::State; -use ntex::util::Bytes; -use ntex::web; -use nom::AsBytes; -use snafu::ResultExt; use crate::web_server::app_error::AppError; -use crate::web_server::NMAppState; use crate::web_server::old_app_api::handlers::{app_init, version}; use crate::web_server::old_app_api::types::{AppInitRequest, MandatoryParams}; use crate::web_server::utils::redis::is_api_key_valid; +use crate::web_server::NMAppState; +use bstr::ByteSlice; +use ntex::util::Bytes; +use ntex::web; +use ntex::web::types::State; +use snafu::ResultExt; use super::app_error; - /// Обработчик запросов от приложений. /// /// Отвечает за разделение на функции по `cmd`. @@ -31,22 +30,28 @@ pub async fn old_api_handler( return Err(AppError::RequestTooLarge); } - let body_bytes = body_bytes.as_bytes(); + let body_bytes = body_bytes.as_bstr(); - let mandatory_params: MandatoryParams<'_> = serde_json::from_slice(body_bytes).context(app_error::JsonSnafu {})?; // TODO: Simd-JSON + let mandatory_params: MandatoryParams<'_> = + serde_json::from_slice(body_bytes).context(app_error::JsonSnafu {})?; // TODO: Simd-JSON // Ignore clippy singlematch - if mandatory_params.cmd.as_ref() == "version" { return version((), &app_state).await } + if mandatory_params.cmd.as_ref() == "version" { + return version((), &app_state).await; + } is_api_key_valid(&app_state.redis_client, mandatory_params.api_key.as_ref()).await?; match mandatory_params.cmd.as_ref() { "appInit" => { - let body: AppInitRequest = serde_json::from_slice(body_bytes).context(app_error::JsonSnafu {})?; + let body: AppInitRequest = + serde_json::from_slice(body_bytes).context(app_error::JsonSnafu {})?; app_init(body, &app_state).await } - _ => Err(AppError::UnknownMethod { method: mandatory_params.cmd.to_string() }), + _ => Err(AppError::UnknownMethod { + method: mandatory_params.cmd.to_string(), + }), } //Ok("fuck") diff --git a/src/web_server/old_device_sensor_api/qs_parser.rs b/src/web_server/old_device_sensor_api/qs_parser.rs index 0b4ca09..dd5f131 100644 --- a/src/web_server/old_device_sensor_api/qs_parser.rs +++ b/src/web_server/old_device_sensor_api/qs_parser.rs @@ -19,31 +19,29 @@ pub enum QSParserError { #[snafu(display("asd"))] SerdeQS { #[snafu(source(from(serde_qs::Error, convert_to_arc::)))] - source: Arc + source: Arc, }, #[snafu(display("asd"))] - Parsing { - context: String - }, + Parsing { context: String }, #[snafu(display("asd"))] FloatP { #[snafu(source)] - source: ParseFloatError + source: ParseFloatError, }, #[snafu(display("failed to parse into decimal"))] - DecimalParse { - source: rust_decimal::Error - }, + DecimalParse { source: rust_decimal::Error }, #[snafu(display("asd"))] NoMAC, } -impl From> for QSParserError { - fn from(value: Error<&str>) -> Self { - QSParserError::Parsing { context: format!("{:?}", value)} +impl From for QSParserError { + fn from(value: Error) -> Self { + QSParserError::Parsing { + context: format!("{:?}", value), + } } } @@ -66,7 +64,7 @@ pub fn qs_rest_to_values( for (key, value) in parsed { hashset.insert(SensorValue { mac: key, - value: Decimal::from_str(value.as_str()).context(DecimalParseSnafu{})?, + value: Decimal::from_str(value.as_str()).context(DecimalParseSnafu {})?, time: None, unit: None, @@ -82,7 +80,9 @@ pub fn parse_decimal_if_exists( key: &str, ) -> Result, QSParserError> { if let Some(unwrapped_value) = parsed.remove(key) { - Ok(Some(Decimal::from_str(unwrapped_value.as_str()).context(DecimalParseSnafu{})?)) + Ok(Some( + Decimal::from_str(unwrapped_value.as_str()).context(DecimalParseSnafu {})?, + )) } else { Ok(None) } @@ -93,17 +93,19 @@ pub fn parse_epoch_if_exists( key: &str, ) -> Result, QSParserError> { if let Some(unwrapped_value) = parsed.remove(key) { - Ok(Some(Epoch::from_unix_seconds(unwrapped_value.parse().context(FloatPSnafu{})?))) + Ok(Some(Epoch::from_unix_seconds( + unwrapped_value.parse().context(FloatPSnafu {})?, + ))) } else { Ok(None) } } pub async fn parse_nm_qs_format(input: &str) -> Result { - let mut parsed: HashMap = serde_qs::from_str(input).context(SerdeQSSnafu{})?; + let mut parsed: HashMap = serde_qs::from_str(input).context(SerdeQSSnafu {})?; - let (_, device_mac) = if let Some(id) = parsed.get("ID") { - parse_mac_address(id)? + let device_mac = if let Some(id) = parsed.get("ID") { + parse_mac_address(id.as_bytes())? } else { return Err(QSParserError::NoMAC); };