80 lines
1.9 KiB
Python
80 lines
1.9 KiB
Python
import socket
|
|
from urllib.request import urlopen, Request
|
|
|
|
import unittest
|
|
from unittest import TestCase
|
|
|
|
from fixtures import device
|
|
|
|
|
|
TCP_ADDR = ('127.0.0.1', 8283)
|
|
UDP_ADDR = ('127.0.0.1', 8283)
|
|
HTTP_URL = 'http://127.0.0.1:8080'
|
|
|
|
|
|
class TestTCP(TestCase):
|
|
|
|
def setUp(self) -> None:
|
|
self.sock = socket.socket()
|
|
self.sock.connect(TCP_ADDR)
|
|
|
|
def test_nm(self) -> None:
|
|
self.sock.sendall(device.nm_req_data().encode())
|
|
self._wait_for_ok()
|
|
|
|
def test_json(self) -> None:
|
|
self.sock.sendall(device.json_req_data().encode())
|
|
self._wait_for_ok()
|
|
|
|
def _wait_for_ok(self) -> None:
|
|
data = self.sock.recv(1024)
|
|
self.assertEqual(data, b'OK')
|
|
|
|
def tearDown(self) -> None:
|
|
self.sock.close()
|
|
|
|
|
|
class TestUDP(TestCase):
|
|
|
|
def setUp(self) -> None:
|
|
self.sock = socket.socket(type=socket.SOCK_DGRAM)
|
|
|
|
def test_nm(self) -> None:
|
|
self.sock.sendto(device.nm_req_data().encode(), UDP_ADDR)
|
|
# aaand we can not check if server accepted the data
|
|
# so this TestCase is absolutely useless
|
|
# until client API is implemented
|
|
|
|
def test_json(self) -> None:
|
|
self.sock.sendto(device.json_req_data().encode(), UDP_ADDR)
|
|
|
|
def tearDown(self) -> None:
|
|
self.sock.close()
|
|
|
|
|
|
class TestHTTP(TestCase):
|
|
|
|
def test_get(self) -> None:
|
|
self._send_and_check(Request(
|
|
HTTP_URL + '/get?' + device.http_req_data(),
|
|
))
|
|
|
|
def test_post(self) -> None:
|
|
self._send_and_check(Request(
|
|
HTTP_URL + '/post',
|
|
data=device.http_req_data().encode(),
|
|
))
|
|
|
|
def test_json(self) -> None:
|
|
self._send_and_check(Request(
|
|
HTTP_URL + '/json',
|
|
data=device.json_req_data().encode(),
|
|
))
|
|
|
|
def _send_and_check(self, req: Request) -> None:
|
|
with urlopen(req) as resp:
|
|
self.assertEqual(resp.status, 200)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|