Bugfixes in atfile,wss,connect

This commit is contained in:
DarkCat09 2022-03-25 16:45:38 +04:00
parent 48f8090d2a
commit cec2938804
10 changed files with 234 additions and 229 deletions

View file

@ -28,7 +28,7 @@ class Client:
loginreq = atconn.request_cloudflare( loginreq = atconn.request_cloudflare(
f'https://aternos.org/panel/ajax/account/login.php', f'https://aternos.org/panel/ajax/account/login.php',
atconnect.REQPOST, data=credentials, 'POST', data=credentials,
sendtoken=True sendtoken=True
) )
@ -51,7 +51,7 @@ class Client:
def from_session(cls, session:str): def from_session(cls, session:str):
atconn = AternosConnect() atconn = AternosConnect()
atconn.session.cookies.set('ATERNOS_SESSION', session) atconn.session.cookies['ATERNOS_SESSION'] = session
atconn.parse_token() atconn.parse_token()
atconn.generate_sec() atconn.generate_sec()
@ -63,14 +63,13 @@ class Client:
atconn = AternosConnect() atconn = AternosConnect()
auth = atconn.request_cloudflare( auth = atconn.request_cloudflare(
'https://aternos.org/auth/google-login', 'https://aternos.org/auth/google-login',
atconnect.REQGET, redirect=False 'GET', redirect=False
) )
return auth.headers['Location'] return auth.headers['Location']
def list_servers(self) -> List[atserver.AternosServer]: def list_servers(self) -> List[atserver.AternosServer]:
serverspage = self.atconn.request_cloudflare( serverspage = self.atconn.request_cloudflare(
'https://aternos.org/servers/', 'https://aternos.org/servers/', 'GET'
atconnect.REQGET
) )
serverstree = lxml.html.fromstring(serverspage.content) serverstree = lxml.html.fromstring(serverspage.content)
serverslist = serverstree.xpath('//div[contains(@class,"servers ")]/div') serverslist = serverstree.xpath('//div[contains(@class,"servers ")]/div')

View file

