Added file manager, static typization, bugfixes

This commit is contained in:
Andrey 2021-10-14 17:56:01 +04:00 committed by GitHub
parent b010b8a4c7
commit bf1bc9c553
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 241 additions and 74 deletions

View file

@ -22,7 +22,7 @@ class Client:
self.atconn = atconnect.AternosConnect()
self.token = self.atconn.get_token()
self.token = self.atconn.parse_token()
self.sec = self.atconn.generate_sec()
self.credentials = {
@ -33,7 +33,7 @@ class Client:
loginreq = self.atconn.request_cloudflare(
f'https://aternos.org/panel/ajax/account/login.php?' + \
f'SEC={self.sec}&TOKEN={self.token}',
self.atconn.REQPOST, data=self.credentials
atconnect.REQPOST, data=self.credentials
)
if loginreq.cookies.get('ATERNOS_SESSION', None) == None:
@ -41,11 +41,11 @@ class Client:
'Check your username and password'
)
def get_servers(self):
@property
def servers(self):
serverspage = self.atconn.request_cloudflare(
'https://aternos.org/servers/',
self.atconn.REQGET
atconnect.REQGET
)
serverstree = lxml.html.fromstring(serverspage.content)
serverslist = serverstree.xpath('//div[@class="servers"]/div')

View file

