Docstrings, logging in websocket example

This commit is contained in:
DarkCat09 2022-06-17 12:30:58 +04:00
parent 72135f41fc
commit 9f0610e5a7
9 changed files with 423 additions and 58 deletions

View file

@ -1,9 +1,15 @@
import asyncio
import logging
from getpass import getpass
from python_aternos import Client, atwss
user = input('Username: ')
pswd = getpass('Password: ')
logs = input('Show detailed logs? (y/n) ').strip().lower() == 'y'
if logs:
logging.basicConfig(level=logging.DEBUG)
aternos = Client.from_credentials(user, pswd)
s = aternos.list_servers()[0]

View file

@ -1,15 +1,16 @@
import enum
import re
import lxml.html
from typing import Any, Dict, List, Optional
from typing import Any, Dict, List, Union, Optional
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .atserver import AternosServer
#
# server.options
class ServerOpts(enum.Enum):
"""server.options file"""
players = 'max-players'
gm = 'gamemode'
difficulty = 'difficulty'
@ -31,15 +32,19 @@ class ServerOpts(enum.Enum):
DAT_PREFIX = 'Data:'
DAT_GR_PREFIX = 'Data:GameRules:'
# level.dat
class WorldOpts(enum.Enum):
"""level.dat file"""
seed12 = 'randomseed'
seed = 'seed'
hardcore = 'hardcore'
difficulty = 'Difficulty'
# /gamerule
class WorldRules(enum.Enum):
"""/gamerule list"""
advs = 'announceAdvancements'
univanger = 'universalAnger'
cmdout = 'commandBlockOutput'
@ -76,41 +81,24 @@ class WorldRules(enum.Enum):
spectchunkgen = 'spectatorsGenerateChunks'
cmdfb = 'sendCommandFeedback'
DAT_TYPE_WORLD = 0
DAT_TYPE_GR = 1
class Gamemode(enum.IntEnum):
"""/gamemode numeric list"""
survival = 0
creative = 1
adventure = 2
spectator = 3
class Difficulty(enum.IntEnum):
"""/difficulty numeric list"""
peaceful = 0
easy = 1
normal = 2
hard = 3
#
# jre types for set_java
javatype = {
'jdk': 'openjdk:{ver}',
'openj9-1': 'adoptopenjdk:{ver}-jre-openj9-bionic',
'openj9-2': 'ibm-semeru-runtimes:open-{ver}-jre'
}
# checking java version format
javacheck = re.compile(
''.join(
list(
map(
# create a regexp for each jre type,
# e.g.: (^openjdk:\d+$)|
lambda i: '(^' + javatype[i].format(ver=r'\d+') + '$)|',
javatype
)
)
).rstrip('|')
)
# checking timezone format
tzcheck = re.compile(r'(^[A-Z]\w+\/[A-Z]\w+$)|^UTC$')
# options types converting
@ -120,15 +108,26 @@ convert = {
'config-option-toggle': bool
}
# MAIN CLASS
class AternosConfig:
"""Class for editing server settings
:param atserv: :class:`python_aternos.atserver.AternosServer` object
:type atserv: python_aternos.atserver.AternosServer
"""
def __init__(self, atserv:'AternosServer') -> None:
self.atserv = atserv
def get_timezone(self) -> str:
"""Parses timezone from options page
:return: Area/Location
:rtype: str
"""
optreq = self.atserv.atserver_request(
'https://aternos.org/options', 'GET'
)
@ -140,6 +139,14 @@ class AternosConfig:
def set_timezone(self, value:str) -> None:
"""Sets new timezone
:param value: New timezone
:type value: str
:raises ValueError: If given string
doesn't match Area/Location format
"""
matches_tz = tzcheck.search(value)
if not matches_tz:
raise ValueError('Timezone must match zoneinfo format: Area/Location')
@ -150,7 +157,13 @@ class AternosConfig:
sendtoken=True
)
def get_java(self) -> str:
def get_java(self) -> int:
"""Parses Java version from options page
:return: Java image version
:rtype: int
"""
optreq = self.atserv.atserver_request(
'https://aternos.org/options', 'GET'
@ -158,17 +171,21 @@ class AternosConfig:
opttree = lxml.html.fromstring(optreq)
imgopt = opttree.xpath('//div[@class="options-other-input image-switch"]')[0]
imgver = imgopt.xpath('.//div[@class="option current"]/@data-value')[0]
return imgver
def set_java(self, value:str) -> None:
jdkver = str(imgver or '').removeprefix('openjdk:')
return int(jdkver)
matches_jdkver = javacheck.search(value)
if not matches_jdkver:
raise ValueError('Incorrect Java image version format!')
def set_java(self, value:int) -> None:
"""Sets new Java version
:param value: New Java image version
:type value: int
"""
self.atserv.atserver_request(
'https://aternos.org/panel/ajax/image.php',
'POST', data={'image': value},
'POST', data={'image': f'openjdk:{value}'},
sendtoken=True
)
@ -176,15 +193,42 @@ class AternosConfig:
# server.properties
#
def set_server_prop(self, option:str, value:Any) -> None:
"""Sets server.properties option
:param option: Option name
:type option: str
:param value: New value
:type value: Any
"""
self.__set_prop(
'/server.properties',
option, value
)
def get_server_props(self, proptyping:bool=True) -> Dict[str,Any]:
"""Parses all server.properties from options page
:param proptyping: If the returned dict should contain value
that matches property type (e.g. max-players will be int)
instead of string, defaults to True
:type proptyping: bool, optional
:return: Server.properties dict
:rtype: Dict[str,Any]
"""
return self.__get_all_props('https://aternos.org/options', proptyping)
def set_server_props(self, props:Dict[str,Any]) -> None:
"""Updates server.properties options with the given dict
:param props: Dict with properties `{key:value}`
:type props: Dict[str,Any]
"""
for key in props:
self.set_server_prop(key, props[key])
@ -192,11 +236,26 @@ class AternosConfig:
# level.dat
#
def set_world_prop(
self, option:str, value:Any,
proptype:int, world:str='world') -> None:
self, option:Union[WorldOpts,WorldRules],
value:Any, gamerule:bool=False,
world:str='world') -> None:
"""Sets level.dat option for specified world
:param option: Option name
:type option: Union[WorldOpts,WorldRules]
:param value: New value
:type value: Any
:param gamerule: If the option
is a gamerule, defaults to False
:type gamerule: bool, optional
:param world: Name of the world which
level.dat must be edited, defaults to 'world'
:type world: str, optional
"""
prefix = DAT_PREFIX
if proptype == DAT_TYPE_GR:
if gamerule:
prefix = DAT_GR_PREFIX
self.__set_prop(
@ -209,6 +268,18 @@ class AternosConfig:
self, world:str='world',
proptyping:bool=True) -> Dict[str,Any]:
"""Parses level.dat from specified world's options page
:param world: Name of the world, defaults to 'world'
:type world: str, optional
:param proptyping: If the returned dict should contain the value
that matches property type (e.g. randomTickSpeed will be bool)
instead of string, defaults to True
:type proptyping: bool, optional
:return: Level.dat dict
:rtype: Dict[str,Any]
"""
self.__get_all_props(
f'https://aternos.org/files/{world}/level.dat',
proptyping, [DAT_PREFIX, DAT_GR_PREFIX]

View file

@ -12,6 +12,10 @@ REQUA = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko)
class AternosConnect:
"""
Class for sending API requests bypass Cloudflare
and parsing responses"""
def __init__(self) -> None:
self.session = CloudScraper()
@ -19,6 +23,17 @@ class AternosConnect:
def parse_token(self) -> str:
"""Parses Aternos ajax token that
is needed for most requests
:raises RuntimeWarning: If the parser
can not find <head> tag in HTML response
:raises CredentialsError: If the parser
is unable to extract ajax token in HTML
:return: Aternos ajax token
:rtype: str
"""
loginpage = self.request_cloudflare(
f'https://aternos.org/go/', 'GET'
).content
@ -55,6 +70,13 @@ class AternosConnect:
def generate_sec(self) -> str:
"""Generates Aternos SEC token which
is also needed for most API requests
:return: Random SEC key:value string
:rtype: str
"""
randkey = self.generate_aternos_rand()
randval = self.generate_aternos_rand()
self.sec = f'{randkey}:{randval}'
@ -67,6 +89,15 @@ class AternosConnect:
def generate_aternos_rand(self, randlen:int=16) -> str:
"""Generates a random string using
Aternos algorithm from main.js file
:param randlen: Random string length, defaults to 16
:type randlen: int, optional
:return: Random string for SEC token
:rtype: str
"""
# a list with randlen+1 empty strings:
# generate a string with spaces,
# then split it by space
@ -81,6 +112,20 @@ class AternosConnect:
self, num:Union[int,float,str],
base:int, frombase:int=10) -> str:
"""Converts an integer to specified base
:param num: Integer in any base to convert.
If it is a float started with `0,`,
zero and comma will be removed to get int
:type num: Union[int,float,str]
:param base: New base
:type base: int
:param frombase: Given number base, defaults to 10
:type frombase: int, optional
:return: Number converted to a specified base
:rtype: str
"""
if isinstance(num, str):
num = int(num, frombase)
@ -101,9 +146,41 @@ class AternosConnect:
self, url:str, method:str,
params:Optional[dict]=None, data:Optional[dict]=None,
headers:Optional[dict]=None, reqcookies:Optional[dict]=None,
sendtoken:bool=False, redirect:bool=True, retry:int=0) -> Response:
sendtoken:bool=False, redirect:bool=True, retry:int=3) -> Response:
if retry > 3:
"""Sends a request to Aternos API bypass Cloudflare
:param url: Request URL
:type url: str
:param method: Request method, must be GET or POST
:type method: str
:param params: URL parameters, defaults to None
:type params: Optional[dict], optional
:param data: POST request data, if the method is GET,
this dict will be combined with params, defaults to None
:type data: Optional[dict], optional
:param headers: Custom headers, defaults to None
:type headers: Optional[dict], optional
:param reqcookies: Cookies only for this request, defaults to None
:type reqcookies: Optional[dict], optional
:param sendtoken: If the ajax and SEC token
should be sent, defaults to False
:type sendtoken: bool, optional
:param redirect: If requests lib should follow
Location header in 3xx responses, defaults to True
:type redirect: bool, optional
:param retry: How many times parser must retry
connection to API bypass Cloudflare, defaults to 3
:type retry: int, optional
:raises CloudflareError:
When the parser has exceeded retries count
:raises NotImplementedError:
When the specified method is not GET or POST
:return: API response
:rtype: requests.Response
"""
if retry <= 0:
raise CloudflareError('Unable to bypass Cloudflare protection')
try:
@ -157,7 +234,7 @@ class AternosConnect:
params, data,
headers, reqcookies,
sendtoken, redirect,
retry + 1
retry - 1
)
logging.info(

View file

@ -9,14 +9,31 @@ if TYPE_CHECKING:
from .atserver import AternosServer
class FileType(enum.IntEnum):
"""File or dierctory"""
file = 0
directory = 1
class AternosFile:
"""File class which contains info about its path, type and size
:param atserv: :class:`python_aternos.atserver.AternosServer` instance
:type atserv: python_aternos.atserver.AternosServer
:param path: Path to the file
:type path: str
:param name: Filename
:type name: str
:param ftype: File or directory
:type ftype: python_aternos.atfile.FileType
:param size: File size, defaults to 0
:type size: Union[int,float], optional
"""
def __init__(
self, atserv:'AternosServer',
path:str, name:str, ftype:int=FileType.file,
path:str, name:str, ftype:FileType=FileType.file,
size:Union[int,float]=0) -> None:
self.atserv = atserv
@ -28,6 +45,8 @@ class AternosFile:
def delete(self) -> None:
"""Deletes the file"""
self.atserv.atserver_request(
'https://aternos.org/panel/ajax/delete.php',
'POST', data={'file': self._full},
@ -36,6 +55,14 @@ class AternosFile:
def get_content(self) -> bytes:
"""Requests file content in bytes (downloads it)
:raises FileError: If downloading
the file is not allowed by Aternos
:return: File content
:rtype: bytes
"""
file = self.atserv.atserver_request(
'https://aternos.org/panel/ajax/files/download.php',
'GET', params={
@ -48,6 +75,12 @@ class AternosFile:
def set_content(self, value:bytes) -> None:
"""Modifies the file content
:param value: New content
:type value: bytes
"""
self.atserv.atserver_request(
f'https://aternos.org/panel/ajax/save.php',
'POST', data={
@ -58,6 +91,13 @@ class AternosFile:
def get_text(self) -> str:
"""Requests editing the file as a text
(try it if downloading is disallowed)
:return: File text content
:rtype: str
"""
editor = self.atserv.atserver_request(
f'https://aternos.org/files/{self._full.lstrip("/")}', 'GET'
)
@ -68,6 +108,14 @@ class AternosFile:
def set_text(self, value:str) -> None:
"""Modifies the file content,
but unlike set_content takes
a string as a new value
:param value: New content
:type value: str
"""
self.set_content(value.encode('utf-8'))
@property

View file

@ -1,5 +1,5 @@
import lxml.html
from typing import Union, List
from typing import Union, Optional, List
from typing import TYPE_CHECKING
from .atfile import AternosFile, FileType
@ -8,12 +8,28 @@ if TYPE_CHECKING:
class FileManager:
"""Aternos file manager class for viewing files structure
:param atserv: :class:`python_aternos.atserver.AternosServer` instance
:type atserv: python_aternos.atserver.AternosServer
"""
def __init__(self, atserv:'AternosServer') -> None:
self.atserv = atserv
def listdir(self, path:str='') -> List[AternosFile]:
"""Requests a list of files
in the specified directory
:param path: Directory
(an empty string means root), defaults to ''
:type path: str, optional
:return: List of :class:`python_aternos.atfile.AternosFile`
:rtype: List[AternosFile]
"""
path = path.lstrip('/')
filesreq = self.atserv.atserver_request(
f'https://aternos.org/files/{path}', 'GET'
@ -57,6 +73,16 @@ class FileManager:
def convert_size(self, num:Union[int,float], measure:str) -> float:
"""Converts "human" file size to size in bytes
:param num: Size
:type num: Union[int,float]
:param measure: Units (B, kB, MB, GB)
:type measure: str
:return: Size in bytes
:rtype: float
"""
measure_match = {
'B': 1,
'kB': 1000,
@ -68,7 +94,16 @@ class FileManager:
except KeyError:
return -1
def get_file(self, path:str) -> Union[AternosFile,None]:
def get_file(self, path:str) -> Optional[AternosFile]:
"""Returns :class:`python_aternos.atfile.AternosFile`
instance by its path
:param path: Path to file including its filename
:type path: str
:return: _description_
:rtype: Optional[AternosFile]
"""
filepath = path[:path.rfind('/')]
filename = path[path.rfind('/'):]
@ -82,6 +117,14 @@ class FileManager:
def dl_file(self, path:str) -> bytes:
"""Returns the file content in bytes (downloads it)
:param path: Path to file including its filename
:type path: str
:return: File content
:rtype: bytes
"""
file = self.atserv.atserver_request(
f'https://aternos.org/panel/ajax/files/download.php?' + \
f'file={path.replace("/","%2F")}',
@ -92,6 +135,15 @@ class FileManager:
def dl_world(self, world:str='world') -> bytes:
"""Returns the world zip file content
by its name (downloads it)
:param world: Name of world, defaults to 'world'
:type world: str, optional
:return: Zip file content
:rtype: bytes
"""
world = self.atserv.atserver_request(
f'https://aternos.org/panel/ajax/worlds/download.php?' + \
f'world={world.replace("/","%2F")}',

View file

@ -7,6 +7,15 @@ from typing import Any
arrowexp = regex.compile(r'\w[^\}]*+')
def to_ecma5_function(f:str) -> str:
"""Converts a ECMA6 function to ECMA5 format (without arrow expressions)
:param f: ECMA6 function
:type f: str
:return: ECMA5 function
:rtype: str
"""
match = arrowexp.search(f)
conv = '(function(){' + match.group(0) + '})()'
return regex.sub(
@ -19,6 +28,15 @@ def atob(s:str) -> str:
return base64.standard_b64decode(str(s)).decode('utf-8')
def exec(f:str) -> Any:
"""Executes a JavaScript function
:param f: ECMA6 function
:type f: str
:return: JavaScript interpreter context
:rtype: Any
"""
ctx = js2py.EvalJs({'atob': atob})
ctx.execute('window.document = { };')
ctx.execute('window.Map = function(_i){ };')

View file

@ -15,6 +15,15 @@ class Lists(enum.Enum):
class PlayersList:
"""Class for managing operators, whitelist and banned players lists
:param lst: Players list type, must be
:class:`python_aternos.atplayers.Lists` enum value
:type lst: Union[str,Lists]
:param atserv: :class:`python_aternos.atserver.AternosServer` instance
:type atserv: python_aternos.atserver.AternosServer
"""
def __init__(self, lst:Union[str,Lists], atserv:'AternosServer') -> None:
self.atserv = atserv
@ -24,6 +33,14 @@ class PlayersList:
def list_players(self, cache:bool=True) -> List[str]:
"""Parse a players list
:param cache: If the function can return cached list (highly recommended), defaults to True
:type cache: bool, optional
:return: List of players nicknames
:rtype: List[str]
"""
if cache and self.parsed:
return self.players
@ -47,6 +64,12 @@ class PlayersList:
def add(self, name:str) -> None:
"""Appends a player to the list by the nickname
:param name: Player's nickname
:type name: str
"""
self.atserv.atserver_request(
'https://aternos.org/panel/ajax/players/add.php',
'POST', data={
@ -59,6 +82,12 @@ class PlayersList:
def remove(self, name:str) -> None:
"""Removes a player from the list by the nickname
:param name: Player's nickname
:type name: str
"""
self.atserv.atserver_request(
'https://aternos.org/panel/ajax/players/remove.php',
'POST', data={

View file

@ -208,7 +208,7 @@ class AternosServer:
def config(self) -> AternosConfig:
"""Returns :class:`python_aternos.atconf.AternosConfig`
instance for changing server settings
instance for editing server settings
:return: :class:`python_aternos.atconf.AternosConfig` object
:rtype: python_aternos.atconf.AternosConfig
@ -219,7 +219,7 @@ class AternosServer:
def players(self, lst:Lists) -> PlayersList:
"""Returns :class:`python_aternos.atplayers.PlayersList`
instance for managing operators, whitelist or banned players list
instance for managing operators, whitelist and banned players lists
:param lst: Players list type, must be
the :class:`python_aternos.atplayers.Lists` enum value
@ -231,7 +231,7 @@ class AternosServer:
return PlayersList(lst, self)
def atserver_request(
self, url:str, method:int,
self, url:str, method:str,
params:Optional[dict]=None,
data:Optional[dict]=None,
headers:Optional[dict]=None,
@ -244,15 +244,16 @@ class AternosServer:
:type url: str
:param method: Request method, must be GET or POST
:type method: str
:param params: URL parameters, defaults to an empty dictionary
:type params: dict, optional
:param data: POST request data. If the method is set to GET,
it will be combined with params. Defaults to an empty dictionary
:type data: dict, optional
:param headers: Custom headers, defaults to an empty dictionary
:type headers: dict, optional
:param sendtoken: Send ajax token in params
:type sendtoken: bool
:param params: URL parameters, defaults to None
:type params: Optional[dict], optional
:param data: POST request data, if the method is GET,
this dict will be combined with params, defaults to None
:type data: Optional[dict], optional
:param headers: Custom headers, defaults to None
:type headers: Optional[dict], optional
:param sendtoken: If the ajax and SEC token
should be sent, defaults to False
:type sendtoken: bool, optional
:return: API response
:rtype: requests.Response
"""

View file

@ -12,6 +12,8 @@ if TYPE_CHECKING:
class Streams(enum.Enum):
"""WebSocket streams types"""
status = (0,None)
queue = (1,None)
console = (2,'console')
@ -24,6 +26,15 @@ class Streams(enum.Enum):
class AternosWss:
"""Class for managing websocket connection
:param atserv: :class:`python_aternos.atserver.AternosServer` instance
:type atserv: python_aternos.atserver.AternosServer
:param autoconfirm: Automatically start server status listener
when AternosWss connects to API to confirm server launching, defaults to `False`
:type autoconfirm: bool, optional
"""
def __init__(self, atserv:'AternosServer', autoconfirm:bool=False) -> None:
self.atserv = atserv
@ -36,15 +47,32 @@ class AternosWss:
async def confirm(self) -> None:
"""Simple way to call AternosServer.confirm from this class"""
self.atserv.confirm()
def wssreceiver(self, stream:Streams, *args:Any) -> Callable[[Callable[[Any],Coroutine[Any,Any,None]]],Any]:
"""Decorator that marks your function as a stream receiver.
When websocket receives message from the specified stream,
it calls all listeners created with this decorator.
:param stream: Stream that your function should listen
:type stream: python_aternos.atwss.Streams
:param args: Arguments which will be passed to your function
:type args: tuple, optional
:return: ...
:rtype: Callable[[Callable[[Any],Coroutine[Any,Any,None]]],Any]
"""
def decorator(func:Callable[[Any],Coroutine[Any,Any,None]]) -> None:
self.recv[stream] = (func, args)
return decorator
async def connect(self) -> None:
"""Connect to the websocket server and start all stream listeners"""
headers = [
('Host', 'aternos.org'),
('User-Agent', REQUA),
@ -62,7 +90,9 @@ class AternosWss:
@self.wssreceiver(Streams.status)
async def confirmfunc(msg):
# Autoconfirm
"""Automatically confirm Minecraft server launching"""
if not self.autoconfirm:
return
if msg['class'] == 'queueing' \
@ -72,6 +102,22 @@ class AternosWss:
@self.wssreceiver(Streams.status)
async def streamsfunc(msg):
"""Automatically starts streams. Detailed description:
According to the websocket messages from the web site,
Aternos can't receive any data from a stream (e.g. console) until
it requests this stream via the special message to the websocket server:
`{"stream":"console","type":"start"}`
on which the server responses with: `{"type":"connected"}`
Also, there are RAM (used heap) and TPS (ticks per second)
streams that must be enabled before trying to get information.
Enabling the stream for listening the server status is not needed,
these data is sent from API by default, so there's None value in
the second item of its stream type tuple (`<Streams.status: (0, None)>`).
https://github.com/DarkCat09/python-aternos/issues/22#issuecomment-1146788496
"""
if msg['status'] == 2:
# Automatically start streams
for strm in self.recv:
@ -88,6 +134,8 @@ class AternosWss:
async def close(self) -> None:
"""Closes websocket connection and stops all listeners"""
self.keep.cancel()
self.msgs.cancel()
await self.socket.close()
@ -95,6 +143,12 @@ class AternosWss:
async def send(self, obj:Union[Dict[str, Any],str]) -> None:
"""Sends a message to websocket server
:param obj: Message, may be a string or a dict
:type obj: Union[Dict[str, Any],str]
"""
if isinstance(obj, dict):
obj = json.dumps(obj)
@ -102,11 +156,17 @@ class AternosWss:
async def wssworker(self) -> None:
"""Starts async tasks in background
for receiving websocket messages
and sending keepalive ping"""
self.keep = asyncio.create_task(self.keepalive())
self.msgs = asyncio.create_task(self.receiver())
async def keepalive(self) -> None:
"""Each 49 seconds sends keepalive ping to websocket server"""
try:
while True:
await asyncio.sleep(49)
@ -117,6 +177,9 @@ class AternosWss:
async def receiver(self) -> None:
"""Receives messages from websocket servers
and calls user's streams listeners"""
try:
while True:
data = await self.socket.recv()