@ -4,11 +4,11 @@ import lxml.html
from typing import Any, Dict, List, Optional from typing import Any, Dict, List, Optional
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from . import atconnect
if TYPE_CHECKING: if TYPE_CHECKING:
from .atserver import AternosServer from .atserver import AternosServer
#
# server.options
class ServerOpts(enum.Enum): class ServerOpts(enum.Enum):
players = 'max-players' players = 'max-players'
gm = 'gamemode' gm = 'gamemode'
@ -25,16 +25,20 @@ class ServerOpts(enum.Enum):
forcegm = 'force-gamemode' forcegm = 'force-gamemode'
spawnlock = 'spawn-protection' spawnlock = 'spawn-protection'
cmds = 'allow-cheats' cmds = 'allow-cheats'
packreq = 'require-resource-pack'
pack = 'resource-pack' pack = 'resource-pack'
DAT_PREFIX = 'Data:' DAT_PREFIX = 'Data:'
DAT_GR_PREFIX = 'Data:GameRules:' DAT_GR_PREFIX = 'Data:GameRules:'
# level.dat
class WorldOpts(enum.Enum): class WorldOpts(enum.Enum):
seed = 'randomseed' seed12 = 'randomseed'
seed = 'seed'
hardcore = 'hardcore' hardcore = 'hardcore'
difficulty = 'difficulty' difficulty = 'Difficulty'
# /gamerule
class WorldRules(enum.Enum): class WorldRules(enum.Enum):
advs = 'announceAdvancements' advs = 'announceAdvancements'
univanger = 'universalAnger' univanger = 'universalAnger'
@ -56,6 +60,7 @@ class WorldRules(enum.Enum):
drowndmg = 'drowningDamage' drowndmg = 'drowningDamage'
falldmg = 'fallDamage' falldmg = 'fallDamage'
firedmg = 'fireDamage' firedmg = 'fireDamage'
snowdmg = 'freezeDamage'
forgive = 'forgiveDeadPlayers' forgive = 'forgiveDeadPlayers'
keepinv = 'keepInventory' keepinv = 'keepInventory'
deathmsg = 'showDeathMessages' deathmsg = 'showDeathMessages'
@ -64,7 +69,8 @@ class WorldRules(enum.Enum):
entcram = 'maxEntityCramming' entcram = 'maxEntityCramming'
mobgrief = 'mobGriefing' mobgrief = 'mobGriefing'
regen = 'naturalRegeneration' regen = 'naturalRegeneration'
rndtick = 'randomTickspeed' sleeppct = 'playersSleepingPercentage'
rndtick = 'randomTickSpeed'
spawnradius = 'spawnRadius' spawnradius = 'spawnRadius'
reducedf3 = 'reducedDebugInfo' reducedf3 = 'reducedDebugInfo'
spectchunkgen = 'spectatorsGenerateChunks' spectchunkgen = 'spectatorsGenerateChunks'
@ -85,24 +91,43 @@ class Difficulty(enum.IntEnum):
normal = 2 normal = 2
hard = 3 hard = 3
JDK = 'openjdk:{}' #
OJ9 = 'adoptopenjdk:{}-jre-openj9-bionic' # jre types for set_java
javatype = {
'jdk': 'openjdk:{ver}',
'openj9-1': 'adoptopenjdk:{ver}-jre-openj9-bionic',
'openj9-2': 'ibm-semeru-runtimes:open-{ver}-jre'
}
# checking java version format
javacheck = re.compile(
''.join(
list(
map(
# create a regexp for each jre type,
# e.g.: (^openjdk:\d+$)|
lambda i: '(^' + javatype[i].format(ver=r'\d+') + '$)|',
javatype
)
)
).rstrip('|')
)
# checking timezone format
tzcheck = re.compile(r'(^[A-Z]\w+\/[A-Z]\w+$)|^UTC$')
# options types converting
convert = { convert = {
'config-option-number': int, 'config-option-number': int,
'config-option-select': int, 'config-option-select': int,
'config-option-toggle': bool 'config-option-toggle': bool
} }
FLAG_PROP_TYPE = 1 # MAIN CLASS
class AternosConfig: class AternosConfig:
def __init__(self, atserv:'AternosServer') -> None: def __init__(self, atserv:'AternosServer') -> None:
self.atserv = atserv self.atserv = atserv
@property def get_timezone(self) -> str:
def timezone(self) -> str:
optreq = self.atserv.atserver_request( optreq = self.atserv.atserver_request(
'https://aternos.org/options', 'GET' 'https://aternos.org/options', 'GET'
@ -113,12 +138,11 @@ class AternosConfig:
tztext = tzopt.xpath('.//div[@class="option current"]')[0].text tztext = tzopt.xpath('.//div[@class="option current"]')[0].text
return tztext.strip() return tztext.strip()
@timezone.setter def set_timezone(self, value:str) -> None:
def timezone(self, value:str) -> None:
matches_tz = re.search(r'(?:^[A-Z]\w+\/[A-Z]\w+$)|^UTC$', value) matches_tz = tzcheck.search(value)
if matches_tz == None: if not matches_tz:
raise AttributeError('Timezone must match zoneinfo format: Area/Location') raise ValueError('Timezone must match zoneinfo format: Area/Location')
self.atserv.atserver_request( self.atserv.atserver_request(
'https://aternos.org/panel/ajax/timezone.php', 'https://aternos.org/panel/ajax/timezone.php',
@ -126,24 +150,21 @@ class AternosConfig:
sendtoken=True sendtoken=True
) )
@property def get_java(self) -> str:
def java_version(self) -> str:
optreq = self.atserv.atserver_request( optreq = self.atserv.atserver_request(
'https://aternos.org/options', 'https://aternos.org/options', 'GET'
'GET'
) )
opttree = lxml.html.fromstring(optreq) opttree = lxml.html.fromstring(optreq)
imgopt = opttree.xpath('//div[@class="options-other-input image-switch"]')[0] imgopt = opttree.xpath('//div[@class="options-other-input image-switch"]')[0]
imgver = imgopt.xpath('.//div[@class="option current"]/@data-value')[0] imgver = imgopt.xpath('.//div[@class="option current"]/@data-value')[0]
return imgver return imgver
@java_version.setter def set_java(self, value:str) -> None:
def java_version(self, value:str) -> None:
matches_jdkver = re.search(r'^(?:adopt)?openjdk:(\d+)(?:-jre-openj9-bionic)?$', value) matches_jdkver = javacheck.search(value)
if matches_jdkver == None: if not matches_jdkver:
raise AttributeError('Incorrect Java image version format!') raise ValueError('Incorrect Java image version format!')
self.atserv.atserver_request( self.atserv.atserver_request(
'https://aternos.org/panel/ajax/image.php', 'https://aternos.org/panel/ajax/image.php',
@ -160,8 +181,8 @@ class AternosConfig:
option, value option, value
) )
def get_server_props(self, flags:int=FLAG_PROP_TYPE) -> Dict[str,Any]: def get_server_props(self, proptyping:bool=True) -> Dict[str,Any]:
return self.__get_all_props('https://aternos.org/options', flags) return self.__get_all_props('https://aternos.org/options', proptyping)
def set_server_props(self, props:Dict[str,Any]) -> None: def set_server_props(self, props:Dict[str,Any]) -> None:
for key in props: for key in props:
@ -186,11 +207,11 @@ class AternosConfig:
def get_world_props( def get_world_props(
self, world:str='world', self, world:str='world',
flags:int=FLAG_PROP_TYPE) -> Dict[str,Any]: proptyping:bool=True) -> Dict[str,Any]:
self.__get_all_props( self.__get_all_props(
f'https://aternos.org/files/{world}/level.dat', f'https://aternos.org/files/{world}/level.dat',
flags, [DAT_PREFIX, DAT_GR_PREFIX] proptyping, [DAT_PREFIX, DAT_GR_PREFIX]
) )
def set_world_props(self, props:Dict[str,Any]) -> None: def set_world_props(self, props:Dict[str,Any]) -> None:
@ -212,37 +233,28 @@ class AternosConfig:
) )
def __get_all_props( def __get_all_props(
self, url:str, flags:int=FLAG_PROP_TYPE, self, url:str, proptyping:bool=True,
prefixes:Optional[List[str]]=None) -> Dict[str,Any]: prefixes:Optional[List[str]]=None) -> Dict[str,Any]:
optreq = self.atserv.atserver_request( optreq = self.atserv.atserver_request(url, 'GET')
url,
'GET'
)
opttree = lxml.html.fromstring(optreq.content) opttree = lxml.html.fromstring(optreq.content)
configs = opttree.xpath('//div[@class="config-options"]') configs = opttree.xpath('//div[@class="config-options"]')
for i in range(len(configs)):
conf = configs[i] for i, conf in enumerate(configs):
opts = conf.xpath('/div[contains(@class,"config-option ")]') opts = conf.xpath('/div[contains(@class,"config-option ")]')
result = {} result = {}
for opt in opts: for opt in opts:
key = opt.xpath('.//span[@class="config-option-output-key"]')[0].text key = opt.xpath('.//span[@class="config-option-output-key"]')[0].text
value = opt.xpath('.//span[@class="config-option-output-value"]')[0].text value = opt.xpath('.//span[@class="config-option-output-value"]')[0].text
if prefixes != None: if prefixes != None:
key = f'{prefixes[i]}{key}' key = f'{prefixes[i]}{key}'
opttype = opt.xpath('/@class').split(' ')[1] opttype = opt.xpath('/@class').split(' ')[1]
if flags == FLAG_PROP_TYPE: if proptyping and opttype in convert:
value = convert[opttype](value)
if opttype == 'config-option-number'\
or opttype == 'config-option-select':
value = int(value)
elif opttype == 'config-option-toggle':
value = bool(value)
result[key] = value result[key] = value
return result return result

View file

@ -7,7 +7,7 @@ from cloudscraper import CloudScraper
from typing import Optional, Union from typing import Optional, Union
from . import atjsparse from . import atjsparse
from .aterrors import CredentialsError, CloudflareError from .aterrors import CredentialsError
REQUA = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:68.0) Gecko/20100101 Goanna/4.8 Firefox/68.0 PaleMoon/29.4.0.2' REQUA = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:68.0) Gecko/20100101 Goanna/4.8 Firefox/68.0 PaleMoon/29.4.0.2'
@ -15,17 +15,14 @@ class AternosConnect:
def __init__(self) -> None: def __init__(self) -> None:
pass self.session = CloudScraper()
def parse_token(self, response:Optional[Union[str,bytes]]=None) -> str: def parse_token(self) -> str:
if response == None: loginpage = self.request_cloudflare(
loginpage = self.request_cloudflare( f'https://aternos.org/go/', 'GET'
f'https://aternos.org/go/', 'GET' ).content
).content pagetree = lxml.html.fromstring(loginpage)
pagetree = lxml.html.fromstring(loginpage)
else:
pagetree = lxml.html.fromstring(response)
try: try:
pagehead = pagetree.head pagehead = pagetree.head
@ -58,9 +55,10 @@ class AternosConnect:
def generate_aternos_rand(self, randlen:int=16) -> str: def generate_aternos_rand(self, randlen:int=16) -> str:
rand_arr = [] # a list with randlen+1 empty strings:
for i in range(randlen+1): # generate a string with spaces,
rand_arr.append('') # then split it by space
rand_arr = (' ' * (randlen+1)).split(' ')
rand = random.random() rand = random.random()
rand_alphanum = self.convert_num(rand, 36) + ('0' * 17) rand_alphanum = self.convert_num(rand, 36) + ('0' * 17)
@ -88,60 +86,36 @@ class AternosConnect:
return result return result
def request_cloudflare( def request_cloudflare(
self, url:str, method:str, retries:int=10, self, url:str, method:str,
params:Optional[dict]=None, data:Optional[dict]=None, params:Optional[dict]=None, data:Optional[dict]=None,
headers:Optional[dict]=None, reqcookies:Optional[dict]=None, headers:Optional[dict]=None, reqcookies:Optional[dict]=None,
sendtoken:bool=False, redirect:bool=True) -> Response: sendtoken:bool=False, redirect:bool=True) -> Response:
cftitle = '<title>Please Wait... | Cloudflare</title>' params = params if params else {}
data = data if data else {}
if params == None: headers = headers if headers else {}
params = {} reqcookies = reqcookies if reqcookies else {}
if headers == None:
headers = {}
headers['User-Agent'] = REQUA headers['User-Agent'] = REQUA
if sendtoken: if sendtoken:
url += f'?TOKEN={self.token}&SEC={self.sec}' params['TOKEN'] = self.token
params['SEC'] = self.sec
try: # requests.cookies.CookieConflictError bugfix
cookies = self.session.cookies reqcookies['ATERNOS_SESSION'] = self.session.cookies['ATERNOS_SESSION']
except AttributeError: del self.session.cookies['ATERNOS_SESSION']
cookies = None
countdown = retries if method == 'POST':
while True: req = self.session.post(
url, data=data, params=params,
self.session = CloudScraper() headers=headers, cookies=reqcookies,
if cookies != None: allow_redirects=redirect
self.session.cookies = cookies )
else:
if reqcookies != None: req = self.session.get(
for cookiekey in reqcookies: url, params={**params, **data},
self.session.cookies.set(cookiekey, reqcookies[cookiekey]) headers=headers, cookies=reqcookies,
allow_redirects=redirect
time.sleep(1) )
if method == 'POST':
req = self.session.post(
url, data=data, params=params,
headers=headers, cookies=reqcookies,
allow_redirects=redirect
)
else:
req = self.session.get(
url, params=params,
headers=headers, cookies=reqcookies,
allow_redirects=redirect
)
if not cftitle in req.text:
break
if not countdown > 0:
raise CloudflareError(
'The retries limit has been reached'
)
countdown -= 1
return req return req

View file

@ -9,6 +9,3 @@ class ServerError(AternosError):
class FileError(AternosError): class FileError(AternosError):
pass pass
class CloudflareError(AternosError):
pass

View file

@ -17,51 +17,53 @@ class AternosFile:
def __init__( def __init__(
self, atserv:'AternosServer', self, atserv:'AternosServer',
path:str, name:str, ftype:int=FileType.file, path:str, name:str, ftype:int=FileType.file,
size:Union[int,float]=0, dlallowed:bool=False) -> None: size:Union[int,float]=0) -> None:
self.atserv = atserv self.atserv = atserv
self._path = path self._path = path
self._name = name self._name = name
self._full = path + name
self._ftype = ftype self._ftype = ftype
self._size = float(size) self._size = float(size)
self._dlallowed = dlallowed
def delete(self) -> None: def delete(self) -> None:
self.atserv.atserver_request( self.atserv.atserver_request(
'https://aternos.org/panel/ajax/delete.php', 'https://aternos.org/panel/ajax/delete.php',
'POST', data={'file': self._name}, 'POST', data={'file': self._full},
sendtoken=True sendtoken=True
) )
def get_content(self) -> bytes: def get_content(self) -> bytes:
file = self.atserv.atserver_request( file = self.atserv.atserver_request(
f'https://aternos.org/panel/ajax/files/download.php', 'https://aternos.org/panel/ajax/files/download.php',
'GET', params={ 'GET', params={
'file': self._path 'file': self._full
} }
) )
if not self._dlallowed: if file.content == b'{"success":false}':
raise FileError('Downloading this file is not allowed. Try to get text') raise FileError('Unable to download the file. Try to get text')
return file.content return file.content
def set_content(self, value:bytes) -> None: def set_content(self, value:bytes) -> None:
self.atserv.atserver_request( self.atserv.atserver_request(
f'https://aternos.org/panel/ajax/save.php', f'https://aternos.org/panel/ajax/save.php',
'POST', data={ 'POST', data={
'file': self._path, 'file': self._full,
'content': value 'content': value
}, sendtoken=True }, sendtoken=True
) )
def get_text(self) -> str: def get_text(self) -> str:
editor = self.atserv.atserver_request( editor = self.atserv.atserver_request(
f'https://aternos.org/files/{self._name}', 'GET' f'https://aternos.org/files/{self._name}', 'GET'
) )
edittree = lxml.html.fromstring(editor.content) edittree = lxml.html.fromstring(editor.content)
editfield = edittree.xpath('//div[@class="ace_layer ace_text-layer"]')[0] editlines = edittree.xpath('//div[@class="ace_line"]')
editlines = editfield.xpath('/div[@class="ace_line"]')
rawlines = [] rawlines = []
for line in editlines: for line in editlines:
@ -69,6 +71,7 @@ class AternosFile:
return rawlines return rawlines
def set_text(self, value:str) -> None: def set_text(self, value:str) -> None:
self.set_content(value.encode('utf-8')) self.set_content(value.encode('utf-8'))
@property @property
@ -79,6 +82,10 @@ class AternosFile:
def name(self) -> str: def name(self) -> str:
return self._name return self._name
@property
def full(self) -> str:
return self._full
@property @property
def is_dir(self) -> bool: def is_dir(self) -> bool:
if self._ftype == FileType.directory: if self._ftype == FileType.directory:
@ -94,7 +101,3 @@ class AternosFile:
@property @property
def size(self) -> float: def size(self) -> float:
return self._size return self._size
@property
def dlallowed(self) -> bool:
return self._dlallowed