@ -2,26 +2,27 @@ import re
import time
import random
import lxml.html
from requests import Response
from cloudscraper import CloudScraper
from typing import Optional, Union
from . import aterrors
class AternosConnect:
REQGET = 0
REQPOST = 1
REQUA = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:68.0) Gecko/20100101 Goanna/4.8 Firefox/68.0 PaleMoon/29.4.0.2'
def __init__(self):
class AternosConnect:
def __init__(self) -> None:
pass
def get_token(self, response=None):
def parse_token(self, response:Optional[Union[str,bytes]]=None) -> str:
if response == None:
loginpage = self.request_cloudflare(
f'https://aternos.org/go/',
self.REQGET
f'https://aternos.org/go/', REQGET
).content
pagetree = lxml.html.fromstring(loginpage)
else:
@ -40,7 +41,7 @@ class AternosConnect:
return self.token
def generate_sec(self):
def generate_sec(self) -> str:
randkey = self.generate_aternos_rand()
randval = self.generate_aternos_rand()
@ -52,7 +53,7 @@ class AternosConnect:
return self.sec
def generate_aternos_rand(self, randlen=16):
def generate_aternos_rand(self, randlen:int=16) -> str:
rand_arr = []
for i in range(randlen+1):
@ -63,7 +64,7 @@ class AternosConnect:
'00000000000000000'
return (rand_alphanum[2:18].join(rand_arr)[:randlen])
def convert_num(self, num, base):
def convert_num(self, num:Union[int,float], base:int) -> str:
result = ''
while num > 0:
@ -72,9 +73,13 @@ class AternosConnect:
return result
def request_cloudflare(
self, url, method, retries=10,
params=None, data=None, headers=None,
reqcookies=None, sendtoken=False):
self, url:str, method:int,
retries:int=10,
params:Optional[dict]=None,
data:Optional[dict]=None,
headers:Optional[dict]=None,
reqcookies:Optional[dict]=None,
sendtoken:bool=False) -> Response:
cftitle = '<title>Please Wait... | Cloudflare</title>'
@ -86,7 +91,7 @@ class AternosConnect:
if headers == None:
headers = {}
headers['User-Agent'] = self.REQUA
headers['User-Agent'] = REQUA
try:
cookies = self.session.cookies
@ -97,7 +102,7 @@ class AternosConnect:
if cookies != None:
self.session.cookies = cookies
if method == self.REQPOST:
if method == REQPOST:
req = self.session.post(
url,
data=data,
@ -124,7 +129,7 @@ class AternosConnect:
self.session.cookies.set(cookiekey, reqcookies[cookiekey])
time.sleep(1)
if method == self.REQPOST:
if method == REQPOST:
req = self.session.post(
url,
data=data,
@ -141,7 +146,3 @@ class AternosConnect:
countdown -= 1
return req
def get_session(self):
return self.session

61
python_aternos/atfile.py Normal file
View file

@ -0,0 +1,61 @@
import lxml.html
from typing import Union
from . import atserver
from . import atconnect
FTYPE_FILE = 0
FTYPE_DIR = 1
class AternosFile:
def __init__(atserv:atserver.AternosServer, path:str, name:str, ftype:int=FTYPE_FILE, size:Union[]=0):
self.atserv = atserv
self._name = name
self._ftype = ftype
self._size = size
def delete(self):
self.atserv.atserver_request(
'https://aternos.org/panel/ajax/delete.php',
atconnect.REQPOST, data={'file': self._name},
sendtoken=True
)
@property
def text(self):
editor = self.atserv.atserver_request(
f'https://aternos.org/files/{self._name}',
atconnect.REQGET
)
edittree = lxml.html.fromstring(editor.content)
editfield = edittree.xpath('//div[@class="ace_layer ace_text-layer"]')[0]
editlines = editfield.xpath('/div[@class="ace_line"]')
rawlines = []
for line in editlines:
rawlines.append(line.text)
return rawlines
@text.setter
def text(self, value):
self.atserv.atserver_request(
f'https://aternos.org/panel/ajax/save.php',
atconnect.REQPOST, data={'content': value},
sendtoken=True
)
@property
def name(self):
return self._name
@property
def ftype(self):
return self._ftype
@property
def size(self):
return self._size

101
python_aternos/atfm.py Normal file
View file

@ -0,0 +1,101 @@
import lxml.html
from typing import Optional, Union, List
from . import atserver
from . import atconnect
from . import atfile
class AternosFileManager:
def __init__(atserv:atserver.AternosServer) -> None:
self.atserv = atserv
def listdir(self, path:str='') -> List[atfile.AternosFile]:
filesreq = self.atserv.atserver_request(
f'https://aternos.org/files/{path}',
atconnect.REQGET
)
filestree = lxml.html.fromstring(filesreq.content)
fileslist = filestree.xpath(
'//div[@class="files"]/div[@class="directory dropzone"]' + \
'/div[@class="file clickable"]'
)
files = []
for f in fileslist:
ftype_raw = f.xpath('/@data-type')
ftype = atfile.FTYPE_FILE \
if ftype_raw == 'file' \
else atfile.FTYPE_DIR
fsize_raw = f.xpath('/div[@class="filesize"]')
fsize = 0
if len(fsize_raw) > 0:
fsize_text = fsize_raw[0].text.strip()
fsize_num = fsize_text[:fsize_text.rfind(' ')]
fsize_msr = fsize_text[fsize_text.rfind(' ')+1:]
fsize = convert_size(fsize_num, fsize_msr)
fullpath = f.xpath('/@data-path')[0]
filepath = fullpath[:fullpath.rfind('/')]
filename = fullpath[fullpath.rfind('/'):]
files.append(
atfile.AternosFile(
self.atserv,
filepath, filename,
ftype, fsize
)
)
return files
def convert_size(self, num:Union[int,float], measure:str) -> float:
measure_match = {
'B': 1,
'kB': 1000,
'MB': 1000000,
'GB': 1000000000
}
try:
result = num * measure_match[measure]
except KeyError:
result = -1
return result
def get_file(self, path:str) -> Union[atfile.AternosFile,None]:
filepath = path[:path.rfind('/')]
filename = path[path.rfind('/'):]
filedir = listdir(filepath)
for file in filedir:
if file.name == filename:
return file
return None
def dl_file(self, path:str) -> bytes:
file = self.atserv.atserver_request(
f'https://aternos.org/panel/ajax/files/download.php?' + \
f'file={path.replace('/','%2F')}',
atconnect.REQGET
)
return file.content
def dl_world(self, world:str='world') -> bytes:
world = self.atserv.atserver_request(
f'https://aternos.org/panel/ajax/worlds/download.php?' + \
f'world={world.replace('/','%2F')}',
atconnect.REQGET
)
return world.content

View file

@ -1,45 +1,41 @@
import re
import json
import lxml.html
from requests import Response
from typing import Optional, Dict
from . import atconnect
from . import aterrors
from . import atfiles
from . import atfm
class AternosServer:
def __init__(self, servid, atconn):
def __init__(self, servid:str, atconn:atconnect.AternosConnect) -> None:
self.servid = servid
self.atconn = atconn
servreq = self.atserver_request(
'https://aternos.org/server',
self.atconn.REQGET
atconnect.REQGET
)
servtree = lxml.html.fromstring(servreq.content)
servinfo = servtree.xpath(
'//div[@class="server-bottom-info server-info"]' + \
'/div[@class="server-info-container"]' + \
'/div[@class="server-info-box"]' + \
'/div[@class="server-info-box-body"]' + \
'/div[@class="server-info-box-value"]/span'
self._info = json.loads(
re.search(
r'var\s*lastStatus\s*=\s*({.*})',
servtree.head.text
)[1]
)
fullip = servinfo[0].text
self._address = fullip
self._domain = fullip[:fullip.rfind(':')]
self._port = fullip[fullip.rfind(':')+1:]
self._software = servinfo[1].text
self._version = servinfo[2].text
self.atconn.get_token(servreq.content)
self.atconn.parse_token(servreq.content)
self.atconn.generate_sec()
def start(self, accepteula=True):
def start(self, accepteula:bool=True) -> None:
startreq = self.atserver_request(
'https://aternos.org/panel/ajax/start.php',
self.atconn.REQGET, sendtoken=True
atconnect.REQGET, sendtoken=True
)
startresult = startreq.json()
@ -49,6 +45,7 @@ class AternosServer:
error = startresult['error']
if error == 'eula' and accepteula:
self.eula()
self.start(accepteula=False)
elif error == 'eula':
raise aterrors.AternosServerStartError(
'EULA was not accepted. Use start(accepteula=True)'
@ -62,48 +59,51 @@ class AternosServer:
f'Unable to start server. Code: {error}'
)
def confirm(self):
def confirm(self) -> None:
self.atserver_request(
'https://aternos.org/panel/ajax/confirm.php',
self.atconn.REQGET, sendtoken=True
atconnect.REQGET, sendtoken=True
)
def stop(self):
def stop(self) -> None:
self.atserver_request(
'https://aternos.org/panel/ajax/stop.php',
self.atconn.REQGET, sendtoken=True
atconnect.REQGET, sendtoken=True
)
def cancel(self):
def cancel(self) -> None:
self.atserver_request(
'https://aternos.org/panel/ajax/cancel.php',
self.atconn.REQGET, sendtoken=True
atconnect.REQGET, sendtoken=True
)
def restart(self):
def restart(self) -> None:
self.atserver_request(
'https://aternos.org/panel/ajax/restart.php',
self.atconn.REQGET, sendtoken=True
atconnect.REQGET, sendtoken=True
)
def eula(self):
def eula(self) -> None:
self.atserver_request(
'https://aternos.org/panel/ajax/eula.php',
self.atconn.REQGET, sendtoken=True
atconnect.REQGET, sendtoken=True
)
def files(self):
def files(self) -> atfm.AternosFileManager:
return AternosFileManager(self)
return atfm.AternosFileManager(self)
def atserver_request(
self, url, method, params=None,
data=None, headers=None, sendtoken=False):
self, url:str, method:int,
params:Optional[dict]=None,
data:Optional[dict]=None,
headers:Optional[dict]=None,
sendtoken:bool=False) -> Response:
return self.atconn.request_cloudflare(
url=url, method=method,
@ -116,21 +116,25 @@ class AternosServer:
)
@property
def address(self):
return self._address
def info(self) -> dict:
return self._info
@property
def domain(self):
return self._domain
def address(self) -> str:
return f'{self.domain}:{self.port}'
@property
def port(self):
return self._port
def domain(self) -> str:
return self._info['displayAddress']
@property
def software(self):
return self._software
def port(self) -> int:
return self._info['port']
@property
def version(self):
return self._version
def software(self) -> str:
return self._info['software']
@property
def version(self) -> str:
return self._info['version']