diff --git a/src/cli.rs b/src/cli.rs index e26834b..335f621 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,3 +1,5 @@ +use std::net::IpAddr; + use clap::{Args, Parser, Subcommand}; #[derive(Parser, Clone)] @@ -32,9 +34,12 @@ pub struct WebServerArgs { #[derive(Args, Clone)] pub struct SocketServerArgs { - #[arg(short, long, default_value = "localhost")] - pub addr: String, + #[arg(short, long)] + pub addr: IpAddr, #[arg(short = 'p', default_value_t = 8283)] pub port: u16, -} \ No newline at end of file + + #[arg(long, default_value_t = false)] + pub proxy_protocol: bool, +} diff --git a/src/ingest_socket_server/mod.rs b/src/ingest_socket_server/mod.rs index b93a9c8..56e6e1c 100644 --- a/src/ingest_socket_server/mod.rs +++ b/src/ingest_socket_server/mod.rs @@ -11,10 +11,120 @@ // } // } +use std::{fmt::Display, io::Read, net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, ops::ControlFlow}; + +use bytes::{buf, Buf, Bytes, BytesMut}; use fred::prelude::Client; +use ppp::v2::{self, Addresses, Command, Header, ParseError}; +use snafu::{whatever, ResultExt, Whatever}; +use tokio::{io::{AsyncReadExt, AsyncWriteExt, ReadBuf}, net::{TcpSocket, TcpStream}}; +use log::{*}; -use crate::cli::{Cli, SocketServerArgs}; +use crate::{business_logic::process_packet, cli::{Cli, SocketServerArgs}, ingest_protocol::{self, error::Error}, web_server::app_error::{AppError, StdIOSnafu}}; -pub async fn socketserv_main(args: Cli, specific_args: SocketServerArgs, client: Client) { - todo!() +pub async fn socketserv_main(args: Cli, specific_args: SocketServerArgs, client: Client) -> Result<(), AppError> { + return Ok(()); +} + +#[derive(Clone, PartialEq, Eq, Debug)] +pub struct AddressesUnpacked { + pub source: SocketAddr, + pub dest: SocketAddr +} + +pub fn get_sockaddr(addresses: Addresses) -> Option { + match addresses { + Addresses::IPv4(addr) => Some(SocketAddr::V4(SocketAddrV4::new(addr.source_address, addr.source_port))), + Addresses::IPv6(addr) => Some(SocketAddr::V6(SocketAddrV6::new(addr.source_address, addr.source_port, 0, 0))), + _ => None + } +} + +pub async fn tcp_proxy_handler(mut buffer: BytesMut, stream: &mut TcpStream, actual_origin: &mut SocketAddr, proxy_data: &mut Option) -> Result<(), AppError> { + 'proxy_loop: loop { + let len = stream.read(&mut buffer).await.context(StdIOSnafu)?; + + match v2::Header::try_from(buffer.as_ref()) { + Ok(header) => { + match header.command { + Command::Proxy => { + if let Some(new_socket_addr) = get_sockaddr(header.addresses) { + *actual_origin = new_socket_addr; + *proxy_data = Some(Bytes::copy_from_slice(&header.header)); + break 'proxy_loop; + } + }, + _ => { + // Продолажем работать как обычно (health check от прокси или т.п) + } + } + }, + Err(err) => { + + match err { + ParseError::Incomplete(_) => { + // Продолжаем заполнять буфер + + continue 'proxy_loop; + }, + _ => { + warn!("Получили неисправимую ошибку при парсинге proxy протокола. Убедитесь что никто не пытается подключиться к сервису напрямую без прокси. {error}", error = err); + + whatever!("No proxy headers detected"); + } + } + } + } + } + + buffer = buffer.split_off(proxy_data.as_ref().unwrap().len()); + + return Ok(()); +} + +pub async fn socketserv_tcp(args: Cli, specific_args: SocketServerArgs, client: Client) -> Result<(), AppError> { + // TODO: errors should not break the server + + let mut buffer = BytesMut::with_capacity(16 * 1024); + let socket = TcpSocket::new_v4().context(StdIOSnafu)?; + + socket.bind(SocketAddr::new(specific_args.addr, specific_args.port)).context(StdIOSnafu)?; + + let listener = socket.listen(256).context(StdIOSnafu)?; + + 'tcp_loop: while let Ok((mut stream, socketaddr)) = listener.accept().await { + // Нужно для правильного применения рейт лимита + let mut actual_origin = socketaddr; + let mut proxy_data = None; + + if specific_args.proxy_protocol { + tcp_proxy_handler(buffer.clone(), &mut stream, &mut actual_origin, &mut proxy_data).await?; + } + + 'protocol_loop: loop { + let len = stream.read(&mut buffer).await.context(StdIOSnafu)?; + + match ingest_protocol::parser::parse_packet(&buffer) { + Ok(packet) => { + let resp = process_packet(client.clone(), packet, proxy_data.clone()).await; + let _ = stream.write(resp.as_bytes()).await; + let _ = stream.shutdown().await; + }, + Err(err) if err == Error::Incomplete => { + continue 'protocol_loop; + }, + Err(err) => { + stream.write(format!("ERROR: {}", err).as_bytes()).await.context(StdIOSnafu)?; + continue 'tcp_loop; + } + } + } + } + + return Ok(()); + +} + +pub async fn socketserv_udp(args: Cli, specific_args: SocketServerArgs, client: Client) { + } diff --git a/src/main.rs b/src/main.rs index cc3fa32..a31ed14 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,4 +1,6 @@ #![doc = include_str!("../README.md")] +#![feature(try_blocks)] + extern crate core; mod business_logic; @@ -15,11 +17,13 @@ use fred::prelude::{ ReconnectPolicy, Server, ServerConfig, }; use ingest_socket_server::socketserv_main; +use web_server::app_error::AppError; use crate::web_server::server_main; +#[snafu::report] #[ntex::main] -async fn main() { +async fn main() -> Result<(), AppError> { let result = Cli::parse(); let mut config = RedisConfig::default(); @@ -40,7 +44,9 @@ async fn main() { server_main(result.clone(), specific_args.clone(), redis).await; } MyCommand::SocketServer(specific_args) => { - socketserv_main(result.clone(), specific_args.clone(), redis).await; + socketserv_main(result.clone(), specific_args.clone(), redis).await?; } }; + + return Ok(()); } diff --git a/src/utils/hifitime_serde.rs b/src/utils/hifitime_serde.rs index 16bcffc..4e605f8 100644 --- a/src/utils/hifitime_serde.rs +++ b/src/utils/hifitime_serde.rs @@ -65,9 +65,7 @@ impl<'de> Deserialize<'de> for EpochUTC { ) as f64) .into()); } - Ok( - Epoch::from_unix_seconds(value.parse().map_err(de::Error::custom)?).into(), - ) + Ok(Epoch::from_unix_seconds(value.parse().map_err(de::Error::custom)?).into()) } } diff --git a/src/utils/mod.rs b/src/utils/mod.rs index b7193a0..890ccb6 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -70,7 +70,7 @@ impl<'de> Deserialize<'de> for SupportedUnit { } /// Таблица преобразования текстового представления единиц в значения [SupportedUnit]. -static STR_TO_UNITS: phf::Map<&'static str, SupportedUnit> = phf_map! { +pub static STR_TO_UNITS: phf::Map<&'static str, SupportedUnit> = phf_map! { "C" => SupportedUnit::Celsius, "%" => SupportedUnit::Percentage, "mmHg" => SupportedUnit::MillimeterHg, @@ -85,6 +85,12 @@ static STR_TO_UNITS: phf::Map<&'static str, SupportedUnit> = phf_map! { "KWh" => SupportedUnit::KWh, }; -pub fn convert_to_arc(error: T) -> Arc { +pub fn convert_to_arc(error: T) -> Arc { Arc::new(error) } + +pub fn convert_to_arced_dynerror( + error: Option>, +) -> Option> { + error.map(|el| Arc::from(el)) +} diff --git a/src/web_server/app_error.rs b/src/web_server/app_error.rs index d13243c..78b39ed 100644 --- a/src/web_server/app_error.rs +++ b/src/web_server/app_error.rs @@ -8,42 +8,36 @@ use ntex::web::{HttpRequest, HttpResponse}; use snafu::Snafu; use crate::insert_header; -use crate::utils::convert_to_arc; +use crate::utils::{convert_to_arc, convert_to_arced_dynerror}; use crate::web_server::old_device_sensor_api::qs_parser::QSParserError; use rust_decimal::Decimal; use serde_json::json; use super::old_device_sensor_api::qs_parser; - - - /// Главный объект ошибки [std::error::Error] для всего Web API. /// /// В целом, все Result у Web сервера должны использовать этот Error. -#[derive(Debug, Snafu, Clone)] +#[derive(Debug, Snafu)] #[snafu(visibility(pub))] pub enum AppError { #[snafu(display("Could not read file"))] JsonError { #[snafu(source(from(serde_json::Error, convert_to_arc::)))] - source: Arc + source: Arc, }, #[snafu(display("Could not read file"))] - QSError { - source: QSParserError - }, + QSError { source: QSParserError }, #[snafu(display("Could not read file"))] - ServerRedisError { - source: fred::error::Error - }, + ServerRedisError { source: fred::error::Error }, + + #[snafu(display("Could not parse decimal"))] + Decimal { source: rust_decimal::Error }, #[snafu(display("Could not read file"))] - UnknownMethod { - method: String - }, + UnknownMethod { method: String }, #[snafu(display("Could not read file"))] RequestTooLarge, @@ -58,9 +52,10 @@ pub enum AppError { }, #[snafu(display("UTF-8 Error"))] - Utf8Error { - source: std::str::Utf8Error - }, + Utf8Error { source: std::str::Utf8Error }, + + #[snafu(display("String cannot be parced into a UUID"))] + Uuid { source: uuid::Error }, #[snafu(display("Could not read file"))] UnknownBody { @@ -69,12 +64,24 @@ pub enum AppError { }, #[snafu(display("Could not read file"))] - DeviceNotFound { - mac: String + DeviceNotFound { mac: String }, + + #[snafu(display("Std IO error"))] + StdIO { + #[snafu(source(from(std::io::Error, convert_to_arc::)))] + source: Arc, }, #[snafu(display("Could not read file"))] - TimeIsLongBehindNow + TimeIsLongBehindNow, + + #[snafu(display("Could not read file"))] + #[snafu(whatever)] + Whatever { + #[snafu(source(from(Box, Some)))] + source: Option>, + message: String, + }, } impl web::error::WebResponseError for AppError { @@ -90,7 +97,12 @@ impl web::error::WebResponseError for AppError { AppError::UnknownBody { .. } => StatusCode::BAD_REQUEST, AppError::QSError { .. } => StatusCode::BAD_REQUEST, AppError::DeviceNotFound { .. } => StatusCode::BAD_REQUEST, - AppError::TimeIsLongBehindNow { .. } => StatusCode::BAD_REQUEST + AppError::TimeIsLongBehindNow { .. } => StatusCode::BAD_REQUEST, + + // Не знаю что лучше тут использовать + AppError::Whatever { .. } => StatusCode::INTERNAL_SERVER_ERROR, + + _ => StatusCode::INTERNAL_SERVER_ERROR, } } @@ -108,7 +120,8 @@ impl web::error::WebResponseError for AppError { } AppError::QSError { .. } => "UrlEncoded body or query params are incorrect", AppError::DeviceNotFound { .. } => "Device not found", - AppError::TimeIsLongBehindNow { .. } => "Time is long behind what it should be" + AppError::TimeIsLongBehindNow { .. } => "Time is long behind what it should be", + _ => "Internal server error. Please make sure to tell the devs they are stupid :p", }; let status_code = self.status_code(); @@ -121,7 +134,6 @@ impl web::error::WebResponseError for AppError { let mut resp = HttpResponse::build(status_code).json(&body); let headers = resp.headers_mut(); - match self { AppError::JsonError { source } => { insert_header!(headers, "X-Error-Line", source.line()); @@ -132,7 +144,7 @@ impl web::error::WebResponseError for AppError { source.to_string().escape_default().collect::() ); } - AppError::UnknownMethod {method} => { + AppError::UnknownMethod { method } => { insert_header!( headers, "X-Unknown-Cmd", diff --git a/src/web_server/old_device_sensor_api/mod.rs b/src/web_server/old_device_sensor_api/mod.rs index 9f2053b..bcf1c65 100644 --- a/src/web_server/old_device_sensor_api/mod.rs +++ b/src/web_server/old_device_sensor_api/mod.rs @@ -20,17 +20,13 @@ use super::NMAppState; #[snafu(visibility(pub))] pub enum Error { #[snafu(display("Device not found"))] - DeviceNotFound { - mac: String - }, + DeviceNotFound { mac: String }, #[snafu(display("Time sent with the device is way too behind now"))] TimeIsLongBehindNow, #[snafu(display("{source}"))] - QSParser { - source: QSParserError - }, + QSParser { source: QSParserError }, } /// Обработчик данных датчиков с устройств.