Compare commits

..

4 commits

Author SHA1 Message Date
04ba96108e
Edit readme + increment version 2024-02-15 18:05:51 +04:00
53abd2725b
Last version 2024-02-15 17:38:18 +04:00
5561ee133d
Merge pull request #99 from TeslaOwls/main
#97 fixing issue after aternos changed 'user' to 'username' in http GET data
2023-11-03 14:52:16 +04:00
Romain Hedouin
4f19e3395c fixing issue after aternos changed 'user' to 'username' in http GET data 2023-10-27 01:05:00 +02:00
30 changed files with 4809 additions and 203 deletions

View file

@ -1,7 +1,7 @@
<div align="center">
<img src="https://i.ibb.co/3RXcXJ1/aternos-400.png" alt="Python Aternos Logo">
<h1>
Python Aternos
[UNMAINTAINED] Python Aternos
<div>
<a href="https://pypi.org/project/python-aternos/">
<img src="https://img.shields.io/pypi/v/python-aternos">
@ -22,16 +22,21 @@
An unofficial Aternos API written in Python.
It uses [aternos](https://aternos.org/)' private API and html parsing.
> **WARNING**
> [!WARNING]
>
> Aternos now trying to detect python-aternos requests
> by finding bugs in custom JavaScript parser.
> This library is no longer maintained, because:
> 1. Aternos started detecting all automated requests (and, therefore, ToS violations)
> via JS code in `AJAX_TOKEN` which is executed incorrectly in Js2Py and
> requires a NodeJS DOM library (at least) or a browser engine.
> For details, see [#85](https://github.com/DarkCat09/python-aternos/issues/85).
> 2. Aternos frontend is protected with Cloudflare, so this library fails to parse pages
> in case of, for example, blocked or suspicious IP address (e.g. web hosting).
> CF shows IUAM page, often with captcha. We need a browser engine like undetected-chromedriver and an AI or a man solving captchas.
> 3. Last Aternos API update broke nearly everything.
> 4. I have no more motivation and not enough time to work on this, nor need in using Aternos.
>
> Please, always keep the library up-to-date.
>
> I'm going to use Selenium in this library later
> (but you'll still be able to choose between
> lightweight HTML parsing and running a browser).
> I'm so sorry. If you want to continue development of python-aternos,
> [contact me](https://url.dc09.ru/contact), but I think it's better to write from scratch.
Python Aternos supports:
@ -41,7 +46,7 @@ Python Aternos supports:
- Parsing Minecraft servers list
- Parsing server info by its ID
- Starting/stoping server, restarting, confirming/cancelling launch
- Updating server info in real-time (see [WebSocket API](https://aternos.dc09.ru/howto/websocket))
- Updating server info in real-time (see [WebSocket API](https://python-aternos.codeberg.page/howto/websocket))
- Changing server subdomain and MOTD (message-of-the-day)
- Managing files, settings, players (whitelist, operators, etc.)
@ -119,9 +124,9 @@ if testserv is not None:
## [More examples](https://github.com/DarkCat09/python-aternos/tree/main/examples)
## [Documentation](https://aternos.dc09.ru)
## [Documentation](https://python-aternos.codeberg.page)
## [How-To Guide](https://aternos.dc09.ru/howto/auth)
## [How-To Guide](https://python-aternos.codeberg.page/howto/auth)
## Changelog
|Version|Description |
@ -137,8 +142,9 @@ if testserv is not None:
|v2.1.x|Fixes in websockets API, atconnect (including cookie refreshing fix). Support for captcha solving services (view [#52](https://github.com/DarkCat09/python-aternos/issues/52)).|
|v2.2.x|Node.JS interpreter support.|
|v3.0.0|Partially rewritten, API updates.|
|v3.1.x|Full implementation of config API.|
|v3.2.x|Shared access API and maybe Google Drive backups.|
|v3.0.5|Unmaintained.|
|v3.1.x|TODO: Full implementation of config API.|
|v3.2.x|TODO: Shared access API and maybe Google Drive backups.|
## Reversed API Specification
Private Aternos API requests were captured into

View file

@ -1,23 +1,27 @@
from getpass import getpass
from selenium.webdriver import Firefox
from python_aternos import Client
from python_aternos import Client, atserver
user = input('Username: ')
pswd = getpass('Password: ')
with Firefox() as driver:
atclient = Client()
aternos = atclient.account
atclient.login(user, pswd)
atclient = Client(driver)
atclient.login(user, pswd)
srvs = aternos.list_servers()
servers = atclient.list_servers()
for srv in srvs:
print()
print('***', srv.servid, '***')
srv.fetch()
print(srv.domain)
print(srv.motd)
print('*** Status:', srv.status)
print('*** Full address:', srv.address)
print('*** Port:', srv.port)
print('*** Name:', srv.subdomain)
print('*** Minecraft:', srv.software, srv.version)
print('*** IsBedrock:', srv.edition == atserver.Edition.bedrock)
print('*** IsJava:', srv.edition == atserver.Edition.java)
# for serv in servers:
# print(
# serv.id, serv.name,
# serv.software,
# serv.status,
# serv.players,
# )
list(map(print, servers))
print()

View file

@ -1 +1,11 @@
from .atclient import Client # noqa: F401
"""Init"""
from .atclient import Client
from .atserver import AternosServer
from .atserver import Edition
from .atserver import Status
from .atplayers import PlayersList
from .atplayers import Lists
from .atwss import Streams
from .atjsparse import Js2PyInterpreter
from .atjsparse import NodeInterpreter

View file

@ -3,33 +3,38 @@ and allows to manage your account"""
import os
import re
from typing import Optional, List
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.remote.webelement import WebElement
from python_aternos.atserver import PartialServerInfo
from .atselenium import SeleniumHelper, Remote
from typing import Optional, Type
from .atlog import log, is_debug, set_debug
from .atmd5 import md5encode
from .ataccount import AternosAccount
from .atconnect import AternosConnect
from .atconnect import AJAX_URL
from .aterrors import CredentialsError
from .aterrors import TwoFactorAuthError
from . import atjsparse
from .atjsparse import Interpreter
from .atjsparse import Js2PyInterpreter
class Client:
"""Aternos API Client class, object
of which contains user's auth data"""
def __init__(self, driver: Remote) -> None:
self.se = SeleniumHelper(driver)
def __init__(self) -> None:
# Config
self.sessions_dir = '~'
self.js: Type[Interpreter] = Js2PyInterpreter
# ###
self.saved_session = '~/.aternos' # will be rewritten by login()
self.atconn = AternosConnect()
self.account = AternosAccount(self)
def login(
self,
@ -45,41 +50,73 @@ class Client:
code (Optional[int], optional): 2FA code
"""
self.se.load_page('/go')
self.login_hashed(
username,
md5encode(password),
code,
)
err_block = self.se.find_element(By.CLASS_NAME, 'login-error')
err_alert = self.se.find_element(By.CLASS_NAME, 'alert-wrapper')
def login_hashed(
self,
username: str,
md5: str,
code: Optional[int] = None) -> None:
"""Log in to your Aternos account
with a username and a hashed password
self.se.exec_js(f'''
document.getElementById('user').value = '{username}'
document.getElementById('password').value = '{password}'
document.getElementById('twofactor-code').value = '{code}'
login()
''')
Args:
username (str): Username
md5 (str): Password hashed with MD5
code (int): 2FA code
def logged_in_or_error(driver: Remote):
return (
driver.current_url.find('/servers') != -1 or
err_block.is_displayed() or
err_alert.is_displayed()
Raises:
TwoFactorAuthError: If the 2FA is enabled,
but `code` argument was not passed or is incorrect
CredentialsError: If the Aternos backend
returned empty session cookie
(usually because of incorrect credentials)
ValueError: _description_
"""
filename = self.session_filename(
username, self.sessions_dir
)
try:
self.restore_session(filename)
except (OSError, CredentialsError):
pass
atjsparse.get_interpreter(create=self.js)
self.atconn.parse_token()
self.atconn.generate_sec()
credentials = {
'username': username,
'password': md5,
}
if code is not None:
credentials['code'] = str(code)
loginreq = self.atconn.request_cloudflare(
f'{AJAX_URL}/account/login',
'POST', data=credentials, sendtoken=True,
)
if b'"show2FA":true' in loginreq.content:
raise TwoFactorAuthError('2FA code is required')
if 'ATERNOS_SESSION' not in loginreq.cookies:
raise CredentialsError(
'Check your username and password'
)
self.se.wait.until(logged_in_or_error)
if self.se.driver.current_url.find('/go') != -1:
if err_block.is_displayed():
raise CredentialsError(err_block.text)
if err_alert.is_displayed():
raise CredentialsError(err_alert.text)
self.se.wait.until(lambda d: d.title.find('Cloudflare') == -1)
if not self.se.get_cookie('ATERNOS_SESSION'):
raise CredentialsError('Session cookie is empty')
print(self.se.get_cookie('ATERNOS_SESSION')) # TODO: remove, this is for debug
self.saved_session = filename
try:
self.save_session(filename)
except OSError:
pass
def login_with_session(self, session: str) -> None:
"""Log in using ATERNOS_SESSION cookie
@ -88,50 +125,31 @@ class Client:
session (str): Session cookie value
"""
self.se.set_cookie('ATERNOS_SESSION', session)
self.atconn.parse_token()
self.atconn.generate_sec()
self.atconn.session.cookies['ATERNOS_SESSION'] = session
def logout(self) -> None:
"""Log out from the Aternos account"""
self.se.load_page('/servers')
self.se.find_element(By.CLASS_NAME, 'logout').click()
self.atconn.request_cloudflare(
f'{AJAX_URL}/account/logout',
'GET', sendtoken=True,
)
self.remove_session(self.saved_session)
def list_servers(self) -> List[PartialServerInfo]:
CARD_CLASS = 'servercard'
self.se.load_page('/servers')
def create_obj(s: WebElement) -> PartialServerInfo:
return PartialServerInfo(
id=s.get_dom_attribute('data-id'),
name=s.get_dom_attribute('title'),
software='',
status=(
s
.get_dom_attribute('class')
.replace(CARD_CLASS, '')
.split()[0]
),
players=0,
se=self.se,
)
return list(map(
create_obj,
self.se.find_elements(By.CLASS_NAME, CARD_CLASS),
))
def restore_session(self, file: str = '~/.aternos') -> None:
"""Restores ATERNOS_SESSION cookie from a session file
"""Restores ATERNOS_SESSION cookie and,
if included, servers list, from a session file
Args:
file (str, optional): Filename
Raises:
FileNotFoundError: If the file cannot be found
CredentialsError: If the session cookie
(or the file at all) has incorrect format
"""
file = os.path.expanduser(file)
@ -141,24 +159,48 @@ class Client:
raise FileNotFoundError()
with open(file, 'rt', encoding='utf-8') as f:
session = f.readline().strip()
saved = f.read() \
.strip() \
.replace('\r\n', '\n') \
.split('\n')
self.login_with_session(session)
session = saved[0].strip()
if session == '' or not session.isalnum():
raise CredentialsError(
'Session cookie is invalid or the file is empty'
)
if len(saved) > 1:
self.account.refresh_servers(saved[1:])
self.atconn.session.cookies['ATERNOS_SESSION'] = session
self.saved_session = file
def save_session(self, file: str = '~/.aternos') -> None:
def save_session(
self,
file: str = '~/.aternos',
incl_servers: bool = True) -> None:
"""Saves an ATERNOS_SESSION cookie to a file
Args:
file (str, optional):
File where the session cookie must be saved
file (str, optional): File where a session cookie must be saved
incl_servers (bool, optional): If the function
should include the servers IDs in this file
to reduce API requests count on the next restoration
(recommended)
"""
file = os.path.expanduser(file)
log.debug('Saving session to %s', file)
with open(file, 'wt', encoding='utf-8') as f:
f.write(self.se.get_cookie('ATERNOS_SESSION') + '\n')
f.write(self.atconn.atsession + '\n')
if not incl_servers:
return
for s in self.account.servers:
f.write(s.servid + '\n')
def remove_session(self, file: str = '~/.aternos') -> None:
"""Removes a file which contains

View file

@ -1,15 +1,40 @@
"""Stores API session and sends requests"""
import re
import time
import string
import secrets
from functools import partial
from typing import Optional
from typing import Dict, Any
from typing import List, Dict, Any
import requests
from cloudscraper import CloudScraper
from .atlog import log, is_debug
from . import atjsparse
from .aterrors import TokenError
from .aterrors import CloudflareError
from .aterrors import AternosPermissionError
BASE_URL = 'https://aternos.org'
AJAX_URL = f'{BASE_URL}/ajax'
REQUA = \
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 ' \
'(KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36 OPR/85.0.4341.47'
ARROW_FN_REGEX = r'\(\(\).*?\)\(\);'
SCRIPT_TAG_REGEX = (
rb'<script type=([\'"]?)text/javascript\1>.+?</script>'
)
SEC_ALPHABET = string.ascii_lowercase + string.digits
@ -19,15 +44,108 @@ class AternosConnect:
def __init__(self) -> None:
self.session = CloudScraper()
self.sec = ''
self.token = ''
self.atcookie = ''
def refresh_session(self) -> None:
"""Creates a new CloudScraper
session object and copies all cookies.
Required for bypassing Cloudflare"""
old_cookies = self.session.cookies
captcha_kwarg = self.session.captcha
self.session = CloudScraper(captcha=captcha_kwarg)
self.session.cookies.update(old_cookies)
del old_cookies
def parse_token(self) -> str:
return ''
"""Parses Aternos ajax token that
is needed for most requests
Raises:
TokenError: If the parser is unable
to extract ajax token from HTML
Returns:
Aternos ajax token
"""
loginpage = self.request_cloudflare(
f'{BASE_URL}/go/', 'GET'
).content
# Using the standard string methods
# instead of the expensive xml parsing
head = b'<head>'
headtag = loginpage.find(head)
headend = loginpage.find(b'</head>', headtag + len(head))
# Some checks
if headtag < 0 or headend < 0:
pagehead = loginpage
log.warning(
'Unable to find <head> tag, parsing the whole page'
)
else:
# Extracting <head> content
headtag = headtag + len(head)
pagehead = loginpage[headtag:headend]
js_code: Optional[List[Any]] = None
try:
text = pagehead.decode('utf-8', 'replace')
js_code = re.findall(ARROW_FN_REGEX, text)
token_func = js_code[0]
if len(js_code) > 1:
token_func = js_code[1]
js = atjsparse.get_interpreter()
js.exec_js(token_func)
self.token = js['AJAX_TOKEN']
except (IndexError, TypeError) as err:
log.warning('---')
log.warning('Unable to parse AJAX_TOKEN!')
log.warning('Please, insert the info below')
log.warning('to the GitHub issue description:')
log.warning('---')
log.warning('JavaScript: %s', js_code)
log.warning(
'All script tags: %s',
re.findall(SCRIPT_TAG_REGEX, pagehead)
)
log.warning('---')
raise TokenError(
'Unable to parse TOKEN from the page'
) from err
return self.token
def generate_sec(self) -> str:
return 'a:b'
"""Generates Aternos SEC token which
is also needed for most API requests
Returns:
Random SEC `key:value` string
"""
randkey = self.generate_sec_part()
randval = self.generate_sec_part()
self.sec = f'{randkey}:{randval}'
self.session.cookies.set(
f'ATERNOS_SEC_{randkey}', randval,
domain='aternos.org'
)
return self.sec
def generate_sec_part(self) -> str:
"""Generates a part for SEC token"""
@ -45,8 +163,124 @@ class AternosConnect:
reqcookies: Optional[Dict[Any, Any]] = None,
sendtoken: bool = False,
retries: int = 5,
timeout: int = 4) -> Any:
return None
timeout: int = 4) -> requests.Response:
"""Sends a request to Aternos API bypass Cloudflare
Args:
url (str): Request URL
method (str): Request method, must be GET or POST
params (Optional[Dict[Any, Any]], optional): URL parameters
data (Optional[Dict[Any, Any]], optional): POST request data,
if the method is GET, this dict will be combined with params
headers (Optional[Dict[Any, Any]], optional): Custom headers
reqcookies (Optional[Dict[Any, Any]], optional):
Cookies only for this request
sendtoken (bool, optional): If the ajax and SEC token
should be sent
retries (int, optional): How many times parser must retry
connection to API bypass Cloudflare
timeout (int, optional): Request timeout in seconds
Raises:
CloudflareError: When the parser has exceeded retries count
NotImplementedError: When the specified method is not GET or POST
Returns:
API response
"""
if retries <= 0:
raise CloudflareError('Unable to bypass Cloudflare protection')
try:
self.atcookie = self.session.cookies['ATERNOS_SESSION']
except KeyError:
pass
self.refresh_session()
params = params or {}
data = data or {}
headers = headers or {}
reqcookies = reqcookies or {}
method = method or 'GET'
method = method.upper().strip()
if method not in ('GET', 'POST'):
raise NotImplementedError('Only GET and POST are available')
if sendtoken:
params['TOKEN'] = self.token
params['SEC'] = self.sec
headers['X-Requested-With'] = 'XMLHttpRequest'
# requests.cookies.CookieConflictError bugfix
reqcookies['ATERNOS_SESSION'] = self.atcookie
del self.session.cookies['ATERNOS_SESSION']
if is_debug():
reqcookies_dbg = {
k: str(v or '')[:3]
for k, v in reqcookies.items()
}
session_cookies_dbg = {
k: str(v or '')[:3]
for k, v in self.session.cookies.items()
}
log.debug('Requesting(%s)%s', method, url)
log.debug('headers=%s', headers)
log.debug('params=%s', params)
log.debug('data=%s', data)
log.debug('req-cookies=%s', reqcookies_dbg)
log.debug('session-cookies=%s', session_cookies_dbg)
if method == 'POST':
sendreq = partial(
self.session.post,
params=params,
data=data,
)
else:
sendreq = partial(
self.session.get,
params={**params, **data},
)
req = sendreq(
url,
headers=headers,
cookies=reqcookies,
timeout=timeout,
)
resp_type = req.headers.get('content-type', '')
html_type = resp_type.find('text/html') != -1
cloudflare = req.status_code == 403
if html_type and cloudflare:
log.info('Retrying to bypass Cloudflare')
time.sleep(0.3)
return self.request_cloudflare(
url, method,
params, data,
headers, reqcookies,
sendtoken, retries - 1
)
log.debug('AternosConnect received: %s', req.text[:65])
log.info(
'%s completed with %s status',
method, req.status_code
)
if req.status_code == 402:
raise AternosPermissionError
req.raise_for_status()
return req
@property
def atsession(self) -> str:

View file

@ -1,50 +0,0 @@
from selenium.webdriver import Remote
from selenium.webdriver.support.wait import WebDriverWait
BASE_URL = 'https://aternos.org'
RM_SCRIPTS = '''
const rmScripts = () => {
const lst = document.querySelectorAll("script")
for (let js of lst) {
if (
js.src.includes('googletagmanager.com') ||
js.src.includes('cloudflareinsights.com') ||
js.innerText.includes('LANGUAGE_VARIABLES')
) {
js.remove()
}
}
}
addEventListener('DOMContentLoaded', rmScripts)
rmScripts()
'''
class SeleniumHelper:
def __init__(self, driver: Remote) -> None:
self.driver = driver
self.wait = WebDriverWait(driver, 8.0)
self.find_element = self.driver.find_element
self.find_elements = self.driver.find_elements
def load_page(self, path: str) -> None:
self.driver.get(f'{BASE_URL}{path}')
self.driver.execute_script(RM_SCRIPTS)
def exec_js(self, script: str) -> None:
self.driver.execute_script(script)
def get_cookie(self, name: str) -> str:
cookie = self.driver.get_cookie(name)
if cookie is None:
return ''
return cookie.get('value') or ''
def set_cookie(self, name: str, value: str) -> None:
self.driver.add_cookie({
'name': name,
'value': value,
})

View file

@ -1,26 +1,26 @@
"""Aternos Minecraft server"""
import re
import json
import enum
from typing import List
from typing import Any, Dict, List
from functools import partial
from .atselenium import SeleniumHelper
from .atconnect import BASE_URL, AJAX_URL
from .atconnect import AternosConnect
from .atwss import AternosWss
from .atconnect import AJAX_URL
from .atplayers import PlayersList
from .atplayers import Lists
from .atstubs import PlayersList
from .atstubs import Lists
from .atstubs import FileManager
from .atstubs import AternosConfig
from .atfm import FileManager
from .atconf import AternosConfig
from .aterrors import AternosError
from .aterrors import ServerStartError
from dataclasses import dataclass
SERVER_URL = f'{AJAX_URL}/server'
status_re = re.compile(
r'<script>\s*var lastStatus\s*?=\s*?(\{.+?\});?\s*<\/script>'
@ -55,25 +55,67 @@ class Status(enum.IntEnum):
confirm = 10
@dataclass
class PartialServerInfo:
se: SeleniumHelper
id: str
name: str = ''
software: str = ''
status: str = ''
players: int = 0
def use(self) -> None:
self.se.set_cookie('ATERNOS_SERVER', self.id)
self.se.load_page('/server')
@dataclass
class AternosServer:
"""Class for controlling your Aternos Minecraft server"""
def __init__(
self, servid: str,
atconn: AternosConnect,
autofetch: bool = False) -> None:
"""Class for controlling your Aternos Minecraft server
Args:
servid (str): Unique server IDentifier
atconn (AternosConnect):
AternosConnect instance with initialized Aternos session
autofetch (bool, optional): Automatically call
`fetch()` to get all info
"""
self.servid = servid
self.atconn = atconn
self._info: Dict[str, Any] = {}
self.atserver_request = partial(
self.atconn.request_cloudflare,
reqcookies={
'ATERNOS_SERVER': self.servid,
}
)
if autofetch:
self.fetch()
def fetch(self) -> None:
"""Get all server info"""
page = self.atserver_request(
f'{BASE_URL}/server', 'GET'
)
match = status_re.search(page.text)
if match is None:
raise AternosError('Unable to parse lastStatus object')
self._info = json.loads(match[1])
def wss(self, autoconfirm: bool = False) -> AternosWss:
"""Returns AternosWss instance for
listening server streams in real-time
Args:
autoconfirm (bool, optional):
Automatically start server status listener
when AternosWss connects to API to confirm
server launching
Returns:
AternosWss object
"""
return AternosWss(self, autoconfirm)
def start(
self,
headstart: bool = False,

View file

@ -1,7 +0,0 @@
from typing import Any
PlayersList = type('PlayersList', (), {})
FileManager = type('FileManager', (), {})
AternosConfig = type('AternosConfig', (), {})
Lists = Any

67
python_aternos/data/package-lock.json generated Normal file
View file

@ -0,0 +1,67 @@
{
"name": "data",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"dependencies": {
"vm2": "^3.9.13"
}
},
"node_modules/acorn": {
"version": "8.8.1",
"resolved": "https://registry.npmjs.org/acorn/-/acorn-8.8.1.tgz",
"integrity": "sha512-7zFpHzhnqYKrkYdUjF1HI1bzd0VygEGX8lFk4k5zVMqHEoES+P+7TKI+EvLO9WVMJ8eekdO0aDEK044xTXwPPA==",
"bin": {
"acorn": "bin/acorn"
},
"engines": {
"node": ">=0.4.0"
}
},
"node_modules/acorn-walk": {
"version": "8.2.0",
"resolved": "https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.2.0.tgz",
"integrity": "sha512-k+iyHEuPgSw6SbuDpGQM+06HQUa04DZ3o+F6CSzXMvvI5KMvnaEqXe+YVe555R9nn6GPt404fos4wcgpw12SDA==",
"engines": {
"node": ">=0.4.0"
}
},
"node_modules/vm2": {
"version": "3.9.19",
"resolved": "https://registry.npmjs.org/vm2/-/vm2-3.9.19.tgz",
"integrity": "sha512-J637XF0DHDMV57R6JyVsTak7nIL8gy5KH4r1HiwWLf/4GBbb5MKL5y7LpmF4A8E2nR6XmzpmMFQ7V7ppPTmUQg==",
"dependencies": {
"acorn": "^8.7.0",
"acorn-walk": "^8.2.0"
},
"bin": {
"vm2": "bin/vm2"
},
"engines": {
"node": ">=6.0"
}
}
},
"dependencies": {
"acorn": {
"version": "8.8.1",
"resolved": "https://registry.npmjs.org/acorn/-/acorn-8.8.1.tgz",
"integrity": "sha512-7zFpHzhnqYKrkYdUjF1HI1bzd0VygEGX8lFk4k5zVMqHEoES+P+7TKI+EvLO9WVMJ8eekdO0aDEK044xTXwPPA=="
},
"acorn-walk": {
"version": "8.2.0",
"resolved": "https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.2.0.tgz",
"integrity": "sha512-k+iyHEuPgSw6SbuDpGQM+06HQUa04DZ3o+F6CSzXMvvI5KMvnaEqXe+YVe555R9nn6GPt404fos4wcgpw12SDA=="
},
"vm2": {
"version": "3.9.19",
"resolved": "https://registry.npmjs.org/vm2/-/vm2-3.9.19.tgz",
"integrity": "sha512-J637XF0DHDMV57R6JyVsTak7nIL8gy5KH4r1HiwWLf/4GBbb5MKL5y7LpmF4A8E2nR6XmzpmMFQ7V7ppPTmUQg==",
"requires": {
"acorn": "^8.7.0",
"acorn-walk": "^8.2.0"
}
}
}
}

View file

@ -0,0 +1,5 @@
{
"dependencies": {
"vm2": "^3.9.13"
}
}

View file

@ -0,0 +1,49 @@
const http = require('http')
const process = require('process')
const { VM } = require('vm2')
const args = process.argv.slice(2)
const port = args[0] || 8000
const host = args[1] || 'localhost'
const stubFunc = (_i) => {}
const vm = new VM({
timeout: 2000,
allowAsync: false,
sandbox: {
atob: atob,
setTimeout: stubFunc,
setInterval: stubFunc,
document: {
getElementById: stubFunc,
prepend: stubFunc,
append: stubFunc,
appendChild: stubFunc,
doctype: {},
currentScript: {},
},
},
})
vm.run('var window = global')
const listener = (req, res) => {
if (req.method != 'POST')
res.writeHead(405) & res.end()
let body = ''
req.on('data', chunk => (body += chunk))
req.on('end', () => {
let resp
try { resp = JSON.stringify(vm.run(body)) }
catch (ex) { resp = ex.message }
res.writeHead(200)
res.end(resp)
})
}
const server = http.createServer(listener)
server.listen(port, host, () => console.log('OK'))

View file

@ -1 +1,5 @@
selenium==4.11.0
cloudscraper==1.2.71
Js2Py==0.74
lxml==4.9.2
regex==2023.6.3
websockets==11.0.3

View file

@ -5,9 +5,9 @@ with open('README.md', 'rt') as readme:
setuptools.setup(
name='python-aternos',
version='4.0.0',
author='Chechkenev Andrey (@DarkCat09)',
author_email='aacd0709@mail.ru',
version='3.0.6',
author='Andrey @DarkCat09',
author_email='py@dc09.ru',
description='An unofficial Aternos API',
long_description=long_description,
long_description_content_type='text/markdown',
@ -19,18 +19,36 @@ setuptools.setup(
},
classifiers=[
'Development Status :: 4 - Beta',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'License :: OSI Approved :: Apache Software License',
'Operating System :: OS Independent',
'Operating System :: Microsoft :: Windows',
'Operating System :: POSIX :: Linux',
'Operating System :: MacOS',
'Intended Audience :: Developers',
'Topic :: Internet',
'Typing :: Typed',
],
install_requires=[
'selenium==4.11.0',
'cloudscraper==1.2.71',
'Js2Py==0.74',
'lxml==4.9.2',
'regex==2023.6.3',
'websockets==11.0.3',
],
extras_require={
'dev': [
'ruff==0.0.281',
'autopep8==2.0.2',
'pycodestyle==2.10.0',
'mypy==1.4.1',
'pylint==2.17.4',
'requests-mock==1.11.0',
'types-requests==2.31.0.1',
],
'pypi': [
'build==0.10.0',
@ -43,4 +61,5 @@ setuptools.setup(
},
packages=['python_aternos'],
python_requires=">=3.7",
include_package_data=True,
)

0
tests/__init__.py Normal file
View file

32
tests/files.py Normal file
View file

@ -0,0 +1,32 @@
from pathlib import Path
from typing import List
abs_dir = Path(__file__).absolute().parent
samples = abs_dir / 'samples'
htmls = samples / 'html'
def read_sample(name: str) -> List[str]:
path = samples / name
if not path.exists():
return []
with path.open('rt', encoding='utf-8') as file:
return file \
.read() \
.strip() \
.replace('\r\n', '\n') \
.split('\n')
def read_html(name: str) -> bytes:
path = samples / 'html' / name
if not path.exists():
return b''
with path.open('rb') as file:
return file.read()

102
tests/js_samples.py Executable file
View file

@ -0,0 +1,102 @@
#!/usr/bin/env python3
# How to use
# *******************************
# 1. Open DevTools at aternos.org
# 2. Get AJAX_TOKEN variable value (without quotes)
#
# 3. Pass it to this script as an argument, e.g.:
# python3 js_samples.py xKflIsKHxlv96fLc1tht
#
# 4. The script will request the token 100 times
# and check it with different built-in interpreters
# (now there are only js2py and nodejs)
# 5. Array "errored" which is printed at the end
# contains indexes of incorrectly executed JS functions
# 6. Enter this index in the opened console
# or enter "exit" to exit
import re
import sys
from python_aternos.atconnect import AternosConnect
from python_aternos.atconnect import BASE_URL
from python_aternos import Js2PyInterpreter
from python_aternos import NodeInterpreter
TIMES = 100
js = re.compile(r'\(\(\).*?\)\(\);')
conn = AternosConnect()
jsi1 = Js2PyInterpreter()
jsi2 = NodeInterpreter()
token = sys.argv[1]
samples = []
errored = []
def get_code() -> bool:
r = conn.request_cloudflare(
f'{BASE_URL}/go', 'GET'
)
if r.status_code != 200:
print(r.status_code)
code = js.search(r.text)
if code is None:
print('No match!')
return False
sample = code.group(0)
samples.append(sample)
print(sample)
print('***')
jsi1.exec_js(sample)
jsi2.exec_js(sample)
var1 = jsi1['AJAX_TOKEN']
var2 = jsi2['AJAX_TOKEN']
print(var1)
print(var2)
print('***')
print()
print()
return var1 == var2 == token
def main() -> None:
print()
for i in range(TIMES):
print(i)
if not get_code():
errored.append(i)
print('Errored:', errored)
print('Choose sample number:')
while True:
try:
print('>', end=' ')
cmd = input()
if cmd.strip().lower() in ('exit', 'quit'):
print('Quit')
break
print(samples[int(cmd)])
except KeyboardInterrupt:
print()
print('Quit')
break
except Exception as err:
print(err)
if __name__ == '__main__':
main()

46
tests/mock.py Normal file
View file

@ -0,0 +1,46 @@
from requests_mock import Mocker
from python_aternos.atconnect import BASE_URL, AJAX_URL
from tests import files
mock = Mocker()
with mock:
mock.get(
f'{BASE_URL}/go/',
content=files.read_html('aternos_go'),
)
mock.get(
f'{BASE_URL}/servers/',
content=files.read_html('aternos_servers'),
)
mock.get(
f'{BASE_URL}/server',
content=files.read_html('aternos_server1'),
)
mock.post(
f'{AJAX_URL}/account/login',
json={
'success': True,
'error': None,
'message': None,
'show2FA': False,
},
cookies={
'ATERNOS_SESSION': '0123abcd',
},
)
mock.get(
f'{BASE_URL}/players/',
content=files.read_html('aternos_players'),
)
mock.get(
f'{BASE_URL}/files/',
content=files.read_html('aternos_file_root'),
)

1
tests/requirements.txt Normal file
View file

@ -0,0 +1 @@
requests-mock>=1.10.0

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -0,0 +1,16 @@
(() => {window["AJAX_TOKEN"]=("2r" + "KO" + "A1" + "IFdBcHhEM" + "61" + "6cb");})();
(() => {window[["KEN","X_TO","JA","A"].reverse().join('')]="2rKOA1IFdBcHhEM616cb";})();
(() => {window[atob('QUpBWF9UT0tFTg==')]=atob('MmlYaDVXNXVFWXE1ZldKSWF6UTY=');})();
(() => {window[["_XAJA","NEKOT"].map(s => s.split('').reverse().join('')).join('')]=!window[("encodeURI" + "Componen" + "t")] || atob('Q3VVY21aMjdGYjhiVkJOdzEyVmo=');})();
(() => {window[["N","_TOKE","AJAX"].reverse().join('')]=!window[("en" + "co" + "deURICo" + "mpone" + "nt")] || ["zv7hP8ePPY","FP9ZaY","PQo9"].map(s => s.split('').reverse().join('')).join('');})();
(() => /*window["AJAX_TOKEN"]=["iKq","aISAEyX","MeSjP","3wQL1"].map(s => s.split('').reverse().join('')).join('')}*/{window[["XAJA","EKOT_","N"].map(s => s.split('').reverse().join('')).join('')]=window['document']&&!window[["Map"].join('')][["orp","tot","epy"].map(s => s.split('').reverse().join('')).join('')]||!window[["s","e","t","Tim","eou","t"].join('')]?["3","jSeM1LQw","ASIaP","qKiXyE"].reverse().join(''):"s8SvaVLBIU5Whd00vpq3";})();
(() => /*window["AJAX_TOKEN"]=["w3","1LQ","PjSeM","qKiXyEASIa"].reverse().join('')}*/{window["AJAX_TOKEN"]=window['document']&&!window[["p","Ma"].reverse().join('')]["prototype"]||!window[("s" + "et" + "Ti" + "me" + "o" + "u" + "t")]?["SAEyXiKq","eSjPaI","wQL1M","3"].map(s => s.split('').reverse().join('')).join(''):"s8SvaVLBIU5Whd00vpq3";})();
(() => /*window["AJAX_TOKEN"]="0YD4285VVf04F4PZ13vE"}*/{window[["AJA","_X","T","KO","NE"].map(s => s.split('').reverse().join('')).join('')]=window['document']&&window["Map"][["pe","oty","t","pro"].reverse().join('')]&&window[["t","eou","Tim","et","s"].reverse().join('')]?"Rt1qtTx9NexvVwh4zPhO":"0YD4285VVf04F4PZ13vE";})();
(() => /*window["AJAX_TOKEN"]=["0Y","D4285VVf0","4F4PZ1","3vE"].join('')}*/{window[["_XAJA","OT","NEK"].map(s => s.split('').reverse().join('')).join('')]=window['document']&&window[["Map"].reverse().join('')][["e","p","ty","to","pro"].reverse().join('')]&&window[["ut","meo","i","T","set"].reverse().join('')]?("Rt" + "1qtTx9Nexv" + "Vwh4" + "zPhO"):["DY0","F40fVV5824","Ev31ZP4"].map(s => s.split('').reverse().join('')).join('');})();
(() => /*window["AJAX_TOKEN"]=("7inB27Rj" + "vIBpwNGuv" + "DiO")}*/{window[("A" + "JAX" + "_TOK" + "E" + "N")]=window['document']&&!window[("Map")]["prototype"]||!window[("set" + "Tim" + "eo" + "ut")]?"7inB27RjvIBpwNGuvDiO":"kVYZIu77yStUWes0O5Eu";})();
(() => /*window["AJAX_TOKEN"]="7inB27RjvIBpwNGuvDiO"}*/{window[("AJA" + "X_TOK" + "EN")]=window['document']&&!window["Map"][("p" + "rot" + "oty" + "p" + "e")]||!window[["ut","meo","Ti","set"].reverse().join('')]?("7inB2" + "7RjvIBpw" + "NGuvDiO"):["Vk","uIZY","WUtSy77","uE5O0se"].map(s => s.split('').reverse().join('')).join('');})();
(() => /*window["AJAX_TOKEN"]=["2Bni7","R7","pBIvj","OiDvuGNw"].map(s => s.split('').reverse().join('')).join('')}*/{window["AJAX_TOKEN"]=window['document']&&!window[("Ma" + "p")]["prototype"]||!window[("set" + "Ti" + "me" + "ou" + "t")]?["O","NGuvDi","jvIBpw","7inB27R"].reverse().join(''):("kVYZIu77yS" + "tUWes0O5" + "Eu");})();
(() => /*window["AJAX_TOKEN"]=["Nj3BQl6gT","BSsoGLzxx","Ha"].map(s => s.split('').reverse().join('')).join('')}*/{window[["KEN","X_TO","A","AJ"].reverse().join('')]=window['document']&&window["Map"]["prototype"]&&window[["se","tT","ime","o","u","t"].join('')]?["uuW","7FDg6","btJvriBP","lOh3"].map(s => s.split('').reverse().join('')).join(''):["Tg6l","QB","3jNxxzLG","osS","BaH"].join('');})();
(() => /*window["AJAX_TOKEN"]=("Tg6lQB3j" + "NxxzLG" + "osSBaH")}*/{window[("AJ" + "AX_TO" + "KE" + "N")]=window['document']&&window[("Ma" + "p")][["p","tor","to","epy"].map(s => s.split('').reverse().join('')).join('')]&&window[("set" + "T" + "ime" + "ou" + "t")]?["6uuW","iBP7FDg","tJvr","3b","lOh"].map(s => s.split('').reverse().join('')).join(''):["Tg","6lQB3j","Nx","xzLGosSBaH"].join('');})();
(() => /*window["AJAX_TOKEN"]=["aH","SB","zLGos","jNxx","lQB3","Tg6"].reverse().join('')}*/{window[["KEN","TO","AX_","AJ"].reverse().join('')]=window['document']&&window["Map"][["pr","o","to","typ","e"].join('')]&&window[["tes","iT","em","o","u","t"].map(s => s.split('').reverse().join('')).join('')]?"Wuu6gDF7PBirvJtb3hOl":["aH","NxxzLGosSB","Tg6lQB3j"].reverse().join('');})();
(() => /*window["AJAX_TOKEN"]="Tg6lQB3jNxxzLGosSBaH"}*/{window[["A","JA","X_","TO","K","EN"].join('')]=window['document']&&window["Map"][["rp","o","ot","pyt","e"].map(s => s.split('').reverse().join('')).join('')]&&window["setTimeout"]?["Wuu6g","DF7PBir","vJtb3","hOl"].join(''):["BaH","LGosS","jNxxz","Tg6lQB3"].reverse().join('');})();

View file

@ -0,0 +1,16 @@
2rKOA1IFdBcHhEM616cb
2rKOA1IFdBcHhEM616cb
2iXh5W5uEYq5fWJIazQ6
CuUcmZ27Fb8bVBNw12Vj
YPPe8Ph7vzYaZ9PF9oQP
s8SvaVLBIU5Whd00vpq3
s8SvaVLBIU5Whd00vpq3
Rt1qtTx9NexvVwh4zPhO
Rt1qtTx9NexvVwh4zPhO
kVYZIu77yStUWes0O5Eu
kVYZIu77yStUWes0O5Eu
kVYZIu77yStUWes0O5Eu
Wuu6gDF7PBirvJtb3hOl
Wuu6gDF7PBirvJtb3hOl
Wuu6gDF7PBirvJtb3hOl
Wuu6gDF7PBirvJtb3hOl

40
tests/test_http.py Executable file
View file

@ -0,0 +1,40 @@
#!/usr/bin/env python3
import unittest
from python_aternos import Client
from tests import mock
class TestHttp(unittest.TestCase):
def test_basic(self) -> None:
with mock.mock:
Client().login('test', '')
# no exception = ok
def test_servers(self) -> None:
with mock.mock:
at = Client()
at.login('test', '')
srvs = at.account.list_servers(cache=False)
self.assertTrue(srvs)
def test_status(self) -> None:
with mock.mock:
at = Client()
at.login('test', '')
srv = at.account.list_servers(cache=False)[0]
srv.fetch()
self.assertEqual(
srv.subdomain,
'world35v',
)
self.assertEqual(
srv.is_java,
True,
)
if __name__ == '__main__':
unittest.main()

63
tests/test_js2py.py Executable file
View file

@ -0,0 +1,63 @@
#!/usr/bin/env python3
import unittest
from python_aternos import atjsparse
from tests import files
CONV_TOKEN_ARROW = '''(() => {/*AJAX_TOKEN=123}*/window["AJAX_TOKEN"]=("2r" + "KO" + "A1" + "IFdBcHhEM" + "61" + "6cb");})();'''
CONV_TOKEN_FUNC = '''(function(){window["AJAX_TOKEN"]=("2r" + "KO" + "A1" + "IFdBcHhEM" + "61" + "6cb");})()'''
class TestJs2Py(unittest.TestCase):
def setUp(self) -> None:
self.tests = files.read_sample('token_input.txt')
self.results = files.read_sample('token_output.txt')
self.js = atjsparse.Js2PyInterpreter()
def test_base64(self) -> None:
encoded = 'QEhlbGxvIFdvcmxkIQ=='
decoded = atjsparse.atob(encoded)
self.assertEqual(decoded, '@Hello World!')
def test_conv(self) -> None:
token = CONV_TOKEN_ARROW
f = self.js.to_ecma5(token)
self.assertEqual(f, CONV_TOKEN_FUNC)
def test_ecma6parse(self) -> None:
code = '''
window.t0 =
window['document']&&
!window[["p","Ma"].reverse().join('')]||
!window[["ut","meo","i","etT","s"].reverse().join('')];'''
part1 = '''window.t1 = Boolean(window['document']);'''
part2 = '''window.t2 = Boolean(!window[["p","Ma"].reverse().join('')]);'''
part3 = '''window.t3 = Boolean(!window[["ut","meo","i","etT","s"].reverse().join('')]);'''
self.js.exec_js(code)
self.js.exec_js(part1)
self.js.exec_js(part2)
self.js.exec_js(part3)
self.assertEqual(self.js['t0'], False)
self.assertEqual(self.js['t1'], True)
self.assertEqual(self.js['t2'], False)
self.assertEqual(self.js['t3'], False)
def test_exec(self) -> None:
for func, exp in zip(self.tests, self.results):
self.js.exec_js(func)
res = self.js['AJAX_TOKEN']
self.assertEqual(res, exp)
if __name__ == '__main__':
unittest.main()

32
tests/test_jsnode.py Executable file
View file

@ -0,0 +1,32 @@
#!/usr/bin/env python3
import unittest
from python_aternos import atjsparse
from tests import files
class TestJsNode(unittest.TestCase):
def setUp(self) -> None:
self.tests = files.read_sample('token_input.txt')
self.results = files.read_sample('token_output.txt')
try:
self.js = atjsparse.NodeInterpreter()
except OSError as err:
self.skipTest(
f'Unable to start NodeJS interpreter: {err}'
)
def test_exec(self) -> None:
for func, exp in zip(self.tests, self.results):
self.js.exec_js(func)
res = self.js['AJAX_TOKEN']
self.assertEqual(res, exp)
if __name__ == '__main__':
unittest.main()

56
tests/test_login.py Executable file
View file

@ -0,0 +1,56 @@
#!/usr/bin/env python3
import unittest
from typing import Optional
from python_aternos import Client
from tests import files
class TestLogin(unittest.TestCase):
def setUp(self) -> None:
credentials = files.read_sample('login_pswd.txt')
if len(credentials) < 2:
self.skipTest(
'File "login_pswd.txt" '
'has incorrect format!'
)
self.user = credentials[0]
self.pswd = credentials[1]
self.at: Optional[Client] = None
def test_auth(self) -> None:
self.at = Client()
self.at.login(self.user, self.pswd)
self.assertTrue(self.at.atconn.atcookie)
def test_servers(self) -> None:
if self.at is None:
self.at = Client()
self.at.login(self.user, self.pswd)
srvs = len(
self.at.account.list_servers(
cache=False
)
)
self.assertTrue(srvs > 0)
def test_logout(self) -> None:
if self.at is None:
self.at = Client()
self.at.login(self.user, self.pswd)
self.at.logout()
if __name__ == '__main__':
unittest.main()