This repository has been archived on 2024-07-30. You can view files and clone it, but cannot push or open issues or pull requests.
python-aternos/python_aternos/atserver.py

248 lines
5.2 KiB
Python
Raw Normal View History

import enum
import re
import json
2021-10-08 18:35:20 +03:00
import lxml.html
import websockets
from requests import Response
from typing import Optional, Dict
2021-10-08 18:35:20 +03:00
from . import atconnect
2021-10-08 18:35:20 +03:00
from . import aterrors
from . import atfm
from . import atconf
from . import atplayers
2022-01-22 14:10:30 +03:00
from .atwss import AternosWss
JAVA = 0
BEDROCK = 1
class Lists(enum.Enum):
whl = 'whitelist'
ops = 'ops'
ban = 'banned-players'
ips = 'banned-ips'
class Status(enum.IntEnum):
off = 0
on = 1
loading = 2
shutdown = 3
2022-01-22 14:10:30 +03:00
unknown = 6
error = 7
2022-01-22 14:10:30 +03:00
confirm = 10
2021-10-08 18:35:20 +03:00
class AternosServer:
def __init__(
self, servid:str,
atconn:atconnect.AternosConnect,
savelog:bool=True) -> None:
2021-10-08 18:35:20 +03:00
self.servid = servid
self.atconn = atconn
self.savelog = savelog
self.log = []
2021-10-08 18:35:20 +03:00
servreq = self.atserver_request(
'https://aternos.org/server',
atconnect.REQGET
2021-10-08 18:35:20 +03:00
)
servtree = lxml.html.fromstring(servreq.content)
self._info = json.loads(
re.search(
r'var\s*lastStatus\s*=\s*({.*})',
2021-10-15 18:31:47 +03:00
servtree.head.text_content()
)[1]
2021-10-08 18:35:20 +03:00
)
self._ram = 0
self._tps = 0
2021-10-08 18:35:20 +03:00
self.atconn.parse_token(servreq.content)
2021-10-08 18:35:20 +03:00
self.atconn.generate_sec()
2022-01-22 14:10:30 +03:00
async def wss(self) -> AternosWss:
return AternosWss(
self.atconn.session.cookies,
self.servid
)
def start(self, headstart:bool=False, accepteula:bool=True) -> None:
2021-10-08 18:35:20 +03:00
startreq = self.atserver_request(
'https://aternos.org/panel/ajax/start.php',
atconnect.REQGET, params={'headstart': int(headstart)},
sendtoken=True
2021-10-08 18:35:20 +03:00
)
startresult = startreq.json()
if startresult['success']:
return
error = startresult['error']
2021-10-08 18:35:20 +03:00
if error == 'eula' and accepteula:
self.eula()
self.start(accepteula=False)
2021-10-08 18:35:20 +03:00
elif error == 'eula':
raise aterrors.AternosServerStartError(
'EULA was not accepted. Use start(accepteula=True)'
)
2021-10-08 18:35:20 +03:00
elif error == 'already':
raise aterrors.AternosServerStartError(
'Server is already running'
)
elif error == 'wrongversion':
raise aterrors.AternosServerStartError(
'Incorrect software version installed'
)
elif error == 'file':
raise aterrors.AternosServerStartError(
'File server is unavailbale, view status.aternos.gmbh'
)
elif error == 'size':
raise aterrors.AternosServerStartError(
f'Available storage size is 4GB, ' + \
f'your server used: {startresult["size"]}'
)
2021-10-08 18:35:20 +03:00
else:
raise aterrors.AternosServerStartError(
f'Unable to start server, code: {error}'
2021-10-08 18:35:20 +03:00
)
def confirm(self) -> None:
2021-10-08 18:35:20 +03:00
self.atserver_request(
'https://aternos.org/panel/ajax/confirm.php',
atconnect.REQGET, sendtoken=True
2021-10-08 18:35:20 +03:00
)
def stop(self) -> None:
2021-10-08 18:35:20 +03:00
self.atserver_request(
'https://aternos.org/panel/ajax/stop.php',
atconnect.REQGET, sendtoken=True
2021-10-08 18:35:20 +03:00
)
def cancel(self) -> None:
2021-10-08 18:35:20 +03:00
self.atserver_request(
'https://aternos.org/panel/ajax/cancel.php',
atconnect.REQGET, sendtoken=True
2021-10-08 18:35:20 +03:00
)
def restart(self) -> None:
2021-10-08 18:35:20 +03:00
self.atserver_request(
'https://aternos.org/panel/ajax/restart.php',
atconnect.REQGET, sendtoken=True
2021-10-08 18:35:20 +03:00
)
def eula(self) -> None:
2021-10-08 18:35:20 +03:00
self.atserver_request(
'https://aternos.org/panel/ajax/eula.php',
atconnect.REQGET, sendtoken=True
2021-10-08 18:35:20 +03:00
)
def files(self) -> atfm.AternosFileManager:
2021-10-08 18:35:20 +03:00
return atfm.AternosFileManager(self)
2021-10-08 18:35:20 +03:00
def config(self) -> atconf.AternosConfig:
return atconf.AternosConfig(self)
def players(self, lst:str) -> atplayers.AternosPlayersList:
correct = False
for lsttype in Lists:
if lsttype.value == lst:
correct = True
if not correct:
raise AttributeError('Incorrect players list type! Use Lists enum')
return atplayers.AternosPlayersList(lst, self)
2021-10-08 18:35:20 +03:00
def atserver_request(
self, url:str, method:int,
params:Optional[dict]=None,
data:Optional[dict]=None,
headers:Optional[dict]=None,
sendtoken:bool=False) -> Response:
2021-10-08 18:35:20 +03:00
return self.atconn.request_cloudflare(
url=url, method=method,
params=params, data=data,
headers=headers,
reqcookies={
'ATERNOS_SERVER': self.servid
},
sendtoken=sendtoken
)
@property
def subdomain(self) -> str:
atdomain = self.domain
return atdomain[:atdomain.find('.')]
@subdomain.setter
def subdomain(self, value:str) -> None:
self.atserver_request(
'https://aternos.org/panel/ajax/options/subdomain.php',
atconnect.REQGET, params={'subdomain': value},
sendtoken=True
)
@property
def motd(self) -> str:
return self._info['motd']
@motd.setter
def motd(self, value:str) -> None:
self.atserver_request(
'https://aternos.org/panel/ajax/options/motd.php',
2021-11-01 17:04:19 +03:00
atconnect.REQPOST, data={'motd': value},
sendtoken=True
)
@property
def address(self) -> str:
return f'{self.domain}:{self.port}'
2021-10-08 18:35:20 +03:00
@property
def domain(self) -> str:
return self._info['ip']
2021-10-08 18:35:20 +03:00
@property
def port(self) -> int:
return self._info['port']
2021-10-08 18:35:20 +03:00
@property
def edition(self) -> int:
soft_type = self._info['bedrock']
return int(soft_type)
2021-10-08 18:35:20 +03:00
@property
def software(self) -> str:
return self._info['software']
2021-10-08 18:35:20 +03:00
@property
def version(self) -> str:
return self._info['version']
@property
def status(self) -> int:
return int(self._info['status'])
@property
def ram(self) -> int:
return self._ram