From d5488fd2eb368461378728ab6f8da13e4e1f2105 Mon Sep 17 00:00:00 2001 From: nm17 Date: Mon, 24 Feb 2025 20:13:59 +0400 Subject: [PATCH] style: rustfmt --- src/ingest_protocol/error.rs | 10 +-- src/ingest_socket_server/mod.rs | 89 ++++++++++++++----- src/web_server/old_app_api/mod.rs | 12 +-- .../old_device_sensor_api/qs_parser.rs | 28 +++--- 4 files changed, 92 insertions(+), 47 deletions(-) diff --git a/src/ingest_protocol/error.rs b/src/ingest_protocol/error.rs index 53e7e8f..8a61cd6 100644 --- a/src/ingest_protocol/error.rs +++ b/src/ingest_protocol/error.rs @@ -3,15 +3,15 @@ use std::fmt::Debug; use std::num::ParseFloatError; use thiserror::Error as ThisError; -/// +/// /// Тип ошибки Ingest протокола -/// +/// /// К сожалению, не может быть переделан на Snafu, так как /// Snafu не поддерживает generic типы как source ошибки, /// не приделывая 'static лайфтайм. -/// -/// См. https://github.com/shepmaster/snafu/issues/99 -/// +/// +/// См. https://github.com/shepmaster/snafu/issues/99 +/// #[allow(dead_code)] #[derive(Debug, ThisError)] pub enum Error { diff --git a/src/ingest_socket_server/mod.rs b/src/ingest_socket_server/mod.rs index 56e6e1c..f60bb34 100644 --- a/src/ingest_socket_server/mod.rs +++ b/src/ingest_socket_server/mod.rs @@ -11,36 +11,66 @@ // } // } -use std::{fmt::Display, io::Read, net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, ops::ControlFlow}; +use std::{ + fmt::Display, + io::Read, + net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, + ops::ControlFlow, +}; use bytes::{buf, Buf, Bytes, BytesMut}; use fred::prelude::Client; +use log::*; use ppp::v2::{self, Addresses, Command, Header, ParseError}; use snafu::{whatever, ResultExt, Whatever}; -use tokio::{io::{AsyncReadExt, AsyncWriteExt, ReadBuf}, net::{TcpSocket, TcpStream}}; -use log::{*}; +use tokio::{ + io::{AsyncReadExt, AsyncWriteExt, ReadBuf}, + net::{TcpSocket, TcpStream}, +}; -use crate::{business_logic::process_packet, cli::{Cli, SocketServerArgs}, ingest_protocol::{self, error::Error}, web_server::app_error::{AppError, StdIOSnafu}}; +use crate::{ + business_logic::process_packet, + cli::{Cli, SocketServerArgs}, + ingest_protocol::{self, error::Error}, + web_server::app_error::{AppError, StdIOSnafu}, +}; -pub async fn socketserv_main(args: Cli, specific_args: SocketServerArgs, client: Client) -> Result<(), AppError> { +pub async fn socketserv_main( + args: Cli, + specific_args: SocketServerArgs, + client: Client, +) -> Result<(), AppError> { return Ok(()); } #[derive(Clone, PartialEq, Eq, Debug)] pub struct AddressesUnpacked { pub source: SocketAddr, - pub dest: SocketAddr + pub dest: SocketAddr, } pub fn get_sockaddr(addresses: Addresses) -> Option { match addresses { - Addresses::IPv4(addr) => Some(SocketAddr::V4(SocketAddrV4::new(addr.source_address, addr.source_port))), - Addresses::IPv6(addr) => Some(SocketAddr::V6(SocketAddrV6::new(addr.source_address, addr.source_port, 0, 0))), - _ => None + Addresses::IPv4(addr) => Some(SocketAddr::V4(SocketAddrV4::new( + addr.source_address, + addr.source_port, + ))), + Addresses::IPv6(addr) => Some(SocketAddr::V6(SocketAddrV6::new( + addr.source_address, + addr.source_port, + 0, + 0, + ))), + _ => None, } } -pub async fn tcp_proxy_handler(mut buffer: BytesMut, stream: &mut TcpStream, actual_origin: &mut SocketAddr, proxy_data: &mut Option) -> Result<(), AppError> { +pub async fn tcp_proxy_handler( + mut buffer: BytesMut, + stream: &mut TcpStream, + actual_origin: &mut SocketAddr, + proxy_data: &mut Option, +) -> Result<(), AppError> { 'proxy_loop: loop { let len = stream.read(&mut buffer).await.context(StdIOSnafu)?; @@ -53,20 +83,19 @@ pub async fn tcp_proxy_handler(mut buffer: BytesMut, stream: &mut TcpStream, act *proxy_data = Some(Bytes::copy_from_slice(&header.header)); break 'proxy_loop; } - }, + } _ => { // Продолажем работать как обычно (health check от прокси или т.п) } } - }, + } Err(err) => { - match err { ParseError::Incomplete(_) => { // Продолжаем заполнять буфер continue 'proxy_loop; - }, + } _ => { warn!("Получили неисправимую ошибку при парсинге proxy протокола. Убедитесь что никто не пытается подключиться к сервису напрямую без прокси. {error}", error = err); @@ -82,13 +111,19 @@ pub async fn tcp_proxy_handler(mut buffer: BytesMut, stream: &mut TcpStream, act return Ok(()); } -pub async fn socketserv_tcp(args: Cli, specific_args: SocketServerArgs, client: Client) -> Result<(), AppError> { +pub async fn socketserv_tcp( + args: Cli, + specific_args: SocketServerArgs, + client: Client, +) -> Result<(), AppError> { // TODO: errors should not break the server let mut buffer = BytesMut::with_capacity(16 * 1024); let socket = TcpSocket::new_v4().context(StdIOSnafu)?; - socket.bind(SocketAddr::new(specific_args.addr, specific_args.port)).context(StdIOSnafu)?; + socket + .bind(SocketAddr::new(specific_args.addr, specific_args.port)) + .context(StdIOSnafu)?; let listener = socket.listen(256).context(StdIOSnafu)?; @@ -98,7 +133,13 @@ pub async fn socketserv_tcp(args: Cli, specific_args: SocketServerArgs, client: let mut proxy_data = None; if specific_args.proxy_protocol { - tcp_proxy_handler(buffer.clone(), &mut stream, &mut actual_origin, &mut proxy_data).await?; + tcp_proxy_handler( + buffer.clone(), + &mut stream, + &mut actual_origin, + &mut proxy_data, + ) + .await?; } 'protocol_loop: loop { @@ -109,12 +150,15 @@ pub async fn socketserv_tcp(args: Cli, specific_args: SocketServerArgs, client: let resp = process_packet(client.clone(), packet, proxy_data.clone()).await; let _ = stream.write(resp.as_bytes()).await; let _ = stream.shutdown().await; - }, + } Err(err) if err == Error::Incomplete => { continue 'protocol_loop; - }, + } Err(err) => { - stream.write(format!("ERROR: {}", err).as_bytes()).await.context(StdIOSnafu)?; + stream + .write(format!("ERROR: {}", err).as_bytes()) + .await + .context(StdIOSnafu)?; continue 'tcp_loop; } } @@ -122,9 +166,6 @@ pub async fn socketserv_tcp(args: Cli, specific_args: SocketServerArgs, client: } return Ok(()); - } -pub async fn socketserv_udp(args: Cli, specific_args: SocketServerArgs, client: Client) { - -} +pub async fn socketserv_udp(args: Cli, specific_args: SocketServerArgs, client: Client) {} diff --git a/src/web_server/old_app_api/mod.rs b/src/web_server/old_app_api/mod.rs index 55136a8..b940ddb 100644 --- a/src/web_server/old_app_api/mod.rs +++ b/src/web_server/old_app_api/mod.rs @@ -3,14 +3,14 @@ mod handlers; mod types; -use ntex::web::types::State; -use ntex::util::Bytes; -use ntex::web; use crate::web_server::app_error::AppError; -use crate::web_server::NMAppState; use crate::web_server::old_app_api::handlers::{app_init, version}; use crate::web_server::old_app_api::types::{AppInitRequest, MandatoryParams}; use crate::web_server::utils::redis::is_api_key_valid; +use crate::web_server::NMAppState; +use ntex::util::Bytes; +use ntex::web; +use ntex::web::types::State; use ntex::web::HttpRequest; use snafu::{whatever, ResultExt}; @@ -60,7 +60,9 @@ pub async fn old_api_handler( app_init(body, &app_state).await } - _ => Err(AppError::UnknownMethod { method: mandatory_params.cmd.to_string() }), + _ => Err(AppError::UnknownMethod { + method: mandatory_params.cmd.to_string(), + }), } //Ok("fuck") diff --git a/src/web_server/old_device_sensor_api/qs_parser.rs b/src/web_server/old_device_sensor_api/qs_parser.rs index 0b4ca09..65e440d 100644 --- a/src/web_server/old_device_sensor_api/qs_parser.rs +++ b/src/web_server/old_device_sensor_api/qs_parser.rs @@ -19,23 +19,19 @@ pub enum QSParserError { #[snafu(display("asd"))] SerdeQS { #[snafu(source(from(serde_qs::Error, convert_to_arc::)))] - source: Arc + source: Arc, }, #[snafu(display("asd"))] - Parsing { - context: String - }, + Parsing { context: String }, #[snafu(display("asd"))] FloatP { #[snafu(source)] - source: ParseFloatError + source: ParseFloatError, }, #[snafu(display("failed to parse into decimal"))] - DecimalParse { - source: rust_decimal::Error - }, + DecimalParse { source: rust_decimal::Error }, #[snafu(display("asd"))] NoMAC, @@ -43,7 +39,9 @@ pub enum QSParserError { impl From> for QSParserError { fn from(value: Error<&str>) -> Self { - QSParserError::Parsing { context: format!("{:?}", value)} + QSParserError::Parsing { + context: format!("{:?}", value), + } } } @@ -66,7 +64,7 @@ pub fn qs_rest_to_values( for (key, value) in parsed { hashset.insert(SensorValue { mac: key, - value: Decimal::from_str(value.as_str()).context(DecimalParseSnafu{})?, + value: Decimal::from_str(value.as_str()).context(DecimalParseSnafu {})?, time: None, unit: None, @@ -82,7 +80,9 @@ pub fn parse_decimal_if_exists( key: &str, ) -> Result, QSParserError> { if let Some(unwrapped_value) = parsed.remove(key) { - Ok(Some(Decimal::from_str(unwrapped_value.as_str()).context(DecimalParseSnafu{})?)) + Ok(Some( + Decimal::from_str(unwrapped_value.as_str()).context(DecimalParseSnafu {})?, + )) } else { Ok(None) } @@ -93,14 +93,16 @@ pub fn parse_epoch_if_exists( key: &str, ) -> Result, QSParserError> { if let Some(unwrapped_value) = parsed.remove(key) { - Ok(Some(Epoch::from_unix_seconds(unwrapped_value.parse().context(FloatPSnafu{})?))) + Ok(Some(Epoch::from_unix_seconds( + unwrapped_value.parse().context(FloatPSnafu {})?, + ))) } else { Ok(None) } } pub async fn parse_nm_qs_format(input: &str) -> Result { - let mut parsed: HashMap = serde_qs::from_str(input).context(SerdeQSSnafu{})?; + let mut parsed: HashMap = serde_qs::from_str(input).context(SerdeQSSnafu {})?; let (_, device_mac) = if let Some(id) = parsed.get("ID") { parse_mac_address(id)?