diff --git a/python_aternos/ataccount.py b/python_aternos/ataccount.py new file mode 100644 index 0000000..6aac518 --- /dev/null +++ b/python_aternos/ataccount.py @@ -0,0 +1,229 @@ +"""Methods related to an Aternos account +including servers page parsing""" + +import re +import base64 + +from typing import List, Dict +from typing import TYPE_CHECKING + +import lxml.html + +from .atlog import log +from .atmd5 import md5encode + +from .atconnect import AternosConnect +from .atconnect import BASE_URL, AJAX_URL + +from .atserver import AternosServer + +if TYPE_CHECKING: + from .atclient import Client + + +email_re = re.compile( + r'^[A-Za-z0-9\-_+.]+@[A-Za-z0-9\-_+.]+\.[A-Za-z0-9\-]+$|^$' +) + + +class AternosAccount: + """Methods related to an Aternos account + including servers page parsing""" + + def __init__(self, atclient: 'Client') -> None: + """Should not be instantiated manually, + the entrypoint is `atclient.Client` + + Args: + atconn (AternosConnect): AternosConnect object + """ + + self.atclient = atclient + self.atconn: AternosConnect = atclient.atconn + + self.parsed = False + self.servers: List[AternosServer] = [] + + def list_servers(self, cache: bool = True) -> List[AternosServer]: + """Parses a servers list + + Args: + cache (bool, optional): If the function should use + cached servers list (recommended) + + Returns: + List of AternosServer objects + """ + + if cache and self.parsed: + return self.servers + + serverspage = self.atconn.request_cloudflare( + f'{BASE_URL}/servers/', 'GET' + ) + serverstree = lxml.html.fromstring(serverspage.content) + + servers = serverstree.xpath( + '//div[@class="server-body"]/@data-id' + ) + self.refresh_servers(servers) + + # Update session file (add servers) + try: + self.atclient.save_session(self.atclient.saved_session) + except OSError as err: + log.warning('Unable to save servers list to file: %s', err) + + return self.servers + + def refresh_servers(self, ids: List[str]) -> None: + """Replaces the cached servers list + creating AternosServer objects by given IDs + + Args: + ids (List[str]): Servers unique identifiers + """ + + self.servers = [] + for s in ids: + + servid = s.strip() + if servid == '': + continue + + log.debug('Adding server %s', servid) + srv = AternosServer(servid, self.atconn) + self.servers.append(srv) + + self.parsed = True + + def get_server(self, servid: str) -> AternosServer: + """Creates a server object from the server ID. + Use this instead of `list_servers` if you know + the server IDentifier + + Returns: + AternosServer object + """ + + return AternosServer(servid, self.atconn) + + def change_username(self, value: str) -> None: + """Changes a username in your Aternos account + + Args: + value (str): New username + """ + + self.atconn.request_cloudflare( + f'{AJAX_URL}/account/username.php', + 'POST', data={'username': value}, + sendtoken=True, + ) + + def change_email(self, value: str) -> None: + """Changes an e-mail in your Aternos account + + Args: + value (str): New e-mail + + Raises: + ValueError: If an invalid e-mail address + was passed to the function + """ + + if not email_re.match(value): + raise ValueError('Invalid e-mail') + + self.atconn.request_cloudflare( + f'{AJAX_URL}/account/email.php', + 'POST', data={'email': value}, + sendtoken=True, + ) + + def change_password(self, old: str, new: str) -> None: + """Changes a password in your Aternos account + + Args: + old (str): Old password + new (str): New password + """ + + self.change_password_hashed( + md5encode(old), + md5encode(new), + ) + + def change_password_hashed(self, old: str, new: str) -> None: + """Changes a password in your Aternos account. + Unlike `change_password`, this function + takes hashed passwords as the arguments + + Args: + old (str): Old password hashed with MD5 + new (str): New password hashed with MD5 + """ + + self.atconn.request_cloudflare( + f'{AJAX_URL}/account/password.php', + 'POST', data={ + 'oldpassword': old, + 'newpassword': new, + }, + sendtoken=True, + ) + + def qrcode_2fa(self) -> Dict[str, str]: + """Requests a secret code and + a QR code for enabling 2FA""" + + return self.atconn.request_cloudflare( + f'{AJAX_URL}/account/secret.php', + 'GET', sendtoken=True, + ).json() + + def save_qr(self, qrcode: str, filename: str) -> None: + """Writes a 2FA QR code into a png-file + + Args: + qrcode (str): Base64 encoded png image from `qrcode_2fa()` + filename (str): Where the QR code image must be saved. + Existing file will be rewritten. + """ + + data = qrcode.removeprefix('data:image/png;base64,') + png = base64.b64decode(data) + + with open(filename, 'wb') as f: + f.write(png) + + def enable_2fa(self, code: int) -> None: + """Enables Two-Factor Authentication + + Args: + code (int): 2FA code + """ + + self.atconn.request_cloudflare( + f'{AJAX_URL}/account/twofactor.php', + 'POST', data={'code': code}, + sendtoken=True, + ) + + def disable_2fa(self, code: int) -> None: + """Disables Two-Factor Authentication + + Args: + code (int): 2FA code + """ + + self.atconn.request_cloudflare( + f'{AJAX_URL}/account/disbaleTwofactor.php', + 'POST', data={'code': code}, + sendtoken=True, + ) + + def logout(self) -> None: + """The same as `atclient.Client.logout`""" + + self.atclient.logout() diff --git a/python_aternos/atclient.py b/python_aternos/atclient.py index bf8a14a..db680c9 100644 --- a/python_aternos/atclient.py +++ b/python_aternos/atclient.py @@ -3,21 +3,16 @@ and allows to manage your account""" import os import re -import hashlib - -import base64 - -from typing import List, Dict from typing import Optional, Type -import lxml.html - from .atlog import log +from .atmd5 import md5encode + +from .ataccount import AternosAccount from .atconnect import AternosConnect -from .atconnect import BASE_URL, AJAX_URL +from .atconnect import AJAX_URL -from .atserver import AternosServer from .aterrors import CredentialsError from .aterrors import TwoFactorAuthError @@ -27,82 +22,75 @@ from .atjsparse import Js2PyInterpreter class Client: - """Aternos API Client class, object of which contains user's auth data""" - def __init__( - self, - atconn: AternosConnect, - servers: Optional[List[str]] = None) -> None: - """Aternos API Client class, object - of which contains user's auth data + def __init__(self) -> None: - Args: - atconn (AternosConnect): - AternosConnect instance with initialized Aternos session - servers (Optional[List[str]], optional): - List with servers IDs - """ - - self.atconn = atconn + # Config + self.debug = False + self.sessions_dir = '~' + self.js: Type[Interpreter] = Js2PyInterpreter + # ### self.saved_session = '' + self.atconn = AternosConnect() + self.account = AternosAccount(self) - self.parsed = False - self.servers: List[AternosServer] = [] - - if servers: - self.refresh_servers(servers) - - @classmethod - def from_hashed( - cls, + def login( + self, username: str, - md5: str, - code: Optional[int] = None, - sessions_dir: str = '~', - js: Type[Interpreter] = Js2PyInterpreter, - **custom_args): - """Log in to an Aternos account with - a username and a hashed password + password: str, + code: Optional[int] = None) -> None: + """Log in to your Aternos account + with a username and a plain password Args: - username (str): Your username - md5 (str): Your password hashed with MD5 - code (Optional[int]): 2FA code - sessions_dir (str): Path to the directory - where session will be automatically saved - js (Type[Interpreter]): Preferred JS interpreter, - any class from `atjsparse` - inheriting `Interpreter` class - **custom_args (tuple, optional): Keyword arguments - which will be passed to CloudScraper `__init__` - - Raises: - CredentialsError: If the API didn't - return a valid session cookie + username (str): Username + password (str): Plain-text password + code (Optional[int], optional): 2FA code """ - filename = cls.session_file( - username, sessions_dir + self.login_hashed( + username, + md5encode(password), + code, + ) + + def login_hashed( + self, + username: str, + md5: str, + code: Optional[int] = None) -> None: + """Log in to your Aternos account + with a username and a hashed password + + Args: + username (str): Username + md5 (str): Password hashed with MD5 + code (int): 2FA code + + Raises: + TwoFactorAuthError: If the 2FA is enabled, + but `code` argument was not passed or is incorrect + CredentialsError: If the Aternos backend + returned empty session cookie + (usually because of incorrect credentials) + ValueError: _description_ + """ + + filename = self.session_filename( + username, self.sessions_dir ) try: - return cls.restore_session( - filename, **custom_args - ) + self.restore_session(filename) except (OSError, CredentialsError): pass - atjsparse.get_interpreter(create=js) - atconn = AternosConnect() - - if len(custom_args) > 0: - atconn.add_args(**custom_args) - - atconn.parse_token() - atconn.generate_sec() + atjsparse.get_interpreter(create=self.js) + self.atconn.parse_token() + self.atconn.generate_sec() credentials = { 'user': username, @@ -112,9 +100,9 @@ class Client: if code is not None: credentials['code'] = str(code) - loginreq = atconn.request_cloudflare( + loginreq = self.atconn.request_cloudflare( f'{AJAX_URL}/account/login.php', - 'POST', data=credentials, sendtoken=True + 'POST', data=credentials, sendtoken=True, ) if b'"show2FA":true' in loginreq.content: @@ -125,169 +113,58 @@ class Client: 'Check your username and password' ) - obj = cls(atconn) - obj.saved_session = filename - + self.saved_session = filename try: - obj.save_session(filename) + self.save_session(filename) except OSError: pass - return obj + def logout(self) -> None: + """Log out from the Aternos account""" - @classmethod - def from_credentials( - cls, - username: str, - password: str, - code: Optional[int] = None, - sessions_dir: str = '~', - js: Type[Interpreter] = Js2PyInterpreter, - **custom_args): - """Log in to Aternos with a username and a plain password - - Args: - username (str): Your username - password (str): Your password without any encryption - code (Optional[int]): 2FA code - sessions_dir (str): Path to the directory - where session will be automatically saved - js (Type[Interpreter]): Preferred JS interpreter, - any class from `atjsparse` - inheriting `Interpreter` class - **custom_args (tuple, optional): Keyword arguments - which will be passed to CloudScraper `__init__` - """ - - md5 = Client.md5encode(password) - return cls.from_hashed( - username, md5, code, - sessions_dir, js, - **custom_args + self.atconn.request_cloudflare( + f'{AJAX_URL}/account/logout.php', + 'GET', sendtoken=True, ) - @classmethod - def from_session( - cls, - session: str, - servers: Optional[List[str]] = None, - js: Type[Interpreter] = Js2PyInterpreter, - **custom_args): - """Log in to Aternos using a session cookie value + self.remove_session(self.saved_session) + + def restore_session(self, filename: str = '~/.aternos') -> None: + """Restores ATERNOS_SESSION cookie and, + if included, servers list, from a session file Args: - session (str): Value of ATERNOS_SESSION cookie - servers (Optional[List[str]]): List of cached servers IDs. - js (Type[Interpreter]): Preferred JS interpreter, - any class from `atjsparse` - inheriting `Interpreter` class - **custom_args (tuple, optional): Keyword arguments - which will be passed to CloudScraper `__init__` + filename (str, optional): Filename + + Raises: + FileNotFoundError: If the file cannot be found + CredentialsError: If the session cookie + (or the file at all) has incorrect format """ - atjsparse.get_interpreter(create=js) - atconn = AternosConnect() + filename = os.path.expanduser(filename) + log.debug('Restoring session from %s', filename) - atconn.add_args(**custom_args) - atconn.session.cookies['ATERNOS_SESSION'] = session - - atconn.parse_token() - atconn.generate_sec() - - return cls(atconn, servers) - - @classmethod - def restore_session( - cls, - file: str = '~/.aternos', - js: Type[Interpreter] = Js2PyInterpreter, - **custom_args): - """Log in to Aternos using - a saved ATERNOS_SESSION cookie - - Args: - file (str, optional): File where a session cookie was saved - js (Type[Interpreter]): Preferred JS interpreter, - any class from `atjsparse` - inheriting `Interpreter` class - **custom_args (tuple, optional): Keyword arguments - which will be passed to CloudScraper `__init__` - """ - - file = os.path.expanduser(file) - log.debug('Restoring session from %s', file) - - if not os.path.exists(file): + if not os.path.exists(filename): raise FileNotFoundError() - with open(file, 'rt', encoding='utf-8') as f: + with open(filename, 'rt', encoding='utf-8') as f: saved = f.read() \ .strip() \ .replace('\r\n', '\n') \ .split('\n') session = saved[0].strip() - if session == '': + if session == '' or not session.isalnum(): raise CredentialsError( - 'Unable to read session cookie, ' - 'the first line is empty' + 'Session cookie is invalid or the file is empty' ) if len(saved) > 1: - obj = cls.from_session( - session=session, - servers=saved[1:], - js=js, - **custom_args - ) - else: - obj = cls.from_session( - session, - js=js, - **custom_args - ) + self.account.refresh_servers(saved[1:]) - obj.saved_session = file - - return obj - - @staticmethod - def md5encode(passwd: str) -> str: - """Encodes the given string with MD5 - - Args: - passwd (str): String to encode - - Returns: - Hexdigest hash of the string in lowercase - """ - - encoded = hashlib.md5(passwd.encode('utf-8')) - return encoded.hexdigest().lower() - - @staticmethod - def session_file(username: str, sessions_dir: str = '~') -> str: - """Generates session file name - for authenticated user - - Args: - username (str): Authenticated user - sessions_dir (str, optional): Path to directory - with automatically saved sessions - - Returns: - Filename - """ - - # unsafe symbols replacement - repl = '_' - - secure = re.sub( - r'[^A-Za-z0-9_-]', - repl, username - ) - - return f'{sessions_dir}/.at_{secure}' + self.atconn.session.cookies['ATERNOS_SESSION'] = session + self.saved_session = filename def save_session( self, @@ -298,8 +175,9 @@ class Client: Args: file (str, optional): File where a session cookie must be saved incl_servers (bool, optional): If the function - should include the servers IDs to - reduce API requests count (recommended) + should include the servers IDs in this file + to reduce API requests count on the next restoration + (recommended) """ file = os.path.expanduser(file) @@ -311,7 +189,7 @@ class Client: if not incl_servers: return - for s in self.servers: + for s in self.account.servers: f.write(s.servid + '\n') def remove_session(self, file: str = '~/.aternos') -> None: @@ -331,193 +209,25 @@ class Client: except OSError as err: log.warning('Unable to delete session file: %s', err) - def list_servers(self, cache: bool = True) -> List[AternosServer]: - """Parses a list of your servers from Aternos website + @staticmethod + def session_filename(username: str, sessions_dir: str = '~') -> str: + """Generates a session file name Args: - cache (bool, optional): If the function should use - cached servers list (recommended) + username (str): Authenticated user + sessions_dir (str, optional): Path to directory + with automatically saved sessions Returns: - List of AternosServer objects + Filename """ - if cache and self.parsed: - return self.servers + # unsafe symbols replacement + repl = '_' - serverspage = self.atconn.request_cloudflare( - f'{BASE_URL}/servers/', 'GET' - ) - serverstree = lxml.html.fromstring(serverspage.content) - - servers = serverstree.xpath( - '//div[@class="server-body"]/@data-id' - ) - self.refresh_servers(servers) - - # Update session file (add servers) - try: - self.save_session(self.saved_session) - except OSError as err: - log.warning('Unable to save servers list to file: %s', err) - - return self.servers - - def refresh_servers(self, ids: List[str]) -> None: - """Replaces cached servers list creating - AternosServer objects by given IDs - - Args: - ids (List[str]): Servers unique identifiers - """ - - self.servers = [] - for s in ids: - - servid = s.strip() - if servid == '': - continue - - log.debug('Adding server %s', servid) - srv = AternosServer(servid, self.atconn) - self.servers.append(srv) - - self.parsed = True - - def get_server(self, servid: str) -> AternosServer: - """Creates a server object from the server ID. - Use this instead of list_servers - if you know the ID to save some time. - - Returns: - AternosServer object - """ - - return AternosServer(servid, self.atconn) - - def logout(self) -> None: - """Log out from Aternos account""" - - self.atconn.request_cloudflare( - f'{AJAX_URL}/account/logout.php', - 'GET', sendtoken=True + secure = re.sub( + r'[^A-Za-z0-9_-]', + repl, username, ) - self.remove_session(self.saved_session) - - def change_username(self, value: str) -> None: - """Changes a username in your Aternos account - - Args: - value (str): New username - """ - - self.atconn.request_cloudflare( - f'{AJAX_URL}/account/username.php', - 'POST', data={'username': value}, sendtoken=True - ) - - def change_email(self, value: str) -> None: - """Changes an e-mail in your Aternos account - - Args: - value (str): New e-mail - - Raises: - ValueError: If an invalid e-mail address - was passed to the function - """ - - email = re.compile( - r'^[A-Za-z0-9\-_+.]+@[A-Za-z0-9\-_+.]+\.[A-Za-z0-9\-]+$|^$' - ) - if not email.match(value): - raise ValueError('Invalid e-mail!') - - self.atconn.request_cloudflare( - f'{AJAX_URL}/account/email.php', - 'POST', data={'email': value}, sendtoken=True - ) - - def change_password(self, old: str, new: str) -> None: - """Changes a password in your Aternos account - - Args: - old (str): Old password - new (str): New password - """ - - self.change_password_hashed( - Client.md5encode(old), - Client.md5encode(new), - ) - - def change_password_hashed(self, old: str, new: str) -> None: - """Changes a password in your Aternos account. - Unlike `change_password`, this function - takes hashed passwords as arguments - - Args: - old (str): Old password hashed with MD5 - new (str): New password hashed with MD5 - """ - - self.atconn.request_cloudflare( - f'{AJAX_URL}/account/password.php', - 'POST', data={ - 'oldpassword': old, - 'newpassword': new, - }, sendtoken=True - ) - - def qrcode_2fa(self) -> Dict[str, str]: - """Requests a secret code and - a QR code for enabling 2FA""" - - return self.atconn.request_cloudflare( - f'{AJAX_URL}/account/secret.php', - 'GET', sendtoken=True - ).json() - - def save_qr(self, qrcode: str, filename: str) -> None: - """Writes a 2FA QR code into a png-file - - Args: - qrcode (str): Base64 encoded png image from `qrcode_2fa()` - filename (str): Where the QR code image must be saved. - Existing file will be rewritten. - """ - - data = qrcode.removeprefix('data:image/png;base64,') - png = base64.b64decode(data) - - with open(filename, 'wb') as f: - f.write(png) - - def enable_2fa(self, code: int) -> None: - """Enables Two-Factor Authentication - - Args: - code (int): 2FA code - """ - - self.atconn.request_cloudflare( - f'{AJAX_URL}/account/twofactor.php', - 'POST', data={ - 'code': code - }, sendtoken=True - ) - - def disable_2fa(self, code: int) -> None: - """Disables Two-Factor Authentication - - Args: - code (int): 2FA code - """ - - self.atconn.request_cloudflare( - f'{AJAX_URL}/account/disbaleTwofactor.php', - 'POST', data={ - 'code': code - }, sendtoken=True - ) + return f'{sessions_dir}/.at_{secure}' diff --git a/python_aternos/atconnect.py b/python_aternos/atconnect.py index 9c32759..a564bd7 100644 --- a/python_aternos/atconnect.py +++ b/python_aternos/atconnect.py @@ -1,5 +1,6 @@ """Stores API session and sends requests""" +import logging import re import time @@ -39,7 +40,6 @@ SEC_ALPHABET = string.ascii_lowercase + string.digits class AternosConnect: - """Class for sending API requests, bypassing Cloudflare and parsing responses""" @@ -51,28 +51,6 @@ class AternosConnect: self.token = '' self.atcookie = '' - def add_args(self, **kwargs) -> None: - """Pass arguments to CloudScraper - session object __init__ - if kwargs is not empty - """ - - if len(kwargs) < 1: - log.debug('**kwargs is empty') - return - - log.debug('New args for CloudScraper: %s', kwargs) - self.cf_init = partial(CloudScraper, **kwargs) - self.refresh_session() - - def clear_args(self) -> None: - """Clear CloudScarper object __init__ arguments - which was set using add_args method""" - - log.debug('Creating session object with no keywords') - self.cf_init = partial(CloudScraper) - self.refresh_session() - def refresh_session(self) -> None: """Creates a new CloudScraper session object and copies all cookies. @@ -88,8 +66,6 @@ class AternosConnect: is needed for most requests Raises: - RuntimeWarning: If the parser can not - find `` tag in HTML response TokenError: If the parser is unable to extract ajax token from HTML @@ -120,6 +96,7 @@ class AternosConnect: pagehead = loginpage[headtag:headend] js_code: Optional[List[Any]] = None + try: text = pagehead.decode('utf-8', 'replace') js_code = re.findall(ARROW_FN_REGEX, text) @@ -170,11 +147,14 @@ class AternosConnect: ) return self.sec - + def generate_sec_part(self) -> str: """Generates a part for SEC token""" - return ''.join(secrets.choice(SEC_ALPHABET) for _ in range(11)) + ('0' * 5) + return ''.join( + secrets.choice(SEC_ALPHABET) + for _ in range(11) + ) + ('0' * 5) def request_cloudflare( self, url: str, method: str, @@ -237,22 +217,24 @@ class AternosConnect: reqcookies['ATERNOS_SESSION'] = self.atcookie del self.session.cookies['ATERNOS_SESSION'] - reqcookies_dbg = { - k: str(v or '')[:3] - for k, v in reqcookies.items() - } + if log.level == logging.DEBUG: - session_cookies_dbg = { - k: str(v or '')[:3] - for k, v in self.session.cookies.items() - } + reqcookies_dbg = { + k: str(v or '')[:3] + for k, v in reqcookies.items() + } - log.debug('Requesting(%s)%s', method, url) - log.debug('headers=%s', headers) - log.debug('params=%s', params) - log.debug('data=%s', data) - log.debug('req-cookies=%s', reqcookies_dbg) - log.debug('session-cookies=%s', session_cookies_dbg) + session_cookies_dbg = { + k: str(v or '')[:3] + for k, v in self.session.cookies.items() + } + + log.debug('Requesting(%s)%s', method, url) + log.debug('headers=%s', headers) + log.debug('params=%s', params) + log.debug('data=%s', data) + log.debug('req-cookies=%s', reqcookies_dbg) + log.debug('session-cookies=%s', session_cookies_dbg) if method == 'POST': sendreq = partial( diff --git a/python_aternos/atjsparse.py b/python_aternos/atjsparse.py index cc8d421..29aaf76 100644 --- a/python_aternos/atjsparse.py +++ b/python_aternos/atjsparse.py @@ -115,7 +115,8 @@ class NodeInterpreter(Interpreter): self.proc.communicate() except AttributeError: log.warning( - 'NodeJS process was not initialized' + 'NodeJS process was not initialized, ' + 'but __del__ was called' ) diff --git a/python_aternos/atlog.py b/python_aternos/atlog.py index 3816dc9..e209992 100644 --- a/python_aternos/atlog.py +++ b/python_aternos/atlog.py @@ -1,2 +1,4 @@ +"""Creates a logger""" + import logging log = logging.getLogger('aternos') diff --git a/python_aternos/atmd5.py b/python_aternos/atmd5.py new file mode 100644 index 0000000..7378f24 --- /dev/null +++ b/python_aternos/atmd5.py @@ -0,0 +1,17 @@ +"""Contains a function for hashing""" + +import hashlib + + +def md5encode(passwd: str) -> str: + """Encodes the given string with MD5 + + Args: + passwd (str): String to encode + + Returns: + Hexdigest hash of the string in lowercase + """ + + encoded = hashlib.md5(passwd.encode('utf-8')) + return encoded.hexdigest().lower() diff --git a/python_aternos/atplayers.py b/python_aternos/atplayers.py index 53ec3c5..4d73dc2 100644 --- a/python_aternos/atplayers.py +++ b/python_aternos/atplayers.py @@ -50,8 +50,8 @@ class PlayersList: # whl_je = whitelist for java # whl_be = whitelist for bedrock # whl = common whitelist - common_whl = (self.lst == Lists.whl) - bedrock = (atserv.is_bedrock) + common_whl = self.lst == Lists.whl + bedrock = atserv.is_bedrock if common_whl and bedrock: self.lst = Lists.whl_be diff --git a/python_aternos/atserver.py b/python_aternos/atserver.py index a194ccc..b55dbcb 100644 --- a/python_aternos/atserver.py +++ b/python_aternos/atserver.py @@ -87,8 +87,6 @@ class AternosServer: page = self.atserver_request( f'{BASE_URL}/server', 'GET' ) - with open('server.html', 'wt') as f: - f.write(page.text) match = status_re.search(page.text) if match is None: diff --git a/python_aternos/atwss.py b/python_aternos/atwss.py index 8d8b930..4fb6238 100644 --- a/python_aternos/atwss.py +++ b/python_aternos/atwss.py @@ -157,8 +157,8 @@ class AternosWss: if not self.autoconfirm: return - in_queue = (msg['class'] == 'queueing') - pending = (msg['queue']['pending'] == 'pending') + in_queue = msg['class'] == 'queueing' + pending = msg['queue']['pending'] == 'pending' confirmation = in_queue and pending if confirmation and not self.confirmed: