Compare commits

...
This repository has been archived on 2024-07-30. You can view files and clone it, but cannot push or open issues or pull requests.

3 commits

29 changed files with 187 additions and 4787 deletions

View file

@ -1,27 +1,23 @@
from getpass import getpass
from python_aternos import Client, atserver
from selenium.webdriver import Firefox
from python_aternos import Client
user = input('Username: ')
pswd = getpass('Password: ')
atclient = Client()
aternos = atclient.account
with Firefox() as driver:
atclient = Client(driver)
atclient.login(user, pswd)
srvs = aternos.list_servers()
servers = atclient.list_servers()
for srv in srvs:
print()
print('***', srv.servid, '***')
srv.fetch()
print(srv.domain)
print(srv.motd)
print('*** Status:', srv.status)
print('*** Full address:', srv.address)
print('*** Port:', srv.port)
print('*** Name:', srv.subdomain)
print('*** Minecraft:', srv.software, srv.version)
print('*** IsBedrock:', srv.edition == atserver.Edition.bedrock)
print('*** IsJava:', srv.edition == atserver.Edition.java)
print()
# for serv in servers:
# print(
# serv.id, serv.name,
# serv.software,
# serv.status,
# serv.players,
# )
list(map(print, servers))

View file

@ -1,11 +1 @@
"""Init"""
from .atclient import Client
from .atserver import AternosServer
from .atserver import Edition
from .atserver import Status
from .atplayers import PlayersList
from .atplayers import Lists
from .atwss import Streams
from .atjsparse import Js2PyInterpreter
from .atjsparse import NodeInterpreter
from .atclient import Client # noqa: F401

View file

