WIP: custom #ingest#protocol parser #23
2 changed files with 182 additions and 137 deletions
|
@ -1,28 +1,25 @@
|
|||
use nom::error::VerboseError;
|
||||
use std::fmt::Debug;
|
||||
use std::num::ParseFloatError;
|
||||
use thiserror::Error as ThisError;
|
||||
use snafu::Snafu;
|
||||
|
||||
///
|
||||
/// Тип ошибки Ingest протокола
|
||||
///
|
||||
/// К сожалению, не может быть переделан на Snafu, так как
|
||||
/// Snafu не поддерживает generic типы как source ошибки,
|
||||
/// не приделывая 'static лайфтайм.
|
||||
///
|
||||
/// См. https://github.com/shepmaster/snafu/issues/99
|
||||
///
|
||||
#[allow(dead_code)]
|
||||
#[derive(Debug, ThisError)]
|
||||
pub enum Error<I: Debug> {
|
||||
#[error("Nom error: {0}")]
|
||||
Nom(#[from] nom::Err<VerboseError<I>>),
|
||||
#[derive(Debug, Snafu)]
|
||||
#[snafu(visibility(pub))]
|
||||
pub enum Error {
|
||||
#[snafu(display("Cannot parse packet: {msg}"))]
|
||||
PacketParseError { msg: &'static str },
|
||||
|
||||
#[error("Failed to parse a timestamp")]
|
||||
TimestampParseError(ParseFloatError),
|
||||
#[snafu(display("Cannot parse MAC address: unexpected byte {value}"))]
|
||||
MacInvalidChar { value: u8 },
|
||||
|
||||
#[error("Unknown unit")]
|
||||
UnknownUnit(I),
|
||||
#[error("Failed to parse a number")]
|
||||
DecimalParsing(#[from] rust_decimal::Error),
|
||||
#[snafu(display("Cannot decode as UTF-8 String"))]
|
||||
Utf8Error { source: std::string::FromUtf8Error },
|
||||
|
||||
// TODO: can we merge these two errors?
|
||||
#[snafu(display("Cannot decode as UTF-8 &str"))]
|
||||
Utf8StrError { source: std::str::Utf8Error },
|
||||
|
||||
#[snafu(display("Cannot parse timestamp"))]
|
||||
TimestampParseError { source: std::num::ParseFloatError },
|
||||
|
||||
#[snafu(display("Cannot parse number"))]
|
||||
DecimalParseError { source: rust_decimal::Error },
|
||||
}
|
||||
|
|
|
@ -1,136 +1,184 @@
|
|||
use hifitime::Epoch;
|
||||
use nom::bytes::complete::tag;
|
||||
use nom::bytes::complete::take_until1;
|
||||
use nom::bytes::complete::{take, take_while, take_while1};
|
||||
use nom::character::complete::hex_digit1;
|
||||
use std::str::FromStr;
|
||||
|
||||
use crate::ingest_protocol::error::Error;
|
||||
use crate::ingest_protocol::{NMDeviceDataPacket, SensorValue};
|
||||
use nom::combinator::{map_parser, opt};
|
||||
use nom::error::context;
|
||||
use nom::multi::{count, separated_list0};
|
||||
use nom::sequence::{delimited, preceded};
|
||||
use hifitime::Epoch;
|
||||
use rust_decimal::Decimal;
|
||||
|
||||
type MyIError<I, O> = Result<(I, O), Error<I>>;
|
||||
use bstr::ByteSlice;
|
||||
use snafu::ResultExt;
|
||||
|
||||
pub fn parse_mac_address(input: &str) -> MyIError<&str, [u8; 6]> {
|
||||
let mut mac = [0u8; 6];
|
||||
use super::error::{DecimalParseSnafu, TimestampParseSnafu, Utf8Snafu, Utf8StrSnafu};
|
||||
use crate::ingest_protocol::error::Error;
|
||||
use crate::ingest_protocol::{NMDeviceDataPacket, SensorValue};
|
||||
|
||||
let mut counter = 0;
|
||||
pub fn parse_packet(mut input: &[u8]) -> Result<NMDeviceDataPacket, Error> {
|
||||
|
||||
input = input
|
||||
.find_byte(b'#')
|
||||
.and_then(|idx| input.get(idx + 1..))
|
||||
.ok_or(Error::PacketParseError { msg: "# expected" })?;
|
||||
|
||||
let (leftovers, i) =
|
||||
context("17 символов для MAC адреса", take(17usize))(input)?;
|
||||
let (mac, name) = {
|
||||
let idx_lf = input.find_byte(b'\n').ok_or(Error::PacketParseError {
|
||||
msg: "newline expected",
|
||||
})?;
|
||||
|
||||
let (_, out) = count(
|
||||
|inp| {
|
||||
let (mut i, o) = context("Октет", map_parser(take(2usize), hex_digit1))(inp)?;
|
||||
if counter != 5 {
|
||||
(i, _) = tag("-")(i)?;
|
||||
let mut args = input[..idx_lf].split(|b| b == &b'#');
|
||||
|
||||
let mac = parse_mac_address(args.next().ok_or(Error::PacketParseError {
|
||||
msg: "device MAC field expected",
|
||||
})?)?;
|
||||
|
||||
let name =
|
||||
match args.next() {
|
||||
// TODO: optimize? one extra alloc
|
||||
Some(value) => Some(String::from_utf8(value.to_vec()).map_err(|_| {
|
||||
Error::PacketParseError {
|
||||
msg: "device name is not UTF-8",
|
||||
}
|
||||
counter += 1;
|
||||
Ok((i, o))
|
||||
},
|
||||
6,
|
||||
)(i)?;
|
||||
|
||||
hex::decode_to_slice(out.join(""), &mut mac).unwrap();
|
||||
|
||||
Ok((leftovers, mac))
|
||||
}
|
||||
|
||||
fn handle_special_sensor_macs<'a>(
|
||||
mac: &'a str,
|
||||
sensor_value: &str,
|
||||
packet: &mut NMDeviceDataPacket,
|
||||
) -> Result<bool, Error<&'a str>> {
|
||||
match mac.to_uppercase().as_str() {
|
||||
"LAT" => packet.lat = Some(Decimal::from_str(sensor_value)?),
|
||||
"LON" => packet.lon = Some(Decimal::from_str(sensor_value)?),
|
||||
"ALT" => packet.alt = Some(Decimal::from_str(sensor_value)?),
|
||||
_ => {
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
Ok(true)
|
||||
}
|
||||
|
||||
pub fn parse_packet_body<'a>(
|
||||
line: &'a str,
|
||||
packet: &mut NMDeviceDataPacket,
|
||||
) -> MyIError<&'a str, ()> {
|
||||
let (line, _) = tag("#")(line)?;
|
||||
let (line, sensor_mac) = take_while1(|c| c != '\n' && c != '#')(line)?;
|
||||
|
||||
let (line, _) = tag("#")(line)?;
|
||||
|
||||
match sensor_mac {
|
||||
"OWNER" => {
|
||||
let (_line, owner_value) = take_while1(|c| c != '\n')(line)?;
|
||||
|
||||
packet.owner = Some(owner_value.into())
|
||||
|
||||
//hs.insert(NarodMonValues::Owner(owner_value.into()));
|
||||
//let (line, _) = tag("\n")(line)?;
|
||||
}
|
||||
_ => {
|
||||
let (line, sensor_value) = take_while1(|c| c != '\n' && c != '#')(line)?;
|
||||
let (line, sensor_time) = opt(preceded(
|
||||
tag("#"),
|
||||
take_while1(|c| c != '\n' && c != '#' && "1234567890.".contains(c)),
|
||||
))(line)?;
|
||||
let (_line, sensor_name) = opt(preceded(tag("#"), take_while1(|c| c != '\n')))(line)?;
|
||||
|
||||
let sensor_time = match sensor_time {
|
||||
Some(v) => Some(Epoch::from_unix_seconds(
|
||||
v.parse().map_err(Error::TimestampParseError)?,
|
||||
)),
|
||||
})?),
|
||||
None => None,
|
||||
};
|
||||
|
||||
if !handle_special_sensor_macs(sensor_mac, sensor_value, packet)? {
|
||||
packet.values.insert(SensorValue {
|
||||
mac: sensor_mac.into(),
|
||||
value: Decimal::from_str(sensor_value)?,
|
||||
time: sensor_time.map(|v| v.into()), // TODO
|
||||
unit: None,
|
||||
name: sensor_name.map(|v| v.to_string()),
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok((line, ()))
|
||||
}
|
||||
|
||||
pub fn parse_packet(input: &str) -> MyIError<&str, NMDeviceDataPacket> {
|
||||
let (input, _) = tag("#")(input)?;
|
||||
|
||||
let (input, device_mac) = parse_mac_address(input)?;
|
||||
|
||||
let (input, opt_name) = opt(delimited(tag("#"), take_while(|c| c != '\n'), tag("\n")))(input)?;
|
||||
input = &input[idx_lf..];
|
||||
(mac, name)
|
||||
};
|
||||
|
||||
let mut packet = NMDeviceDataPacket {
|
||||
mac: device_mac,
|
||||
mac,
|
||||
name,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let (input, lines) = context(
|
||||
"Получение значений до тега терминатора",
|
||||
map_parser(
|
||||
take_until1("##"),
|
||||
separated_list0(tag("\n"), take_while1(|c| c != '\n')),
|
||||
),
|
||||
)(input)?;
|
||||
loop {
|
||||
input = input
|
||||
.find_byte(b'#')
|
||||
.and_then(|idx| input.get(idx + 1..))
|
||||
.ok_or(Error::PacketParseError { msg: "# expected" })?;
|
||||
|
||||
for line in lines {
|
||||
parse_packet_body(line, &mut packet)?;
|
||||
// end of packet marker
|
||||
// ##
|
||||
if input.first() == Some(&b'#') {
|
||||
break;
|
||||
}
|
||||
|
||||
let (input, _) = tag("##")(input)?;
|
||||
let idx_lf = input.find_byte(b'\n').ok_or(Error::PacketParseError {
|
||||
msg: "newline expected",
|
||||
})?;
|
||||
|
||||
packet.name = opt_name.map(|v| v.to_string());
|
||||
let mut args = input[..idx_lf].split(|b| b == &b'#');
|
||||
|
||||
Ok((input, packet))
|
||||
let sensor_id = String::from_utf8(
|
||||
args.next()
|
||||
.ok_or(Error::PacketParseError {
|
||||
msg: "sensor ID expected",
|
||||
})?
|
||||
.to_vec(), // TODO: optimize? one extra alloc
|
||||
)
|
||||
.context(Utf8Snafu)?;
|
||||
|
||||
let value = std::str::from_utf8(args.next().ok_or(Error::PacketParseError {
|
||||
msg: "sensor value expected",
|
||||
})?)
|
||||
.context(Utf8StrSnafu)?;
|
||||
|
||||
match sensor_id.as_str() {
|
||||
"OWNER" => {
|
||||
packet.owner = Some(value.to_owned());
|
||||
}
|
||||
"LAT" => {
|
||||
packet.lat = Some(Decimal::from_str(value).context(DecimalParseSnafu)?);
|
||||
}
|
||||
"LON" => {
|
||||
packet.lon = Some(Decimal::from_str(value).context(DecimalParseSnafu)?);
|
||||
}
|
||||
"ALT" => {
|
||||
packet.alt = Some(Decimal::from_str(value).context(DecimalParseSnafu)?);
|
||||
}
|
||||
_ => {
|
||||
let name_or_time = args.next();
|
||||
|
||||
let (time, name) = if let Some(arg) = args.next() {
|
||||
let timestamp = std::str::from_utf8(name_or_time.unwrap())
|
||||
.map_err(|_| Error::PacketParseError {
|
||||
msg: "timestamp is not UTF-8",
|
||||
})?
|
||||
.parse()
|
||||
.context(TimestampParseSnafu)?;
|
||||
let time = Epoch::from_unix_seconds(timestamp);
|
||||
|
||||
// TODO: optimize? one extra alloc
|
||||
let name =
|
||||
String::from_utf8(arg.to_vec()).map_err(|_| Error::PacketParseError {
|
||||
msg: "device name is not UTF-8",
|
||||
})?;
|
||||
|
||||
(Some(time), Some(name))
|
||||
} else {
|
||||
let name = match name_or_time {
|
||||
// TODO: optimize?
|
||||
Some(value) => Some(String::from_utf8(value.to_vec()).map_err(|_| {
|
||||
Error::PacketParseError {
|
||||
msg: "device name is not UTF-8",
|
||||
}
|
||||
})?),
|
||||
None => None,
|
||||
};
|
||||
|
||||
(None, name)
|
||||
};
|
||||
|
||||
packet.values.insert(SensorValue {
|
||||
mac: sensor_id,
|
||||
value: Decimal::from_str(value).context(DecimalParseSnafu)?,
|
||||
time: time.map(|t| t.into()), // TODO: is it ok to cast Epoch -> EpochUTC?
|
||||
unit: None,
|
||||
name,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
input = &input[idx_lf..];
|
||||
}
|
||||
|
||||
Ok(packet)
|
||||
}
|
||||
|
||||
pub fn parse_mac_address(input: &[u8]) -> Result<[u8; 6], Error> {
|
||||
nm17
commented
Точно ли хорошая идея избавиться от nom-like нотации поглощения и возвращения результата и остатков? Она как минимум нужна будет если захотим сделать парсинг нескольких пакетов в одном TCP подключении ( #11 ). Точно ли хорошая идея избавиться от nom-like нотации поглощения и возвращения результата и остатков? Она как минимум нужна будет если захотим сделать парсинг нескольких пакетов в одном TCP подключении ( #11 ).
DarkCat09
commented
Как это будет в коде тцп-сервера? По-моему ты там всё равно будешь читать из потока до Как это будет в коде тцп-сервера? По-моему ты там всё равно будешь читать из потока до `##` и передавать вот эти байты в парсер, у него не будет остатков
|
||||
const MAC_LEN: usize = 6;
|
||||
let mut mac = [0u8; MAC_LEN];
|
||||
|
||||
let mut idx = 0;
|
||||
let mut cur_byte = 0usize; // index of current byte in `mac`
|
||||
let mut least_bit = false;
|
||||
loop {
|
||||
if idx >= input.len() || cur_byte >= MAC_LEN {
|
||||
break;
|
||||
}
|
||||
|
||||
if !least_bit && input[idx] == b'-' {
|
||||
continue;
|
||||
}
|
||||
|
||||
let bit = if (b'0'..b'9').contains(&input[idx]) {
|
||||
input[idx] - b'0'
|
||||
} else if (b'A'..b'F').contains(&input[idx]) {
|
||||
input[idx] - b'A'
|
||||
} else if (b'a'..b'f').contains(&input[idx]) {
|
||||
input[idx] - b'a'
|
||||
} else {
|
||||
return Err(Error::MacInvalidChar { value: input[idx] });
|
||||
};
|
||||
|
||||
if !least_bit {
|
||||
mac[cur_byte] = bit << 1;
|
||||
least_bit = true;
|
||||
} else {
|
||||
mac[cur_byte] |= bit;
|
||||
least_bit = false;
|
||||
cur_byte += 1;
|
||||
}
|
||||
|
||||
idx += 1;
|
||||
}
|
||||
|
||||
Ok(mac)
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue
Может быть всё-таки будем использовать Bytes для этого всего? Таким образом в Error сможем добавлять на каком именно куске всё пошло не по плану)
// из обсуждения в телеграме: я предлагал вариант с чтением напрямую из impl AsyncReadExt, но Даня верно заметил, что парсер не совсем этим должен заниматься => делаем с
bytes::Bytes