Compare commits

..

No commits in common. "custom-parser" and "master" have entirely different histories.

8 changed files with 181 additions and 243 deletions

14
Cargo.lock generated
View file

@ -225,6 +225,17 @@ dependencies = [
"syn 2.0.95", "syn 2.0.95",
] ]
[[package]]
name = "bstr"
version = "1.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "531a9155a481e2ee699d4f98f43c0ca4ff8ee1bfd55c31e9e98fb29d2b176fe0"
dependencies = [
"memchr",
"regex-automata",
"serde",
]
[[package]] [[package]]
name = "bumpalo" name = "bumpalo"
version = "3.16.0" version = "3.16.0"
@ -1070,6 +1081,7 @@ dependencies = [
name = "iotishnik-server" name = "iotishnik-server"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"bstr",
"bytes 1.9.0", "bytes 1.9.0",
"chrono", "chrono",
"clap", "clap",
@ -1081,6 +1093,7 @@ dependencies = [
"hex", "hex",
"hifitime", "hifitime",
"lazy_static", "lazy_static",
"nom",
"ntex", "ntex",
"phf", "phf",
"regex", "regex",
@ -1091,6 +1104,7 @@ dependencies = [
"serde_with", "serde_with",
"smallstr", "smallstr",
"snafu", "snafu",
"thiserror",
"tokio", "tokio",
"ufmt", "ufmt",
] ]

View file

@ -6,6 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies] [dependencies]
bstr = { version = "1.9.0", features = ["serde"] }
bytes = { version = "1.4.0", features = ["serde"] } bytes = { version = "1.4.0", features = ["serde"] }
chrono = { version = "0.4.26", features = ["serde"] } chrono = { version = "0.4.26", features = ["serde"] }
clap = { version = "4.3.8", features = ["derive", "env"] } clap = { version = "4.3.8", features = ["derive", "env"] }
@ -15,6 +16,7 @@ heapless = { version = "0.8.0", features = ["ufmt"] }
hex = { version = "0.4.3", default-features = false, features = ["std", "alloc"] } hex = { version = "0.4.3", default-features = false, features = ["std", "alloc"] }
hifitime = "4.0.2" hifitime = "4.0.2"
lazy_static = "1.4.0" lazy_static = "1.4.0"
nom = { version = "7.1.3", default-features = false, features = ["std", "alloc"] }
ntex = { version = "2.10.0", features = ["tokio", "cookie", "url"] } ntex = { version = "2.10.0", features = ["tokio", "cookie", "url"] }
phf = { version = "0.11.2", features = ["serde", "macros"] } phf = { version = "0.11.2", features = ["serde", "macros"] }
regex = "1.8.4" regex = "1.8.4"
@ -24,6 +26,7 @@ serde_json = "1.0.99"
serde_qs = "0.13.0" serde_qs = "0.13.0"
serde_with = { version = "3.6.1", features = ["hex"] } serde_with = { version = "3.6.1", features = ["hex"] }
smallstr = { version = "0.3.0", features = ["std", "union"] } smallstr = { version = "0.3.0", features = ["std", "union"] }
thiserror = "1.0.40"
tokio = { version = "1.28.2", features = ["full"] } tokio = { version = "1.28.2", features = ["full"] }
ufmt = { version = "0.2.0", features = ["std"] } ufmt = { version = "0.2.0", features = ["std"] }
futures-util = { version = "0.3.30", features = ["tokio-io"] } futures-util = { version = "0.3.30", features = ["tokio-io"] }

View file

