style: rustfmt
This commit is contained in:
parent
812ac99e70
commit
d5488fd2eb
4 changed files with 92 additions and 47 deletions
|
@ -11,36 +11,66 @@
|
||||||
// }
|
// }
|
||||||
// }
|
// }
|
||||||
|
|
||||||
use std::{fmt::Display, io::Read, net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6}, ops::ControlFlow};
|
use std::{
|
||||||
|
fmt::Display,
|
||||||
|
io::Read,
|
||||||
|
net::{IpAddr, Ipv4Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
|
||||||
|
ops::ControlFlow,
|
||||||
|
};
|
||||||
|
|
||||||
use bytes::{buf, Buf, Bytes, BytesMut};
|
use bytes::{buf, Buf, Bytes, BytesMut};
|
||||||
use fred::prelude::Client;
|
use fred::prelude::Client;
|
||||||
|
use log::*;
|
||||||
use ppp::v2::{self, Addresses, Command, Header, ParseError};
|
use ppp::v2::{self, Addresses, Command, Header, ParseError};
|
||||||
use snafu::{whatever, ResultExt, Whatever};
|
use snafu::{whatever, ResultExt, Whatever};
|
||||||
use tokio::{io::{AsyncReadExt, AsyncWriteExt, ReadBuf}, net::{TcpSocket, TcpStream}};
|
use tokio::{
|
||||||
use log::{*};
|
io::{AsyncReadExt, AsyncWriteExt, ReadBuf},
|
||||||
|
net::{TcpSocket, TcpStream},
|
||||||
|
};
|
||||||
|
|
||||||
use crate::{business_logic::process_packet, cli::{Cli, SocketServerArgs}, ingest_protocol::{self, error::Error}, web_server::app_error::{AppError, StdIOSnafu}};
|
use crate::{
|
||||||
|
business_logic::process_packet,
|
||||||
|
cli::{Cli, SocketServerArgs},
|
||||||
|
ingest_protocol::{self, error::Error},
|
||||||
|
web_server::app_error::{AppError, StdIOSnafu},
|
||||||
|
};
|
||||||
|
|
||||||
pub async fn socketserv_main(args: Cli, specific_args: SocketServerArgs, client: Client) -> Result<(), AppError> {
|
pub async fn socketserv_main(
|
||||||
|
args: Cli,
|
||||||
|
specific_args: SocketServerArgs,
|
||||||
|
client: Client,
|
||||||
|
) -> Result<(), AppError> {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, PartialEq, Eq, Debug)]
|
#[derive(Clone, PartialEq, Eq, Debug)]
|
||||||
pub struct AddressesUnpacked {
|
pub struct AddressesUnpacked {
|
||||||
pub source: SocketAddr,
|
pub source: SocketAddr,
|
||||||
pub dest: SocketAddr
|
pub dest: SocketAddr,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_sockaddr(addresses: Addresses) -> Option<SocketAddr> {
|
pub fn get_sockaddr(addresses: Addresses) -> Option<SocketAddr> {
|
||||||
match addresses {
|
match addresses {
|
||||||
Addresses::IPv4(addr) => Some(SocketAddr::V4(SocketAddrV4::new(addr.source_address, addr.source_port))),
|
Addresses::IPv4(addr) => Some(SocketAddr::V4(SocketAddrV4::new(
|
||||||
Addresses::IPv6(addr) => Some(SocketAddr::V6(SocketAddrV6::new(addr.source_address, addr.source_port, 0, 0))),
|
addr.source_address,
|
||||||
_ => None
|
addr.source_port,
|
||||||
|
))),
|
||||||
|
Addresses::IPv6(addr) => Some(SocketAddr::V6(SocketAddrV6::new(
|
||||||
|
addr.source_address,
|
||||||
|
addr.source_port,
|
||||||
|
0,
|
||||||
|
0,
|
||||||
|
))),
|
||||||
|
_ => None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn tcp_proxy_handler(mut buffer: BytesMut, stream: &mut TcpStream, actual_origin: &mut SocketAddr, proxy_data: &mut Option<Bytes>) -> Result<(), AppError> {
|
pub async fn tcp_proxy_handler(
|
||||||
|
mut buffer: BytesMut,
|
||||||
|
stream: &mut TcpStream,
|
||||||
|
actual_origin: &mut SocketAddr,
|
||||||
|
proxy_data: &mut Option<Bytes>,
|
||||||
|
) -> Result<(), AppError> {
|
||||||
'proxy_loop: loop {
|
'proxy_loop: loop {
|
||||||
let len = stream.read(&mut buffer).await.context(StdIOSnafu)?;
|
let len = stream.read(&mut buffer).await.context(StdIOSnafu)?;
|
||||||
|
|
||||||
|
@ -53,20 +83,19 @@ pub async fn tcp_proxy_handler(mut buffer: BytesMut, stream: &mut TcpStream, act
|
||||||
*proxy_data = Some(Bytes::copy_from_slice(&header.header));
|
*proxy_data = Some(Bytes::copy_from_slice(&header.header));
|
||||||
break 'proxy_loop;
|
break 'proxy_loop;
|
||||||
}
|
}
|
||||||
},
|
}
|
||||||
_ => {
|
_ => {
|
||||||
// Продолажем работать как обычно (health check от прокси или т.п)
|
// Продолажем работать как обычно (health check от прокси или т.п)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
|
|
||||||
match err {
|
match err {
|
||||||
ParseError::Incomplete(_) => {
|
ParseError::Incomplete(_) => {
|
||||||
// Продолжаем заполнять буфер
|
// Продолжаем заполнять буфер
|
||||||
|
|
||||||
continue 'proxy_loop;
|
continue 'proxy_loop;
|
||||||
},
|
}
|
||||||
_ => {
|
_ => {
|
||||||
warn!("Получили неисправимую ошибку при парсинге proxy протокола. Убедитесь что никто не пытается подключиться к сервису напрямую без прокси. {error}", error = err);
|
warn!("Получили неисправимую ошибку при парсинге proxy протокола. Убедитесь что никто не пытается подключиться к сервису напрямую без прокси. {error}", error = err);
|
||||||
|
|
||||||
|
@ -82,13 +111,19 @@ pub async fn tcp_proxy_handler(mut buffer: BytesMut, stream: &mut TcpStream, act
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn socketserv_tcp(args: Cli, specific_args: SocketServerArgs, client: Client) -> Result<(), AppError> {
|
pub async fn socketserv_tcp(
|
||||||
|
args: Cli,
|
||||||
|
specific_args: SocketServerArgs,
|
||||||
|
client: Client,
|
||||||
|
) -> Result<(), AppError> {
|
||||||
// TODO: errors should not break the server
|
// TODO: errors should not break the server
|
||||||
|
|
||||||
let mut buffer = BytesMut::with_capacity(16 * 1024);
|
let mut buffer = BytesMut::with_capacity(16 * 1024);
|
||||||
let socket = TcpSocket::new_v4().context(StdIOSnafu)?;
|
let socket = TcpSocket::new_v4().context(StdIOSnafu)?;
|
||||||
|
|
||||||
socket.bind(SocketAddr::new(specific_args.addr, specific_args.port)).context(StdIOSnafu)?;
|
socket
|
||||||
|
.bind(SocketAddr::new(specific_args.addr, specific_args.port))
|
||||||
|
.context(StdIOSnafu)?;
|
||||||
|
|
||||||
let listener = socket.listen(256).context(StdIOSnafu)?;
|
let listener = socket.listen(256).context(StdIOSnafu)?;
|
||||||
|
|
||||||
|
@ -98,7 +133,13 @@ pub async fn socketserv_tcp(args: Cli, specific_args: SocketServerArgs, client:
|
||||||
let mut proxy_data = None;
|
let mut proxy_data = None;
|
||||||
|
|
||||||
if specific_args.proxy_protocol {
|
if specific_args.proxy_protocol {
|
||||||
tcp_proxy_handler(buffer.clone(), &mut stream, &mut actual_origin, &mut proxy_data).await?;
|
tcp_proxy_handler(
|
||||||
|
buffer.clone(),
|
||||||
|
&mut stream,
|
||||||
|
&mut actual_origin,
|
||||||
|
&mut proxy_data,
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
'protocol_loop: loop {
|
'protocol_loop: loop {
|
||||||
|
@ -109,12 +150,15 @@ pub async fn socketserv_tcp(args: Cli, specific_args: SocketServerArgs, client:
|
||||||
let resp = process_packet(client.clone(), packet, proxy_data.clone()).await;
|
let resp = process_packet(client.clone(), packet, proxy_data.clone()).await;
|
||||||
let _ = stream.write(resp.as_bytes()).await;
|
let _ = stream.write(resp.as_bytes()).await;
|
||||||
let _ = stream.shutdown().await;
|
let _ = stream.shutdown().await;
|
||||||
},
|
}
|
||||||
Err(err) if err == Error::Incomplete => {
|
Err(err) if err == Error::Incomplete => {
|
||||||
continue 'protocol_loop;
|
continue 'protocol_loop;
|
||||||
},
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
stream.write(format!("ERROR: {}", err).as_bytes()).await.context(StdIOSnafu)?;
|
stream
|
||||||
|
.write(format!("ERROR: {}", err).as_bytes())
|
||||||
|
.await
|
||||||
|
.context(StdIOSnafu)?;
|
||||||
continue 'tcp_loop;
|
continue 'tcp_loop;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -122,9 +166,6 @@ pub async fn socketserv_tcp(args: Cli, specific_args: SocketServerArgs, client:
|
||||||
}
|
}
|
||||||
|
|
||||||
return Ok(());
|
return Ok(());
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn socketserv_udp(args: Cli, specific_args: SocketServerArgs, client: Client) {
|
pub async fn socketserv_udp(args: Cli, specific_args: SocketServerArgs, client: Client) {}
|
||||||
|
|
||||||
}
|
|
||||||
|
|
|
@ -3,14 +3,14 @@
|
||||||
mod handlers;
|
mod handlers;
|
||||||
mod types;
|
mod types;
|
||||||
|
|
||||||
use ntex::web::types::State;
|
|
||||||
use ntex::util::Bytes;
|
|
||||||
use ntex::web;
|
|
||||||
use crate::web_server::app_error::AppError;
|
use crate::web_server::app_error::AppError;
|
||||||
use crate::web_server::NMAppState;
|
|
||||||
use crate::web_server::old_app_api::handlers::{app_init, version};
|
use crate::web_server::old_app_api::handlers::{app_init, version};
|
||||||
use crate::web_server::old_app_api::types::{AppInitRequest, MandatoryParams};
|
use crate::web_server::old_app_api::types::{AppInitRequest, MandatoryParams};
|
||||||
use crate::web_server::utils::redis::is_api_key_valid;
|
use crate::web_server::utils::redis::is_api_key_valid;
|
||||||
|
use crate::web_server::NMAppState;
|
||||||
|
use ntex::util::Bytes;
|
||||||
|
use ntex::web;
|
||||||
|
use ntex::web::types::State;
|
||||||
use ntex::web::HttpRequest;
|
use ntex::web::HttpRequest;
|
||||||
use snafu::{whatever, ResultExt};
|
use snafu::{whatever, ResultExt};
|
||||||
|
|
||||||
|
@ -60,7 +60,9 @@ pub async fn old_api_handler(
|
||||||
|
|
||||||
app_init(body, &app_state).await
|
app_init(body, &app_state).await
|
||||||
}
|
}
|
||||||
_ => Err(AppError::UnknownMethod { method: mandatory_params.cmd.to_string() }),
|
_ => Err(AppError::UnknownMethod {
|
||||||
|
method: mandatory_params.cmd.to_string(),
|
||||||
|
}),
|
||||||
}
|
}
|
||||||
|
|
||||||
//Ok("fuck")
|
//Ok("fuck")
|
||||||
|
|
|
@ -19,23 +19,19 @@ pub enum QSParserError {
|
||||||
#[snafu(display("asd"))]
|
#[snafu(display("asd"))]
|
||||||
SerdeQS {
|
SerdeQS {
|
||||||
#[snafu(source(from(serde_qs::Error, convert_to_arc::<serde_qs::Error>)))]
|
#[snafu(source(from(serde_qs::Error, convert_to_arc::<serde_qs::Error>)))]
|
||||||
source: Arc<serde_qs::Error>
|
source: Arc<serde_qs::Error>,
|
||||||
},
|
},
|
||||||
#[snafu(display("asd"))]
|
#[snafu(display("asd"))]
|
||||||
Parsing {
|
Parsing { context: String },
|
||||||
context: String
|
|
||||||
},
|
|
||||||
|
|
||||||
#[snafu(display("asd"))]
|
#[snafu(display("asd"))]
|
||||||
FloatP {
|
FloatP {
|
||||||
#[snafu(source)]
|
#[snafu(source)]
|
||||||
source: ParseFloatError
|
source: ParseFloatError,
|
||||||
},
|
},
|
||||||
|
|
||||||
#[snafu(display("failed to parse into decimal"))]
|
#[snafu(display("failed to parse into decimal"))]
|
||||||
DecimalParse {
|
DecimalParse { source: rust_decimal::Error },
|
||||||
source: rust_decimal::Error
|
|
||||||
},
|
|
||||||
|
|
||||||
#[snafu(display("asd"))]
|
#[snafu(display("asd"))]
|
||||||
NoMAC,
|
NoMAC,
|
||||||
|
@ -43,7 +39,9 @@ pub enum QSParserError {
|
||||||
|
|
||||||
impl From<Error<&str>> for QSParserError {
|
impl From<Error<&str>> for QSParserError {
|
||||||
fn from(value: Error<&str>) -> Self {
|
fn from(value: Error<&str>) -> Self {
|
||||||
QSParserError::Parsing { context: format!("{:?}", value)}
|
QSParserError::Parsing {
|
||||||
|
context: format!("{:?}", value),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -66,7 +64,7 @@ pub fn qs_rest_to_values(
|
||||||
for (key, value) in parsed {
|
for (key, value) in parsed {
|
||||||
hashset.insert(SensorValue {
|
hashset.insert(SensorValue {
|
||||||
mac: key,
|
mac: key,
|
||||||
value: Decimal::from_str(value.as_str()).context(DecimalParseSnafu{})?,
|
value: Decimal::from_str(value.as_str()).context(DecimalParseSnafu {})?,
|
||||||
|
|
||||||
time: None,
|
time: None,
|
||||||
unit: None,
|
unit: None,
|
||||||
|
@ -82,7 +80,9 @@ pub fn parse_decimal_if_exists(
|
||||||
key: &str,
|
key: &str,
|
||||||
) -> Result<Option<Decimal>, QSParserError> {
|
) -> Result<Option<Decimal>, QSParserError> {
|
||||||
if let Some(unwrapped_value) = parsed.remove(key) {
|
if let Some(unwrapped_value) = parsed.remove(key) {
|
||||||
Ok(Some(Decimal::from_str(unwrapped_value.as_str()).context(DecimalParseSnafu{})?))
|
Ok(Some(
|
||||||
|
Decimal::from_str(unwrapped_value.as_str()).context(DecimalParseSnafu {})?,
|
||||||
|
))
|
||||||
} else {
|
} else {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
|
@ -93,14 +93,16 @@ pub fn parse_epoch_if_exists(
|
||||||
key: &str,
|
key: &str,
|
||||||
) -> Result<Option<Epoch>, QSParserError> {
|
) -> Result<Option<Epoch>, QSParserError> {
|
||||||
if let Some(unwrapped_value) = parsed.remove(key) {
|
if let Some(unwrapped_value) = parsed.remove(key) {
|
||||||
Ok(Some(Epoch::from_unix_seconds(unwrapped_value.parse().context(FloatPSnafu{})?)))
|
Ok(Some(Epoch::from_unix_seconds(
|
||||||
|
unwrapped_value.parse().context(FloatPSnafu {})?,
|
||||||
|
)))
|
||||||
} else {
|
} else {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn parse_nm_qs_format(input: &str) -> Result<NMDeviceDataPacket, QSParserError> {
|
pub async fn parse_nm_qs_format(input: &str) -> Result<NMDeviceDataPacket, QSParserError> {
|
||||||
let mut parsed: HashMap<String, String> = serde_qs::from_str(input).context(SerdeQSSnafu{})?;
|
let mut parsed: HashMap<String, String> = serde_qs::from_str(input).context(SerdeQSSnafu {})?;
|
||||||
|
|
||||||
let (_, device_mac) = if let Some(id) = parsed.get("ID") {
|
let (_, device_mac) = if let Some(id) = parsed.get("ID") {
|
||||||
parse_mac_address(id)?
|
parse_mac_address(id)?
|
||||||
|
|
Loading…
Add table
Reference in a new issue