Rewritten atclient, some methods moved to ataccount, corrected docs

This commit is contained in:
DarkCat09 2023-05-24 20:03:09 +04:00
parent 134a27b43d
commit 55ce48819e
9 changed files with 378 additions and 439 deletions

229
python_aternos/ataccount.py Normal file
View file

@ -0,0 +1,229 @@
"""Methods related to an Aternos account
including servers page parsing"""
import re
import base64
from typing import List, Dict
from typing import TYPE_CHECKING
import lxml.html
from .atlog import log
from .atmd5 import md5encode
from .atconnect import AternosConnect
from .atconnect import BASE_URL, AJAX_URL
from .atserver import AternosServer
if TYPE_CHECKING:
from .atclient import Client
email_re = re.compile(
r'^[A-Za-z0-9\-_+.]+@[A-Za-z0-9\-_+.]+\.[A-Za-z0-9\-]+$|^$'
)
class AternosAccount:
"""Methods related to an Aternos account
including servers page parsing"""
def __init__(self, atclient: 'Client') -> None:
"""Should not be instantiated manually,
the entrypoint is `atclient.Client`
Args:
atconn (AternosConnect): AternosConnect object
"""
self.atclient = atclient
self.atconn: AternosConnect = atclient.atconn
self.parsed = False
self.servers: List[AternosServer] = []
def list_servers(self, cache: bool = True) -> List[AternosServer]:
"""Parses a servers list
Args:
cache (bool, optional): If the function should use
cached servers list (recommended)
Returns:
List of AternosServer objects
"""
if cache and self.parsed:
return self.servers
serverspage = self.atconn.request_cloudflare(
f'{BASE_URL}/servers/', 'GET'
)
serverstree = lxml.html.fromstring(serverspage.content)
servers = serverstree.xpath(
'//div[@class="server-body"]/@data-id'
)
self.refresh_servers(servers)
# Update session file (add servers)
try:
self.atclient.save_session(self.atclient.saved_session)
except OSError as err:
log.warning('Unable to save servers list to file: %s', err)
return self.servers
def refresh_servers(self, ids: List[str]) -> None:
"""Replaces the cached servers list
creating AternosServer objects by given IDs
Args:
ids (List[str]): Servers unique identifiers
"""
self.servers = []
for s in ids:
servid = s.strip()
if servid == '':
continue
log.debug('Adding server %s', servid)
srv = AternosServer(servid, self.atconn)
self.servers.append(srv)
self.parsed = True
def get_server(self, servid: str) -> AternosServer:
"""Creates a server object from the server ID.
Use this instead of `list_servers` if you know
the server IDentifier
Returns:
AternosServer object
"""
return AternosServer(servid, self.atconn)
def change_username(self, value: str) -> None:
"""Changes a username in your Aternos account
Args:
value (str): New username
"""
self.atconn.request_cloudflare(
f'{AJAX_URL}/account/username.php',
'POST', data={'username': value},
sendtoken=True,
)
def change_email(self, value: str) -> None:
"""Changes an e-mail in your Aternos account
Args:
value (str): New e-mail
Raises:
ValueError: If an invalid e-mail address
was passed to the function
"""
if not email_re.match(value):
raise ValueError('Invalid e-mail')
self.atconn.request_cloudflare(
f'{AJAX_URL}/account/email.php',
'POST', data={'email': value},
sendtoken=True,
)
def change_password(self, old: str, new: str) -> None:
"""Changes a password in your Aternos account
Args:
old (str): Old password
new (str): New password
"""
self.change_password_hashed(
md5encode(old),
md5encode(new),
)
def change_password_hashed(self, old: str, new: str) -> None:
"""Changes a password in your Aternos account.
Unlike `change_password`, this function
takes hashed passwords as the arguments
Args:
old (str): Old password hashed with MD5
new (str): New password hashed with MD5
"""
self.atconn.request_cloudflare(
f'{AJAX_URL}/account/password.php',
'POST', data={
'oldpassword': old,
'newpassword': new,
},
sendtoken=True,
)
def qrcode_2fa(self) -> Dict[str, str]:
"""Requests a secret code and
a QR code for enabling 2FA"""
return self.atconn.request_cloudflare(
f'{AJAX_URL}/account/secret.php',
'GET', sendtoken=True,
).json()
def save_qr(self, qrcode: str, filename: str) -> None:
"""Writes a 2FA QR code into a png-file
Args:
qrcode (str): Base64 encoded png image from `qrcode_2fa()`
filename (str): Where the QR code image must be saved.
Existing file will be rewritten.
"""
data = qrcode.removeprefix('data:image/png;base64,')
png = base64.b64decode(data)
with open(filename, 'wb') as f:
f.write(png)
def enable_2fa(self, code: int) -> None:
"""Enables Two-Factor Authentication
Args:
code (int): 2FA code
"""
self.atconn.request_cloudflare(
f'{AJAX_URL}/account/twofactor.php',
'POST', data={'code': code},
sendtoken=True,
)
def disable_2fa(self, code: int) -> None:
"""Disables Two-Factor Authentication
Args:
code (int): 2FA code
"""
self.atconn.request_cloudflare(
f'{AJAX_URL}/account/disbaleTwofactor.php',
'POST', data={'code': code},
sendtoken=True,
)
def logout(self) -> None:
"""The same as `atclient.Client.logout`"""
self.atclient.logout()

View file

@ -3,21 +3,16 @@ and allows to manage your account"""
import os
import re
import hashlib
import base64
from typing import List, Dict
from typing import Optional, Type
import lxml.html
from .atlog import log
from .atmd5 import md5encode
from .ataccount import AternosAccount
from .atconnect import AternosConnect
from .atconnect import BASE_URL, AJAX_URL
from .atconnect import AJAX_URL
from .atserver import AternosServer
from .aterrors import CredentialsError
from .aterrors import TwoFactorAuthError
@ -27,82 +22,75 @@ from .atjsparse import Js2PyInterpreter
class Client:
"""Aternos API Client class, object
of which contains user's auth data"""
def __init__(
self,
atconn: AternosConnect,
servers: Optional[List[str]] = None) -> None:
"""Aternos API Client class, object
of which contains user's auth data
def __init__(self) -> None:
Args:
atconn (AternosConnect):
AternosConnect instance with initialized Aternos session
servers (Optional[List[str]], optional):
List with servers IDs
"""
self.atconn = atconn
# Config
self.debug = False
self.sessions_dir = '~'
self.js: Type[Interpreter] = Js2PyInterpreter
# ###
self.saved_session = ''
self.atconn = AternosConnect()
self.account = AternosAccount(self)
self.parsed = False
self.servers: List[AternosServer] = []
if servers:
self.refresh_servers(servers)
@classmethod
def from_hashed(
cls,
def login(
self,
username: str,
md5: str,
code: Optional[int] = None,
sessions_dir: str = '~',
js: Type[Interpreter] = Js2PyInterpreter,
**custom_args):
"""Log in to an Aternos account with
a username and a hashed password
password: str,
code: Optional[int] = None) -> None:
"""Log in to your Aternos account
with a username and a plain password
Args:
username (str): Your username
md5 (str): Your password hashed with MD5
code (Optional[int]): 2FA code
sessions_dir (str): Path to the directory
where session will be automatically saved
js (Type[Interpreter]): Preferred JS interpreter,
any class from `atjsparse`
inheriting `Interpreter` class
**custom_args (tuple, optional): Keyword arguments
which will be passed to CloudScraper `__init__`
Raises:
CredentialsError: If the API didn't
return a valid session cookie
username (str): Username
password (str): Plain-text password
code (Optional[int], optional): 2FA code
"""
filename = cls.session_file(
username, sessions_dir
self.login_hashed(
username,
md5encode(password),
code,
)
def login_hashed(
self,
username: str,
md5: str,
code: Optional[int] = None) -> None:
"""Log in to your Aternos account
with a username and a hashed password
Args:
username (str): Username
md5 (str): Password hashed with MD5
code (int): 2FA code
Raises:
TwoFactorAuthError: If the 2FA is enabled,
but `code` argument was not passed or is incorrect
CredentialsError: If the Aternos backend
returned empty session cookie
(usually because of incorrect credentials)
ValueError: _description_
"""
filename = self.session_filename(
username, self.sessions_dir
)
try:
return cls.restore_session(
filename, **custom_args
)
self.restore_session(filename)
except (OSError, CredentialsError):
pass
atjsparse.get_interpreter(create=js)
atconn = AternosConnect()
if len(custom_args) > 0:
atconn.add_args(**custom_args)
atconn.parse_token()
atconn.generate_sec()
atjsparse.get_interpreter(create=self.js)
self.atconn.parse_token()
self.atconn.generate_sec()
credentials = {
'user': username,
@ -112,9 +100,9 @@ class Client:
if code is not None:
credentials['code'] = str(code)
loginreq = atconn.request_cloudflare(
loginreq = self.atconn.request_cloudflare(
f'{AJAX_URL}/account/login.php',
'POST', data=credentials, sendtoken=True
'POST', data=credentials, sendtoken=True,
)
if b'"show2FA":true' in loginreq.content:
@ -125,169 +113,58 @@ class Client:
'Check your username and password'
)
obj = cls(atconn)
obj.saved_session = filename
self.saved_session = filename
try:
obj.save_session(filename)
self.save_session(filename)
except OSError:
pass
return obj
def logout(self) -> None:
"""Log out from the Aternos account"""
@classmethod
def from_credentials(
cls,
username: str,
password: str,
code: Optional[int] = None,
sessions_dir: str = '~',
js: Type[Interpreter] = Js2PyInterpreter,
**custom_args):
"""Log in to Aternos with a username and a plain password
Args:
username (str): Your username
password (str): Your password without any encryption
code (Optional[int]): 2FA code
sessions_dir (str): Path to the directory
where session will be automatically saved
js (Type[Interpreter]): Preferred JS interpreter,
any class from `atjsparse`
inheriting `Interpreter` class
**custom_args (tuple, optional): Keyword arguments
which will be passed to CloudScraper `__init__`
"""
md5 = Client.md5encode(password)
return cls.from_hashed(
username, md5, code,
sessions_dir, js,
**custom_args
self.atconn.request_cloudflare(
f'{AJAX_URL}/account/logout.php',
'GET', sendtoken=True,
)
@classmethod
def from_session(
cls,
session: str,
servers: Optional[List[str]] = None,
js: Type[Interpreter] = Js2PyInterpreter,
**custom_args):
"""Log in to Aternos using a session cookie value
self.remove_session(self.saved_session)
def restore_session(self, filename: str = '~/.aternos') -> None:
"""Restores ATERNOS_SESSION cookie and,
if included, servers list, from a session file
Args:
session (str): Value of ATERNOS_SESSION cookie
servers (Optional[List[str]]): List of cached servers IDs.
js (Type[Interpreter]): Preferred JS interpreter,
any class from `atjsparse`
inheriting `Interpreter` class
**custom_args (tuple, optional): Keyword arguments
which will be passed to CloudScraper `__init__`
filename (str, optional): Filename
Raises:
FileNotFoundError: If the file cannot be found
CredentialsError: If the session cookie
(or the file at all) has incorrect format
"""
atjsparse.get_interpreter(create=js)
atconn = AternosConnect()
filename = os.path.expanduser(filename)
log.debug('Restoring session from %s', filename)
atconn.add_args(**custom_args)
atconn.session.cookies['ATERNOS_SESSION'] = session
atconn.parse_token()
atconn.generate_sec()
return cls(atconn, servers)
@classmethod
def restore_session(
cls,
file: str = '~/.aternos',
js: Type[Interpreter] = Js2PyInterpreter,
**custom_args):
"""Log in to Aternos using
a saved ATERNOS_SESSION cookie
Args:
file (str, optional): File where a session cookie was saved
js (Type[Interpreter]): Preferred JS interpreter,
any class from `atjsparse`
inheriting `Interpreter` class
**custom_args (tuple, optional): Keyword arguments
which will be passed to CloudScraper `__init__`
"""
file = os.path.expanduser(file)
log.debug('Restoring session from %s', file)
if not os.path.exists(file):
if not os.path.exists(filename):
raise FileNotFoundError()
with open(file, 'rt', encoding='utf-8') as f:
with open(filename, 'rt', encoding='utf-8') as f:
saved = f.read() \
.strip() \
.replace('\r\n', '\n') \
.split('\n')
session = saved[0].strip()
if session == '':
if session == '' or not session.isalnum():
raise CredentialsError(
'Unable to read session cookie, '
'the first line is empty'
'Session cookie is invalid or the file is empty'
)
if len(saved) > 1:
obj = cls.from_session(
session=session,
servers=saved[1:],
js=js,
**custom_args
)
else:
obj = cls.from_session(
session,
js=js,
**custom_args
)
self.account.refresh_servers(saved[1:])
obj.saved_session = file
return obj
@staticmethod
def md5encode(passwd: str) -> str:
"""Encodes the given string with MD5
Args:
passwd (str): String to encode
Returns:
Hexdigest hash of the string in lowercase
"""
encoded = hashlib.md5(passwd.encode('utf-8'))
return encoded.hexdigest().lower()
@staticmethod
def session_file(username: str, sessions_dir: str = '~') -> str:
"""Generates session file name
for authenticated user
Args:
username (str): Authenticated user
sessions_dir (str, optional): Path to directory
with automatically saved sessions
Returns:
Filename
"""
# unsafe symbols replacement
repl = '_'
secure = re.sub(
r'[^A-Za-z0-9_-]',
repl, username
)
return f'{sessions_dir}/.at_{secure}'
self.atconn.session.cookies['ATERNOS_SESSION'] = session
self.saved_session = filename
def save_session(
self,
@ -298,8 +175,9 @@ class Client:
Args:
file (str, optional): File where a session cookie must be saved
incl_servers (bool, optional): If the function
should include the servers IDs to
reduce API requests count (recommended)
should include the servers IDs in this file
to reduce API requests count on the next restoration
(recommended)
"""
file = os.path.expanduser(file)
@ -311,7 +189,7 @@ class Client:
if not incl_servers:
return
for s in self.servers:
for s in self.account.servers:
f.write(s.servid + '\n')
def remove_session(self, file: str = '~/.aternos') -> None:
@ -331,193 +209,25 @@ class Client:
except OSError as err:
log.warning('Unable to delete session file: %s', err)
def list_servers(self, cache: bool = True) -> List[AternosServer]:
"""Parses a list of your servers from Aternos website
@staticmethod
def session_filename(username: str, sessions_dir: str = '~') -> str:
"""Generates a session file name
Args:
cache (bool, optional): If the function should use
cached servers list (recommended)
username (str): Authenticated user
sessions_dir (str, optional): Path to directory
with automatically saved sessions
Returns:
List of AternosServer objects
Filename
"""
if cache and self.parsed:
return self.servers
# unsafe symbols replacement
repl = '_'
serverspage = self.atconn.request_cloudflare(
f'{BASE_URL}/servers/', 'GET'
)
serverstree = lxml.html.fromstring(serverspage.content)
servers = serverstree.xpath(
'//div[@class="server-body"]/@data-id'
)
self.refresh_servers(servers)
# Update session file (add servers)
try:
self.save_session(self.saved_session)
except OSError as err:
log.warning('Unable to save servers list to file: %s', err)
return self.servers
def refresh_servers(self, ids: List[str]) -> None:
"""Replaces cached servers list creating
AternosServer objects by given IDs
Args:
ids (List[str]): Servers unique identifiers
"""
self.servers = []
for s in ids:
servid = s.strip()
if servid == '':
continue
log.debug('Adding server %s', servid)
srv = AternosServer(servid, self.atconn)
self.servers.append(srv)
self.parsed = True
def get_server(self, servid: str) -> AternosServer:
"""Creates a server object from the server ID.
Use this instead of list_servers
if you know the ID to save some time.
Returns:
AternosServer object
"""
return AternosServer(servid, self.atconn)
def logout(self) -> None:
"""Log out from Aternos account"""
self.atconn.request_cloudflare(
f'{AJAX_URL}/account/logout.php',
'GET', sendtoken=True
secure = re.sub(
r'[^A-Za-z0-9_-]',
repl, username,
)
self.remove_session(self.saved_session)
def change_username(self, value: str) -> None:
"""Changes a username in your Aternos account
Args:
value (str): New username
"""
self.atconn.request_cloudflare(
f'{AJAX_URL}/account/username.php',
'POST', data={'username': value}, sendtoken=True
)
def change_email(self, value: str) -> None:
"""Changes an e-mail in your Aternos account
Args:
value (str): New e-mail
Raises:
ValueError: If an invalid e-mail address
was passed to the function
"""
email = re.compile(
r'^[A-Za-z0-9\-_+.]+@[A-Za-z0-9\-_+.]+\.[A-Za-z0-9\-]+$|^$'
)
if not email.match(value):
raise ValueError('Invalid e-mail!')
self.atconn.request_cloudflare(
f'{AJAX_URL}/account/email.php',
'POST', data={'email': value}, sendtoken=True
)
def change_password(self, old: str, new: str) -> None:
"""Changes a password in your Aternos account
Args:
old (str): Old password
new (str): New password
"""
self.change_password_hashed(
Client.md5encode(old),
Client.md5encode(new),
)
def change_password_hashed(self, old: str, new: str) -> None:
"""Changes a password in your Aternos account.
Unlike `change_password`, this function
takes hashed passwords as arguments
Args:
old (str): Old password hashed with MD5
new (str): New password hashed with MD5
"""
self.atconn.request_cloudflare(
f'{AJAX_URL}/account/password.php',
'POST', data={
'oldpassword': old,
'newpassword': new,
}, sendtoken=True
)
def qrcode_2fa(self) -> Dict[str, str]:
"""Requests a secret code and
a QR code for enabling 2FA"""
return self.atconn.request_cloudflare(
f'{AJAX_URL}/account/secret.php',
'GET', sendtoken=True
).json()
def save_qr(self, qrcode: str, filename: str) -> None:
"""Writes a 2FA QR code into a png-file
Args:
qrcode (str): Base64 encoded png image from `qrcode_2fa()`
filename (str): Where the QR code image must be saved.
Existing file will be rewritten.
"""
data = qrcode.removeprefix('data:image/png;base64,')
png = base64.b64decode(data)
with open(filename, 'wb') as f:
f.write(png)
def enable_2fa(self, code: int) -> None:
"""Enables Two-Factor Authentication
Args:
code (int): 2FA code
"""
self.atconn.request_cloudflare(
f'{AJAX_URL}/account/twofactor.php',
'POST', data={
'code': code
}, sendtoken=True
)
def disable_2fa(self, code: int) -> None:
"""Disables Two-Factor Authentication
Args:
code (int): 2FA code
"""
self.atconn.request_cloudflare(
f'{AJAX_URL}/account/disbaleTwofactor.php',
'POST', data={
'code': code
}, sendtoken=True
)
return f'{sessions_dir}/.at_{secure}'

View file

@ -1,5 +1,6 @@
"""Stores API session and sends requests"""
import logging
import re
import time
@ -39,7 +40,6 @@ SEC_ALPHABET = string.ascii_lowercase + string.digits
class AternosConnect:
"""Class for sending API requests,
bypassing Cloudflare and parsing responses"""
@ -51,28 +51,6 @@ class AternosConnect:
self.token = ''
self.atcookie = ''
def add_args(self, **kwargs) -> None:
"""Pass arguments to CloudScraper
session object __init__
if kwargs is not empty
"""
if len(kwargs) < 1:
log.debug('**kwargs is empty')
return
log.debug('New args for CloudScraper: %s', kwargs)
self.cf_init = partial(CloudScraper, **kwargs)
self.refresh_session()
def clear_args(self) -> None:
"""Clear CloudScarper object __init__ arguments
which was set using add_args method"""
log.debug('Creating session object with no keywords')
self.cf_init = partial(CloudScraper)
self.refresh_session()
def refresh_session(self) -> None:
"""Creates a new CloudScraper
session object and copies all cookies.
@ -88,8 +66,6 @@ class AternosConnect:
is needed for most requests
Raises:
RuntimeWarning: If the parser can not
find `<head>` tag in HTML response
TokenError: If the parser is unable
to extract ajax token from HTML
@ -120,6 +96,7 @@ class AternosConnect:
pagehead = loginpage[headtag:headend]
js_code: Optional[List[Any]] = None
try:
text = pagehead.decode('utf-8', 'replace')
js_code = re.findall(ARROW_FN_REGEX, text)
@ -174,7 +151,10 @@ class AternosConnect:
def generate_sec_part(self) -> str:
"""Generates a part for SEC token"""
return ''.join(secrets.choice(SEC_ALPHABET) for _ in range(11)) + ('0' * 5)
return ''.join(
secrets.choice(SEC_ALPHABET)
for _ in range(11)
) + ('0' * 5)
def request_cloudflare(
self, url: str, method: str,
@ -237,22 +217,24 @@ class AternosConnect:
reqcookies['ATERNOS_SESSION'] = self.atcookie
del self.session.cookies['ATERNOS_SESSION']
reqcookies_dbg = {
k: str(v or '')[:3]
for k, v in reqcookies.items()
}
if log.level == logging.DEBUG:
session_cookies_dbg = {
k: str(v or '')[:3]
for k, v in self.session.cookies.items()
}
reqcookies_dbg = {
k: str(v or '')[:3]
for k, v in reqcookies.items()
}
log.debug('Requesting(%s)%s', method, url)
log.debug('headers=%s', headers)
log.debug('params=%s', params)
log.debug('data=%s', data)
log.debug('req-cookies=%s', reqcookies_dbg)
log.debug('session-cookies=%s', session_cookies_dbg)
session_cookies_dbg = {
k: str(v or '')[:3]
for k, v in self.session.cookies.items()
}
log.debug('Requesting(%s)%s', method, url)
log.debug('headers=%s', headers)
log.debug('params=%s', params)
log.debug('data=%s', data)
log.debug('req-cookies=%s', reqcookies_dbg)
log.debug('session-cookies=%s', session_cookies_dbg)
if method == 'POST':
sendreq = partial(

View file

@ -115,7 +115,8 @@ class NodeInterpreter(Interpreter):
self.proc.communicate()
except AttributeError:
log.warning(
'NodeJS process was not initialized'
'NodeJS process was not initialized, '
'but __del__ was called'
)

View file

@ -1,2 +1,4 @@
"""Creates a logger"""
import logging
log = logging.getLogger('aternos')

17
python_aternos/atmd5.py Normal file
View file

@ -0,0 +1,17 @@
"""Contains a function for hashing"""
import hashlib
def md5encode(passwd: str) -> str:
"""Encodes the given string with MD5
Args:
passwd (str): String to encode
Returns:
Hexdigest hash of the string in lowercase
"""
encoded = hashlib.md5(passwd.encode('utf-8'))
return encoded.hexdigest().lower()

View file

@ -50,8 +50,8 @@ class PlayersList:
# whl_je = whitelist for java
# whl_be = whitelist for bedrock
# whl = common whitelist
common_whl = (self.lst == Lists.whl)
bedrock = (atserv.is_bedrock)
common_whl = self.lst == Lists.whl
bedrock = atserv.is_bedrock
if common_whl and bedrock:
self.lst = Lists.whl_be

View file

@ -87,8 +87,6 @@ class AternosServer:
page = self.atserver_request(
f'{BASE_URL}/server', 'GET'
)
with open('server.html', 'wt') as f:
f.write(page.text)
match = status_re.search(page.text)
if match is None:

View file

@ -157,8 +157,8 @@ class AternosWss:
if not self.autoconfirm:
return
in_queue = (msg['class'] == 'queueing')
pending = (msg['queue']['pending'] == 'pending')
in_queue = msg['class'] == 'queueing'
pending = msg['queue']['pending'] == 'pending'
confirmation = in_queue and pending
if confirmation and not self.confirmed: