Add narodmon format request builder
This commit is contained in:
parent
2ce161205c
commit
86dc8777bc
2 changed files with 36 additions and 3 deletions
15
main.py
15
main.py
|
@ -1,15 +1,26 @@
|
||||||
import socket
|
import socket
|
||||||
import urllib
|
import urllib
|
||||||
|
|
||||||
|
from functools import partial
|
||||||
from unittest import TestCase
|
from unittest import TestCase
|
||||||
|
|
||||||
from fixtures import device_mac, device_name, sensors
|
import models
|
||||||
|
import fixtures
|
||||||
|
|
||||||
|
|
||||||
TCP_ADDR = ('127.0.0.1', 8283)
|
TCP_ADDR = ('127.0.0.1', 8283)
|
||||||
UDP_ADDR = ('127.0.0.1', 8283)
|
UDP_ADDR = ('127.0.0.1', 8283)
|
||||||
HTTP_URL = 'http://127.0.0.1:8080'
|
HTTP_URL = 'http://127.0.0.1:8080'
|
||||||
|
|
||||||
|
nm_req = partial(
|
||||||
|
models.create_nm_req,
|
||||||
|
fixtures.device_mac,
|
||||||
|
fixtures.device_name,
|
||||||
|
fixtures.sensors,
|
||||||
|
)
|
||||||
|
|
||||||
|
json_req = None # TODO
|
||||||
|
|
||||||
|
|
||||||
class TestTCP(TestCase):
|
class TestTCP(TestCase):
|
||||||
|
|
||||||
|
@ -18,7 +29,7 @@ class TestTCP(TestCase):
|
||||||
self.sock.connect(TCP_ADDR)
|
self.sock.connect(TCP_ADDR)
|
||||||
|
|
||||||
def test_nm(self) -> None:
|
def test_nm(self) -> None:
|
||||||
self.sock.sendall(b'') # TODO
|
self.sock.sendall(nm_req().encode('utf-8'))
|
||||||
self._wait_for_ok()
|
self._wait_for_ok()
|
||||||
|
|
||||||
def test_json(self) -> None:
|
def test_json(self) -> None:
|
||||||
|
|
24
models.py
24
models.py
|
@ -1,5 +1,5 @@
|
||||||
from dataclasses import dataclass
|
from dataclasses import dataclass
|
||||||
from typing import Any
|
from typing import Any, Iterable
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
@dataclass
|
||||||
|
@ -8,3 +8,25 @@ class SensorData:
|
||||||
value: Any
|
value: Any
|
||||||
time: int | None = None
|
time: int | None = None
|
||||||
name: str | None = None
|
name: str | None = None
|
||||||
|
|
||||||
|
|
||||||
|
def create_nm_req(
|
||||||
|
dev_mac: str,
|
||||||
|
dev_name: str | None = None,
|
||||||
|
sensors: Iterable[SensorData] = ()) -> str:
|
||||||
|
'''Generate a NarodMon format (#mac#value) request'''
|
||||||
|
|
||||||
|
res = '#' + dev_mac
|
||||||
|
if dev_name is not None:
|
||||||
|
res += '#' + dev_name
|
||||||
|
res += '\n'
|
||||||
|
|
||||||
|
for s in sensors:
|
||||||
|
res += f'#{s.mac}#{s.value}'
|
||||||
|
if s.time is not None:
|
||||||
|
res += f'#{s.time}'
|
||||||
|
if s.name is not None:
|
||||||
|
res += '#' + s.name
|
||||||
|
res += '\n'
|
||||||
|
|
||||||
|
return res
|
||||||
|
|
Loading…
Add table
Reference in a new issue