View file

@ -18,21 +18,18 @@ class FileManager:
f'https://aternos.org/files/{path}', 'GET' f'https://aternos.org/files/{path}', 'GET'
) )
filestree = lxml.html.fromstring(filesreq.content) filestree = lxml.html.fromstring(filesreq.content)
fileslist = filestree.xpath( fileslist = filestree.xpath('//div[@class="file clickable"]')
'//div[@class="files"]' + \
'/div[@class="directory dropzone"]' + \
'/div[@class="file clickable"]'
)
files = [] files = []
for f in fileslist: for f in fileslist:
ftype_raw = f.xpath('/@data-type') ftype_raw = f.xpath('@data-type')[0]
ftype = FileType.file \ ftype = FileType.file \
if ftype_raw == 'file' \ if ftype_raw == 'file' \
else FileType.directory else FileType.directory
fsize_raw = f.xpath('/div[@class="filesize"]') fsize_raw = f.xpath('./div[@class="filesize"]')
print(fsize_raw)
fsize = 0 fsize = 0
if len(fsize_raw) > 0: if len(fsize_raw) > 0:
@ -45,19 +42,14 @@ class FileManager:
except ValueError: except ValueError:
fsize = -1 fsize = -1
dlbutton = f.xpath('/div[contains(@class,"js-download-file ")]') fullpath = f.xpath('@data-path')[0]
dlallowed = False
if len(dlbutton) > 0:
dlallowed = True
fullpath = f.xpath('/@data-path')
filepath = fullpath[:fullpath.rfind('/')] filepath = fullpath[:fullpath.rfind('/')]
filename = fullpath[fullpath.rfind('/'):] filename = fullpath[fullpath.rfind('/'):]
files.append( files.append(
AternosFile( AternosFile(
self.atserv, self.atserv,
filepath, filename, filepath, filename,
ftype, fsize, dlallowed ftype, fsize
) )
) )