@ -3,38 +3,33 @@ and allows to manage your account"""
import os
import re
from typing import Optional, Type
from typing import Optional, List
from selenium.webdriver.common.by import By
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.remote.webelement import WebElement
from python_aternos.atserver import PartialServerInfo
from .atselenium import SeleniumHelper, Remote
from .atlog import log, is_debug, set_debug
from .atmd5 import md5encode
from .ataccount import AternosAccount
from .atconnect import AternosConnect
from .atconnect import AJAX_URL
from .aterrors import CredentialsError
from .aterrors import TwoFactorAuthError
from . import atjsparse
from .atjsparse import Interpreter
from .atjsparse import Js2PyInterpreter
class Client:
"""Aternos API Client class, object
of which contains user's auth data"""
def __init__(self) -> None:
def __init__(self, driver: Remote) -> None:
self.se = SeleniumHelper(driver)
# Config
self.sessions_dir = '~'
self.js: Type[Interpreter] = Js2PyInterpreter
# ###
self.saved_session = '~/.aternos' # will be rewritten by login()
self.atconn = AternosConnect()
self.account = AternosAccount(self)
def login(
self,
@ -50,73 +45,41 @@ class Client:
code (Optional[int], optional): 2FA code
"""
self.login_hashed(
username,
md5encode(password),
code,
self.se.load_page('/go')
err_block = self.se.find_element(By.CLASS_NAME, 'login-error')
err_alert = self.se.find_element(By.CLASS_NAME, 'alert-wrapper')
self.se.exec_js(f'''
document.getElementById('user').value = '{username}'
document.getElementById('password').value = '{password}'
document.getElementById('twofactor-code').value = '{code}'
login()
''')
def logged_in_or_error(driver: Remote):
return (
driver.current_url.find('/servers') != -1 or
err_block.is_displayed() or
err_alert.is_displayed()
)
def login_hashed(
self,
username: str,
md5: str,
code: Optional[int] = None) -> None:
"""Log in to your Aternos account
with a username and a hashed password
self.se.wait.until(logged_in_or_error)
Args:
username (str): Username
md5 (str): Password hashed with MD5
code (int): 2FA code
if self.se.driver.current_url.find('/go') != -1:
Raises:
TwoFactorAuthError: If the 2FA is enabled,
but `code` argument was not passed or is incorrect
CredentialsError: If the Aternos backend
returned empty session cookie
(usually because of incorrect credentials)
ValueError: _description_
"""
if err_block.is_displayed():
raise CredentialsError(err_block.text)
filename = self.session_filename(
username, self.sessions_dir
)
if err_alert.is_displayed():
raise CredentialsError(err_alert.text)
try:
self.restore_session(filename)
except (OSError, CredentialsError):
pass
self.se.wait.until(lambda d: d.title.find('Cloudflare') == -1)
atjsparse.get_interpreter(create=self.js)
self.atconn.parse_token()
self.atconn.generate_sec()
if not self.se.get_cookie('ATERNOS_SESSION'):
raise CredentialsError('Session cookie is empty')
credentials = {
'user': username,
'password': md5,
}
if code is not None:
credentials['code'] = str(code)
loginreq = self.atconn.request_cloudflare(
f'{AJAX_URL}/account/login',
'POST', data=credentials, sendtoken=True,
)
if b'"show2FA":true' in loginreq.content:
raise TwoFactorAuthError('2FA code is required')
if 'ATERNOS_SESSION' not in loginreq.cookies:
raise CredentialsError(
'Check your username and password'
)
self.saved_session = filename
try:
self.save_session(filename)
except OSError:
pass
print(self.se.get_cookie('ATERNOS_SESSION')) # TODO: remove, this is for debug
def login_with_session(self, session: str) -> None:
"""Log in using ATERNOS_SESSION cookie
@ -125,31 +88,50 @@ class Client:
session (str): Session cookie value
"""
self.atconn.parse_token()
self.atconn.generate_sec()
self.atconn.session.cookies['ATERNOS_SESSION'] = session
self.se.set_cookie('ATERNOS_SESSION', session)
def logout(self) -> None:
"""Log out from the Aternos account"""
self.atconn.request_cloudflare(
f'{AJAX_URL}/account/logout',
'GET', sendtoken=True,
)
self.se.load_page('/servers')
self.se.find_element(By.CLASS_NAME, 'logout').click()
self.remove_session(self.saved_session)
def list_servers(self) -> List[PartialServerInfo]:
CARD_CLASS = 'servercard'
self.se.load_page('/servers')
def create_obj(s: WebElement) -> PartialServerInfo:
return PartialServerInfo(
id=s.get_dom_attribute('data-id'),
name=s.get_dom_attribute('title'),
software='',
status=(
s
.get_dom_attribute('class')
.replace(CARD_CLASS, '')
.split()[0]
),
players=0,
se=self.se,
)
return list(map(
create_obj,
self.se.find_elements(By.CLASS_NAME, CARD_CLASS),
))
def restore_session(self, file: str = '~/.aternos') -> None:
"""Restores ATERNOS_SESSION cookie and,
if included, servers list, from a session file
"""Restores ATERNOS_SESSION cookie from a session file
Args:
file (str, optional): Filename
Raises:
FileNotFoundError: If the file cannot be found
CredentialsError: If the session cookie
(or the file at all) has incorrect format
"""
file = os.path.expanduser(file)
@ -159,48 +141,24 @@ class Client:
raise FileNotFoundError()
with open(file, 'rt', encoding='utf-8') as f:
saved = f.read() \
.strip() \
.replace('\r\n', '\n') \
.split('\n')
session = f.readline().strip()
session = saved[0].strip()
if session == '' or not session.isalnum():
raise CredentialsError(
'Session cookie is invalid or the file is empty'
)
if len(saved) > 1:
self.account.refresh_servers(saved[1:])
self.atconn.session.cookies['ATERNOS_SESSION'] = session
self.login_with_session(session)
self.saved_session = file
def save_session(
self,
file: str = '~/.aternos',
incl_servers: bool = True) -> None:
def save_session(self, file: str = '~/.aternos') -> None:
"""Saves an ATERNOS_SESSION cookie to a file
Args:
file (str, optional): File where a session cookie must be saved
incl_servers (bool, optional): If the function
should include the servers IDs in this file
to reduce API requests count on the next restoration
(recommended)
file (str, optional):
File where the session cookie must be saved
"""
file = os.path.expanduser(file)
log.debug('Saving session to %s', file)
with open(file, 'wt', encoding='utf-8') as f:
f.write(self.atconn.atsession + '\n')
if not incl_servers:
return
for s in self.account.servers:
f.write(s.servid + '\n')
f.write(self.se.get_cookie('ATERNOS_SESSION') + '\n')
def remove_session(self, file: str = '~/.aternos') -> None:
"""Removes a file which contains

View file

@ -1,40 +1,15 @@
"""Stores API session and sends requests"""
import re
import time
import string
import secrets
from functools import partial
from typing import Optional
from typing import List, Dict, Any
import requests
from cloudscraper import CloudScraper
from .atlog import log, is_debug
from . import atjsparse
from .aterrors import TokenError
from .aterrors import CloudflareError
from .aterrors import AternosPermissionError
from typing import Dict, Any
BASE_URL = 'https://aternos.org'
AJAX_URL = f'{BASE_URL}/ajax'
REQUA = \
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 ' \
'(KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36 OPR/85.0.4341.47'
ARROW_FN_REGEX = r'\(\(\).*?\)\(\);'
SCRIPT_TAG_REGEX = (
rb'<script type=([\'"]?)text/javascript\1>.+?</script>'
)
SEC_ALPHABET = string.ascii_lowercase + string.digits
@ -44,108 +19,15 @@ class AternosConnect:
def __init__(self) -> None:
self.session = CloudScraper()
self.sec = ''
self.token = ''
self.atcookie = ''
def refresh_session(self) -> None:
"""Creates a new CloudScraper
session object and copies all cookies.
Required for bypassing Cloudflare"""
old_cookies = self.session.cookies
captcha_kwarg = self.session.captcha
self.session = CloudScraper(captcha=captcha_kwarg)
self.session.cookies.update(old_cookies)
del old_cookies
def parse_token(self) -> str:
"""Parses Aternos ajax token that
is needed for most requests
Raises:
TokenError: If the parser is unable
to extract ajax token from HTML
Returns:
Aternos ajax token
"""
loginpage = self.request_cloudflare(
f'{BASE_URL}/go/', 'GET'
).content
# Using the standard string methods
# instead of the expensive xml parsing
head = b'<head>'
headtag = loginpage.find(head)
headend = loginpage.find(b'</head>', headtag + len(head))
# Some checks
if headtag < 0 or headend < 0:
pagehead = loginpage
log.warning(
'Unable to find <head> tag, parsing the whole page'
)
else:
# Extracting <head> content
headtag = headtag + len(head)
pagehead = loginpage[headtag:headend]
js_code: Optional[List[Any]] = None
try:
text = pagehead.decode('utf-8', 'replace')
js_code = re.findall(ARROW_FN_REGEX, text)
token_func = js_code[0]
if len(js_code) > 1:
token_func = js_code[1]
js = atjsparse.get_interpreter()
js.exec_js(token_func)
self.token = js['AJAX_TOKEN']
except (IndexError, TypeError) as err:
log.warning('---')
log.warning('Unable to parse AJAX_TOKEN!')
log.warning('Please, insert the info below')
log.warning('to the GitHub issue description:')
log.warning('---')
log.warning('JavaScript: %s', js_code)
log.warning(
'All script tags: %s',
re.findall(SCRIPT_TAG_REGEX, pagehead)
)
log.warning('---')
raise TokenError(
'Unable to parse TOKEN from the page'
) from err
return self.token
return ''
def generate_sec(self) -> str:
"""Generates Aternos SEC token which
is also needed for most API requests
Returns:
Random SEC `key:value` string
"""
randkey = self.generate_sec_part()
randval = self.generate_sec_part()
self.sec = f'{randkey}:{randval}'
self.session.cookies.set(
f'ATERNOS_SEC_{randkey}', randval,
domain='aternos.org'
)
return self.sec
return 'a:b'
def generate_sec_part(self) -> str:
"""Generates a part for SEC token"""
@ -163,124 +45,8 @@ class AternosConnect:
reqcookies: Optional[Dict[Any, Any]] = None,
sendtoken: bool = False,
retries: int = 5,
timeout: int = 4) -> requests.Response:
"""Sends a request to Aternos API bypass Cloudflare
Args:
url (str): Request URL
method (str): Request method, must be GET or POST
params (Optional[Dict[Any, Any]], optional): URL parameters
data (Optional[Dict[Any, Any]], optional): POST request data,
if the method is GET, this dict will be combined with params
headers (Optional[Dict[Any, Any]], optional): Custom headers
reqcookies (Optional[Dict[Any, Any]], optional):
Cookies only for this request
sendtoken (bool, optional): If the ajax and SEC token
should be sent
retries (int, optional): How many times parser must retry
connection to API bypass Cloudflare
timeout (int, optional): Request timeout in seconds
Raises:
CloudflareError: When the parser has exceeded retries count
NotImplementedError: When the specified method is not GET or POST
Returns:
API response
"""
if retries <= 0:
raise CloudflareError('Unable to bypass Cloudflare protection')
try:
self.atcookie = self.session.cookies['ATERNOS_SESSION']
except KeyError:
pass
self.refresh_session()
params = params or {}
data = data or {}
headers = headers or {}
reqcookies = reqcookies or {}
method = method or 'GET'
method = method.upper().strip()
if method not in ('GET', 'POST'):
raise NotImplementedError('Only GET and POST are available')
if sendtoken:
params['TOKEN'] = self.token
params['SEC'] = self.sec
headers['X-Requested-With'] = 'XMLHttpRequest'
# requests.cookies.CookieConflictError bugfix
reqcookies['ATERNOS_SESSION'] = self.atcookie
del self.session.cookies['ATERNOS_SESSION']
if is_debug():
reqcookies_dbg = {
k: str(v or '')[:3]
for k, v in reqcookies.items()
}
session_cookies_dbg = {
k: str(v or '')[:3]
for k, v in self.session.cookies.items()
}
log.debug('Requesting(%s)%s', method, url)
log.debug('headers=%s', headers)
log.debug('params=%s', params)
log.debug('data=%s', data)
log.debug('req-cookies=%s', reqcookies_dbg)
log.debug('session-cookies=%s', session_cookies_dbg)
if method == 'POST':
sendreq = partial(
self.session.post,
params=params,
data=data,
)
else:
sendreq = partial(
self.session.get,
params={**params, **data},
)
req = sendreq(
url,
headers=headers,
cookies=reqcookies,
timeout=timeout,
)
resp_type = req.headers.get('content-type', '')
html_type = resp_type.find('text/html') != -1
cloudflare = req.status_code == 403
if html_type and cloudflare:
log.info('Retrying to bypass Cloudflare')
time.sleep(0.3)
return self.request_cloudflare(
url, method,
params, data,
headers, reqcookies,
sendtoken, retries - 1
)
log.debug('AternosConnect received: %s', req.text[:65])
log.info(
'%s completed with %s status',
method, req.status_code
)
if req.status_code == 402:
raise AternosPermissionError
req.raise_for_status()
return req
timeout: int = 4) -> Any:
return None
@property
def atsession(self) -> str:

View file

@ -0,0 +1,50 @@
from selenium.webdriver import Remote
from selenium.webdriver.support.wait import WebDriverWait
BASE_URL = 'https://aternos.org'
RM_SCRIPTS = '''
const rmScripts = () => {
const lst = document.querySelectorAll("script")
for (let js of lst) {
if (
js.src.includes('googletagmanager.com') ||
js.src.includes('cloudflareinsights.com') ||
js.innerText.includes('LANGUAGE_VARIABLES')
) {
js.remove()
}
}
}
addEventListener('DOMContentLoaded', rmScripts)
rmScripts()
'''
class SeleniumHelper:
def __init__(self, driver: Remote) -> None:
self.driver = driver
self.wait = WebDriverWait(driver, 8.0)
self.find_element = self.driver.find_element
self.find_elements = self.driver.find_elements
def load_page(self, path: str) -> None:
self.driver.get(f'{BASE_URL}{path}')
self.driver.execute_script(RM_SCRIPTS)
def exec_js(self, script: str) -> None:
self.driver.execute_script(script)
def get_cookie(self, name: str) -> str:
cookie = self.driver.get_cookie(name)
if cookie is None:
return ''
return cookie.get('value') or ''
def set_cookie(self, name: str, value: str) -> None:
self.driver.add_cookie({
'name': name,
'value': value,
})

View file

@ -1,26 +1,26 @@
"""Aternos Minecraft server"""
import re
import json
import enum
from typing import Any, Dict, List
from functools import partial
from typing import List
from .atconnect import BASE_URL, AJAX_URL
from .atconnect import AternosConnect
from .atwss import AternosWss
from .atselenium import SeleniumHelper
from .atplayers import PlayersList
from .atplayers import Lists
from .atconnect import AJAX_URL
from .atfm import FileManager
from .atconf import AternosConfig
from .atstubs import PlayersList
from .atstubs import Lists
from .atstubs import FileManager
from .atstubs import AternosConfig
from .aterrors import AternosError
from .aterrors import ServerStartError
from dataclasses import dataclass
SERVER_URL = f'{AJAX_URL}/server'
status_re = re.compile(
r'<script>\s*var lastStatus\s*?=\s*?(\{.+?\});?\s*<\/script>'
@ -55,67 +55,25 @@ class Status(enum.IntEnum):
confirm = 10
@dataclass
class PartialServerInfo:
se: SeleniumHelper
id: str
name: str = ''
software: str = ''
status: str = ''
players: int = 0
def use(self) -> None:
self.se.set_cookie('ATERNOS_SERVER', self.id)
self.se.load_page('/server')
@dataclass
class AternosServer:
"""Class for controlling your Aternos Minecraft server"""
def __init__(
self, servid: str,
atconn: AternosConnect,
autofetch: bool = False) -> None:
"""Class for controlling your Aternos Minecraft server
Args:
servid (str): Unique server IDentifier
atconn (AternosConnect):
AternosConnect instance with initialized Aternos session
autofetch (bool, optional): Automatically call
`fetch()` to get all info
"""
self.servid = servid
self.atconn = atconn
self._info: Dict[str, Any] = {}
self.atserver_request = partial(
self.atconn.request_cloudflare,
reqcookies={
'ATERNOS_SERVER': self.servid,
}
)
if autofetch:
self.fetch()
def fetch(self) -> None:
"""Get all server info"""
page = self.atserver_request(
f'{BASE_URL}/server', 'GET'
)
match = status_re.search(page.text)
if match is None:
raise AternosError('Unable to parse lastStatus object')
self._info = json.loads(match[1])
def wss(self, autoconfirm: bool = False) -> AternosWss:
"""Returns AternosWss instance for
listening server streams in real-time
Args:
autoconfirm (bool, optional):
Automatically start server status listener
when AternosWss connects to API to confirm
server launching
Returns:
AternosWss object
"""
return AternosWss(self, autoconfirm)
def start(
self,
headstart: bool = False,

View file

@ -0,0 +1,7 @@
from typing import Any
PlayersList = type('PlayersList', (), {})
FileManager = type('FileManager', (), {})
AternosConfig = type('AternosConfig', (), {})
Lists = Any

View file

@ -1,67 +0,0 @@
{
"name": "data",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"dependencies": {
"vm2": "^3.9.13"
}
},
"node_modules/acorn": {
"version": "8.8.1",
"resolved": "https://registry.npmjs.org/acorn/-/acorn-8.8.1.tgz",
"integrity": "sha512-7zFpHzhnqYKrkYdUjF1HI1bzd0VygEGX8lFk4k5zVMqHEoES+P+7TKI+EvLO9WVMJ8eekdO0aDEK044xTXwPPA==",
"bin": {
"acorn": "bin/acorn"
},
"engines": {
"node": ">=0.4.0"
}
},
"node_modules/acorn-walk": {
"version": "8.2.0",
"resolved": "https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.2.0.tgz",
"integrity": "sha512-k+iyHEuPgSw6SbuDpGQM+06HQUa04DZ3o+F6CSzXMvvI5KMvnaEqXe+YVe555R9nn6GPt404fos4wcgpw12SDA==",
"engines": {
"node": ">=0.4.0"
}
},
"node_modules/vm2": {
"version": "3.9.19",
"resolved": "https://registry.npmjs.org/vm2/-/vm2-3.9.19.tgz",
"integrity": "sha512-J637XF0DHDMV57R6JyVsTak7nIL8gy5KH4r1HiwWLf/4GBbb5MKL5y7LpmF4A8E2nR6XmzpmMFQ7V7ppPTmUQg==",
"dependencies": {
"acorn": "^8.7.0",
"acorn-walk": "^8.2.0"
},
"bin": {
"vm2": "bin/vm2"
},
"engines": {
"node": ">=6.0"
}
}
},
"dependencies": {
"acorn": {
"version": "8.8.1",
"resolved": "https://registry.npmjs.org/acorn/-/acorn-8.8.1.tgz",
"integrity": "sha512-7zFpHzhnqYKrkYdUjF1HI1bzd0VygEGX8lFk4k5zVMqHEoES+P+7TKI+EvLO9WVMJ8eekdO0aDEK044xTXwPPA=="
},
"acorn-walk": {
"version": "8.2.0",
"resolved": "https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.2.0.tgz",
"integrity": "sha512-k+iyHEuPgSw6SbuDpGQM+06HQUa04DZ3o+F6CSzXMvvI5KMvnaEqXe+YVe555R9nn6GPt404fos4wcgpw12SDA=="
},
"vm2": {
"version": "3.9.19",
"resolved": "https://registry.npmjs.org/vm2/-/vm2-3.9.19.tgz",
"integrity": "sha512-J637XF0DHDMV57R6JyVsTak7nIL8gy5KH4r1HiwWLf/4GBbb5MKL5y7LpmF4A8E2nR6XmzpmMFQ7V7ppPTmUQg==",
"requires": {
"acorn": "^8.7.0",
"acorn-walk": "^8.2.0"
}
}
}
}

View file

@ -1,5 +0,0 @@
{
"dependencies": {
"vm2": "^3.9.13"
}
}

View file

@ -1,49 +0,0 @@
const http = require('http')
const process = require('process')
const { VM } = require('vm2')
const args = process.argv.slice(2)
const port = args[0] || 8000
const host = args[1] || 'localhost'
const stubFunc = (_i) => {}
const vm = new VM({
timeout: 2000,
allowAsync: false,
sandbox: {
atob: atob,
setTimeout: stubFunc,
setInterval: stubFunc,
document: {
getElementById: stubFunc,
prepend: stubFunc,
append: stubFunc,
appendChild: stubFunc,
doctype: {},
currentScript: {},
},
},
})
vm.run('var window = global')
const listener = (req, res) => {
if (req.method != 'POST')
res.writeHead(405) & res.end()
let body = ''
req.on('data', chunk => (body += chunk))
req.on('end', () => {
let resp
try { resp = JSON.stringify(vm.run(body)) }
catch (ex) { resp = ex.message }
res.writeHead(200)
res.end(resp)
})
}
const server = http.createServer(listener)
server.listen(port, host, () => console.log('OK'))

View file

@ -1,5 +1 @@
cloudscraper==1.2.71
Js2Py==0.74
lxml==4.9.2
regex==2023.6.3
websockets==11.0.3
selenium==4.11.0

View file

@ -5,7 +5,7 @@ with open('README.md', 'rt') as readme:
setuptools.setup(
name='python-aternos',
version='3.0.4',
version='4.0.0',
author='Chechkenev Andrey (@DarkCat09)',
author_email='aacd0709@mail.ru',
description='An unofficial Aternos API',
@ -19,36 +19,18 @@ setuptools.setup(
},
classifiers=[
'Development Status :: 4 - Beta',
'Programming Language :: Python :: 3',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'License :: OSI Approved :: Apache Software License',
'Operating System :: OS Independent',
'Operating System :: Microsoft :: Windows',
'Operating System :: POSIX :: Linux',
'Operating System :: MacOS',
'Intended Audience :: Developers',
'Topic :: Internet',
'Typing :: Typed',
],
install_requires=[
'cloudscraper==1.2.71',
'Js2Py==0.74',
'lxml==4.9.2',
'regex==2023.6.3',
'websockets==11.0.3',
'selenium==4.11.0',
],
extras_require={
'dev': [
'autopep8==2.0.2',
'pycodestyle==2.10.0',
'mypy==1.4.1',
'pylint==2.17.4',
'requests-mock==1.11.0',
'types-requests==2.31.0.1',
'ruff==0.0.281',
],
'pypi': [
'build==0.10.0',
@ -61,5 +43,4 @@ setuptools.setup(
},
packages=['python_aternos'],
python_requires=">=3.7",
include_package_data=True,
)

View file

View file

@ -1,32 +0,0 @@
from pathlib import Path
from typing import List
abs_dir = Path(__file__).absolute().parent
samples = abs_dir / 'samples'
htmls = samples / 'html'
def read_sample(name: str) -> List[str]:
path = samples / name
if not path.exists():
return []
with path.open('rt', encoding='utf-8') as file:
return file \
.read() \
.strip() \
.replace('\r\n', '\n') \
.split('\n')
def read_html(name: str) -> bytes:
path = samples / 'html' / name
if not path.exists():
return b''
with path.open('rb') as file:
return file.read()

View file

@ -1,102 +0,0 @@
#!/usr/bin/env python3
# How to use
# *******************************
# 1. Open DevTools at aternos.org
# 2. Get AJAX_TOKEN variable value (without quotes)
#
# 3. Pass it to this script as an argument, e.g.:
# python3 js_samples.py xKflIsKHxlv96fLc1tht
#
# 4. The script will request the token 100 times
# and check it with different built-in interpreters
# (now there are only js2py and nodejs)
# 5. Array "errored" which is printed at the end
# contains indexes of incorrectly executed JS functions
# 6. Enter this index in the opened console
# or enter "exit" to exit
import re
import sys
from python_aternos.atconnect import AternosConnect
from python_aternos.atconnect import BASE_URL
from python_aternos import Js2PyInterpreter
from python_aternos import NodeInterpreter
TIMES = 100
js = re.compile(r'\(\(\).*?\)\(\);')
conn = AternosConnect()
jsi1 = Js2PyInterpreter()
jsi2 = NodeInterpreter()
token = sys.argv[1]
samples = []
errored = []
def get_code() -> bool:
r = conn.request_cloudflare(
f'{BASE_URL}/go', 'GET'
)
if r.status_code != 200:
print(r.status_code)
code = js.search(r.text)
if code is None:
print('No match!')
return False
sample = code.group(0)
samples.append(sample)
print(sample)
print('***')
jsi1.exec_js(sample)
jsi2.exec_js(sample)
var1 = jsi1['AJAX_TOKEN']
var2 = jsi2['AJAX_TOKEN']
print(var1)
print(var2)
print('***')
print()
print()
return var1 == var2 == token
def main() -> None:
print()
for i in range(TIMES):
print(i)
if not get_code():
errored.append(i)
print('Errored:', errored)
print('Choose sample number:')
while True:
try:
print('>', end=' ')
cmd = input()
if cmd.strip().lower() in ('exit', 'quit'):
print('Quit')
break
print(samples[int(cmd)])
except KeyboardInterrupt:
print()
print('Quit')
break
except Exception as err:
print(err)
if __name__ == '__main__':
main()

View file

@ -1,46 +0,0 @@
from requests_mock import Mocker
from python_aternos.atconnect import BASE_URL, AJAX_URL
from tests import files
mock = Mocker()
with mock:
mock.get(
f'{BASE_URL}/go/',
content=files.read_html('aternos_go'),
)
mock.get(
f'{BASE_URL}/servers/',
content=files.read_html('aternos_servers'),
)
mock.get(
f'{BASE_URL}/server',
content=files.read_html('aternos_server1'),
)
mock.post(
f'{AJAX_URL}/account/login',
json={
'success': True,
'error': None,
'message': None,
'show2FA': False,
},
cookies={
'ATERNOS_SESSION': '0123abcd',
},
)
mock.get(
f'{BASE_URL}/players/',
content=files.read_html('aternos_players'),
)
mock.get(
f'{BASE_URL}/files/',
content=files.read_html('aternos_file_root'),
)

View file

@ -1 +0,0 @@
requests-mock>=1.10.0

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -1,16 +0,0 @@
(() => {window["AJAX_TOKEN"]=("2r" + "KO" + "A1" + "IFdBcHhEM" + "61" + "6cb");})();
(() => {window[["KEN","X_TO","JA","A"].reverse().join('')]="2rKOA1IFdBcHhEM616cb";})();
(() => {window[atob('QUpBWF9UT0tFTg==')]=atob('MmlYaDVXNXVFWXE1ZldKSWF6UTY=');})();
(() => {window[["_XAJA","NEKOT"].map(s => s.split('').reverse().join('')).join('')]=!window[("encodeURI" + "Componen" + "t")] || atob('Q3VVY21aMjdGYjhiVkJOdzEyVmo=');})();
(() => {window[["N","_TOKE","AJAX"].reverse().join('')]=!window[("en" + "co" + "deURICo" + "mpone" + "nt")] || ["zv7hP8ePPY","FP9ZaY","PQo9"].map(s => s.split('').reverse().join('')).join('');})();
(() => /*window["AJAX_TOKEN"]=["iKq","aISAEyX","MeSjP","3wQL1"].map(s => s.split('').reverse().join('')).join('')}*/{window[["XAJA","EKOT_","N"].map(s => s.split('').reverse().join('')).join('')]=window['document']&&!window[["Map"].join('')][["orp","tot","epy"].map(s => s.split('').reverse().join('')).join('')]||!window[["s","e","t","Tim","eou","t"].join('')]?["3","jSeM1LQw","ASIaP","qKiXyE"].reverse().join(''):"s8SvaVLBIU5Whd00vpq3";})();
(() => /*window["AJAX_TOKEN"]=["w3","1LQ","PjSeM","qKiXyEASIa"].reverse().join('')}*/{window["AJAX_TOKEN"]=window['document']&&!window[["p","Ma"].reverse().join('')]["prototype"]||!window[("s" + "et" + "Ti" + "me" + "o" + "u" + "t")]?["SAEyXiKq","eSjPaI","wQL1M","3"].map(s => s.split('').reverse().join('')).join(''):"s8SvaVLBIU5Whd00vpq3";})();
(() => /*window["AJAX_TOKEN"]="0YD4285VVf04F4PZ13vE"}*/{window[["AJA","_X","T","KO","NE"].map(s => s.split('').reverse().join('')).join('')]=window['document']&&window["Map"][["pe","oty","t","pro"].reverse().join('')]&&window[["t","eou","Tim","et","s"].reverse().join('')]?"Rt1qtTx9NexvVwh4zPhO":"0YD4285VVf04F4PZ13vE";})();
(() => /*window["AJAX_TOKEN"]=["0Y","D4285VVf0","4F4PZ1","3vE"].join('')}*/{window[["_XAJA","OT","NEK"].map(s => s.split('').reverse().join('')).join('')]=window['document']&&window[["Map"].reverse().join('')][["e","p","ty","to","pro"].reverse().join('')]&&window[["ut","meo","i","T","set"].reverse().join('')]?("Rt" + "1qtTx9Nexv" + "Vwh4" + "zPhO"):["DY0","F40fVV5824","Ev31ZP4"].map(s => s.split('').reverse().join('')).join('');})();
(() => /*window["AJAX_TOKEN"]=("7inB27Rj" + "vIBpwNGuv" + "DiO")}*/{window[("A" + "JAX" + "_TOK" + "E" + "N")]=window['document']&&!window[("Map")]["prototype"]||!window[("set" + "Tim" + "eo" + "ut")]?"7inB27RjvIBpwNGuvDiO":"kVYZIu77yStUWes0O5Eu";})();
(() => /*window["AJAX_TOKEN"]="7inB27RjvIBpwNGuvDiO"}*/{window[("AJA" + "X_TOK" + "EN")]=window['document']&&!window["Map"][("p" + "rot" + "oty" + "p" + "e")]||!window[["ut","meo","Ti","set"].reverse().join('')]?("7inB2" + "7RjvIBpw" + "NGuvDiO"):["Vk","uIZY","WUtSy77","uE5O0se"].map(s => s.split('').reverse().join('')).join('');})();
(() => /*window["AJAX_TOKEN"]=["2Bni7","R7","pBIvj","OiDvuGNw"].map(s => s.split('').reverse().join('')).join('')}*/{window["AJAX_TOKEN"]=window['document']&&!window[("Ma" + "p")]["prototype"]||!window[("set" + "Ti" + "me" + "ou" + "t")]?["O","NGuvDi","jvIBpw","7inB27R"].reverse().join(''):("kVYZIu77yS" + "tUWes0O5" + "Eu");})();
(() => /*window["AJAX_TOKEN"]=["Nj3BQl6gT","BSsoGLzxx","Ha"].map(s => s.split('').reverse().join('')).join('')}*/{window[["KEN","X_TO","A","AJ"].reverse().join('')]=window['document']&&window["Map"]["prototype"]&&window[["se","tT","ime","o","u","t"].join('')]?["uuW","7FDg6","btJvriBP","lOh3"].map(s => s.split('').reverse().join('')).join(''):["Tg6l","QB","3jNxxzLG","osS","BaH"].join('');})();
(() => /*window["AJAX_TOKEN"]=("Tg6lQB3j" + "NxxzLG" + "osSBaH")}*/{window[("AJ" + "AX_TO" + "KE" + "N")]=window['document']&&window[("Ma" + "p")][["p","tor","to","epy"].map(s => s.split('').reverse().join('')).join('')]&&window[("set" + "T" + "ime" + "ou" + "t")]?["6uuW","iBP7FDg","tJvr","3b","lOh"].map(s => s.split('').reverse().join('')).join(''):["Tg","6lQB3j","Nx","xzLGosSBaH"].join('');})();
(() => /*window["AJAX_TOKEN"]=["aH","SB","zLGos","jNxx","lQB3","Tg6"].reverse().join('')}*/{window[["KEN","TO","AX_","AJ"].reverse().join('')]=window['document']&&window["Map"][["pr","o","to","typ","e"].join('')]&&window[["tes","iT","em","o","u","t"].map(s => s.split('').reverse().join('')).join('')]?"Wuu6gDF7PBirvJtb3hOl":["aH","NxxzLGosSB","Tg6lQB3j"].reverse().join('');})();
(() => /*window["AJAX_TOKEN"]="Tg6lQB3jNxxzLGosSBaH"}*/{window[["A","JA","X_","TO","K","EN"].join('')]=window['document']&&window["Map"][["rp","o","ot","pyt","e"].map(s => s.split('').reverse().join('')).join('')]&&window["setTimeout"]?["Wuu6g","DF7PBir","vJtb3","hOl"].join(''):["BaH","LGosS","jNxxz","Tg6lQB3"].reverse().join('');})();

View file

@ -1,16 +0,0 @@
2rKOA1IFdBcHhEM616cb
2rKOA1IFdBcHhEM616cb
2iXh5W5uEYq5fWJIazQ6
CuUcmZ27Fb8bVBNw12Vj
YPPe8Ph7vzYaZ9PF9oQP
s8SvaVLBIU5Whd00vpq3
s8SvaVLBIU5Whd00vpq3
Rt1qtTx9NexvVwh4zPhO
Rt1qtTx9NexvVwh4zPhO
kVYZIu77yStUWes0O5Eu
kVYZIu77yStUWes0O5Eu
kVYZIu77yStUWes0O5Eu
Wuu6gDF7PBirvJtb3hOl
Wuu6gDF7PBirvJtb3hOl
Wuu6gDF7PBirvJtb3hOl
Wuu6gDF7PBirvJtb3hOl

View file

@ -1,40 +0,0 @@
#!/usr/bin/env python3
import unittest
from python_aternos import Client
from tests import mock
class TestHttp(unittest.TestCase):
def test_basic(self) -> None:
with mock.mock:
Client().login('test', '')
# no exception = ok
def test_servers(self) -> None:
with mock.mock:
at = Client()
at.login('test', '')
srvs = at.account.list_servers(cache=False)
self.assertTrue(srvs)
def test_status(self) -> None:
with mock.mock:
at = Client()
at.login('test', '')
srv = at.account.list_servers(cache=False)[0]
srv.fetch()
self.assertEqual(
srv.subdomain,
'world35v',
)
self.assertEqual(
srv.is_java,
True,
)
if __name__ == '__main__':
unittest.main()

View file

@ -1,63 +0,0 @@
#!/usr/bin/env python3
import unittest
from python_aternos import atjsparse
from tests import files
CONV_TOKEN_ARROW = '''(() => {/*AJAX_TOKEN=123}*/window["AJAX_TOKEN"]=("2r" + "KO" + "A1" + "IFdBcHhEM" + "61" + "6cb");})();'''
CONV_TOKEN_FUNC = '''(function(){window["AJAX_TOKEN"]=("2r" + "KO" + "A1" + "IFdBcHhEM" + "61" + "6cb");})()'''
class TestJs2Py(unittest.TestCase):
def setUp(self) -> None:
self.tests = files.read_sample('token_input.txt')
self.results = files.read_sample('token_output.txt')
self.js = atjsparse.Js2PyInterpreter()
def test_base64(self) -> None:
encoded = 'QEhlbGxvIFdvcmxkIQ=='
decoded = atjsparse.atob(encoded)
self.assertEqual(decoded, '@Hello World!')
def test_conv(self) -> None:
token = CONV_TOKEN_ARROW
f = self.js.to_ecma5(token)
self.assertEqual(f, CONV_TOKEN_FUNC)
def test_ecma6parse(self) -> None:
code = '''
window.t0 =
window['document']&&
!window[["p","Ma"].reverse().join('')]||
!window[["ut","meo","i","etT","s"].reverse().join('')];'''
part1 = '''window.t1 = Boolean(window['document']);'''
part2 = '''window.t2 = Boolean(!window[["p","Ma"].reverse().join('')]);'''
part3 = '''window.t3 = Boolean(!window[["ut","meo","i","etT","s"].reverse().join('')]);'''
self.js.exec_js(code)
self.js.exec_js(part1)
self.js.exec_js(part2)
self.js.exec_js(part3)
self.assertEqual(self.js['t0'], False)
self.assertEqual(self.js['t1'], True)
self.assertEqual(self.js['t2'], False)
self.assertEqual(self.js['t3'], False)
def test_exec(self) -> None:
for func, exp in zip(self.tests, self.results):
self.js.exec_js(func)
res = self.js['AJAX_TOKEN']
self.assertEqual(res, exp)
if __name__ == '__main__':
unittest.main()

View file

@ -1,32 +0,0 @@
#!/usr/bin/env python3
import unittest
from python_aternos import atjsparse
from tests import files
class TestJsNode(unittest.TestCase):
def setUp(self) -> None:
self.tests = files.read_sample('token_input.txt')
self.results = files.read_sample('token_output.txt')
try:
self.js = atjsparse.NodeInterpreter()
except OSError as err:
self.skipTest(
f'Unable to start NodeJS interpreter: {err}'
)
def test_exec(self) -> None:
for func, exp in zip(self.tests, self.results):
self.js.exec_js(func)
res = self.js['AJAX_TOKEN']
self.assertEqual(res, exp)
if __name__ == '__main__':
unittest.main()

View file

@ -1,56 +0,0 @@
#!/usr/bin/env python3
import unittest
from typing import Optional
from python_aternos import Client
from tests import files
class TestLogin(unittest.TestCase):
def setUp(self) -> None:
credentials = files.read_sample('login_pswd.txt')
if len(credentials) < 2:
self.skipTest(
'File "login_pswd.txt" '
'has incorrect format!'
)
self.user = credentials[0]
self.pswd = credentials[1]
self.at: Optional[Client] = None
def test_auth(self) -> None:
self.at = Client()
self.at.login(self.user, self.pswd)
self.assertTrue(self.at.atconn.atcookie)
def test_servers(self) -> None:
if self.at is None:
self.at = Client()
self.at.login(self.user, self.pswd)
srvs = len(
self.at.account.list_servers(
cache=False
)
)
self.assertTrue(srvs > 0)
def test_logout(self) -> None:
if self.at is None:
self.at = Client()
self.at.login(self.user, self.pswd)
self.at.logout()
if __name__ == '__main__':
unittest.main()