@ -1,39 +1,28 @@
use snafu::Snafu; use nom::error::VerboseError;
use std::fmt::Debug;
use std::num::ParseFloatError;
use thiserror::Error as ThisError;
///
/// Тип ошибки Ingest протокола /// Тип ошибки Ingest протокола
#[derive(Debug, Snafu)] ///
#[snafu(visibility(pub))] /// К сожалению, не может быть переделан на Snafu, так как
pub enum Error { /// Snafu не поддерживает generic типы как source ошибки,
Incomplete, /// не приделывая 'static лайфтайм.
///
/// См. https://github.com/shepmaster/snafu/issues/99
///
#[allow(dead_code)]
#[derive(Debug, ThisError)]
pub enum Error<I: Debug> {
#[error("Nom error: {0}")]
Nom(#[from] nom::Err<VerboseError<I>>),
#[snafu(display("Cannot parse packet: {msg}"))] #[error("Failed to parse a timestamp")]
PacketParseError { TimestampParseError(ParseFloatError),
msg: &'static str,
},
#[snafu(display("Cannot parse MAC address: unexpected byte {value}"))] #[error("Unknown unit")]
MacInvalidChar { UnknownUnit(I),
value: u8, #[error("Failed to parse a number")]
}, DecimalParsing(#[from] rust_decimal::Error),
#[snafu(display("Cannot decode as UTF-8 String"))]
Utf8Error {
source: std::string::FromUtf8Error,
},
// TODO: can we merge these two errors?
#[snafu(display("Cannot decode as UTF-8 &str"))]
Utf8StrError {
source: std::str::Utf8Error,
},
#[snafu(display("Cannot parse timestamp"))]
TimestampParseError {
source: std::num::ParseFloatError,
},
#[snafu(display("Cannot parse number"))]
DecimalParseError {
source: rust_decimal::Error,
},
} }

View file

@ -52,8 +52,8 @@ impl Hash for SensorValue {
/// Функция-помощник для [MacAsArray], предназначенный для использования с [serde_with]. /// Функция-помощник для [MacAsArray], предназначенный для использования с [serde_with].
/// ///
/// Преобразует MAC-адрес. /// Преобразует MAC-адрес.
fn mac_as_array(value: &str) -> Result<[u8; 6], Error> { fn mac_as_array(value: &str) -> Result<[u8; 6], Error<&str>> {
parse_mac_address(value.as_bytes()) Ok(parse_mac_address(value)?.1)
} }
serde_with::serde_conv!( serde_with::serde_conv!(

View file

@ -1,196 +1,136 @@
use hifitime::Epoch;
use nom::bytes::complete::tag;
use nom::bytes::complete::take_until1;
use nom::bytes::complete::{take, take_while, take_while1};
use nom::character::complete::hex_digit1;
use std::str::FromStr; use std::str::FromStr;
use hifitime::Epoch;
use rust_decimal::Decimal;
use snafu::ResultExt;
use super::error::{DecimalParseSnafu, TimestampParseSnafu, Utf8Snafu, Utf8StrSnafu};
use crate::ingest_protocol::error::Error; use crate::ingest_protocol::error::Error;
use crate::ingest_protocol::{NMDeviceDataPacket, SensorValue}; use crate::ingest_protocol::{NMDeviceDataPacket, SensorValue};
use nom::combinator::{map_parser, opt};
use nom::error::context;
use nom::multi::{count, separated_list0};
use nom::sequence::{delimited, preceded};
use rust_decimal::Decimal;
pub fn parse_packet(input: impl AsRef<[u8]>) -> Result<NMDeviceDataPacket, Error> { type MyIError<I, O> = Result<(I, O), Error<I>>;
let input = input.as_ref();
let input = if input.first().ok_or(Error::Incomplete)? != &b'#' { pub fn parse_mac_address(input: &str) -> MyIError<&str, [u8; 6]> {
return Err(Error::PacketParseError { let mut mac = [0u8; 6];
msg: "# expected at beginning",
})?;
} else {
&input[1..]
};
let (input, mac, name) = { let mut counter = 0;
let idx_lf = find_byte(input, b'\n')?;
let mut args = input[..idx_lf].split(|b| b == &b'#'); let (leftovers, i) =
context("17 символов для MAC адреса", take(17usize))(input)?;
let mac = parse_mac_address(args.next().ok_or(Error::PacketParseError { let (_, out) = count(
msg: "device MAC field expected", |inp| {
})?)?; let (mut i, o) = context("Октет", map_parser(take(2usize), hex_digit1))(inp)?;
if counter != 5 {
let name = (i, _) = tag("-")(i)?;
match args.next() {
// TODO: optimize? one extra alloc
Some(value) => Some(String::from_utf8(value.to_vec()).map_err(|_| {
Error::PacketParseError {
msg: "device name is not UTF-8",
} }
})?), counter += 1;
None => None, Ok((i, o))
}; },
6,
)(i)?;
(&input[idx_lf..], mac, name) hex::decode_to_slice(out.join(""), &mut mac).unwrap();
};
let mut packet = NMDeviceDataPacket { Ok((leftovers, mac))
mac, }
name,
..Default::default()
};
let mut input = input; fn handle_special_sensor_macs<'a>(
mac: &'a str,
loop { sensor_value: &str,
// TODO: searching for # and \n duplicates code above packet: &mut NMDeviceDataPacket,
) -> Result<bool, Error<&'a str>> {
if input.first().ok_or(Error::Incomplete)? != &b'#' { match mac.to_uppercase().as_str() {
return Err(Error::PacketParseError { msg: "# expected" })?; "LAT" => packet.lat = Some(Decimal::from_str(sensor_value)?),
"LON" => packet.lon = Some(Decimal::from_str(sensor_value)?),
"ALT" => packet.alt = Some(Decimal::from_str(sensor_value)?),
_ => {
return Ok(false);
} }
// end of packet marker
// ##
if input.first() == Some(&b'#') {
break;
} }
Ok(true)
}
let idx_lf = find_byte(input, b'\n')?; pub fn parse_packet_body<'a>(
line: &'a str,
packet: &mut NMDeviceDataPacket,
) -> MyIError<&'a str, ()> {
let (line, _) = tag("#")(line)?;
let (line, sensor_mac) = take_while1(|c| c != '\n' && c != '#')(line)?;
let mut args = input[..idx_lf].split(|b| b == &b'#'); let (line, _) = tag("#")(line)?;
let sensor_id = String::from_utf8( match sensor_mac {
args.next()
.ok_or(Error::PacketParseError {
msg: "sensor ID expected",
})?
.to_vec(), // TODO: optimize? one extra alloc
)
.context(Utf8Snafu)?;
let value = std::str::from_utf8(args.next().ok_or(Error::PacketParseError {
msg: "sensor value expected",
})?)
.context(Utf8StrSnafu)?;
match sensor_id.as_str() {
"OWNER" => { "OWNER" => {
packet.owner = Some(value.to_owned()); let (_line, owner_value) = take_while1(|c| c != '\n')(line)?;
}
"LAT" => { packet.owner = Some(owner_value.into())
packet.lat = Some(Decimal::from_str(value).context(DecimalParseSnafu)?);
} //hs.insert(NarodMonValues::Owner(owner_value.into()));
"LON" => { //let (line, _) = tag("\n")(line)?;
packet.lon = Some(Decimal::from_str(value).context(DecimalParseSnafu)?);
}
"ALT" => {
packet.alt = Some(Decimal::from_str(value).context(DecimalParseSnafu)?);
} }
_ => { _ => {
let name_or_time = args.next(); let (line, sensor_value) = take_while1(|c| c != '\n' && c != '#')(line)?;
let (line, sensor_time) = opt(preceded(
tag("#"),
take_while1(|c| c != '\n' && c != '#' && "1234567890.".contains(c)),
))(line)?;
let (_line, sensor_name) = opt(preceded(tag("#"), take_while1(|c| c != '\n')))(line)?;
let (time, name) = if let Some(arg) = args.next() { let sensor_time = match sensor_time {
let timestamp = std::str::from_utf8(name_or_time.unwrap()) Some(v) => Some(Epoch::from_unix_seconds(
.map_err(|_| Error::PacketParseError { v.parse().map_err(Error::TimestampParseError)?,
msg: "timestamp is not UTF-8", )),
})?
.parse()
.context(TimestampParseSnafu)?;
let time = Epoch::from_unix_seconds(timestamp);
// TODO: optimize? one extra alloc
let name =
String::from_utf8(arg.to_vec()).map_err(|_| Error::PacketParseError {
msg: "device name is not UTF-8",
})?;
(Some(time), Some(name))
} else {
let name = match name_or_time {
// TODO: optimize?
Some(value) => Some(String::from_utf8(value.to_vec()).map_err(|_| {
Error::PacketParseError {
msg: "device name is not UTF-8",
}
})?),
None => None, None => None,
}; };
(None, name) if !handle_special_sensor_macs(sensor_mac, sensor_value, packet)? {
};
packet.values.insert(SensorValue { packet.values.insert(SensorValue {
mac: sensor_id, mac: sensor_mac.into(),
value: Decimal::from_str(value).context(DecimalParseSnafu)?, value: Decimal::from_str(sensor_value)?,
time: time.map(|t| t.into()), // TODO: is it ok to cast Epoch -> EpochUTC? time: sensor_time.map(|v| v.into()), // TODO
unit: None, unit: None,
name, name: sensor_name.map(|v| v.to_string()),
}); });
} }
} }
input = &input[idx_lf..];
} }
Ok(packet) Ok((line, ()))
} }
pub fn parse_mac_address(input: &[u8]) -> Result<[u8; 6], Error> { pub fn parse_packet(input: &str) -> MyIError<&str, NMDeviceDataPacket> {
const MAC_LEN: usize = 6; let (input, _) = tag("#")(input)?;
let mut mac = [0u8; MAC_LEN];
let mut idx = 0; let (input, device_mac) = parse_mac_address(input)?;
let mut cur_byte = 0usize; // index of current byte in `mac`
let mut least_bit = false;
loop {
if idx >= input.len() || cur_byte >= MAC_LEN {
break;
}
if !least_bit && input[idx] == b'-' { let (input, opt_name) = opt(delimited(tag("#"), take_while(|c| c != '\n'), tag("\n")))(input)?;
idx += 1;
continue;
}
let bit = if (b'0'..=b'9').contains(&input[idx]) { let mut packet = NMDeviceDataPacket {
input[idx] - b'0' mac: device_mac,
} else if (b'A'..=b'F').contains(&input[idx]) { ..Default::default()
input[idx] - b'A' + 10
} else if (b'a'..=b'f').contains(&input[idx]) {
input[idx] - b'a' + 10
} else {
return Err(Error::MacInvalidChar { value: input[idx] });
}; };
if !least_bit { let (input, lines) = context(
mac[cur_byte] = bit * 16; "Получение значений до тега терминатора",
least_bit = true; map_parser(
} else { take_until1("##"),
mac[cur_byte] |= bit; separated_list0(tag("\n"), take_while1(|c| c != '\n')),
least_bit = false; ),
cur_byte += 1; )(input)?;
for line in lines {
parse_packet_body(line, &mut packet)?;
} }
idx += 1; let (input, _) = tag("##")(input)?;
}
Ok(mac) packet.name = opt_name.map(|v| v.to_string());
}
Ok((input, packet))
fn find_byte(haystack: &[u8], needle: u8) -> Result<usize, Error> {
for (idx, byte) in haystack.as_ref().iter().enumerate() {
if byte == &needle {
return Ok(idx);
}
}
Err(Error::Incomplete)
} }

View file

@ -17,9 +17,8 @@ fn test_asd() {
#ALT#38 #ALT#38
## ##
"# "#
.trim() .trim();
.as_bytes(); let packet = parse_packet(asd).unwrap().1;
let packet = parse_packet(asd).unwrap();
assert!(serde_json::to_string_pretty(&packet) assert!(serde_json::to_string_pretty(&packet)
.unwrap() .unwrap()
.contains(r#""time": 1000000.0"#)); .contains(r#""time": 1000000.0"#));
@ -28,8 +27,8 @@ fn test_asd() {
#[test] #[test]
fn test_mac() { fn test_mac() {
assert_eq!( assert_eq!(
parse_mac_address(b"12-34-Aa-1255-fA").unwrap(), parse_mac_address("12-34-AA-12-55-AA").unwrap(),
[18, 52, 170, 18, 85, 250], ("", [18, 52, 170, 18, 85, 170])
); );
dbg!(Epoch::now().unwrap().to_unix_seconds()); dbg!(Epoch::now().unwrap().to_unix_seconds());
@ -46,7 +45,7 @@ fn test_packet() {
#T2#1.2#3400005345 #T2#1.2#3400005345
##"#; ##"#;
println!("{:#?}", parse_packet(inp.as_bytes())); println!("{:#?}", parse_packet(inp));
} }
#[test] #[test]

View file

@ -3,19 +3,20 @@
mod handlers; mod handlers;
mod types; mod types;
use ntex::web::types::State;
use ntex::util::Bytes;
use ntex::web;
use nom::AsBytes;
use snafu::ResultExt;
use crate::web_server::app_error::AppError; use crate::web_server::app_error::AppError;
use crate::web_server::NMAppState;
use crate::web_server::old_app_api::handlers::{app_init, version}; use crate::web_server::old_app_api::handlers::{app_init, version};
use crate::web_server::old_app_api::types::{AppInitRequest, MandatoryParams}; use crate::web_server::old_app_api::types::{AppInitRequest, MandatoryParams};
use crate::web_server::utils::redis::is_api_key_valid; use crate::web_server::utils::redis::is_api_key_valid;
use crate::web_server::NMAppState;
use bstr::ByteSlice;
use ntex::util::Bytes;
use ntex::web;
use ntex::web::types::State;
use snafu::ResultExt;
use super::app_error; use super::app_error;
/// Обработчик запросов от приложений. /// Обработчик запросов от приложений.
/// ///
/// Отвечает за разделение на функции по `cmd`. /// Отвечает за разделение на функции по `cmd`.
@ -30,28 +31,22 @@ pub async fn old_api_handler(
return Err(AppError::RequestTooLarge); return Err(AppError::RequestTooLarge);
} }
let body_bytes = body_bytes.as_bstr(); let body_bytes = body_bytes.as_bytes();
let mandatory_params: MandatoryParams<'_> = let mandatory_params: MandatoryParams<'_> = serde_json::from_slice(body_bytes).context(app_error::JsonSnafu {})?; // TODO: Simd-JSON
serde_json::from_slice(body_bytes).context(app_error::JsonSnafu {})?; // TODO: Simd-JSON
// Ignore clippy singlematch // Ignore clippy singlematch
if mandatory_params.cmd.as_ref() == "version" { if mandatory_params.cmd.as_ref() == "version" { return version((), &app_state).await }
return version((), &app_state).await;
}
is_api_key_valid(&app_state.redis_client, mandatory_params.api_key.as_ref()).await?; is_api_key_valid(&app_state.redis_client, mandatory_params.api_key.as_ref()).await?;
match mandatory_params.cmd.as_ref() { match mandatory_params.cmd.as_ref() {
"appInit" => { "appInit" => {
let body: AppInitRequest = let body: AppInitRequest = serde_json::from_slice(body_bytes).context(app_error::JsonSnafu {})?;
serde_json::from_slice(body_bytes).context(app_error::JsonSnafu {})?;
app_init(body, &app_state).await app_init(body, &app_state).await
} }
_ => Err(AppError::UnknownMethod { _ => Err(AppError::UnknownMethod { method: mandatory_params.cmd.to_string() }),
method: mandatory_params.cmd.to_string(),
}),
} }
//Ok("fuck") //Ok("fuck")

View file

@ -19,29 +19,31 @@ pub enum QSParserError {
#[snafu(display("asd"))] #[snafu(display("asd"))]
SerdeQS { SerdeQS {
#[snafu(source(from(serde_qs::Error, convert_to_arc::<serde_qs::Error>)))] #[snafu(source(from(serde_qs::Error, convert_to_arc::<serde_qs::Error>)))]
source: Arc<serde_qs::Error>, source: Arc<serde_qs::Error>
}, },
#[snafu(display("asd"))] #[snafu(display("asd"))]
Parsing { context: String }, Parsing {
context: String
},
#[snafu(display("asd"))] #[snafu(display("asd"))]
FloatP { FloatP {
#[snafu(source)] #[snafu(source)]
source: ParseFloatError, source: ParseFloatError
}, },
#[snafu(display("failed to parse into decimal"))] #[snafu(display("failed to parse into decimal"))]
DecimalParse { source: rust_decimal::Error }, DecimalParse {
source: rust_decimal::Error
},
#[snafu(display("asd"))] #[snafu(display("asd"))]
NoMAC, NoMAC,
} }
impl From<Error> for QSParserError { impl From<Error<&str>> for QSParserError {
fn from(value: Error) -> Self { fn from(value: Error<&str>) -> Self {
QSParserError::Parsing { QSParserError::Parsing { context: format!("{:?}", value)}
context: format!("{:?}", value),
}
} }
} }
@ -64,7 +66,7 @@ pub fn qs_rest_to_values(
for (key, value) in parsed { for (key, value) in parsed {
hashset.insert(SensorValue { hashset.insert(SensorValue {
mac: key, mac: key,
value: Decimal::from_str(value.as_str()).context(DecimalParseSnafu {})?, value: Decimal::from_str(value.as_str()).context(DecimalParseSnafu{})?,
time: None, time: None,
unit: None, unit: None,
@ -80,9 +82,7 @@ pub fn parse_decimal_if_exists(
key: &str, key: &str,
) -> Result<Option<Decimal>, QSParserError> { ) -> Result<Option<Decimal>, QSParserError> {
if let Some(unwrapped_value) = parsed.remove(key) { if let Some(unwrapped_value) = parsed.remove(key) {
Ok(Some( Ok(Some(Decimal::from_str(unwrapped_value.as_str()).context(DecimalParseSnafu{})?))
Decimal::from_str(unwrapped_value.as_str()).context(DecimalParseSnafu {})?,
))
} else { } else {
Ok(None) Ok(None)
} }
@ -93,19 +93,17 @@ pub fn parse_epoch_if_exists(
key: &str, key: &str,
) -> Result<Option<Epoch>, QSParserError> { ) -> Result<Option<Epoch>, QSParserError> {
if let Some(unwrapped_value) = parsed.remove(key) { if let Some(unwrapped_value) = parsed.remove(key) {
Ok(Some(Epoch::from_unix_seconds( Ok(Some(Epoch::from_unix_seconds(unwrapped_value.parse().context(FloatPSnafu{})?)))
unwrapped_value.parse().context(FloatPSnafu {})?,
)))
} else { } else {
Ok(None) Ok(None)
} }
} }
pub async fn parse_nm_qs_format(input: &str) -> Result<NMDeviceDataPacket, QSParserError> { pub async fn parse_nm_qs_format(input: &str) -> Result<NMDeviceDataPacket, QSParserError> {
let mut parsed: HashMap<String, String> = serde_qs::from_str(input).context(SerdeQSSnafu {})?; let mut parsed: HashMap<String, String> = serde_qs::from_str(input).context(SerdeQSSnafu{})?;
let device_mac = if let Some(id) = parsed.get("ID") { let (_, device_mac) = if let Some(id) = parsed.get("ID") {
parse_mac_address(id.as_bytes())? parse_mac_address(id)?
} else { } else {
return Err(QSParserError::NoMAC); return Err(QSParserError::NoMAC);
}; };