View file

@ -1,6 +1,6 @@
import enum import enum
import lxml.html import lxml.html
from typing import List from typing import List, Union
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
if TYPE_CHECKING: if TYPE_CHECKING:
@ -15,40 +15,34 @@ class Lists(enum.Enum):
class PlayersList: class PlayersList:
def __init__(self, lst:str, atserv:'AternosServer') -> None: def __init__(self, lst:Union[str,Lists], atserv:'AternosServer') -> None:
for ltype in Lists:
if ltype.value == lst:
break
else:
raise ValueError(
'Incorrect players list type! ' + \
'Use atplayers.Lists enum'
)
self.atserv = atserv self.atserv = atserv
self.lst = lst self.lst = Lists(lst)
self.players = [] self.players = []
self.parsed = False
def list_players(self, cache:bool=True) -> List[str]: def list_players(self, cache:bool=True) -> List[str]:
if cache: if cache and self.parsed:
return self.players return self.players
listreq = self.atserv.atserver_request( listreq = self.atserv.atserver_request(
f'https://aternos.org/players/{self.lst}', 'GET' f'https://aternos.org/players/{self.lst.value}',
'GET'
) )
listtree = lxml.html.fromstring(listreq.content) listtree = lxml.html.fromstring(listreq.content)
items = listtree.xpath( items = listtree.xpath(
'//div[@class="player-list"]' + \ '//div[@class="list-item"]'
'/div[@class="list-item-container"]' + \
'/div[@class="list-item"]'
) )
result = [] result = []
for i in items: for i in items:
name = i.xpath('./div[@class="list-name"]') name = i.xpath('./div[@class="list-name"]')
result.append(name) result.append(name[0].text.strip())
self.players = result
self.parsed = True
return result return result
def add(self, name:str) -> None: def add(self, name:str) -> None:
@ -56,9 +50,9 @@ class PlayersList:
self.atserv.atserver_request( self.atserv.atserver_request(
'https://aternos.org/panel/ajax/players/add.php', 'https://aternos.org/panel/ajax/players/add.php',
'POST', data={ 'POST', data={
'list': self.lst, 'list': self.lst.value,
'name': name 'name': name
} }, sendtoken=True
) )
self.players.append(name) self.players.append(name)
@ -68,9 +62,9 @@ class PlayersList:
self.atserv.atserver_request( self.atserv.atserver_request(
'https://aternos.org/panel/ajax/players/remove.php', 'https://aternos.org/panel/ajax/players/remove.php',
'POST', data={ 'POST', data={
'list': self.lst, 'list': self.lst.value,
'name': name 'name': name
} }, sendtoken=True
) )
for i, j in enumerate(self.players): for i, j in enumerate(self.players):

View file

