dev-nm17-2 #25
7 changed files with 177 additions and 44 deletions
|
@ -1,3 +1,5 @@
|
||||||
|
use std::net::IpAddr;
|
||||||
|
|
||||||
use clap::{Args, Parser, Subcommand};
|
use clap::{Args, Parser, Subcommand};
|
||||||
|
|
||||||
#[derive(Parser, Clone)]
|
#[derive(Parser, Clone)]
|
||||||
|
@ -32,9 +34,12 @@ pub struct WebServerArgs {
|
||||||
|
|
||||||
#[derive(Args, Clone)]
|
#[derive(Args, Clone)]
|
||||||
pub struct SocketServerArgs {
|
pub struct SocketServerArgs {
|
||||||
#[arg(short, long, default_value = "localhost")]
|
#[arg(short, long)]
|
||||||
pub addr: String,
|
pub addr: IpAddr,
|
||||||
|
|
||||||
#[arg(short = 'p', default_value_t = 8283)]
|
#[arg(short = 'p', default_value_t = 8283)]
|
||||||
pub port: u16,
|
pub port: u16,
|
||||||
|
|
||||||
|
#[arg(long, default_value_t = false)]
|
||||||
|
pub proxy_protocol: bool,
|
||||||
}
|
}
|
|
@ -11,10 +11,120 @@
|
||||||
// }
|
// }
|
||||||
// }
|
// }
|
||||||
|
|
||||||
|
use std::{fmt::Display, io::Read, net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, ops::ControlFlow};
|
||||||
|
|
||||||
|
use bytes::{buf, Buf, Bytes, BytesMut};
|
||||||
use fred::prelude::Client;
|
use fred::prelude::Client;
|
||||||
|
use ppp::v2::{self, Addresses, Command, Header, ParseError};
|
||||||
|
use snafu::{whatever, ResultExt, Whatever};
|
||||||
|
use tokio::{io::{AsyncReadExt, AsyncWriteExt, ReadBuf}, net::{TcpSocket, TcpStream}};
|
||||||
|
use log::{*};
|
||||||
|
|
||||||
use crate::cli::{Cli, SocketServerArgs};
|
use crate::{business_logic::process_packet, cli::{Cli, SocketServerArgs}, ingest_protocol::{self, error::Error}, web_server::app_error::{AppError, StdIOSnafu}};
|
||||||
|
|
||||||
|
pub async fn socketserv_main(args: Cli, specific_args: SocketServerArgs, client: Client) -> Result<(), AppError> {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, PartialEq, Eq, Debug)]
|
||||||
|
pub struct AddressesUnpacked {
|
||||||
|
pub source: SocketAddr,
|
||||||
|
pub dest: SocketAddr
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_sockaddr(addresses: Addresses) -> Option<SocketAddr> {
|
||||||
|
match addresses {
|
||||||
|
Addresses::IPv4(addr) => Some(SocketAddr::V4(SocketAddrV4::new(addr.source_address, addr.source_port))),
|
||||||
|
Addresses::IPv6(addr) => Some(SocketAddr::V6(SocketAddrV6::new(addr.source_address, addr.source_port, 0, 0))),
|
||||||
|
_ => None
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn tcp_proxy_handler(mut buffer: BytesMut, stream: &mut TcpStream, actual_origin: &mut SocketAddr, proxy_data: &mut Option<Bytes>) -> Result<(), AppError> {
|
||||||
|
'proxy_loop: loop {
|
||||||
|
let len = stream.read(&mut buffer).await.context(StdIOSnafu)?;
|
||||||
|
|
||||||
|
match v2::Header::try_from(buffer.as_ref()) {
|
||||||
|
Ok(header) => {
|
||||||
|
match header.command {
|
||||||
|
Command::Proxy => {
|
||||||
|
if let Some(new_socket_addr) = get_sockaddr(header.addresses) {
|
||||||
|
*actual_origin = new_socket_addr;
|
||||||
|
*proxy_data = Some(Bytes::copy_from_slice(&header.header));
|
||||||
|
break 'proxy_loop;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
// Продолажем работать как обычно (health check от прокси или т.п)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
|
||||||
|
match err {
|
||||||
|
ParseError::Incomplete(_) => {
|
||||||
|
// Продолжаем заполнять буфер
|
||||||
|
|
||||||
|
continue 'proxy_loop;
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
warn!("Получили неисправимую ошибку при парсинге proxy протокола. Убедитесь что никто не пытается подключиться к сервису напрямую без прокси. {error}", error = err);
|
||||||
|
|
||||||
|
whatever!("No proxy headers detected");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
buffer = buffer.split_off(proxy_data.as_ref().unwrap().len());
|
||||||
|
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn socketserv_tcp(args: Cli, specific_args: SocketServerArgs, client: Client) -> Result<(), AppError> {
|
||||||
|
// TODO: errors should not break the server
|
||||||
|
|
||||||
|
let mut buffer = BytesMut::with_capacity(16 * 1024);
|
||||||
|
let socket = TcpSocket::new_v4().context(StdIOSnafu)?;
|
||||||
|
|
||||||
|
socket.bind(SocketAddr::new(specific_args.addr, specific_args.port)).context(StdIOSnafu)?;
|
||||||
|
|
||||||
|
let listener = socket.listen(256).context(StdIOSnafu)?;
|
||||||
|
|
||||||
|
'tcp_loop: while let Ok((mut stream, socketaddr)) = listener.accept().await {
|
||||||
|
// Нужно для правильного применения рейт лимита
|
||||||
|
let mut actual_origin = socketaddr;
|
||||||
|
let mut proxy_data = None;
|
||||||
|
|
||||||
|
if specific_args.proxy_protocol {
|
||||||
|
tcp_proxy_handler(buffer.clone(), &mut stream, &mut actual_origin, &mut proxy_data).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
'protocol_loop: loop {
|
||||||
|
let len = stream.read(&mut buffer).await.context(StdIOSnafu)?;
|
||||||
|
|
||||||
|
match ingest_protocol::parser::parse_packet(&buffer) {
|
||||||
|
Ok(packet) => {
|
||||||
|
let resp = process_packet(client.clone(), packet, proxy_data.clone()).await;
|
||||||
|
let _ = stream.write(resp.as_bytes()).await;
|
||||||
|
let _ = stream.shutdown().await;
|
||||||
|
},
|
||||||
|
Err(err) if err == Error::Incomplete => {
|
||||||
|
continue 'protocol_loop;
|
||||||
|
},
|
||||||
|
Err(err) => {
|
||||||
|
stream.write(format!("ERROR: {}", err).as_bytes()).await.context(StdIOSnafu)?;
|
||||||
|
continue 'tcp_loop;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return Ok(());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn socketserv_udp(args: Cli, specific_args: SocketServerArgs, client: Client) {
|
||||||
|
|
||||||
pub async fn socketserv_main(args: Cli, specific_args: SocketServerArgs, client: Client) {
|
|
||||||
todo!()
|
|
||||||
}
|
}
|
||||||
|
|
10
src/main.rs
10
src/main.rs
|
@ -1,4 +1,6 @@
|
||||||
#![doc = include_str!("../README.md")]
|
#![doc = include_str!("../README.md")]
|
||||||
|
#![feature(try_blocks)]
|
||||||
|
|
||||||
extern crate core;
|
extern crate core;
|
||||||
|
|
||||||
mod business_logic;
|
mod business_logic;
|
||||||
|
@ -15,11 +17,13 @@ use fred::prelude::{
|
||||||
ReconnectPolicy, Server, ServerConfig,
|
ReconnectPolicy, Server, ServerConfig,
|
||||||
};
|
};
|
||||||
use ingest_socket_server::socketserv_main;
|
use ingest_socket_server::socketserv_main;
|
||||||
|
use web_server::app_error::AppError;
|
||||||
|
|
||||||
use crate::web_server::server_main;
|
use crate::web_server::server_main;
|
||||||
|
|
||||||
|
#[snafu::report]
|
||||||
#[ntex::main]
|
#[ntex::main]
|
||||||
async fn main() {
|
async fn main() -> Result<(), AppError> {
|
||||||
let result = Cli::parse();
|
let result = Cli::parse();
|
||||||
|
|
||||||
let mut config = RedisConfig::default();
|
let mut config = RedisConfig::default();
|
||||||
|
@ -40,7 +44,9 @@ async fn main() {
|
||||||
server_main(result.clone(), specific_args.clone(), redis).await;
|
server_main(result.clone(), specific_args.clone(), redis).await;
|
||||||
}
|
}
|
||||||
MyCommand::SocketServer(specific_args) => {
|
MyCommand::SocketServer(specific_args) => {
|
||||||
socketserv_main(result.clone(), specific_args.clone(), redis).await;
|
socketserv_main(result.clone(), specific_args.clone(), redis).await?;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,9 +65,7 @@ impl<'de> Deserialize<'de> for EpochUTC {
|
||||||
) as f64)
|
) as f64)
|
||||||
.into());
|
.into());
|
||||||
}
|
}
|
||||||
Ok(
|
Ok(Epoch::from_unix_seconds(value.parse().map_err(de::Error::custom)?).into())
|
||||||
Epoch::from_unix_seconds(value.parse().map_err(de::Error::custom)?).into(),
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -70,7 +70,7 @@ impl<'de> Deserialize<'de> for SupportedUnit {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Таблица преобразования текстового представления единиц в значения [SupportedUnit].
|
/// Таблица преобразования текстового представления единиц в значения [SupportedUnit].
|
||||||
static STR_TO_UNITS: phf::Map<&'static str, SupportedUnit> = phf_map! {
|
pub static STR_TO_UNITS: phf::Map<&'static str, SupportedUnit> = phf_map! {
|
||||||
"C" => SupportedUnit::Celsius,
|
"C" => SupportedUnit::Celsius,
|
||||||
"%" => SupportedUnit::Percentage,
|
"%" => SupportedUnit::Percentage,
|
||||||
"mmHg" => SupportedUnit::MillimeterHg,
|
"mmHg" => SupportedUnit::MillimeterHg,
|
||||||
|
@ -85,6 +85,12 @@ static STR_TO_UNITS: phf::Map<&'static str, SupportedUnit> = phf_map! {
|
||||||
"KWh" => SupportedUnit::KWh,
|
"KWh" => SupportedUnit::KWh,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub fn convert_to_arc<T: std::error::Error>(error: T) -> Arc<T> {
|
pub fn convert_to_arc<T: core::error::Error>(error: T) -> Arc<T> {
|
||||||
Arc::new(error)
|
Arc::new(error)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn convert_to_arced_dynerror(
|
||||||
|
error: Option<Box<dyn core::error::Error>>,
|
||||||
|
) -> Option<Arc<dyn core::error::Error>> {
|
||||||
|
error.map(|el| Arc::from(el))
|
||||||
|
}
|
||||||
|
|
|
@ -8,42 +8,36 @@ use ntex::web::{HttpRequest, HttpResponse};
|
||||||
use snafu::Snafu;
|
use snafu::Snafu;
|
||||||
|
|
||||||
use crate::insert_header;
|
use crate::insert_header;
|
||||||
use crate::utils::convert_to_arc;
|
use crate::utils::{convert_to_arc, convert_to_arced_dynerror};
|
||||||
use crate::web_server::old_device_sensor_api::qs_parser::QSParserError;
|
use crate::web_server::old_device_sensor_api::qs_parser::QSParserError;
|
||||||
use rust_decimal::Decimal;
|
use rust_decimal::Decimal;
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
|
|
||||||
use super::old_device_sensor_api::qs_parser;
|
use super::old_device_sensor_api::qs_parser;
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/// Главный объект ошибки [std::error::Error] для всего Web API.
|
/// Главный объект ошибки [std::error::Error] для всего Web API.
|
||||||
///
|
///
|
||||||
/// В целом, все Result у Web сервера должны использовать этот Error.
|
/// В целом, все Result у Web сервера должны использовать этот Error.
|
||||||
#[derive(Debug, Snafu, Clone)]
|
#[derive(Debug, Snafu)]
|
||||||
#[snafu(visibility(pub))]
|
#[snafu(visibility(pub))]
|
||||||
pub enum AppError {
|
pub enum AppError {
|
||||||
#[snafu(display("Could not read file"))]
|
#[snafu(display("Could not read file"))]
|
||||||
JsonError {
|
JsonError {
|
||||||
#[snafu(source(from(serde_json::Error, convert_to_arc::<serde_json::Error>)))]
|
#[snafu(source(from(serde_json::Error, convert_to_arc::<serde_json::Error>)))]
|
||||||
source: Arc<serde_json::Error>
|
source: Arc<serde_json::Error>,
|
||||||
},
|
},
|
||||||
|
|
||||||
#[snafu(display("Could not read file"))]
|
#[snafu(display("Could not read file"))]
|
||||||
QSError {
|
QSError { source: QSParserError },
|
||||||
source: QSParserError
|
|
||||||
},
|
|
||||||
|
|
||||||
#[snafu(display("Could not read file"))]
|
#[snafu(display("Could not read file"))]
|
||||||
ServerRedisError {
|
ServerRedisError { source: fred::error::Error },
|
||||||
source: fred::error::Error
|
|
||||||
},
|
#[snafu(display("Could not parse decimal"))]
|
||||||
|
Decimal { source: rust_decimal::Error },
|
||||||
|
|
||||||
#[snafu(display("Could not read file"))]
|
#[snafu(display("Could not read file"))]
|
||||||
UnknownMethod {
|
UnknownMethod { method: String },
|
||||||
method: String
|
|
||||||
},
|
|
||||||
|
|
||||||
#[snafu(display("Could not read file"))]
|
#[snafu(display("Could not read file"))]
|
||||||
RequestTooLarge,
|
RequestTooLarge,
|
||||||
|
@ -58,9 +52,10 @@ pub enum AppError {
|
||||||
},
|
},
|
||||||
|
|
||||||
#[snafu(display("UTF-8 Error"))]
|
#[snafu(display("UTF-8 Error"))]
|
||||||
Utf8Error {
|
Utf8Error { source: std::str::Utf8Error },
|
||||||
source: std::str::Utf8Error
|
|
||||||
},
|
#[snafu(display("String cannot be parced into a UUID"))]
|
||||||
|
Uuid { source: uuid::Error },
|
||||||
|
|
||||||
#[snafu(display("Could not read file"))]
|
#[snafu(display("Could not read file"))]
|
||||||
UnknownBody {
|
UnknownBody {
|
||||||
|
@ -69,12 +64,24 @@ pub enum AppError {
|
||||||
},
|
},
|
||||||
|
|
||||||
#[snafu(display("Could not read file"))]
|
#[snafu(display("Could not read file"))]
|
||||||
DeviceNotFound {
|
DeviceNotFound { mac: String },
|
||||||
mac: String
|
|
||||||
|
#[snafu(display("Std IO error"))]
|
||||||
|
StdIO {
|
||||||
|
#[snafu(source(from(std::io::Error, convert_to_arc::<std::io::Error>)))]
|
||||||
|
source: Arc<std::io::Error>,
|
||||||
},
|
},
|
||||||
|
|
||||||
#[snafu(display("Could not read file"))]
|
#[snafu(display("Could not read file"))]
|
||||||
TimeIsLongBehindNow
|
TimeIsLongBehindNow,
|
||||||
|
|
||||||
|
#[snafu(display("Could not read file"))]
|
||||||
|
#[snafu(whatever)]
|
||||||
|
Whatever {
|
||||||
|
#[snafu(source(from(Box<dyn std::error::Error>, Some)))]
|
||||||
|
source: Option<Box<dyn core::error::Error>>,
|
||||||
|
message: String,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
impl web::error::WebResponseError for AppError {
|
impl web::error::WebResponseError for AppError {
|
||||||
|
@ -90,7 +97,12 @@ impl web::error::WebResponseError for AppError {
|
||||||
AppError::UnknownBody { .. } => StatusCode::BAD_REQUEST,
|
AppError::UnknownBody { .. } => StatusCode::BAD_REQUEST,
|
||||||
AppError::QSError { .. } => StatusCode::BAD_REQUEST,
|
AppError::QSError { .. } => StatusCode::BAD_REQUEST,
|
||||||
AppError::DeviceNotFound { .. } => StatusCode::BAD_REQUEST,
|
AppError::DeviceNotFound { .. } => StatusCode::BAD_REQUEST,
|
||||||
AppError::TimeIsLongBehindNow { .. } => StatusCode::BAD_REQUEST
|
AppError::TimeIsLongBehindNow { .. } => StatusCode::BAD_REQUEST,
|
||||||
|
|
||||||
|
// Не знаю что лучше тут использовать
|
||||||
|
AppError::Whatever { .. } => StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
|
||||||
|
_ => StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,7 +120,8 @@ impl web::error::WebResponseError for AppError {
|
||||||
}
|
}
|
||||||
AppError::QSError { .. } => "UrlEncoded body or query params are incorrect",
|
AppError::QSError { .. } => "UrlEncoded body or query params are incorrect",
|
||||||
AppError::DeviceNotFound { .. } => "Device not found",
|
AppError::DeviceNotFound { .. } => "Device not found",
|
||||||
AppError::TimeIsLongBehindNow { .. } => "Time is long behind what it should be"
|
AppError::TimeIsLongBehindNow { .. } => "Time is long behind what it should be",
|
||||||
|
_ => "Internal server error. Please make sure to tell the devs they are stupid :p",
|
||||||
};
|
};
|
||||||
|
|
||||||
let status_code = self.status_code();
|
let status_code = self.status_code();
|
||||||
|
@ -121,7 +134,6 @@ impl web::error::WebResponseError for AppError {
|
||||||
let mut resp = HttpResponse::build(status_code).json(&body);
|
let mut resp = HttpResponse::build(status_code).json(&body);
|
||||||
let headers = resp.headers_mut();
|
let headers = resp.headers_mut();
|
||||||
|
|
||||||
|
|
||||||
match self {
|
match self {
|
||||||
AppError::JsonError { source } => {
|
AppError::JsonError { source } => {
|
||||||
insert_header!(headers, "X-Error-Line", source.line());
|
insert_header!(headers, "X-Error-Line", source.line());
|
||||||
|
@ -132,7 +144,7 @@ impl web::error::WebResponseError for AppError {
|
||||||
source.to_string().escape_default().collect::<String>()
|
source.to_string().escape_default().collect::<String>()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
AppError::UnknownMethod {method} => {
|
AppError::UnknownMethod { method } => {
|
||||||
insert_header!(
|
insert_header!(
|
||||||
headers,
|
headers,
|
||||||
"X-Unknown-Cmd",
|
"X-Unknown-Cmd",
|
||||||
|
|
|
@ -20,17 +20,13 @@ use super::NMAppState;
|
||||||
#[snafu(visibility(pub))]
|
#[snafu(visibility(pub))]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
#[snafu(display("Device not found"))]
|
#[snafu(display("Device not found"))]
|
||||||
DeviceNotFound {
|
DeviceNotFound { mac: String },
|
||||||
mac: String
|
|
||||||
},
|
|
||||||
|
|
||||||
#[snafu(display("Time sent with the device is way too behind now"))]
|
#[snafu(display("Time sent with the device is way too behind now"))]
|
||||||
TimeIsLongBehindNow,
|
TimeIsLongBehindNow,
|
||||||
|
|
||||||
#[snafu(display("{source}"))]
|
#[snafu(display("{source}"))]
|
||||||
QSParser {
|
QSParser { source: QSParserError },
|
||||||
source: QSParserError
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Обработчик данных датчиков с устройств.
|
/// Обработчик данных датчиков с устройств.
|
||||||
|
|
Loading…
Add table
Reference in a new issue