@ -1,7 +1,5 @@
import enum import enum
import re
import json import json
import lxml.html
from requests import Response from requests import Response
from typing import Optional from typing import Optional
@ -12,9 +10,6 @@ from .atconf import AternosConfig
from .atplayers import PlayersList from .atplayers import PlayersList
from .atwss import AternosWss from .atwss import AternosWss
JAVA = 0
BEDROCK = 1
class Edition(enum.IntEnum): class Edition(enum.IntEnum):
java = 0 java = 0
bedrock = 1 bedrock = 1
@ -32,37 +27,22 @@ class AternosServer:
def __init__( def __init__(
self, servid:str, self, servid:str,
atconn:AternosConnect, atconn:AternosConnect) -> None:
savelog:bool=True) -> None:
self.servid = servid self.servid = servid
self.atconn = atconn self.atconn = atconn
self.savelog = savelog
self.log = [] def fetch(self) -> None:
servreq = self.atserver_request( servreq = self.atserver_request(
'https://aternos.org/server', 'GET' 'https://aternos.org/panel/ajax/status.php',
'GET', sendtoken=True
) )
servtree = lxml.html.fromstring(servreq.content) self._info = json.loads(servreq.content)
self._info = json.loads( def wss(self, autoconfirm:bool=False) -> AternosWss:
re.search(
r'var\s*lastStatus\s*=\s*({.*})',
servtree.head.text_content()
)[1]
)
self._ram = 0
self._tps = 0
self.atconn.parse_token(servreq.content) return AternosWss(self, autoconfirm)
self.atconn.generate_sec()
async def wss(self) -> AternosWss:
return AternosWss(
self.atconn.session.cookies,
self.servid
)
def start(self, headstart:bool=False, accepteula:bool=True) -> None: def start(self, headstart:bool=False, accepteula:bool=True) -> None:
@ -98,7 +78,7 @@ class AternosServer:
elif error == 'file': elif error == 'file':
raise ServerError( raise ServerError(
'File server is unavailbale, view status.aternos.gmbh' 'File server is unavailbale, view https://status.aternos.gmbh'
) )
elif error == 'size': elif error == 'size':
@ -155,7 +135,7 @@ class AternosServer:
return AternosConfig(self) return AternosConfig(self)
def get_players(self, lst:str) -> PlayersList: def players(self, lst:str) -> PlayersList:
return PlayersList(lst, self) return PlayersList(lst, self)
@ -227,9 +207,13 @@ class AternosServer:
return self._info['version'] return self._info['version']
@property @property
def status(self) -> int: def status(self) -> str:
return self._info['class']
@property
def status_num(self) -> int:
return int(self._info['status']) return int(self._info['status'])
@property @property
def ram(self) -> int: def ram(self) -> int:
return self._ram return int(self._info['ram'])

View file

@ -3,8 +3,11 @@ import json
import asyncio import asyncio
import websockets import websockets
from typing import Union, Any, Dict, Callable, Coroutine from typing import Union, Any, Dict, Callable, Coroutine
from typing import TYPE_CHECKING
from .atconnect import REQUA from .atconnect import REQUA
if TYPE_CHECKING:
from .atserver import AternosServer
class Streams(enum.IntEnum): class Streams(enum.IntEnum):
status = 0 status = 0
@ -15,11 +18,19 @@ class Streams(enum.IntEnum):
class AternosWss: class AternosWss:
def __init__(self, session:str, servid:str) -> None: def __init__(self, atserv:'AternosServer', autoconfirm:bool=False) -> None:
self.session = session self.atserv = atserv
self.servid = servid self.cookies = atserv.atconn.session.cookies
self.session = self.cookies['ATERNOS_SESSION']
self.servid = self.cookies['ATERNOS_SERVER']
self.recv = {} self.recv = {}
self.autoconfirm = autoconfirm
self.confirmed = False
async def confirm(self) -> None:
self.atserv.confirm()
def wssreceiver(self, stream:int) -> Callable[[Callable[[Any],Coroutine[Any,Any,None]]],Any]: def wssreceiver(self, stream:int) -> Callable[[Callable[[Any],Coroutine[Any,Any,None]]],Any]:
def decorator(func:Callable[[Any],Coroutine[Any,Any,None]]) -> None: def decorator(func:Callable[[Any],Coroutine[Any,Any,None]]) -> None:
@ -29,18 +40,21 @@ class AternosWss:
async def connect(self) -> None: async def connect(self) -> None:
headers = [ headers = [
('Host', 'aternos.org'),
('User-Agent', REQUA), ('User-Agent', REQUA),
( (
'Cookie', 'Cookie',
f'ATERNOS_SESSION={self.session}; ' +\ f'ATERNOS_SESSION={self.session}; ' + \
f'ATERNOS_SERVER={self.servid}' f'ATERNOS_SERVER={self.servid}'
) )
] ]
self.socket = await websockets.connect( self.socket = await websockets.connect(
'wss://aternos.org/hermes', 'wss://aternos.org/hermes/',
origin='https://aternos.org',
extra_headers=headers extra_headers=headers
) )
asyncio.run(wssworker())
await self.wssworker()
async def close(self) -> None: async def close(self) -> None:
@ -56,8 +70,8 @@ class AternosWss:
async def wssworker(self) -> None: async def wssworker(self) -> None:
keep = asyncio.create_task(keepalive()) keep = asyncio.create_task(self.keepalive())
msgs = asyncio.create_task(receiver()) msgs = asyncio.create_task(self.receiver())
await keep await keep
await msgs await msgs
@ -90,6 +104,16 @@ class AternosWss:
msgtype = Streams.status msgtype = Streams.status
msg = json.loads(obj['message']) msg = json.loads(obj['message'])
if not self.autoconfirm:
continue
if msg['class'] == 'queueing' \
and msg['queue']['pending'] == 'pending'\
and not self.confirmed:
t = asyncio.create_task(
self.confirm()
)
await t
if msgtype in self.recv: if msgtype in self.recv:
t = asyncio.create_task( t = asyncio.create_task(
self.recv[msgtype](msg) self.recv[msgtype](msg)

View file

@ -1,30 +1,56 @@
import re import re
import unittest
from python_aternos import atjsparse from python_aternos import atjsparse
# Use tests from a file class TestJs2Py(unittest.TestCase):
tests = []
with open('../token.txt', 'rt') as f:
lines = re.split(r'[\r\n]', f.read())
del lines[len(lines)-1] # Remove empty string
tests = lines
for f in tests: def setUp(self) -> None:
ctx = atjsparse.exec(f)
print(ctx.window['AJAX_TOKEN'])
# 2rKOA1IFdBcHhEM616cb self.tests = []
# 2rKOA1IFdBcHhEM616cb with open('../token.txt', 'rt') as f:
# 2rKOA1IFdBcHhEM616cb lines = re.split(r'[\r\n]', f.read())
# 2rKOA1IFdBcHhEM616cb del lines[len(lines)-1] # Remove empty string
# 2rKOA1IFdBcHhEM616cb self.tests = lines
# 2rKOA1IFdBcHhEM616cb
# 2rKOA1IFdBcHhEM616cb self.results = [
# 2rKOA1IFdBcHhEM616cb '2rKOA1IFdBcHhEM616cb'
# 2rKOA1IFdBcHhEM616cb '2rKOA1IFdBcHhEM616cb'
# 2iXh5W5uEYq5fWJIazQ6 '2rKOA1IFdBcHhEM616cb'
# CuUcmZ27Fb8bVBNw12Vj '2rKOA1IFdBcHhEM616cb'
# YPPe8Ph7vzYaZ9PF9oQP '2rKOA1IFdBcHhEM616cb'
# UfLlemvKEE16ltk0hZNM '2rKOA1IFdBcHhEM616cb'
# q6pYdP6r7xiVHhbotvlN '2rKOA1IFdBcHhEM616cb'
# q6pYdP6r7xiVHhbotvlN '2rKOA1IFdBcHhEM616cb'
# XAIbksgkVX9JYboMDI7D '2rKOA1IFdBcHhEM616cb'
'2iXh5W5uEYq5fWJIazQ6'
'CuUcmZ27Fb8bVBNw12Vj'
'YPPe8Ph7vzYaZ9PF9oQP'
'UfLlemvKEE16ltk0hZNM'
'q6pYdP6r7xiVHhbotvlN'
'q6pYdP6r7xiVHhbotvlN'
'XAIbksgkVX9JYboMDI7D'
]
def test_base64(self) -> None:
encoded = 'QEhlbGxvIFdvcmxkIQ=='
decoded = atjsparse.atob(encoded)
self.assertEqual(decoded, '@Hello World!')
def test_conv(self) -> None:
token = '(() => {window["AJAX_TOKEN"]=("2r" + "KO" + "A1" + "IFdBcHhEM" + "61" + "6cb");})();'
f = atjsparse.to_ecma5_function(token)
self.assertEqual(f, '(function(){window["AJAX_TOKEN"]=("2r" + "KO" + "A1" + "IFdBcHhEM" + "61" + "6cb");})()')
def test_exec(self) -> None:
for i, f in enumerate(self.tests):
ctx = atjsparse.exec(f)
res = ctx.window['AJAX_TOKEN']
self.assertEqual(res, self.results[i])
def tearDown(self) -> None:
del self.tests
del self.results