Compare commits

..

No commits in common. "main" and "2.0.1" have entirely different histories.
main ... 2.0.1

61 changed files with 1472 additions and 32265 deletions

6
.gitignore vendored
View file

@ -107,9 +107,3 @@ cython_debug/
# IDE
.vscode/
# Credentials for unittest
tests/samples/login_pswd.txt
# NPM
node_modules/

View file

@ -1,28 +1,16 @@
build:
python3 -m build
python -m build
upload:
python3 -m twine upload dist/*
doc:
python3 -m mkdocs build
python -m twine upload dist/*
clean:
rm -rf dist build python_aternos.egg-info
rm -rf dist python_aternos.egg-info
rm -rf python_aternos/__pycache__
rm -rf examples/__pycache__
rm -rf tests/__pycache__
rm -rf site .mypy_cache
test:
python3 -m unittest discover -v ./tests
check:
python3 -m mypy ./python_aternos
python3 -m pylint ./python_aternos
fullcheck:
chmod +x check.sh; bash check.sh
format:
python3 -m autopep8 -r --in-place ./python_aternos
chmod +x test.sh
bash test.sh

View file

@ -1,7 +1,7 @@
<div align="center">
<img src="https://i.ibb.co/3RXcXJ1/aternos-400.png" alt="Python Aternos Logo">
<h1>
[UNMAINTAINED] Python Aternos
Python Aternos
<div>
<a href="https://pypi.org/project/python-aternos/">
<img src="https://img.shields.io/pypi/v/python-aternos">
@ -22,34 +22,31 @@
An unofficial Aternos API written in Python.
It uses [aternos](https://aternos.org/)' private API and html parsing.
> [!WARNING]
>
> This library is no longer maintained, because:
> 1. Aternos started detecting all automated requests (and, therefore, ToS violations)
> via JS code in `AJAX_TOKEN` which is executed incorrectly in Js2Py and
> requires a NodeJS DOM library (at least) or a browser engine.
> For details, see [#85](https://github.com/DarkCat09/python-aternos/issues/85).
> 2. Aternos frontend is protected with Cloudflare, so this library fails to parse pages
> in case of, for example, blocked or suspicious IP address (e.g. web hosting).
> CF shows IUAM page, often with captcha. We need a browser engine like undetected-chromedriver and an AI or a man solving captchas.
> 3. Last Aternos API update broke nearly everything.
> 4. I have no more motivation and not enough time to work on this, nor need in using Aternos.
>
> I'm so sorry. If you want to continue development of python-aternos,
> [contact me](https://url.dc09.ru/contact), but I think it's better to write from scratch.
Python Aternos supports:
- Logging in to account with password (plain or hashed) or `ATERNOS_SESSION` cookie value
- Saving session to the file and restoring
- Changing username, email and password
- Parsing Minecraft servers list
- Parsing server info by its ID
- Starting/stoping server, restarting, confirming/cancelling launch
- Updating server info in real-time (see [WebSocket API](https://python-aternos.codeberg.page/howto/websocket))
- Changing server subdomain and MOTD (message-of-the-day)
- Logging in to account with password (plain or hashed) or `ATERNOS_SESSION` cookie value.
- Saving session to the file and restoring.
- Changing username, email and password.
- Parsing Minecraft servers list.
- Parsing server info by its ID.
- Starting/stoping server, restarting, confirming/cancelling launch.
- Updating server info in real-time (view WebSocket API).
- Changing server subdomain and MOTD (message-of-the-day).
- Managing files, settings, players (whitelist, operators, etc.)
> **Warning**
>
> According to the Aternos' [Terms of Service §5.2e](https://aternos.gmbh/en/aternos/terms#:~:text=Automatically%20accessing%20our%20website%20or%20automating%20actions%20on%20our%20website.),
> you must not use any software or APIs for automated access,
> beacuse they don't receive money from advertisting in this case.
>
> I always try to hide automated python-aternos requests
> using browser-specific headers/cookies,
> but you should make backups to restore your world
> if Aternos detects violation of ToS and bans your account
> (view issues [#16](https://github.com/DarkCat09/python-aternos/issues/16)
> and [#46](https://github.com/DarkCat09/python-aternos/issues/46)).
## Install
### Common
@ -66,12 +63,12 @@ $ pip install python-aternos
```bash
$ git clone https://github.com/DarkCat09/python-aternos.git
$ cd python-aternos
$ pip install -e .[dev]
$ pip install -e .
```
## Usage
To use Aternos API in your Python script, import it
and login with your username and password or its MD5 hash.
and login with your username and password or MD5.
Then request the servers list using `list_servers()`.
You can start/stop your Aternos server, calling `start()` or `stop()`.
@ -81,26 +78,17 @@ Here is an example how to use the API:
# Import
from python_aternos import Client
# Create object
atclient = Client()
# Log in
# with username and password
atclient.login('example', 'test123')
aternos = Client.from_credentials('example', 'test123')
# ----OR----
# with username and MD5 hashed password
atclient.login_hashed('example', 'cc03e747a6afbbcbf8be7668acfebee5')
aternos = Client.from_hashed('example', 'cc03e747a6afbbcbf8be7668acfebee5')
# ----OR----
# with session cookie
atclient.login_with_session('ATERNOS_SESSION cookie value')
aternos = Client.restore_session()
# Get AternosAccount object
aternos = atclient.account
# Get servers list
# Returns AternosServer list
servs = aternos.list_servers()
# Get the first server
# Get the first server by the 0 index
myserv = servs[0]
# Start
@ -115,7 +103,7 @@ for serv in servs:
testserv = serv
if testserv is not None:
# Prints the server software and its version
# Prints a server softaware and its version
# (for example, "Vanilla 1.12.2")
print(testserv.software, testserv.version)
# Starts server
@ -124,13 +112,15 @@ if testserv is not None:
## [More examples](https://github.com/DarkCat09/python-aternos/tree/main/examples)
## [Documentation](https://python-aternos.codeberg.page)
## [Documentation](https://python-aternos.codeberg.page/)
## [How-To Guide](https://python-aternos.codeberg.page/howto/auth)
## Changelog
|Version|Description |
|:-----:|:-----------|
|v0.1|The first release.|
|v0.2|Fixed import problem.|
|v0.3|Implemented files API, added typization.|
|v0.4|Implemented configuration API, some bugfixes.|
|v0.5|The API was updated corresponding to new Aternos security methods. Huge thanks to [lusm554](https://github.com/lusm554).|
@ -139,20 +129,10 @@ if testserv is not None:
|v1.1.x|Documentation, unit tests, pylint, bugfixes, changes in atwss.|
|**v1.1.2/v2.0.0**|Solution for [#25](https://github.com/DarkCat09/python-aternos/issues/25) (Cloudflare bypassing), bugfixes in JS parser.|
|v2.0.x|Documentation, automatically saving/restoring session, improvements in Files API.|
|v2.1.x|Fixes in websockets API, atconnect (including cookie refreshing fix). Support for captcha solving services (view [#52](https://github.com/DarkCat09/python-aternos/issues/52)).|
|v2.2.x|Node.JS interpreter support.|
|v3.0.0|Partially rewritten, API updates.|
|v3.0.5|Unmaintained.|
|v3.1.x|TODO: Full implementation of config API.|
|v3.2.x|TODO: Shared access API and maybe Google Drive backups.|
## Reversed API Specification
Private Aternos API requests were captured into
[this HAR file](https://github.com/DarkCat09/python-aternos/blob/main/aternos.har)
and were imported to
[a Postman Workspace](https://www.postman.com/darkcat09/workspace/aternos-api).
You can use both resources to explore the API.
Any help with improving this library is welcome.
|v2.1.x|Fixes in the implementation of websockets API.|
|**v2.2.x**|Using Node.js as a JS interpreter if it's installed.|
|v3.0.x|Full implementation of config and software API.|
|v3.1.x|Shared access API and Google Drive backups.|
## License
[License Notice:](https://github.com/DarkCat09/python-aternos/blob/main/NOTICE)

26605
aternos.har

File diff suppressed because one or more lines are too long

74
aternos.txt Normal file
View file

@ -0,0 +1,74 @@
aternos.org/software/v/**TYPE**/**MCVER**-latest|recommended|**SOFTVER**
(AternosSoftware) //div[@class="version-title"]
(software_name) /h1[@class="version-title-name"]
(software_id) /div[@id="install-software"]/@data-software
GET software/install.php
software: rLyATopqZP79WHHR
reinstall: 0 OR 1
GET confirm.php
GET config.php
file: /server.properties OR /world/level.dat
option: max-players OR resource-pack OR Data:hardcore OR Data:GameRules:commandBlockOutput
value: 20
GET timezone.php
timezone: Europe/Ulyanovsk
GET image.php
image: openjdk:8
GET mclogs.php
(save log to mclo.gs)
response.json().id
https://api.mclo.gs/1/raw/**ID**
POST create.php
file: /config/hello
type: directory OR file
POST delete.php
file: /config/123.txt
POST save.php
file: /config/123.txt
content: ... (x-www-form-urlencoded; charset=UTF-8)
GET files/download.php?file=**FILENAME_ABSOLUTE**
(ex. file=/world will download in ZIP all directory)
GET worlds/download.php?world=**WORLD_NAME**
GET players/add.php,remove.php
list: whitelist,ops,banned-players,banned-ips
name: CodePicker13 *OR* 1.2.3.4(in case of IP)
(list players) //div[@class="page-content page-players"]/div[@class="player-list"]/div[@class="list-item-container"]
(players[...]) ./div[@class="list-item"]/div[@class="list-name"] (and class="list-avatar")
POST friends/create.php
username: t3test
(LISTUSERIDs) //div[@class="friends-share-list list-players"]/div[@class="list-item-container"]/@data-id
POST friends/delete.php
id: **LISTUSERID**
POST friends/update.php
id: **LISTUSERID**
permissions: json(permissions)
GET driveBackup/autoBackups.php?enabled=**0or1**&amount=**AUTOBACKUPS_COUNT_LIMIT**
(list backups) //div[@class="backups"]/div[@class="file"]
(backups[...]) ./@id, re.search(r'backup-(\w+)', _)[1]
(backups[...]) ./div[@class="filename"] (/span[@class="backup-time js-date-time"], then /@data-date or content)
(backups[...]) ./div[@class="backup-user,filesize"]
POST driveBackup/create.php
name: MyBackup2
POST driveBackup/restore.php,delete.php
backupID: 5
GET /panel/img/skin.php?name=**NICKNAME**
(get player's head in png)

225
cloudflare.html Normal file
View file

@ -0,0 +1,225 @@
<!DOCTYPE html>
<!--[if lt IE 7]> <html class="no-js ie6 oldie" lang="en-US"> <![endif]-->
<!--[if IE 7]> <html class="no-js ie7 oldie" lang="en-US"> <![endif]-->
<!--[if IE 8]> <html class="no-js ie8 oldie" lang="en-US"> <![endif]-->
<!--[if gt IE 8]><!--> <html class="no-js" lang="en-US"> <!--<![endif]-->
<head>
<title>Please Wait... | Cloudflare</title>
<meta charset="UTF-8" />
<meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />
<meta http-equiv="X-UA-Compatible" content="IE=Edge" />
<meta name="robots" content="noindex, nofollow" />
<meta name="viewport" content="width=device-width,initial-scale=1" />
<link rel="stylesheet" id="cf_styles-css" href="/cdn-cgi/styles/cf.errors.css" />
<!--[if lt IE 9]><link rel="stylesheet" id='cf_styles-ie-css' href="/cdn-cgi/styles/cf.errors.ie.css" /><![endif]-->
<style>body{margin:0;padding:0}</style>
<!--[if gte IE 10]><!-->
<script>
if (!navigator.cookieEnabled) {
window.addEventListener('DOMContentLoaded', function () {
var cookieEl = document.getElementById('cookie-alert');
cookieEl.style.display = 'block';
})
}
</script>
<!--<![endif]-->
<script>
//<![CDATA[
(function(){
window._cf_chl_opt={
cvId: "2",
cType: "managed",
cNounce: "60603",
cRay: "7302b406cd228696",
cHash: "d4e6954950a219e",
cUPMDTk: "\/go?__cf_chl_tk=kWYv_tRIC92k7YH4y2DjHk8cG6NxqiqrNFuOqRK8Fu8-1658729119-0-gaNycGzNBz0",
cFPWv: "b",
cTTimeMs: "1000",
cLt: "n",
cRq: {
ru: "aHR0cHM6Ly9hdGVybm9zLm9yZy9nbw==",
ra: "cHl0aG9uLXJlcXVlc3RzLzIuMjguMQ==",
rm: "R0VU",
d: "G2POstLbRi/mO4xX+XzRkyogW3AfxXQUGCScenji3bfh+Gf0fkmJhvO2MowSatfqGliJ6f2H5ImzWa6d81NiobXdtTyoMSI6SOKJhA1yEwnB0lK2vynRDfPlOnBg3DLD9ajpnwbdChhAW6VJJpNcknw4DjY3Xw6QrapPwPcNviXh2L36o8Q64RdNrmsU2E/dX/bdO93AnNCYELqZNSk5LxR9Mk6nx69h2OlTujAywh6RNNznAb4VIMG98OPs8ezG/uyJT0IMDAi0hxX2XLQMgJpZmH9Fq9mOxdQb4u0yNelPMBmydSH8eY06+T8igX9Gl+KO1GwncGXNJiiuSkzRFyeEmqH1s7BAM8RonqSLmY/QlQBpKfXA7fVz3ZXx3kz/FZ5mymbNQM4Px0H8t70Fr/cBvb3ouURH/auflic15vYcTxJVF0Bhcwixm9kaU0idAJ3D7QVmekRCYAuQFoR/WILD3f1izZeIw/OjRyMCMulbXKlts/r/DnEuGB58L/0yanKluI1NOzWYwfHnRX98+EbGwsUZtysTA0gASgIHkkTUDJ2jR62ttWzksiTMs5L86RQrU+GNAFdct5Duy1RNMYS57xvSLLSfBFU0L/20OCogWsOnX1JTAV3uDaIRpzkBdA/71c9CQYnFhPwXd0CVCXknnVjRsZ9qnS30ajgBleqiw7EBYAFjpmOJcvb7ngRH",
t: "MTY1ODcyOTExOS44MTEwMDA=",
m: "ebYu94kEblPB33GnXq/UkQwZzAYGEz384g11FsTA2/E=",
i1: "7RZsHiXQtnE3zcV5NeyZiw==",
i2: "kRVJdWyzuDqPITricsrSgA==",
zh: "QOULi3+m02zThjEHsshmSiTQdvN0z3mb6EpEHHD4wUE=",
uh: "SLdVolODg++SO356HusO5I/hbfOpiiOxQXj62i/MUkA=",
hh: "/sR0/WeF9SUzF+YeTskjqNWaIcUj72REMvL1SD3B2cE=",
}
};
}());
//]]>
</script>
<style>
#cf-wrapper #spinner {width:69px; margin: auto;}
#cf-wrapper #cf-please-wait{text-align:center}
.attribution {margin-top: 32px;}
.bubbles { background-color: #f58220; width:20px; height: 20px; margin:2px; border-radius:100%; display:inline-block; }
#cf-wrapper #challenge-form { padding-top:25px; padding-bottom:25px; }
#cf-hcaptcha-container { text-align:center;}
#cf-hcaptcha-container iframe { display: inline-block;}
@keyframes fader { 0% {opacity: 0.2;} 50% {opacity: 1.0;} 100% {opacity: 0.2;} }
#cf-wrapper #cf-bubbles { width:69px; }
@-webkit-keyframes fader { 0% {opacity: 0.2;} 50% {opacity: 1.0;} 100% {opacity: 0.2;} }
#cf-bubbles > .bubbles { animation: fader 1.6s infinite;}
#cf-bubbles > .bubbles:nth-child(2) { animation-delay: .2s;}
#cf-bubbles > .bubbles:nth-child(3) { animation-delay: .4s;}
</style>
</head>
<body>
<div id="cf-wrapper">
<div class="cf-alert cf-alert-error cf-cookie-error" id="cookie-alert" data-translate="enable_cookies">Please enable cookies.</div>
<div id="cf-error-details" class="cf-error-details-wrapper">
<div class="cf-wrapper cf-header cf-error-overview">
<h1 data-translate="managed_challenge_headline">Please wait...</h1>
<h2 class="cf-subheadline"><span data-translate="managed_checking_msg">We are checking your browser...</span> aternos.org</h2>
</div>
<div class="cf-section cf-highlight cf-captcha-container">
<div class="cf-wrapper">
<div class="cf-columns two">
<div class="cf-column">
<div class="cf-highlight-inverse cf-form-stacked">
<form id="challenge-form" class="challenge-form managed-form" action="/go?__cf_chl_f_tk=kWYv_tRIC92k7YH4y2DjHk8cG6NxqiqrNFuOqRK8Fu8-1658729119-0-gaNycGzNBz0" method="POST" enctype="application/x-www-form-urlencoded">
<div id='cf-please-wait'>
<div id='spinner'>
<div id="cf-bubbles">
<div class="bubbles"></div>
<div class="bubbles"></div>
<div class="bubbles"></div>
</div>
</div>
<p data-translate="please_wait" id="cf-spinner-please-wait">Please stand by, while we are checking your browser...</p>
<p data-translate="redirecting" id="cf-spinner-redirecting" style="display:none">Redirecting...</p>
</div>
<input type="hidden" name="md" value="T6wr36AHkZr.tdd5QxXPJrVtTgcyCpukpRLSzX85Gzk-1658729119-0-AU4sNK9LmhvpfyhJLVO7FKIPeYP_q3546w6eGP7CX052pVBKRgGfi-poBX5lJAZZcAPVBiTFTpgIeKds3zgvp3OGHEyRSd2gVvkihOGIKGfWDGzfU55OWKcWu1XyWfsVsPLlYO4Bg5Zo3T-FZYUED9JzDelnUiXRLM_3f1iPri0S8FWbOqRSCj-Z4BUcyfSMROTy8Hj6-yABkTMjs1ECZTmuxO6_fpsMJPAud_OMEtmkU0A8DynsAtnC2XlUi23dmFtHMauAjkmytX7z7qrn84yHUwlnHZYbmJgFHBR8zdx-eVd6S0Doo_teYrfIo85WNJM0BeffpD4pm4VqgMKYDit8hIjEJlib1hkTBh0Cxf1nynItdJScxtCHGEVa16hUzuPjsVRbMxoRCSyv3UqiXwBl-udM1bVxojRG6d4Kj_2LlVPObyd8yjzZlam1XECKKMYrHZ_dkBpm3VP0ajAOkluTdNh5nomG69I84OFMvpu0AiCAwIJ_qpzTENd4MVcdhG9WP_q5Gt1RAnN3VZovn4bxlI9z2OqkZBSHwJiN8dF7FfGTApWRJC9PMr8sE5TApZp9oLOHQQf8vBfOfiTkXDG3mAlQqx7kR7rasY2rqKY9ri2DwpjEKGd00g4kTOog5Q5dSyxPgrwtsjoCsb7Qc39xrDgUe2TMFuQT4KoK68iHuIlpwYAoJnvmwug0JJskrOT9dhysZqltW1Zd33U-VscHz61AemfCf4tzUcw6pBJN" />
<input type="hidden" name="r" value="oM2Fs_qXpHUduJSVgh0NNJ3XouhS8gVWmHtP3msVTqs-1658729119-0-AXx9R5jwWxuaTLY5Aax4BBWmdiAWJi5GKnFzi48pJJoEDEQFz3IIplivlgEYkN2FJ6jt1bGleQXzENqiuLfpt77bDU6WrHC4Xwq+pOESEAT4l5SwdxEGAXWNHD0beMlnQ9aJi7CP+hVzsKaVSFHHZtynSwsttTKA3g9zM/0xWg/uS0N3BH2MDheTw/ekLRDw81hqcfIxJDwgg1cHsKYHwui+X5aZ6eMP4eoSmZHwQ7NHnK16OAQnsnZHzDjoZ08O9lAqQevKheWJLtBRHFM0AjUtIcFWIIs5kOFT35JhviNGQjo9882odJMTG0dS0MPCjQ1L4d0bBqiC9yfMDDrhffeOzMA+tVpkvmkov21LtaydRz1VxeuKtsfTH1SFV6ZYld2xCwGKdRDHP770hg3FnEwvSSvT1oowWbASOi1jc261eVRiVCOL6EHvVdimglMSurBxJCSUSbxyCsaNotvMO8sQKEeA7aUpSmCZ8f7t/Dd3KRKMsP0eww5pqiEZ38oIVVXzXxoMFrcAp4Ogqk+osR0l56TQChKU0n4ehg2v+8n6HQL2Ed2KXBWdgB2nQJFrnJ4HCIWeGM8oxxfc8j7goSZpi43ErYgcs4NzyWjIH+lrD7SVCHUJtC4yxa1ZNkTKiDQftL4eyY3+A6EOTgh+SW+fzBRmubB+N/ieEX1hlWqeTLCJzYmU+GFoaGyBVxYLuRmqCiT79PRe5tcWtZPZjUtuHCCsJ3wlWhtRu1Pe+wXxBeOVyO4Ky9Ho2a6beibQ2H+9gmLJUjAtiIRp2Czls/Ubgrx4h+0kLHI0LvZogLeuR0i9XfpFS+hlY/PtufcKeZzpJni2XQxss9XJHhu2Np6hj4yN24e135e8c+j9saDhVrcp1Szx5a1PT8icv2ry+nKx8xmi2CXvG085aj3eGlBeYG4PiNeRBEYD1kdNx3puO6A+te+F5167mjV+X4pDiMZsHsCpxogUuNvcwfUm1UiIhGIy5rXOhoOk/6RoIrw5i5j1FRCmunzG/CTf5J2rO5lyivun1+OohJRrKt3W4n13z8g0FgDcIfuhJbRJLisHt/W2cWj1vflf3HcMrFYaNasP+ixc0EpzpDdsgQTlYaRZhZzWkJ81EIqmMkJIsNw869S/L+H+huTdsoe1MqFVrH4zYJnjnlUyfjE71bWSBDQDxOgL4+q1ET3q/oy5nJlS47QsSt2i9PpHH2u4DY3e6fuXsmyGgV39VtNDwQWzONRdILXDP/5a3BPf4gZDolAYD8xfKBDQAQ/g6h0kEYlJ2LN56DJG2pjWzV195qm9SQmKJwpxghiJxCQeWgj+6g+IFp1BD+6FUTM6mQLldL9AmnEcA0E/plpTJYxahl48d1//zu1yAmdKYReLfCbn2DVqLBiG1m+On9e0FjO/cSLMN16qUFe+PuA3+4+30c7ahr0Gl2f9gKGNaozoFOnjEYm1/MYw65BHPu+/curMrU+PsFK7KPmT9lmUw+SR+XMTW0IL+uRuVFuGAkXmeflC1cwlawQRupaTSStxadeWPI9YA3X6gSItKECQEcOaPII4p0bGoNbHiKYNtcOOFMGb4HO4WolF7IuqsGH7S8C5GK8ynVq8RT9/Qb+xe0wVI4fqIHjFSTW0ZyCcJnw5O5bXBwez/hrkmflM7g1G3G7Dm3bSH5EnnLXOsd1cWQfxb3sKA6nmnQGwnCE6HsTZUPhgsOuHfXAG8phyC/zMhyEONnOrBMhLNpSwHlLx86LtbB7OYk+eDn2gI61fjS8u1qh7wBRc/mJIq7W9JC5hh4l32VUWGg==">
<input type="hidden" name="vc" value="b1c5dfed78a36a966ad5eb7fc136a067">
<noscript id="cf-captcha-bookmark" class="cf-captcha-info">
<h1 data-translate="turn_on_js" style="color:#bd2426;">Please turn JavaScript on and reload the page.</h1>
</noscript>
<div id="no-cookie-warning" class="cookie-warning" data-translate="turn_on_cookies" style="display:none">
<p data-translate="turn_on_cookies" style="color:#bd2426;">Please enable Cookies and reload the page.</p>
</div>
<script>
//<![CDATA[
var a = function() {try{return !!window.addEventListener} catch(e) {return !1} },
b = function(b, c) {a() ? document.addEventListener("DOMContentLoaded", b, c) : document.attachEvent("onreadystatechange", b)};
b(function(){
var cookiesEnabled=(navigator.cookieEnabled)? true : false;
if(!cookiesEnabled){
var q = document.getElementById('no-cookie-warning');q.style.display = 'block';
}
});
//]]>
</script>
<div id="trk_captcha_js" style="background-image:url('/cdn-cgi/images/trace/captcha/nojs/transparent.gif?ray=7302b406cd228696')"></div>
</form>
<script>
//<![CDATA[
(function(){
var isIE = /(MSIE|Trident\/|Edge\/)/i.test(window.navigator.userAgent);
var trkjs = isIE ? new Image() : document.createElement('img');
trkjs.setAttribute("src", "/cdn-cgi/images/trace/managed/js/transparent.gif?ray=7302b406cd228696");
trkjs.id = "trk_managed_js";
trkjs.setAttribute("alt", "");
document.body.appendChild(trkjs);
var cpo=document.createElement('script');
cpo.type='text/javascript';
cpo.src = '/cdn-cgi/challenge-platform/h/b/orchestrate/managed/v1?ray=7302b406cd228696';
window._cf_chl_opt.cOgUHash = location.hash === '' && location.href.indexOf('#') !== -1 ? '#' : location.hash;
window._cf_chl_opt.cOgUQuery = location.search === '' && location.href.slice(0, -window._cf_chl_opt.cOgUHash.length).indexOf('?') !== -1 ? '?' : location.search;
if (window._cf_chl_opt.cUPMDTk && window.history && window.history.replaceState) {
var ogU = location.pathname + window._cf_chl_opt.cOgUQuery + window._cf_chl_opt.cOgUHash;
history.replaceState(null, null, "\/go?__cf_chl_rt_tk=kWYv_tRIC92k7YH4y2DjHk8cG6NxqiqrNFuOqRK8Fu8-1658729119-0-gaNycGzNBz0" + window._cf_chl_opt.cOgUHash);
cpo.onload = function() {
history.replaceState(null, null, ogU);
};
}
document.getElementsByTagName('head')[0].appendChild(cpo);
}());
//]]>
</script>
</div>
</div>
<div class="cf-column">
<div class="cf-screenshot-container">
<span class="cf-no-screenshot"></span>
</div>
</div>
</div>
</div>
</div>
<div class="cf-section cf-wrapper">
<div class="cf-columns two">
<div class="cf-column">
<h2 data-translate="why_captcha_headline">Why do I have to complete a CAPTCHA?</h2>
<p data-translate="why_captcha_detail">Completing the CAPTCHA proves you are a human and gives you temporary access to the web property.</p>
</div>
<div class="cf-column">
<h2 data-translate="resolve_captcha_headline">What can I do to prevent this in the future?</h2>
<p data-translate="resolve_captcha_antivirus">If you are on a personal connection, like at home, you can run an anti-virus scan on your device to make sure it is not infected with malware.</p>
<p data-translate="resolve_captcha_network">If you are at an office or shared network, you can ask the network administrator to run a scan across the network looking for misconfigured or infected devices.</p>
</div>
</div>
</div>
<div class="cf-error-footer cf-wrapper w-240 lg:w-full py-10 sm:py-4 sm:px-8 mx-auto text-center sm:text-left border-solid border-0 border-t border-gray-300">
<p class="text-13">
<span class="cf-footer-item sm:block sm:mb-1">Cloudflare Ray ID: <strong class="font-semibold">7302b406cd228696</strong></span>
<span class="cf-footer-separator sm:hidden">&bull;</span>
<span id="cf-footer-item-ip" class="cf-footer-item hidden sm:block sm:mb-1">
Your IP:
<button type="button" id="cf-footer-ip-reveal" class="cf-footer-ip-reveal-btn">Click to reveal</button>
<span class="hidden" id="cf-footer-ip">35.239.37.39</span>
<span class="cf-footer-separator sm:hidden">&bull;</span>
</span>
<span class="cf-footer-item sm:block sm:mb-1"><span>Performance &amp; security by</span> <a rel="noopener noreferrer" href="https://www.cloudflare.com/5xx-error-landing" id="brand_link" target="_blank">Cloudflare</a></span>
</p>
<script>(function(){function d(){var b=a.getElementById("cf-footer-item-ip"),c=a.getElementById("cf-footer-ip-reveal");b&&"classList"in b&&(b.classList.remove("hidden"),c.addEventListener("click",function(){c.classList.add("hidden");a.getElementById("cf-footer-ip").classList.remove("hidden")}))}var a=document;document.addEventListener&&a.addEventListener("DOMContentLoaded",d)})();</script>
</div><!-- /.error-footer -->
</div>
</div>
<script>
window._cf_translation = {};
</script>
</body>
</html>

View file

@ -97,10 +97,8 @@ so you don't need to do it by yourself now.
Before, you should save session manually:
```python
# ****
# This code is useless in new versions,
# because they do it automatically.
# ****
from python_aternos import Client
@ -121,13 +119,13 @@ myserv = at.list_servers()[0]
...
```
Function `save_session()` writes the session cookie and the cached servers list to `.aternos` file in your home directory.
`restore_session()` creates a Client object from the session cookie and restores the servers list.
Function `save_session()` writes session cookie and cached servers list to `.aternos` file in your home directory.
`restore_session()` creates Client object from session cookie and restores servers list.
This feature reduces the count of network requests and allows you to log in and request servers much faster.
If you have created a new server, but it doesn't appear in `list_servers` result, call it with `cache=False` argument.
If you created a new server, but it doesn't appear in `list_servers` result, call it with `cache=False` argument.
```python
# Refresh the list
# Refreshing list
servers = at.list_servers(cache=False)
```
@ -169,7 +167,7 @@ at.change_password_hashed(my_passwd, new_passwd)
## Two-Factor Authentication
2FA is a good idea if you think that the password
is not enough to protect your account.
It was recently added to python-aternos.
It has been recently added to python-aternos.
### Log in with code
Here's how to log in to an account:
@ -200,12 +198,12 @@ Also, the library allows to enable it.
>>> response
{'qrcode': '...', 'secret': '7HSM...'}
```
As you can see, Aternos responds with
a QR code picture encoded in base64
As you can see, Aternos responses with
QR code picture encoded in base64
and a plain secret code.
- Enter the secret code into your 2FA application
**OR** save the QR into a file:
- Enter this code into your 2FA application
**or** save the QR into a file:
```python
>>> qr = response.get('qrcode', '')
>>> at.save_qr(qr, 'test.png')

View file

@ -38,7 +38,7 @@ but I chose an easier name for the class.)
- `FileType.dir` and `FileType.directory`
- `size` - File size in bytes, float.
`0.0` for directories and
`-1.0` when an error occurs.
`-1.0` when error occures.
- `deleteable`, `downloadable` and `editable` are explained in the next section.
### File
@ -208,23 +208,18 @@ def read():
def write(content):
# set_text and set_content
# uses the same URLs,
# so there's no point in checking
# if the file is editable/downloadable
# uses the same URLs.
# I prefer set_content
# Code for set_text:
#ops.set_text(content)
# Code for set_content:
# Convert the str to bytes
# but we need to convert content to bytes
content = content.encode('utf-8')
# Edit
ops.set_content(content)
# ops.json contains an empty list [] by default
# It contains empty list [] by default
oper_raw = read()
# Convert it to a Python list
# Convert to Python list
oper_lst = json.loads(oper_raw)
# Add an operator

View file

@ -1,6 +1,6 @@
# How-To 3: Players lists
You can add a player to operators,
include into the whitelist or ban him
include in the whitelist or ban
using this feature.
## Common usage
@ -33,7 +33,7 @@ For example, I want to ban someone:
serv.players(Lists.ban).add('someone')
```
And give myself the operator rights:
And give myself operator rights:
```python
serv.players(Lists.ops).add('DarkCat09')
```

View file

@ -1,6 +1,6 @@
# How-To 2: Controlling Minecraft server
In the previous part we've logged into an account and have started a server.
In the previous part we logged into account and started a server.
But python-aternos can do much more.
## Basic methods
@ -32,7 +32,7 @@ serv.confirm()
`start()` can be called with arguments:
- headstart (bool): Start server in headstart mode
which allows you to skip all the queue.
which allows you to skip all queue.
- accepteula (bool): Automatically accept Mojang EULA.
If you want to launch your server instantly, use this code:
@ -45,18 +45,18 @@ serv.start(headstart=True)
This object contains an error code, on which depends an error message.
- EULA was not accepted (code: `eula`) -
remove `accepteula=False` or run `serv.eula()` before the server startup.
remove `accepteula=False` or run `serv.eula()` before startup.
- Server is already running (code: `already`) -
you don't need to start the server, it is online.
you don't need to start server, it is online.
- Incorrect software version installed (code: `wrongversion`) -
if you have *somehow* installed non-existent software version (e.g. `Vanilla 2.16.5`).
- File server is unavailable (code: `file`) -
problems on Aternos servers, view [https://status.aternos.gmbh](https://status.aternos.gmbh)
problems in Aternos servers, view [https://status.aternos.gmbh](https://status.aternos.gmbh)
- Available storage size limit has been reached (code: `size`) -
files on your Minecraft server have reached 4GB limit
(for exmaple, too much mods or loaded chunks).
Always wrap `start` into try-except.
Always wrap `start` into try-catch.
```python
from python_aternos import ServerStartError
@ -72,7 +72,7 @@ except ServerStartError as err:
## Cancellation
Server launching can be cancelled only when you are waiting in a queue.
After queue, when the server starts and writes something to the log,
you can just `stop()` it, **not** `cancel()`.
you can just `stop()` it, not `cancel()`.
## Server info
```python
@ -130,7 +130,7 @@ False
>>> serv.restart()
# Title on the web site: "Loading"
# Title on web site: "Loading"
>>> serv.css_class
'loading'
>>> serv.status
@ -144,7 +144,7 @@ False
>>> serv.status_num == Status.starting
False
# Title on the web site: "Preparing"
# Title on web site: "Preparing"
>>> serv.css_class
'loading'
>>> serv.status
@ -158,7 +158,7 @@ False
>>> serv.status_num == Status.on
False
# Title on the web site: "Starting"
# Title on web site: "Starting"
>>> serv.css_class
'loading starting'
>>> serv.status
@ -175,7 +175,7 @@ False
```
## Changing subdomain and MOTD
To change the server's subdomain or Message-of-the-Day,
To change server subdomain or Message-of-the-Day,
just assign a new value to the corresponding fields:
```python
serv.subdomain = 'new-test-server123'
@ -183,13 +183,13 @@ serv.motd = 'Welcome to the New Test Server!'
```
## Updating status
Python-Aternos don't refresh server information by default.
This can be done with [WebSockets API](/howto/websocket) automatically
python-aternos don't refresh server information by default.
This can be done with [WebSockets API](websocket) automatically
(but it will be explained later in the 6th part of how-to guide),
or with `fetch()` method manually (much easier).
`fetch()` is also called when an AternosServer object is created
to get this info about the server:
`fetch()` called also when an AternosServer object is created
to get info about the server:
- full address,
- MOTD,
@ -198,7 +198,7 @@ to get this info about the server:
- status,
- etc.
Use it if you want to see the new data *one time*:
Use it if you want to see new data one time:
```python
import time
from python_aternos import Client
@ -214,44 +214,6 @@ time.sleep(10)
serv.fetch()
print('Server is', serv.status) # Server is online
```
But this method is **not** a good choice if you want to get *real-time* updates.
Read [How-To 6: Real-time updates](/howto/websocket) about WebSockets API
But this method is **not** a good choice if you want to get real-time updates.
Read [How-To 6: Real-time updates](websocket) about WebSockets API
and use it instead of refreshing data in a while-loop.
## Countdown
Aternos stops a server when there are no players connected.
You can get the remained time in seconds using `serv.countdown`.
For example:
```python
# Start
serv.start()
# Get the countdown value
print(serv.countdown, 'seconds')
# -1 seconds
# means "null" in countdown field
# Wait for start up
time.sleep(10)
# Refresh info
serv.fetch()
# Get countdown value
print(serv.countdown, 'seconds')
# 377 seconds
# Check if countdown changes
time.sleep(10)
serv.fetch()
print(serv.countdown, 'seconds')
# 367 seconds
# ---
# Convert to minutes and seconds
mins, secs = divmod(serv.countdown, 60)
print(f'{mins}:{secs:02}') # 6:07
# OR
cd = serv.countdown
print(f'{cd // 60}:{cd % 60:02}') # 6:07
```

View file

@ -83,7 +83,7 @@ aternos = Client.from_credentials('example', 'test123')
# ----OR----
aternos = Client.from_hashed('example', 'cc03e747a6afbbcbf8be7668acfebee5')
# ----OR----
aternos = Client.from_session('ATERNOS_SESSION cookie')
aternos = Client.restore_session()
# Returns AternosServer list
servs = aternos.list_servers()
@ -103,7 +103,7 @@ for serv in servs:
testserv = serv
if testserv is not None:
# Prints the server software and its version
# Prints a server softaware and its version
# (for example, "Vanilla 1.12.2")
print(testserv.software, testserv.version)
# Starts server
@ -129,8 +129,8 @@ if testserv is not None:
|v1.1.x|Documentation, unit tests, pylint, bugfixes, changes in atwss.|
|**v1.1.2/v2.0.0**|Solution for [#25](https://github.com/DarkCat09/python-aternos/issues/25) (Cloudflare bypassing), bugfixes in JS parser.|
|v2.0.x|Documentation, automatically saving/restoring session, improvements in Files API.|
|v2.1.x|Fixes in websockets API, atconnect (including cookie refreshing fix). Supported captcha solving services (view [#52](https://github.com/DarkCat09/python-aternos/issues/52)).|
|**v2.2.x**|Node.JS interpreter support.|
|v2.1.x|Fixes in the implementation of websockets API.|
|**v2.2.x**|Using Node.js as a JS interpreter if it's installed.|
|v3.0.x|Full implementation of config and software API.|
|v3.1.x|Shared access API and Google Drive backups.|

View file

@ -6,10 +6,7 @@ from python_aternos import Client, atwss
user = input('Username: ')
pswd = getpass('Password: ')
resp = input('Show responses? ').upper() == 'Y'
atclient = Client()
aternos = atclient.account
atclient.login(user, pswd)
aternos = Client.from_credentials(user, pswd)
s = aternos.list_servers()[0]
socket = s.wss()

View file

@ -3,10 +3,7 @@ from python_aternos import Client
user = input('Username: ')
pswd = getpass('Password: ')
atclient = Client()
aternos = atclient.account
atclient.login(user, pswd)
aternos = Client.from_credentials(user, pswd)
s = aternos.list_servers()[0]
files = s.files()
@ -37,11 +34,6 @@ while True:
print('\t' + file.name)
if cmd == 'world':
file_w = files.get_file('/world')
if file_w is None:
print('Cannot create /world directory object')
continue
file = files.get_file('/world')
with open('world.zip', 'wb') as f:
f.write(file_w.get_content())
f.write(file.get_content())

View file

@ -3,18 +3,11 @@ from python_aternos import Client, atserver
user = input('Username: ')
pswd = getpass('Password: ')
atclient = Client()
aternos = atclient.account
atclient.login(user, pswd)
aternos = Client.from_credentials(user, pswd)
srvs = aternos.list_servers()
for srv in srvs:
print()
print('***', srv.servid, '***')
srv.fetch()
print(srv.domain)
print('*** ' + srv.domain + ' ***')
print(srv.motd)
print('*** Status:', srv.status)
print('*** Full address:', srv.address)
@ -23,5 +16,3 @@ for srv in srvs:
print('*** Minecraft:', srv.software, srv.version)
print('*** IsBedrock:', srv.edition == atserver.Edition.bedrock)
print('*** IsJava:', srv.edition == atserver.Edition.java)
print()

View file

@ -3,10 +3,7 @@ from python_aternos import Client
user = input('Username: ')
pswd = getpass('Password: ')
atclient = Client()
aternos = atclient.account
atclient.login(user, pswd)
aternos = Client.from_credentials(user, pswd)
srvs = aternos.list_servers()
print(srvs)

View file

@ -1,51 +1,34 @@
import asyncio
import logging
from getpass import getpass
from python_aternos import Client, atwss
from typing import Tuple, Dict, Any
from python_aternos import Client, Streams
# Request credentials
user = input('Username: ')
pswd = getpass('Password: ')
# Instantiate Client
atclient = Client()
aternos = atclient.account
# Enable debug logging
logs = input('Show detailed logs? (y/n) ').strip().lower() == 'y'
if logs:
atclient.debug = True
logging.basicConfig(level=logging.DEBUG)
# Authenticate
atclient.login(user, pswd)
aternos = Client.from_credentials(user, pswd)
server = aternos.list_servers()[0]
socket = server.wss()
s = aternos.list_servers()[0]
socket = s.wss()
# Handler for console messages
@socket.wssreceiver(Streams.console, ('Server 1',)) # type: ignore
async def console(
msg: Dict[Any, Any],
args: Tuple[str]) -> None:
@socket.wssreceiver(atwss.Streams.console, 'Server 1')
async def console(msg, args):
print(args[0], 'received', msg)
# Main function
async def main() -> None:
server.start()
async def main():
s.start()
await socket.connect()
await asyncio.create_task(loop())
# Keepalive
async def loop() -> None:
async def loop():
while True:
await asyncio.Future()
await asyncio.sleep(1)
asyncio.run(main())

View file

@ -4,10 +4,7 @@ from python_aternos import Client, atwss
user = input('Username: ')
pswd = getpass('Password: ')
atclient = Client()
aternos = atclient.account
atclient.login(user, pswd)
aternos = Client.from_credentials(user, pswd)
s = aternos.list_servers()[0]
socket = s.wss()

View file

@ -1,70 +0,0 @@
import asyncio
from getpass import getpass
from typing import Tuple, Dict, Any
from python_aternos import Client, Streams
# Request credentials
user = input('Username: ')
pswd = getpass('Password: ')
# Instantiate Client
atclient = Client()
aternos = atclient.account
# Enable debug logging
logs = input('Show detailed logs? (y/n) ').strip().lower() == 'y'
if logs:
atclient.debug = True
# Authenticate
atclient.login(user, pswd)
server = aternos.list_servers()[0]
socket = server.wss()
# Handler for server status
@socket.wssreceiver(Streams.status, ('Server 1',)) # type: ignore
async def state(
msg: Dict[Any, Any],
args: Tuple[str]) -> None:
# For debugging
print(args[0], 'received', len(msg), 'symbols')
# Write new info dictionary
server._info = msg
# Server 1 test is online
print(
args[0],
server.subdomain,
'is',
server.status
)
# Server 1 players: ['DarkCat09', 'someone']
print(
args[0],
'players:',
server.players_list
)
# Main function
async def main() -> None:
server.start()
await socket.connect()
await asyncio.create_task(loop())
# Keepalive
async def loop() -> None:
while True:
await asyncio.Future()
asyncio.run(main())

BIN
logo/aternos.xcf Normal file

Binary file not shown.

Binary file not shown.

Before

Width:  |  Height:  |  Size: 7.9 KiB

Binary file not shown.

Before

Width:  |  Height:  |  Size: 24 KiB

261
pylintrc
View file

@ -1,10 +1,9 @@
[MAIN]
analyse-fallback-blocks=no
clear-cache-post-run=no
extension-pkg-allow-list=
extension-pkg-whitelist=
fail-on=
fail-under=10
fail-under=10.0
ignore=CVS
ignore-paths=
ignore-patterns=^\.#
@ -18,97 +17,13 @@ recursive=no
suggestion-mode=yes
unsafe-load-any-extension=no
[BASIC]
argument-naming-style=snake_case
attr-naming-style=snake_case
bad-names=foo,
bar,
baz,
toto,
tutu,
tata
bad-names-rgxs=
class-attribute-naming-style=any
class-const-naming-style=any
class-naming-style=PascalCase
const-naming-style=UPPER_CASE
docstring-min-length=10
function-naming-style=snake_case
good-names=i,
j,
k,
f,
s,
js,
ex,
Run,
_
good-names-rgxs=
include-naming-hint=no
inlinevar-naming-style=any
method-naming-style=snake_case
module-naming-style=snake_case
name-group=
no-docstring-rgx=^_
property-classes=abc.abstractproperty
variable-naming-style=snake_case
[CLASSES]
check-protected-access-in-special-methods=no
defining-attr-methods=__init__,
__new__,
setUp,
__post_init__
exclude-protected=_asdict,
_fields,
_replace,
_source,
_make
valid-classmethod-first-arg=cls
valid-metaclass-classmethod-first-arg=mcs
[REPORTS]
evaluation=max(0, 0 if fatal else 10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10))
msg-template=
reports=no
score=yes
[DESIGN]
exclude-too-few-public-methods=
ignored-parents=
max-args=10
max-attributes=10
max-bool-expr=5
max-branches=12
max-locals=20
max-parents=7
max-public-methods=31
max-returns=6
max-statements=50
min-public-methods=2
[EXCEPTIONS]
overgeneral-exceptions=builtins.BaseException,builtins.Exception
[FORMAT]
expected-line-ending-format=
ignore-long-lines=^\s*(# )?<?https?://\S+>?$
indent-after-paren=4
indent-string=' '
max-line-length=100
max-module-lines=1000
single-line-class-stmt=no
single-line-if-stmt=no
[IMPORTS]
allow-any-import-level=
allow-reexport-from-package=no
allow-wildcard-with-all=no
deprecated-modules=
ext-import-graph=
import-graph=
int-import-graph=
known-standard-library=
known-third-party=enchant
preferred-modules=
[LOGGING]
logging-format-style=old
logging-modules=logging
[MESSAGES CONTROL]
confidence=HIGH,
@ -124,27 +39,13 @@ disable=raw-checker-failed,
useless-suppression,
deprecated-pragma,
use-symbolic-message-instead,
no-member
no-member,
too-many-arguments,
too-many-public-methods,
too-many-instance-attributes,
too-many-locals
enable=c-extension-no-member
[METHOD_ARGS]
timeout-methods=requests.api.delete,requests.api.get,requests.api.head,requests.api.options,requests.api.patch,requests.api.post,requests.api.put,requests.api.request
[MISCELLANEOUS]
notes=FIXME,
XXX,
TODO
notes-rgx=
[REFACTORING]
max-nested-blocks=5
never-returning-functions=sys.exit,argparse.parse_error
[REPORTS]
evaluation=max(0, 0 if fatal else 10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10))
msg-template=
reports=no
score=yes
[SIMILARITIES]
ignore-comments=yes
@ -153,6 +54,129 @@ ignore-imports=yes
ignore-signatures=yes
min-similarity-lines=4
[MISCELLANEOUS]
notes=FIXME,
XXX,
TODO
notes-rgx=
[DESIGN]
exclude-too-few-public-methods=
ignored-parents=
max-args=5
max-attributes=7
max-bool-expr=5
max-branches=12
max-locals=15
max-parents=7
max-public-methods=20
max-returns=6
max-statements=50
min-public-methods=2
[STRING]
check-quote-consistency=no
check-str-concat-over-line-jumps=no
[CLASSES]
check-protected-access-in-special-methods=no
defining-attr-methods=__init__,
__new__,
setUp,
__post_init__
exclude-protected=_asdict,
_fields,
_replace,
_source,
_make
valid-classmethod-first-arg=cls
valid-metaclass-classmethod-first-arg=cls
[FORMAT]
expected-line-ending-format=
ignore-long-lines=^\s*(# )?<?https?://\S+>?$
indent-after-paren=4
indent-string=' '
max-line-length=100
max-module-lines=1000
single-line-class-stmt=no
single-line-if-stmt=no
[IMPORTS]
allow-any-import-level=
allow-wildcard-with-all=no
deprecated-modules=
ext-import-graph=
import-graph=
int-import-graph=
known-standard-library=
known-third-party=enchant
preferred-modules=
[VARIABLES]
additional-builtins=
allow-global-unused-variables=yes
allowed-redefined-builtins=
callbacks=cb_,
_cb
dummy-variables-rgx=_+$|(_[a-zA-Z0-9_]*[a-zA-Z0-9]+?$)|dummy|^ignored_|^unused_
ignored-argument-names=_.*|^ignored_|^unused_
init-import=no
redefining-builtins-modules=six.moves,past.builtins,future.builtins,builtins,io
[LOGGING]
logging-format-style=old
logging-modules=logging
[EXCEPTIONS]
overgeneral-exceptions=BaseException,
Exception
[BASIC]
argument-naming-style=snake_case
attr-naming-style=snake_case
bad-names=foo,
bar,
baz,
toto,
tutu,
tata
bad-names-rgxs=
class-attribute-naming-style=any
class-const-naming-style=any
class-naming-style=PascalCase
const-naming-style=UPPER_CASE
docstring-min-length=-1
function-naming-style=snake_case
good-names=i,
j,
k,
f,
s,
ex,
Run,
_
good-names-rgxs=
include-naming-hint=no
inlinevar-naming-style=any
method-naming-style=snake_case
module-naming-style=snake_case
name-group=
no-docstring-rgx=^_
property-classes=abc.abstractproperty
variable-naming-style=snake_case
[SPELLING]
max-spelling-suggestions=4
spelling-dict=
@ -161,9 +185,6 @@ spelling-ignore-words=
spelling-private-dict-file=
spelling-store-unknown-words=no
[STRING]
check-quote-consistency=no
check-str-concat-over-line-jumps=no
[TYPECHECK]
contextmanager-decorators=contextlib.contextmanager
@ -181,13 +202,7 @@ missing-member-max-choices=1
mixin-class-rgx=.*[Mm]ixin
signature-mutators=
[VARIABLES]
additional-builtins=
allow-global-unused-variables=yes
allowed-redefined-builtins=
callbacks=cb_,
_cb
dummy-variables-rgx=_+$|(_[a-zA-Z0-9_]*[a-zA-Z0-9]+?$)|dummy|^ignored_|^unused_
ignored-argument-names=_.*|^ignored_|^unused_
init-import=no
redefining-builtins-modules=six.moves,past.builtins,future.builtins,builtins,io
[REFACTORING]
max-nested-blocks=5
never-returning-functions=sys.exit,argparse.parse_error

View file

@ -1,11 +1,50 @@
"""Init"""
"""
Unofficial Aternos API module written in Python.
It uses Aternos' private API and html parsing"""
from .atclient import Client
from .atserver import AternosServer
from .atserver import Edition
from .atserver import Status
from .atconnect import AternosConnect
from .atplayers import PlayersList
from .atplayers import Lists
from .atconf import AternosConfig
from .atconf import ServerOpts
from .atconf import WorldOpts
from .atconf import WorldRules
from .atconf import Gamemode
from .atconf import Difficulty
from .atwss import AternosWss
from .atwss import Streams
from .atjsparse import Js2PyInterpreter
from .atjsparse import NodeInterpreter
from .atfm import FileManager
from .atfile import AternosFile
from .atfile import FileType
from .aterrors import AternosError
from .aterrors import CloudflareError
from .aterrors import CredentialsError
from .aterrors import TokenError
from .aterrors import ServerError
from .aterrors import ServerStartError
from .aterrors import FileError
from .aterrors import AternosPermissionError
from .atjsparse import exec_js
__all__ = [
'atclient', 'atserver', 'atconnect',
'atplayers', 'atconf', 'atwss',
'atfm', 'atfile',
'aterrors', 'atjsparse',
'Client', 'AternosServer', 'AternosConnect',
'PlayersList', 'AternosConfig', 'AternosWss',
'FileManager', 'AternosFile', 'AternosError',
'CloudflareError', 'CredentialsError', 'TokenError',
'ServerError', 'ServerStartError', 'FileError',
'AternosPermissionError', 'exec_js',
'Edition', 'Status', 'Lists',
'ServerOpts', 'WorldOpts', 'WorldRules',
'Gamemode', 'Difficulty', 'Streams', 'FileType',
]

View file

@ -1,230 +0,0 @@
"""Methods related to an Aternos account
including servers page parsing"""
import re
import base64
from typing import List, Dict
from typing import TYPE_CHECKING
import lxml.html
from .atlog import log
from .atmd5 import md5encode
from .atconnect import AternosConnect
from .atconnect import BASE_URL, AJAX_URL
from .atserver import AternosServer
if TYPE_CHECKING:
from .atclient import Client
ACCOUNT_URL = f'{AJAX_URL}/account'
email_re = re.compile(
r'^[A-Za-z0-9\-_+.]+@[A-Za-z0-9\-_+.]+\.[A-Za-z0-9\-]+$|^$'
)
class AternosAccount:
"""Methods related to an Aternos account
including servers page parsing"""
def __init__(self, atclient: 'Client') -> None:
"""Should not be instantiated manually,
the entrypoint is `atclient.Client`
Args:
atconn (AternosConnect): AternosConnect object
"""
self.atclient = atclient
self.atconn: AternosConnect = atclient.atconn
self.parsed = False
self.servers: List[AternosServer] = []
def list_servers(self, cache: bool = True) -> List[AternosServer]:
"""Parses a servers list
Args:
cache (bool, optional): If the function should use
cached servers list (recommended)
Returns:
List of AternosServer objects
"""
if cache and self.parsed:
return self.servers
serverspage = self.atconn.request_cloudflare(
f'{BASE_URL}/servers/', 'GET'
)
serverstree = lxml.html.fromstring(serverspage.content)
servers = serverstree.xpath(
'//div[@class="server-body"]/@data-id'
)
self.refresh_servers(servers)
# Update session file (add servers)
try:
self.atclient.save_session(self.atclient.saved_session)
except OSError as err:
log.warning('Unable to save servers list to file: %s', err)
return self.servers
def refresh_servers(self, ids: List[str]) -> None:
"""Replaces the cached servers list
creating AternosServer objects by given IDs
Args:
ids (List[str]): Servers unique identifiers
"""
self.servers = []
for s in ids:
servid = s.strip()
if servid == '':
continue
log.debug('Adding server %s', servid)
srv = AternosServer(servid, self.atconn)
self.servers.append(srv)
self.parsed = True
def get_server(self, servid: str) -> AternosServer:
"""Creates a server object from the server ID.
Use this instead of `list_servers` if you know
the server IDentifier
Returns:
AternosServer object
"""
return AternosServer(servid, self.atconn)
def change_username(self, value: str) -> None:
"""Changes a username in your Aternos account
Args:
value (str): New username
"""
self.atconn.request_cloudflare(
f'{ACCOUNT_URL}/username',
'POST', data={'username': value},
sendtoken=True,
)
def change_email(self, value: str) -> None:
"""Changes an e-mail in your Aternos account
Args:
value (str): New e-mail
Raises:
ValueError: If an invalid e-mail address
was passed to the function
"""
if not email_re.match(value):
raise ValueError('Invalid e-mail')
self.atconn.request_cloudflare(
f'{ACCOUNT_URL}/email',
'POST', data={'email': value},
sendtoken=True,
)
def change_password(self, old: str, new: str) -> None:
"""Changes a password in your Aternos account
Args:
old (str): Old password
new (str): New password
"""
self.change_password_hashed(
md5encode(old),
md5encode(new),
)
def change_password_hashed(self, old: str, new: str) -> None:
"""Changes a password in your Aternos account.
Unlike `change_password`, this function
takes hashed passwords as the arguments
Args:
old (str): Old password hashed with MD5
new (str): New password hashed with MD5
"""
self.atconn.request_cloudflare(
f'{ACCOUNT_URL}/password',
'POST', data={
'oldpassword': old,
'newpassword': new,
},
sendtoken=True,
)
def qrcode_2fa(self) -> Dict[str, str]:
"""Requests a secret code and
a QR code for enabling 2FA"""
return self.atconn.request_cloudflare(
f'{ACCOUNT_URL}/secret',
'GET', sendtoken=True,
).json()
def save_qr(self, qrcode: str, filename: str) -> None:
"""Writes a 2FA QR code into a png-file
Args:
qrcode (str): Base64 encoded png image from `qrcode_2fa()`
filename (str): Where the QR code image must be saved.
Existing file will be rewritten.
"""
data = qrcode.removeprefix('data:image/png;base64,')
png = base64.b64decode(data)
with open(filename, 'wb') as f:
f.write(png)
def enable_2fa(self, code: int) -> None:
"""Enables Two-Factor Authentication
Args:
code (int): 2FA code
"""
self.atconn.request_cloudflare(
f'{ACCOUNT_URL}/twofactor',
'POST', data={'code': code},
sendtoken=True,
)
def disable_2fa(self, code: int) -> None:
"""Disables Two-Factor Authentication
Args:
code (int): 2FA code
"""
self.atconn.request_cloudflare(
f'{ACCOUNT_URL}/disbaleTwofactor',
'POST', data={'code': code},
sendtoken=True,
)
def logout(self) -> None:
"""The same as `atclient.Client.logout`"""
self.atclient.logout()

View file

@ -3,105 +3,99 @@ and allows to manage your account"""
import os
import re
from typing import Optional, Type
import hashlib
import logging
from .atlog import log, is_debug, set_debug
from .atmd5 import md5encode
import base64
from .ataccount import AternosAccount
from typing import List, Dict, Optional
import lxml.html
from .atserver import AternosServer
from .atconnect import AternosConnect
from .atconnect import AJAX_URL
from .aterrors import CredentialsError
from .aterrors import TwoFactorAuthError
from . import atjsparse
from .atjsparse import Interpreter
from .atjsparse import Js2PyInterpreter
class Client:
"""Aternos API Client class, object
of which contains user's auth data"""
def __init__(self) -> None:
# Config
self.sessions_dir = '~'
self.js: Type[Interpreter] = Js2PyInterpreter
# ###
self.saved_session = '~/.aternos' # will be rewritten by login()
self.atconn = AternosConnect()
self.account = AternosAccount(self)
def login(
def __init__(
self,
username: str,
password: str,
code: Optional[int] = None) -> None:
"""Log in to your Aternos account
with a username and a plain password
atconn: AternosConnect,
servers: Optional[List[str]] = None) -> None:
"""Aternos API Client class, object
of which contains user's auth data
Args:
username (str): Username
password (str): Plain-text password
code (Optional[int], optional): 2FA code
atconn (AternosConnect):
AternosConnect instance with initialized Aternos session
servers (Optional[List[str]], optional):
List with servers IDs
"""
self.login_hashed(
username,
md5encode(password),
code,
)
self.atconn = atconn
def login_hashed(
self,
self.saved_session = ''
self.parsed = False
self.servers: List[AternosServer] = []
if servers:
self.refresh_servers(servers)
@classmethod
def from_hashed(
cls,
username: str,
md5: str,
code: Optional[int] = None) -> None:
"""Log in to your Aternos account
with a username and a hashed password
code: Optional[int] = None,
sessions_dir: str = '~'):
"""Log in to an Aternos account with
a username and a hashed password
Args:
username (str): Username
md5 (str): Password hashed with MD5
code (int): 2FA code
username (str): Your username
md5 (str): Your password hashed with MD5
code (Optional[int]): 2FA code
sessions_dir (str): Path to the directory
where session will be automatically saved
Raises:
TwoFactorAuthError: If the 2FA is enabled,
but `code` argument was not passed or is incorrect
CredentialsError: If the Aternos backend
returned empty session cookie
(usually because of incorrect credentials)
ValueError: _description_
CredentialsError: If the API didn't
return a valid session cookie
"""
filename = self.session_filename(
username, self.sessions_dir
atconn = AternosConnect()
filename = cls.session_file(
username, sessions_dir
)
try:
self.restore_session(filename)
return cls.restore_session(filename)
except (OSError, CredentialsError):
pass
atjsparse.get_interpreter(create=self.js)
self.atconn.parse_token()
self.atconn.generate_sec()
atconn.parse_token()
atconn.generate_sec()
credentials = {
'username': username,
'user': username,
'password': md5,
}
if code is not None:
credentials['code'] = str(code)
loginreq = self.atconn.request_cloudflare(
f'{AJAX_URL}/account/login',
'POST', data=credentials, sendtoken=True,
loginreq = atconn.request_cloudflare(
'https://aternos.org/panel/ajax/account/login.php',
'POST', data=credentials, sendtoken=True
)
if b'"show2FA":true' in loginreq.content:
@ -112,48 +106,71 @@ class Client:
'Check your username and password'
)
self.saved_session = filename
obj = cls(atconn)
obj.saved_session = filename
try:
self.save_session(filename)
obj.save_session(filename)
except OSError:
pass
def login_with_session(self, session: str) -> None:
"""Log in using ATERNOS_SESSION cookie
return obj
@classmethod
def from_credentials(
cls,
username: str,
password: str,
code: Optional[int] = None,
sessions_dir: str = '~'):
"""Log in to Aternos with a username and a plain password
Args:
session (str): Session cookie value
username (str): Your username
password (str): Your password without any encryption
code (Optional[int]): 2FA code
sessions_dir (str): Path to the directory
where session will be automatically saved
"""
self.atconn.parse_token()
self.atconn.generate_sec()
self.atconn.session.cookies['ATERNOS_SESSION'] = session
def logout(self) -> None:
"""Log out from the Aternos account"""
self.atconn.request_cloudflare(
f'{AJAX_URL}/account/logout',
'GET', sendtoken=True,
md5 = Client.md5encode(password)
return cls.from_hashed(
username, md5, code,
sessions_dir
)
self.remove_session(self.saved_session)
@classmethod
def from_session(
cls,
session: str,
servers: Optional[List[str]] = None):
def restore_session(self, file: str = '~/.aternos') -> None:
"""Restores ATERNOS_SESSION cookie and,
if included, servers list, from a session file
"""Log in to Aternos using a session cookie value
Args:
file (str, optional): Filename
session (str): Value of ATERNOS_SESSION cookie
"""
Raises:
FileNotFoundError: If the file cannot be found
CredentialsError: If the session cookie
(or the file at all) has incorrect format
atconn = AternosConnect()
atconn.session.cookies['ATERNOS_SESSION'] = session
atconn.parse_token()
atconn.generate_sec()
return cls(atconn, servers)
@classmethod
def restore_session(cls, file: str = '~/.aternos'):
"""Log in to Aternos using
a saved ATERNOS_SESSION cookie
Args:
file (str, optional): File where a session cookie was saved
"""
file = os.path.expanduser(file)
log.debug('Restoring session from %s', file)
logging.debug('Restoring session from %s', file)
if not os.path.exists(file):
raise FileNotFoundError()
@ -165,63 +182,44 @@ class Client:
.split('\n')
session = saved[0].strip()
if session == '' or not session.isalnum():
if session == '':
raise CredentialsError(
'Session cookie is invalid or the file is empty'
'Unable to read session cookie, '
'the first line is empty'
)
if len(saved) > 1:
self.account.refresh_servers(saved[1:])
obj = cls.from_session(
session=session,
servers=saved[1:]
)
else:
obj = cls.from_session(session)
self.atconn.session.cookies['ATERNOS_SESSION'] = session
self.saved_session = file
obj.saved_session = file
def save_session(
self,
file: str = '~/.aternos',
incl_servers: bool = True) -> None:
"""Saves an ATERNOS_SESSION cookie to a file
Args:
file (str, optional): File where a session cookie must be saved
incl_servers (bool, optional): If the function
should include the servers IDs in this file
to reduce API requests count on the next restoration
(recommended)
"""
file = os.path.expanduser(file)
log.debug('Saving session to %s', file)
with open(file, 'wt', encoding='utf-8') as f:
f.write(self.atconn.atsession + '\n')
if not incl_servers:
return
for s in self.account.servers:
f.write(s.servid + '\n')
def remove_session(self, file: str = '~/.aternos') -> None:
"""Removes a file which contains
ATERNOS_SESSION cookie saved
with `save_session()`
Args:
file (str, optional): Filename
"""
file = os.path.expanduser(file)
log.debug('Removing session file: %s', file)
try:
os.remove(file)
except OSError as err:
log.warning('Unable to delete session file: %s', err)
return obj
@staticmethod
def session_filename(username: str, sessions_dir: str = '~') -> str:
"""Generates a session file name
def md5encode(passwd: str) -> str:
"""Encodes the given string with MD5
Args:
passwd (str): String to encode
Returns:
Hexdigest hash of the string in lowercase
"""
encoded = hashlib.md5(passwd.encode('utf-8'))
return encoded.hexdigest().lower()
@staticmethod
def session_file(username: str, sessions_dir: str = '~') -> str:
"""Generates session file name
for authenticated user
Args:
username (str): Authenticated user
@ -237,15 +235,254 @@ class Client:
secure = re.sub(
r'[^A-Za-z0-9_-]',
repl, username,
repl, username
)
return f'{sessions_dir}/.at_{secure}'
@property
def debug(self) -> bool:
return is_debug()
def save_session(
self,
file: str = '~/.aternos',
incl_servers: bool = True) -> None:
@debug.setter
def debug(self, state: bool) -> None:
return set_debug(state)
"""Saves an ATERNOS_SESSION cookie to a file
Args:
file (str, optional): File where a session cookie must be saved
incl_servers (bool, optional): If the function
should include the servers IDs to
reduce API requests count (recommended)
"""
file = os.path.expanduser(file)
logging.debug('Saving session to %s', file)
with open(file, 'wt', encoding='utf-8') as f:
f.write(self.atconn.atsession + '\n')
if not incl_servers:
return
for s in self.servers:
f.write(s.servid + '\n')
def remove_session(self, file: str = '~/.aternos') -> None:
"""Removes a file which contains
ATERNOS_SESSION cookie saved
with `save_session()`
Args:
file (str, optional): Filename
"""
file = os.path.expanduser(file)
logging.debug('Removing session file: %s', file)
try:
os.remove(file)
except OSError as err:
logging.warning('Unable to delete session file: %s', err)
def list_servers(self, cache: bool = True) -> List[AternosServer]:
"""Parses a list of your servers from Aternos website
Args:
cache (bool, optional): If the function should use
cached servers list (recommended)
Returns:
List of AternosServer objects
"""
if cache and self.parsed:
return self.servers
serverspage = self.atconn.request_cloudflare(
'https://aternos.org/servers/', 'GET'
)
serverstree = lxml.html.fromstring(serverspage.content)
servers = serverstree.xpath(
'//div[@class="server-body"]/@data-id'
)
self.refresh_servers(servers)
# Update session file (add servers)
try:
self.save_session(self.saved_session)
except OSError as err:
logging.warning('Unable to save servers list to file: %s', err)
return self.servers
def refresh_servers(self, ids: List[str]) -> None:
"""Replaces cached servers list creating
AternosServer objects by given IDs
Args:
ids (List[str]): Servers unique identifiers
"""
self.servers = []
for s in ids:
servid = s.strip()
if servid == '':
continue
logging.debug('Adding server %s', servid)
srv = AternosServer(servid, self.atconn)
self.servers.append(srv)
self.parsed = True
def get_server(self, servid: str) -> AternosServer:
"""Creates a server object from the server ID.
Use this instead of list_servers
if you know the ID to save some time.
Returns:
AternosServer object
"""
return AternosServer(servid, self.atconn)
def logout(self) -> None:
"""Log out from Aternos account"""
self.atconn.request_cloudflare(
'https://aternos.org/panel/ajax/account/logout.php',
'GET', sendtoken=True
)
self.remove_session(self.saved_session)
def change_username(self, value: str) -> None:
"""Changes a username in your Aternos account
Args:
value (str): New username
"""
self.atconn.request_cloudflare(
'https://aternos.org/panel/ajax/account/username.php',
'POST', data={'username': value}, sendtoken=True
)
def change_email(self, value: str) -> None:
"""Changes an e-mail in your Aternos account
Args:
value (str): New e-mail
Raises:
ValueError: If an invalid e-mail address
was passed to the function
"""
email = re.compile(
r'^[A-Za-z0-9\-_+.]+@[A-Za-z0-9\-_+.]+\.[A-Za-z0-9\-]+$|^$'
)
if not email.match(value):
raise ValueError('Invalid e-mail!')
self.atconn.request_cloudflare(
'https://aternos.org/panel/ajax/account/email.php',
'POST', data={'email': value}, sendtoken=True
)
def change_password(self, old: str, new: str) -> None:
"""Changes a password in your Aternos account
Args:
old (str): Old password
new (str): New password
"""
self.change_password_hashed(
Client.md5encode(old),
Client.md5encode(new),
)
def change_password_hashed(self, old: str, new: str) -> None:
"""Changes a password in your Aternos account.
Unlike `change_password`, this function
takes hashed passwords as arguments
Args:
old (str): Old password hashed with MD5
new (str): New password hashed with MD5
"""
self.atconn.request_cloudflare(
'https://aternos.org/panel/ajax/account/password.php',
'POST', data={
'oldpassword': old,
'newpassword': new,
}, sendtoken=True
)
def qrcode_2fa(self) -> Dict[str, str]:
"""Requests a secret code and
a QR code for enabling 2FA"""
return self.atconn.request_cloudflare(
'https://aternos.org/panel/ajax/account/secret.php',
'GET', sendtoken=True
).json()
def save_qr(self, qrcode: str, filename: str) -> None:
"""Writes a 2FA QR code into a png-file
Args:
qrcode (str): Base64 encoded png image from `qrcode_2fa()`
filename (str): Where the QR code image must be saved.
Existing file will be rewritten.
"""
data = qrcode.removeprefix('data:image/png;base64,')
png = base64.b64decode(data)
with open(filename, 'wb') as f:
f.write(png)
def enable_2fa(self, code: int) -> None:
"""Enables Two-Factor Authentication
Args:
code (int): 2FA code
"""
self.atconn.request_cloudflare(
'https://aternos.org/panel/ajax/account/twofactor.php',
'POST', data={
'code': code
}, sendtoken=True
)
def disable_2fa(self, code: int) -> None:
"""Disables Two-Factor Authentication
Args:
code (int): 2FA code
"""
self.atconn.request_cloudflare(
'https://aternos.org/panel/ajax/account/disbaleTwofactor.php',
'POST', data={
'code': code
}, sendtoken=True
)

View file

@ -1,7 +1,5 @@
"""Modifying server and world options"""
# TODO: Still needs refactoring
import enum
import re
@ -10,11 +8,9 @@ from typing import TYPE_CHECKING
import lxml.html
from .atconnect import BASE_URL, AJAX_URL
if TYPE_CHECKING:
from .atserver import AternosServer
DAT_PREFIX = 'Data:'
DAT_GR_PREFIX = 'Data:GameRules:'
@ -115,12 +111,11 @@ class Difficulty(enum.IntEnum):
# checking timezone format
tzcheck = re.compile(r'(^[A-Z]\w+\/[A-Z]\w+$)|^UTC$')
# options types converting
convert = {
'config-option-number': int,
'config-option-select': int,
'config-option-toggle': bool,
'config-option-toggle': bool
}
@ -129,6 +124,7 @@ class AternosConfig:
"""Class for editing server settings"""
def __init__(self, atserv: 'AternosServer') -> None:
"""Class for editing server settings
Args:
@ -139,6 +135,7 @@ class AternosConfig:
self.atserv = atserv
def get_timezone(self) -> str:
"""Parses timezone from options page
Returns:
@ -146,7 +143,7 @@ class AternosConfig:
"""
optreq = self.atserv.atserver_request(
f'{BASE_URL}/options', 'GET'
'https://aternos.org/options', 'GET'
)
opttree = lxml.html.fromstring(optreq)
@ -157,6 +154,7 @@ class AternosConfig:
return tztext.strip()
def set_timezone(self, value: str) -> None:
"""Sets new timezone
Args:
@ -174,12 +172,13 @@ class AternosConfig:
)
self.atserv.atserver_request(
f'{AJAX_URL}/timezone.php',
'https://aternos.org/panel/ajax/timezone.php',
'POST', data={'timezone': value},
sendtoken=True
)
def get_java(self) -> int:
"""Parses Java version from options page
Returns:
@ -187,7 +186,7 @@ class AternosConfig:
"""
optreq = self.atserv.atserver_request(
f'{BASE_URL}/options', 'GET'
'https://aternos.org/options', 'GET'
)
opttree = lxml.html.fromstring(optreq)
imgopt = opttree.xpath(
@ -201,6 +200,7 @@ class AternosConfig:
return int(jdkver)
def set_java(self, value: int) -> None:
"""Sets new Java version
Args:
@ -208,7 +208,7 @@ class AternosConfig:
"""
self.atserv.atserver_request(
f'{AJAX_URL}/image.php',
'https://aternos.org/panel/ajax/image.php',
'POST', data={'image': f'openjdk:{value}'},
sendtoken=True
)
@ -217,6 +217,7 @@ class AternosConfig:
# server.properties
#
def set_server_prop(self, option: str, value: Any) -> None:
"""Sets server.properties option
Args:
@ -230,6 +231,7 @@ class AternosConfig:
)
def get_server_props(self, proptyping: bool = True) -> Dict[str, Any]:
"""Parses all server.properties from options page
Args:
@ -243,9 +245,10 @@ class AternosConfig:
`server.properties` dictionary
"""
return self.__get_all_props(f'{BASE_URL}/options', proptyping)
return self.__get_all_props('https://aternos.org/options', proptyping)
def set_server_props(self, props: Dict[str, Any]) -> None:
"""Updates server.properties options with the given dict
Args:
@ -263,6 +266,7 @@ class AternosConfig:
self, option: Union[WorldOpts, WorldRules],
value: Any, gamerule: bool = False,
world: str = 'world') -> None:
"""Sets level.dat option for specified world
Args:
@ -286,6 +290,7 @@ class AternosConfig:
def get_world_props(
self, world: str = 'world',
proptyping: bool = True) -> Dict[str, Any]:
"""Parses level.dat from specified world's options page
Args:
@ -301,7 +306,7 @@ class AternosConfig:
"""
return self.__get_all_props(
f'{BASE_URL}/files/{world}/level.dat',
f'https://aternos.org/files/{world}/level.dat',
proptyping, [DAT_PREFIX, DAT_GR_PREFIX]
)
@ -309,6 +314,7 @@ class AternosConfig:
self,
props: Dict[Union[WorldOpts, WorldRules], Any],
world: str = 'world') -> None:
"""Sets level.dat options from
the dictionary for the specified world
@ -332,7 +338,7 @@ class AternosConfig:
def __set_prop(self, file: str, option: str, value: Any) -> None:
self.atserv.atserver_request(
f'{AJAX_URL}/config.php',
'https://aternos.org/panel/ajax/config.php',
'POST', data={
'file': file,
'option': option,

View file

@ -1,44 +1,30 @@
"""Stores API session and sends requests"""
"""Stores API connection session and sends requests"""
import re
import time
import string
import secrets
import random
import logging
from functools import partial
from typing import Optional
from typing import List, Dict, Any
from typing import Optional, Union
from typing import Dict, Any
import requests
from cloudscraper import CloudScraper
from .atlog import log, is_debug
from . import atjsparse
from .aterrors import TokenError
from .aterrors import CloudflareError
from .aterrors import AternosPermissionError
BASE_URL = 'https://aternos.org'
AJAX_URL = f'{BASE_URL}/ajax'
REQUA = \
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 ' \
'(KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36 OPR/85.0.4341.47'
ARROW_FN_REGEX = r'\(\(\).*?\)\(\);'
SCRIPT_TAG_REGEX = (
rb'<script type=([\'"]?)text/javascript\1>.+?</script>'
)
SEC_ALPHABET = string.ascii_lowercase + string.digits
class AternosConnect:
"""Class for sending API requests,
bypassing Cloudflare and parsing responses"""
@ -47,24 +33,15 @@ class AternosConnect:
self.session = CloudScraper()
self.sec = ''
self.token = ''
self.atcookie = ''
def refresh_session(self) -> None:
"""Creates a new CloudScraper
session object and copies all cookies.
Required for bypassing Cloudflare"""
old_cookies = self.session.cookies
captcha_kwarg = self.session.captcha
self.session = CloudScraper(captcha=captcha_kwarg)
self.session.cookies.update(old_cookies)
del old_cookies
def parse_token(self) -> str:
"""Parses Aternos ajax token that
is needed for most requests
Raises:
RuntimeWarning: If the parser can not
find `<head>` tag in HTML response
TokenError: If the parser is unable
to extract ajax token from HTML
@ -73,7 +50,7 @@ class AternosConnect:
"""
loginpage = self.request_cloudflare(
f'{BASE_URL}/go/', 'GET'
'https://aternos.org/go/', 'GET'
).content
# Using the standard string methods
@ -85,44 +62,26 @@ class AternosConnect:
# Some checks
if headtag < 0 or headend < 0:
pagehead = loginpage
log.warning(
raise RuntimeWarning(
'Unable to find <head> tag, parsing the whole page'
)
else:
# Extracting <head> content
headtag = headtag + len(head)
pagehead = loginpage[headtag:headend]
js_code: Optional[List[Any]] = None
# Extracting <head> content
headtag = headtag + len(head)
pagehead = loginpage[headtag:headend]
try:
text = pagehead.decode('utf-8', 'replace')
js_code = re.findall(ARROW_FN_REGEX, text)
js_code = re.findall(r'\(\(\)(.*?)\)\(\);', text)
token_func = js_code[0]
if len(js_code) > 1:
token_func = js_code[1]
js = atjsparse.get_interpreter()
js.exec_js(token_func)
self.token = js['AJAX_TOKEN']
ctx = atjsparse.exec_js(token_func)
self.token = ctx.window['AJAX_TOKEN']
except (IndexError, TypeError) as err:
log.warning('---')
log.warning('Unable to parse AJAX_TOKEN!')
log.warning('Please, insert the info below')
log.warning('to the GitHub issue description:')
log.warning('---')
log.warning('JavaScript: %s', js_code)
log.warning(
'All script tags: %s',
re.findall(SCRIPT_TAG_REGEX, pagehead)
)
log.warning('---')
raise TokenError(
'Unable to parse TOKEN from the page'
) from err
@ -130,6 +89,7 @@ class AternosConnect:
return self.token
def generate_sec(self) -> str:
"""Generates Aternos SEC token which
is also needed for most API requests
@ -137,8 +97,8 @@ class AternosConnect:
Random SEC `key:value` string
"""
randkey = self.generate_sec_part()
randval = self.generate_sec_part()
randkey = self.generate_aternos_rand()
randval = self.generate_aternos_rand()
self.sec = f'{randkey}:{randval}'
self.session.cookies.set(
f'ATERNOS_SEC_{randkey}', randval,
@ -147,13 +107,60 @@ class AternosConnect:
return self.sec
def generate_sec_part(self) -> str:
"""Generates a part for SEC token"""
def generate_aternos_rand(self, randlen: int = 16) -> str:
return ''.join(
secrets.choice(SEC_ALPHABET)
for _ in range(11)
) + ('0' * 5)
"""Generates a random string using
Aternos algorithm from main.js file
Args:
randlen (int, optional): Random string length
Returns:
Random string for SEC token
"""
# a list with randlen+1 empty strings:
# generate a string with spaces,
# then split it by space
rand_arr = (' ' * (randlen + 1)).split(' ')
rand = random.random()
rand_alphanum = self.convert_num(rand, 36) + ('0' * 17)
return rand_alphanum[:18].join(rand_arr)[:randlen]
def convert_num(
self, num: Union[int, float, str],
base: int, frombase: int = 10) -> str:
"""Converts an integer to specified base
Args:
num (Union[int,float,str]): Integer in any base to convert.
If it is a float starting with `0.`,
zero and point will be removed to get int
base (int): New base
frombase (int, optional): Given number base
Returns:
Number converted to a specified base
"""
if isinstance(num, str):
num = int(num, frombase)
if isinstance(num, float):
sliced = str(num)[2:]
num = int(sliced)
symbols = '0123456789abcdefghijklmnopqrstuvwxyz'
basesym = symbols[:base]
result = ''
while num > 0:
rem = num % base
result = str(basesym[rem]) + result
num //= base
return result
def request_cloudflare(
self, url: str, method: str,
@ -162,8 +169,8 @@ class AternosConnect:
headers: Optional[Dict[Any, Any]] = None,
reqcookies: Optional[Dict[Any, Any]] = None,
sendtoken: bool = False,
retries: int = 5,
timeout: int = 4) -> requests.Response:
retry: int = 5) -> requests.Response:
"""Sends a request to Aternos API bypass Cloudflare
Args:
@ -177,9 +184,8 @@ class AternosConnect:
Cookies only for this request
sendtoken (bool, optional): If the ajax and SEC token
should be sent
retries (int, optional): How many times parser must retry
retry (int, optional): How many times parser must retry
connection to API bypass Cloudflare
timeout (int, optional): Request timeout in seconds
Raises:
CloudflareError: When the parser has exceeded retries count
@ -189,15 +195,13 @@ class AternosConnect:
API response
"""
if retries <= 0:
if retry <= 0:
raise CloudflareError('Unable to bypass Cloudflare protection')
try:
self.atcookie = self.session.cookies['ATERNOS_SESSION']
except KeyError:
pass
self.refresh_session()
old_cookies = self.session.cookies
self.session = CloudScraper()
self.session.cookies.update(old_cookies)
del old_cookies
params = params or {}
data = data or {}
@ -215,45 +219,42 @@ class AternosConnect:
headers['X-Requested-With'] = 'XMLHttpRequest'
# requests.cookies.CookieConflictError bugfix
reqcookies['ATERNOS_SESSION'] = self.atcookie
reqcookies['ATERNOS_SESSION'] = self.atsession
del self.session.cookies['ATERNOS_SESSION']
if is_debug():
reqcookies_dbg = {
k: str(v or '')[:3]
for k, v in reqcookies.items()
}
reqcookies_dbg = {
k: str(v or '')[:3]
for k, v in reqcookies.items()
}
session_cookies_dbg = {
k: str(v or '')[:3]
for k, v in self.session.cookies.items()
}
session_cookies_dbg = {
k: str(v or '')[:3]
for k, v in self.session.cookies.items()
}
log.debug('Requesting(%s)%s', method, url)
log.debug('headers=%s', headers)
log.debug('params=%s', params)
log.debug('data=%s', data)
log.debug('req-cookies=%s', reqcookies_dbg)
log.debug('session-cookies=%s', session_cookies_dbg)
logging.debug('Requesting(%s)%s', method, url)
logging.debug('headers=%s', headers)
logging.debug('params=%s', params)
logging.debug('data=%s', data)
logging.debug('req-cookies=%s', reqcookies_dbg)
logging.debug('session-cookies=%s', session_cookies_dbg)
if method == 'POST':
sendreq = partial(
self.session.post,
params=params,
data=data,
data=data
)
else:
sendreq = partial(
self.session.get,
params={**params, **data},
params={**params, **data}
)
req = sendreq(
url,
headers=headers,
cookies=reqcookies,
timeout=timeout,
cookies=reqcookies
)
resp_type = req.headers.get('content-type', '')
@ -261,17 +262,17 @@ class AternosConnect:
cloudflare = req.status_code == 403
if html_type and cloudflare:
log.info('Retrying to bypass Cloudflare')
time.sleep(0.3)
logging.info('Retrying to bypass Cloudflare')
time.sleep(0.2)
return self.request_cloudflare(
url, method,
params, data,
headers, reqcookies,
sendtoken, retries - 1
sendtoken, retry - 1
)
log.debug('AternosConnect received: %s', req.text[:65])
log.info(
logging.debug('AternosConnect received: %s', req.text[:65])
logging.info(
'%s completed with %s status',
method, req.status_code
)
@ -284,6 +285,7 @@ class AternosConnect:
@property
def atsession(self) -> str:
"""Aternos session cookie,
empty string if not logged in

View file

@ -37,6 +37,7 @@ class ServerError(AternosError):
"""Common class for server errors"""
def __init__(self, reason: str, message: str = '') -> None:
"""Common class for server errors
Args:
@ -70,6 +71,7 @@ class ServerStartError(AternosError):
}
def __init__(self, reason: str) -> None:
"""Raised when Aternos
can not start Minecraft server

View file

@ -7,7 +7,6 @@ from typing import TYPE_CHECKING
import lxml.html
from .atconnect import BASE_URL, AJAX_URL
from .aterrors import FileError
if TYPE_CHECKING:
@ -35,6 +34,7 @@ class AternosFile:
dlable: bool, editable: bool,
ftype: FileType = FileType.file,
size: Union[int, float] = 0) -> None:
"""File class which contains info
about its path, type and size
@ -68,6 +68,7 @@ class AternosFile:
self,
name: str,
ftype: FileType = FileType.file) -> None:
"""Creates a file or a directory inside this one
Args:
@ -88,7 +89,7 @@ class AternosFile:
name = name.strip().replace('/', '_')
req = self.atserv.atserver_request(
f'{AJAX_URL}/files/create.php',
'https://aternos.org/panel/ajax/files/create.php',
'POST', data={
'file': f'{self._path}/{name}',
'type': 'file'
@ -101,6 +102,7 @@ class AternosFile:
raise FileError('Unable to create a file')
def delete(self) -> None:
"""Deletes the file
Raises:
@ -115,7 +117,7 @@ class AternosFile:
)
req = self.atserv.atserver_request(
f'{AJAX_URL}/delete.php',
'https://aternos.org/panel/ajax/delete.php',
'POST', data={'file': self._path},
sendtoken=True
)
@ -124,6 +126,7 @@ class AternosFile:
raise FileError('Unable to delete the file')
def get_content(self) -> bytes:
"""Requests file content in bytes (downloads it)
Raises:
@ -141,7 +144,7 @@ class AternosFile:
)
file = self.atserv.atserver_request(
f'{AJAX_URL}/files/download.php',
'https://aternos.org/panel/ajax/files/download.php',
'GET', params={
'file': self._path
}
@ -156,6 +159,7 @@ class AternosFile:
return file.content
def set_content(self, value: bytes) -> None:
"""Modifies file content
Args:
@ -166,7 +170,7 @@ class AternosFile:
"""
req = self.atserv.atserver_request(
f'{AJAX_URL}/save.php',
'https://aternos.org/panel/ajax/save.php',
'POST', data={
'file': self._path,
'content': value
@ -177,6 +181,7 @@ class AternosFile:
raise FileError('Unable to save the file')
def get_text(self) -> str:
"""Requests editing the file as a text
Raises:
@ -201,7 +206,7 @@ class AternosFile:
filepath = self._path.lstrip("/")
editor = self.atserv.atserver_request(
f'{BASE_URL}/files/{filepath}', 'GET'
f'https://aternos.org/files/{filepath}', 'GET'
)
edittree = lxml.html.fromstring(editor.content)
editblock = edittree.xpath('//div[@id="editor"]')
@ -215,6 +220,7 @@ class AternosFile:
return editblock[0].text_content()
def set_text(self, value: str) -> None:
"""Modifies the file content,
but unlike `set_content` takes
a string as an argument
@ -227,6 +233,7 @@ class AternosFile:
@property
def path(self) -> str:
"""Abslute path to the file
without leading slash
including filename
@ -239,6 +246,7 @@ class AternosFile:
@property
def name(self) -> str:
"""Filename with extension
Returns:
@ -249,6 +257,7 @@ class AternosFile:
@property
def dirname(self) -> str:
"""Full path to the directory
which contains the file
without leading slash.
@ -262,6 +271,7 @@ class AternosFile:
@property
def deleteable(self) -> bool:
"""True if the file can be deleted,
otherwise False
@ -273,6 +283,7 @@ class AternosFile:
@property
def downloadable(self) -> bool:
"""True if the file can be downloaded,
otherwise False
@ -284,6 +295,7 @@ class AternosFile:
@property
def editable(self) -> bool:
"""True if the file can be
opened in Aternos editor,
otherwise False
@ -296,6 +308,7 @@ class AternosFile:
@property
def ftype(self) -> FileType:
"""File object type: file or directory
Returns:
@ -306,6 +319,7 @@ class AternosFile:
@property
def is_dir(self) -> bool:
"""Check if the file object is a directory
Returns:
@ -316,6 +330,7 @@ class AternosFile:
@property
def is_file(self) -> bool:
"""Check if the file object is not a directory
Returns:
@ -326,6 +341,7 @@ class AternosFile:
@property
def size(self) -> float:
"""File size in bytes
Returns:

View file

@ -5,9 +5,7 @@ from typing import TYPE_CHECKING
import lxml.html
from .atconnect import BASE_URL, AJAX_URL
from .atfile import AternosFile, FileType
if TYPE_CHECKING:
from .atserver import AternosServer
@ -18,6 +16,7 @@ class FileManager:
for viewing files structure"""
def __init__(self, atserv: 'AternosServer') -> None:
"""Aternos file manager class
for viewing files structure
@ -29,6 +28,7 @@ class FileManager:
self.atserv = atserv
def list_dir(self, path: str = '') -> List[AternosFile]:
"""Requests a list of files
in the specified directory
@ -43,7 +43,7 @@ class FileManager:
path = path.lstrip('/')
filesreq = self.atserv.atserver_request(
f'{BASE_URL}/files/{path}', 'GET'
f'https://aternos.org/files/{path}', 'GET'
)
filestree = lxml.html.fromstring(filesreq.content)
@ -83,6 +83,7 @@ class FileManager:
return files
def extract_size(self, fsize_raw: List[Any]) -> float:
"""Parses file size from the LXML tree
Args:
@ -112,6 +113,7 @@ class FileManager:
self,
num: Union[int, float],
measure: str) -> float:
"""Converts "human" file size to size in bytes
Args:
@ -131,6 +133,7 @@ class FileManager:
return measure_match.get(measure, -1) * num
def get_file(self, path: str) -> Optional[AternosFile]:
"""Returns :class:`python_aternos.atfile.AternosFile`
instance by its path
@ -155,6 +158,7 @@ class FileManager:
}.get('file', None)
def dl_file(self, path: str) -> bytes:
"""Returns the file content in bytes (downloads it)
Args:
@ -165,7 +169,7 @@ class FileManager:
"""
file = self.atserv.atserver_request( # type: ignore
f'{AJAX_URL}/files/download.php'
'https://aternos.org/panel/ajax/files/download.php'
'GET', params={
'file': path.replace('/', '%2F')
}
@ -174,6 +178,7 @@ class FileManager:
return file.content
def dl_world(self, world: str = 'world') -> bytes:
"""Returns the world zip file content
by its name (downloads it)
@ -185,7 +190,7 @@ class FileManager:
"""
resp = self.atserv.atserver_request( # type: ignore
f'{AJAX_URL}/worlds/download.php'
'https://aternos.org/panel/ajax/worlds/download.php'
'GET', params={
'world': world.replace('/', '%2F')
}

View file

@ -1,199 +1,38 @@
"""Parsing and executing JavaScript code"""
import abc
import json
import base64
import subprocess
from pathlib import Path
from typing import Optional, Union
from typing import Type, Any
import regex
import js2py
import requests
from .atlog import log
# Thanks to http://regex.inginf.units.it/
arrowexp = regex.compile(r'\w[^\}]*+')
js: Optional['Interpreter'] = None
def to_ecma5_function(f: str) -> str:
"""Converts a ECMA6 function
to ECMA5 format (without arrow expressions)
class Interpreter(abc.ABC):
"""Base JS interpreter class"""
Args:
f (str): ECMA6 function
def __init__(self) -> None:
"""Base JS interpreter class"""
Returns:
ECMA5 function
"""
def __getitem__(self, name: str) -> Any:
"""Support for `js[name]` syntax
instead of `js.get_var(name)`
Args:
name (str): Variable name
Returns:
Variable value
"""
return self.get_var(name)
@abc.abstractmethod
def exec_js(self, func: str) -> None:
"""Executes JavaScript code
Args:
func (str): JS function
"""
@abc.abstractmethod
def get_var(self, name: str) -> Any:
"""Returns JS variable value
from the interpreter
Args:
name (str): Variable name
Returns:
Variable value
"""
class NodeInterpreter(Interpreter):
"""Node.JS interpreter wrapper,
starts a simple web server in background"""
def __init__(
self,
node: Union[str, Path] = 'node',
host: str = 'localhost',
port: int = 8001) -> None:
"""Node.JS interpreter wrapper,
starts a simple web server in background
Args:
node (Union[str, Path], optional): Path to `node` executable
host (str, optional): Hostname for the web server
port (int, optional): Port for the web server
"""
super().__init__()
file_dir = Path(__file__).absolute().parent
server_js = file_dir / 'data' / 'server.js'
self.url = f'http://{host}:{port}'
self.timeout = 2
# pylint: disable=consider-using-with
self.proc = subprocess.Popen(
args=[
node, server_js,
f'{port}', host,
],
stdout=subprocess.PIPE,
)
# pylint: enable=consider-using-with
assert self.proc.stdout is not None
ok_msg = self.proc.stdout.readline()
log.debug('Received from server.js: %s', ok_msg)
def exec_js(self, func: str) -> None:
resp = requests.post(self.url, data=func, timeout=self.timeout)
resp.raise_for_status()
def get_var(self, name: str) -> Any:
resp = requests.post(self.url, data=name, timeout=self.timeout)
resp.raise_for_status()
log.debug('NodeJS response: %s', resp.content)
return json.loads(resp.content)
def __del__(self) -> None:
try:
self.proc.terminate()
self.proc.communicate()
except AttributeError:
log.warning(
'NodeJS process was not initialized, '
'but __del__ was called'
)
class Js2PyInterpreter(Interpreter):
"""Js2Py interpreter,
uses js2py library to execute code"""
# Thanks to http://regex.inginf.units.it
arrowexp = regex.compile(r'\w[^\}]*+')
def __init__(self) -> None:
"""Js2Py interpreter,
uses js2py library to execute code"""
super().__init__()
ctx = js2py.EvalJs({'atob': atob})
ctx.execute('''
window.Map = function(_i){ };
window.setTimeout = function(_f,_t){ };
window.setInterval = function(_f,_t){ };
window.encodeURIComponent = window.Map;
window.document = { };
document.doctype = { };
document.currentScript = { };
document.getElementById = window.Map;
document.prepend = window.Map;
document.append = window.Map;
document.appendChild = window.Map;
''')
self.ctx = ctx
def exec_js(self, func: str) -> None:
self.ctx.execute(self.to_ecma5(func))
def get_var(self, name: str) -> Any:
return self.ctx[name]
def to_ecma5(self, func: str) -> str:
"""Converts from ECMA6 format to ECMA5
(replacing arrow expressions)
and removes comment blocks
Args:
func (str): ECMA6 function
Returns:
ECMA5 function
"""
# Delete anything between /* and */
func = regex.sub(r'/\*.+?\*/', '', func)
# Search for arrow expressions
match = self.arrowexp.search(func)
if match is None:
return func
# Convert the function
conv = '(function(){' + match[0] + '})()'
# Convert 1 more expression.
# It doesn't change,
# so it was hardcoded
# as a regexp
return regex.sub(
r'(?:s|\(s\)) => s.split\([\'"]{2}\).reverse\(\).join\([\'"]{2}\)',
'function(s){return s.split(\'\').reverse().join(\'\')}',
conv
)
f = regex.sub(r'/\*.+?\*/', '', f)
match = arrowexp.search(f)
conv = '(function(){' + match.group(0) + '})()'
return regex.sub(
r'(?:s|\(s\)) => s.split\([\'"]{2}\).reverse\(\).join\([\'"]{2}\)',
'function(s){return s.split(\'\').reverse().join(\'\')}',
conv
)
def atob(s: str) -> str:
"""Wrapper for the built-in library function.
Decodes a base64 string
"""Decodes base64 string
Args:
s (str): Encoded data
@ -205,27 +44,22 @@ def atob(s: str) -> str:
return base64.standard_b64decode(str(s)).decode('utf-8')
def get_interpreter(
*args,
create: Type[Interpreter] = Js2PyInterpreter,
**kwargs) -> 'Interpreter':
"""Get or create a JS interpreter.
`*args` and `**kwargs` will be passed
directly to JS interpreter `__init__`
(when creating it)
def exec_js(f: str) -> js2py.EvalJs:
"""Executes a JavaScript function
Args:
create (Type[Interpreter], optional): Preferred interpreter
f (str): ECMA6 function
Returns:
JS interpreter instance
JavaScript interpreter context
"""
global js # pylint: disable=global-statement
# create if none
if js is None:
js = create(*args, **kwargs)
# and return
return js
ctx = js2py.EvalJs({'atob': atob})
ctx.execute('window.document = { };')
ctx.execute('window.Map = function(_i){ };')
ctx.execute('window.setTimeout = function(_f,_t){ };')
ctx.execute('window.setInterval = function(_f,_t){ };')
ctx.execute('window.encodeURIComponent = function(_s){ };')
ctx.execute(to_ecma5_function(f))
return ctx

View file

@ -1,31 +0,0 @@
"""Creates a logger"""
import logging
log = logging.getLogger('aternos')
handler = logging.StreamHandler()
fmt = logging.Formatter('%(asctime)s %(levelname)-5s %(message)s')
handler.setFormatter(fmt)
log.addHandler(handler)
def is_debug() -> bool:
"""Is debug logging enabled"""
return log.level == logging.DEBUG
def set_debug(state: bool) -> None:
"""Enable debug logging"""
if state:
set_level(logging.DEBUG)
else:
set_level(logging.WARNING)
def set_level(level: int) -> None:
log.setLevel(level)
handler.setLevel(level)

View file

@ -1,17 +0,0 @@
"""Contains a function for hashing"""
import hashlib
def md5encode(passwd: str) -> str:
"""Encodes the given string with MD5
Args:
passwd (str): String to encode
Returns:
Hexdigest hash of the string in lowercase
"""
encoded = hashlib.md5(passwd.encode('utf-8'))
return encoded.hexdigest().lower()

View file

@ -7,7 +7,6 @@ from typing import TYPE_CHECKING
import lxml.html
from .atconnect import BASE_URL, AJAX_URL
if TYPE_CHECKING:
from .atserver import AternosServer
@ -33,6 +32,7 @@ class PlayersList:
self,
lst: Union[str, Lists],
atserv: 'AternosServer') -> None:
"""Class for managing operators,
whitelist and banned players lists
@ -50,8 +50,8 @@ class PlayersList:
# whl_je = whitelist for java
# whl_be = whitelist for bedrock
# whl = common whitelist
common_whl = self.lst == Lists.whl
bedrock = atserv.is_bedrock
common_whl = (self.lst == Lists.whl)
bedrock = (atserv.is_bedrock)
if common_whl and bedrock:
self.lst = Lists.whl_be
@ -60,6 +60,7 @@ class PlayersList:
self.parsed = False
def list_players(self, cache: bool = True) -> List[str]:
"""Parse a players list
Args:
@ -74,7 +75,7 @@ class PlayersList:
return self.players
listreq = self.atserv.atserver_request(
f'{BASE_URL}/players/{self.lst.value}',
f'https://aternos.org/players/{self.lst.value}',
'GET'
)
listtree = lxml.html.fromstring(listreq.content)
@ -92,6 +93,7 @@ class PlayersList:
return result
def add(self, name: str) -> None:
"""Appends a player to the list by the nickname
Args:
@ -99,7 +101,7 @@ class PlayersList:
"""
self.atserv.atserver_request(
f'{AJAX_URL}/server/players/lists/add',
'https://aternos.org/panel/ajax/players/add.php',
'POST', data={
'list': self.lst.value,
'name': name
@ -109,6 +111,7 @@ class PlayersList:
self.players.append(name)
def remove(self, name: str) -> None:
"""Removes a player from the list by the nickname
Args:
@ -116,7 +119,7 @@ class PlayersList:
"""
self.atserv.atserver_request(
f'{AJAX_URL}/server/players/lists/remove',
'https://aternos.org/panel/ajax/players/remove.php',
'POST', data={
'list': self.lst.value,
'name': name

View file

@ -1,30 +1,20 @@
"""Aternos Minecraft server"""
import re
import enum
import json
import enum
from typing import Any, Dict, List
from functools import partial
from typing import Optional
from typing import List, Dict, Any
import requests
from .atconnect import BASE_URL, AJAX_URL
from .atconnect import AternosConnect
from .atwss import AternosWss
from .atplayers import PlayersList
from .atplayers import Lists
from .aterrors import ServerStartError
from .atfm import FileManager
from .atconf import AternosConfig
from .aterrors import AternosError
from .aterrors import ServerStartError
SERVER_URL = f'{AJAX_URL}/server'
status_re = re.compile(
r'<script>\s*var lastStatus\s*?=\s*?(\{.+?\});?\s*<\/script>'
)
from .atplayers import PlayersList
from .atplayers import Lists
from .atwss import AternosWss
class Edition(enum.IntEnum):
@ -56,51 +46,41 @@ class Status(enum.IntEnum):
class AternosServer:
"""Class for controlling your Aternos Minecraft server"""
def __init__(
self, servid: str,
atconn: AternosConnect,
autofetch: bool = False) -> None:
reqinfo: bool = True) -> None:
"""Class for controlling your Aternos Minecraft server
Args:
servid (str): Unique server IDentifier
atconn (AternosConnect):
AternosConnect instance with initialized Aternos session
autofetch (bool, optional): Automatically call
reqinfo (bool, optional): Automatically call
`fetch()` to get all info
"""
self.servid = servid
self.atconn = atconn
self._info: Dict[str, Any] = {}
self.atserver_request = partial(
self.atconn.request_cloudflare,
reqcookies={
'ATERNOS_SERVER': self.servid,
}
)
if autofetch:
if reqinfo:
self.fetch()
def fetch(self) -> None:
"""Get all server info"""
page = self.atserver_request(
f'{BASE_URL}/server', 'GET'
"""Send a request to Aternos API to get all server info"""
servreq = self.atserver_request(
'https://aternos.org/panel/ajax/status.php',
'GET', sendtoken=True
)
match = status_re.search(page.text)
if match is None:
raise AternosError('Unable to parse lastStatus object')
self._info = json.loads(match[1])
self._info = json.loads(servreq.content)
def wss(self, autoconfirm: bool = False) -> AternosWss:
"""Returns AternosWss instance for
listening server streams in real-time
@ -119,17 +99,14 @@ class AternosServer:
def start(
self,
headstart: bool = False,
access_credits: bool = False,
accepteula: bool = True) -> None:
"""Starts a server
Args:
headstart (bool, optional): Start a server in
the headstart mode which allows
you to skip all queue
access_credits (bool, optional):
Some new parameter in Aternos API,
I don't know what it is
accepteula (bool, optional):
Automatically accept the Mojang EULA
@ -139,12 +116,9 @@ class AternosServer:
"""
startreq = self.atserver_request(
f'{SERVER_URL}/start',
'GET', params={
'headstart': int(headstart),
'access-credits': int(access_credits),
},
sendtoken=True,
'https://aternos.org/panel/ajax/start.php',
'GET', params={'headstart': int(headstart)},
sendtoken=True
)
startresult = startreq.json()
@ -161,46 +135,52 @@ class AternosServer:
raise ServerStartError(error)
def confirm(self) -> None:
"""Confirms server launching"""
self.atserver_request(
f'{SERVER_URL}/confirm',
'GET', sendtoken=True,
'https://aternos.org/panel/ajax/confirm.php',
'GET', sendtoken=True
)
def stop(self) -> None:
"""Stops the server"""
self.atserver_request(
f'{SERVER_URL}/stop',
'GET', sendtoken=True,
'https://aternos.org/panel/ajax/stop.php',
'GET', sendtoken=True
)
def cancel(self) -> None:
"""Cancels server launching"""
self.atserver_request(
f'{SERVER_URL}/cancel',
'GET', sendtoken=True,
'https://aternos.org/panel/ajax/cancel.php',
'GET', sendtoken=True
)
def restart(self) -> None:
"""Restarts the server"""
self.atserver_request(
f'{SERVER_URL}/restart',
'GET', sendtoken=True,
'https://aternos.org/panel/ajax/restart.php',
'GET', sendtoken=True
)
def eula(self) -> None:
"""Sends a request to accept the Mojang EULA"""
"""Accepts the Mojang EULA"""
self.atserver_request(
f'{SERVER_URL}/accept-eula',
'GET', sendtoken=True,
'https://aternos.org/panel/ajax/eula.php',
'GET', sendtoken=True
)
def files(self) -> FileManager:
"""Returns FileManager instance
for file operations
@ -211,6 +191,7 @@ class AternosServer:
return FileManager(self)
def config(self) -> AternosConfig:
"""Returns AternosConfig instance
for editing server settings
@ -221,6 +202,7 @@ class AternosServer:
return AternosConfig(self)
def players(self, lst: Lists) -> PlayersList:
"""Returns PlayersList instance
for managing operators, whitelist
and banned players lists
@ -235,40 +217,45 @@ class AternosServer:
return PlayersList(lst, self)
def set_subdomain(self, value: str) -> None:
"""Set a new subdomain for your server
(the part before `.aternos.me`)
def atserver_request(
self, url: str, method: str,
params: Optional[Dict[Any, Any]] = None,
data: Optional[Dict[Any, Any]] = None,
headers: Optional[Dict[Any, Any]] = None,
sendtoken: bool = False) -> requests.Response:
"""Sends a request to Aternos API
with server IDenitfier parameter
Args:
value (str): Subdomain
url (str): Request URL
method (str): Request method, must be GET or POST
params (Optional[Dict[Any, Any]], optional): URL parameters
data (Optional[Dict[Any, Any]], optional): POST request data,
if the method is GET, this dict
will be combined with params
headers (Optional[Dict[Any, Any]], optional): Custom headers
sendtoken (bool, optional): If the ajax and SEC token should be sent
Returns:
API response
"""
self.atserver_request(
f'{SERVER_URL}/options/set-subdomain',
'GET', params={'subdomain': value},
sendtoken=True,
)
def set_motd(self, value: str) -> None:
"""Set new Message of the Day
(shown below the name in the Minecraft servers list).
Formatting with "paragraph sign + code" is supported,
see https://minecraft.tools/color-code.php
Args:
value (str): MOTD
"""
self.atserver_request(
f'{SERVER_URL}/options/set-motd',
'POST', data={'motd': value},
sendtoken=True,
return self.atconn.request_cloudflare(
url=url, method=method,
params=params, data=data,
headers=headers,
reqcookies={
'ATERNOS_SERVER': self.servid
},
sendtoken=sendtoken
)
@property
def subdomain(self) -> str:
"""Get the server subdomain
(the part before `.aternos.me`)
"""Server subdomain
(the part of domain before `.aternos.me`)
Returns:
Subdomain
@ -277,10 +264,27 @@ class AternosServer:
atdomain = self.domain
return atdomain[:atdomain.find('.')]
@subdomain.setter
def subdomain(self, value: str) -> None:
"""Set a new subdomain for your server
Args:
value (str): Subdomain
"""
self.atserver_request(
'https://aternos.org/panel/ajax/options/subdomain.php',
'GET', params={'subdomain': value},
sendtoken=True
)
@property
def motd(self) -> str:
"""Get the server message of the day
(shown below its name in Minecraft servers list)
"""Server message of the day
which is shown below its name
in the Minecraft servers list
Returns:
MOTD
@ -288,8 +292,24 @@ class AternosServer:
return self._info['motd']
@motd.setter
def motd(self, value: str) -> None:
"""Set a new message of the day
Args:
value (str): New MOTD
"""
self.atserver_request(
'https://aternos.org/panel/ajax/options/motd.php',
'POST', data={'motd': value},
sendtoken=True
)
@property
def address(self) -> str:
"""Full server address
including domain and port
@ -297,10 +317,11 @@ class AternosServer:
Server address
"""
return f'{self.domain}:{self.port}'
return self._info['displayAddress']
@property
def domain(self) -> str:
"""Server domain (e.g. `test.aternos.me`).
In other words, address without port number
@ -312,6 +333,7 @@ class AternosServer:
@property
def port(self) -> int:
"""Server port number
Returns:
@ -322,6 +344,7 @@ class AternosServer:
@property
def edition(self) -> Edition:
"""Server software edition: Java or Bedrock
Returns:
@ -333,6 +356,7 @@ class AternosServer:
@property
def is_java(self) -> bool:
"""Check if server software is Java Edition
Returns:
@ -343,6 +367,7 @@ class AternosServer:
@property
def is_bedrock(self) -> bool:
"""Check if server software is Bedrock Edition
Returns:
@ -353,6 +378,7 @@ class AternosServer:
@property
def software(self) -> str:
"""Server software name (e.g. `Vanilla`)
Returns:
@ -363,6 +389,7 @@ class AternosServer:
@property
def version(self) -> str:
"""Server software version (1.16.5)
Returns:
@ -373,11 +400,12 @@ class AternosServer:
@property
def css_class(self) -> str:
"""CSS class for the server status element
on official web site: offline, online, loading, etc.
See https://aternos.dc09.ru/howto/server/#server-info
In most cases you need `AternosServer.status` instead of this
"""CSS class for
server status block
on official web site
(offline, loading,
loading starting, queueing)
Returns:
CSS class
@ -387,6 +415,7 @@ class AternosServer:
@property
def status(self) -> str:
"""Server status string
(offline, loading, preparing)
@ -398,6 +427,7 @@ class AternosServer:
@property
def status_num(self) -> Status:
"""Server numeric status.
It is highly recommended to use
status string instead of a number
@ -410,6 +440,7 @@ class AternosServer:
@property
def players_list(self) -> List[str]:
"""List of connected players' nicknames
Returns:
@ -420,6 +451,7 @@ class AternosServer:
@property
def players_count(self) -> int:
"""How many players are connected
Returns:
@ -430,6 +462,7 @@ class AternosServer:
@property
def slots(self) -> int:
"""Server slots, how many
players **can** connect
@ -441,6 +474,7 @@ class AternosServer:
@property
def ram(self) -> int:
"""Server used RAM in MB
Returns:
@ -448,15 +482,3 @@ class AternosServer:
"""
return int(self._info['ram'])
@property
def countdown(self) -> int:
"""Server stop countdown
in seconds
Returns:
Stop countdown
"""
value = self._info['countdown']
return int(value or -1)

View file

@ -4,25 +4,22 @@ for real-time information"""
import enum
import json
import asyncio
import logging
from typing import Iterable
from typing import Union, Any
from typing import Tuple, List, Dict
from typing import Tuple, Dict
from typing import Callable, Coroutine
from typing import TYPE_CHECKING
import websockets
from .atlog import log
from .atconnect import REQUA
if TYPE_CHECKING:
from .atserver import AternosServer
OneArgT = Callable[[Any], Coroutine[Any, Any, None]]
TwoArgT = Callable[[Any, Tuple[Any, ...]], Coroutine[Any, Any, None]]
FunctionT = Union[OneArgT, TwoArgT] # pylint: disable=invalid-name
FunctionT = Union[OneArgT, TwoArgT]
ArgsTuple = Tuple[FunctionT, Tuple[Any, ...]]
@ -51,6 +48,7 @@ class AternosWss:
self,
atserv: 'AternosServer',
autoconfirm: bool = False) -> None:
"""Class for managing websocket connection
Args:
@ -68,19 +66,12 @@ class AternosWss:
cookies = atserv.atconn.session.cookies
self.session = cookies['ATERNOS_SESSION']
self.recv: Dict[Streams, List[ArgsTuple]]
self.recv = {
Streams.status: [],
Streams.queue: [],
Streams.console: [],
Streams.ram: [],
Streams.tps: [],
}
recvtype = Dict[Streams, ArgsTuple]
self.recv: recvtype = {}
self.autoconfirm = autoconfirm
self.confirmed = False
self.socket: Any = None
self.socket: Any
self.keep: asyncio.Task
self.msgs: asyncio.Task
@ -95,33 +86,22 @@ class AternosWss:
def wssreceiver(
self,
stream: Streams,
arg: Tuple[Any, ...] = ()) -> Callable[[FunctionT], Any]:
*args: Any) -> Callable[[FunctionT], Any]:
"""Decorator that marks your function as a stream receiver.
When websocket receives message from the specified stream,
it calls all listeners created with this decorator.
Args:
stream (Streams): Stream that your function should listen
arg (Tuple[Any, ...], optional): Arguments which will be passed to your function
*args (tuple, optional): Arguments which will be passed to your function
Returns:
...
"""
def decorator(func: FunctionT) -> Callable[[Any, Any], Coroutine[Any, Any, Any]]:
handlers = self.recv.get(stream, None)
if handlers is None:
self.recv[stream] = [(func, arg)]
else:
handlers.append((func, arg))
async def wrapper(*args, **kwargs) -> Any:
return await func(*args, **kwargs)
return wrapper
def decorator(func: FunctionT) -> None:
self.recv[stream] = (func, args)
return decorator
async def connect(self) -> None:
@ -157,8 +137,8 @@ class AternosWss:
if not self.autoconfirm:
return
in_queue = msg['class'] == 'queueing'
pending = msg['queue']['pending'] == 'pending'
in_queue = (msg['class'] == 'queueing')
pending = (msg['queue']['pending'] == 'pending')
confirmation = in_queue and pending
if confirmation and not self.confirmed:
@ -193,12 +173,8 @@ class AternosWss:
if not isinstance(strm, Streams):
continue
# If the handlers list is empty
if not self.recv.get(strm):
continue
if strm.stream:
log.debug('Requesting %s stream', strm.stream)
logging.debug('Requesting %s stream', strm.stream)
await self.send({
'stream': strm.stream,
'type': 'start'
@ -224,35 +200,11 @@ class AternosWss:
Message, may be a string or a dict
"""
if self.socket is None:
log.warning('Did you forget to call socket.connect?')
await self.connect()
assert self.socket is not None
if isinstance(obj, dict):
obj = json.dumps(obj)
await self.socket.send(obj)
async def command(self, cmd: str) -> None:
"""Sends a Minecraft command
to the websocket server
Args:
cmd (str): Command, slash at
the beginning is not required
"""
await self.send(
{
'stream': 'console',
'type': 'command',
'data': cmd,
}
)
async def wssworker(self) -> None:
"""Starts async tasks in background
@ -264,8 +216,7 @@ class AternosWss:
async def keepalive(self) -> None:
"""Each 49 seconds sends keepalive ping
to the websocket server"""
"""Each 49 seconds sends keepalive ping to websocket server"""
try:
while True:
@ -305,24 +256,19 @@ class AternosWss:
if msgtype in self.recv:
# function info tuples
handlers: Iterable[ArgsTuple]
handlers = self.recv.get(msgtype, ())
# function info tuple
func: ArgsTuple = self.recv[msgtype]
for func in handlers:
# if arguments is not empty
if func[1]:
# call the function with args
coro = func[0](msg, func[1]) # type: ignore
else:
# mypy error: too few arguments
# looks like a bug, so it is ignored
coro = func[0](msg) # type: ignore
# run
asyncio.create_task(coro)
# if arguments is not empty
if func[1]:
# call the function with args
coro = func[0](msg, func[1]) # type: ignore
else:
# mypy error: Too few arguments
# looks like bug, so it is ignored
coro = func[0](msg) # type: ignore
# run
asyncio.create_task(coro)
except asyncio.CancelledError:
break

View file

@ -1,67 +0,0 @@
{
"name": "data",
"lockfileVersion": 2,
"requires": true,
"packages": {
"": {
"dependencies": {
"vm2": "^3.9.13"
}
},
"node_modules/acorn": {
"version": "8.8.1",
"resolved": "https://registry.npmjs.org/acorn/-/acorn-8.8.1.tgz",
"integrity": "sha512-7zFpHzhnqYKrkYdUjF1HI1bzd0VygEGX8lFk4k5zVMqHEoES+P+7TKI+EvLO9WVMJ8eekdO0aDEK044xTXwPPA==",
"bin": {
"acorn": "bin/acorn"
},
"engines": {
"node": ">=0.4.0"
}
},
"node_modules/acorn-walk": {
"version": "8.2.0",
"resolved": "https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.2.0.tgz",
"integrity": "sha512-k+iyHEuPgSw6SbuDpGQM+06HQUa04DZ3o+F6CSzXMvvI5KMvnaEqXe+YVe555R9nn6GPt404fos4wcgpw12SDA==",
"engines": {
"node": ">=0.4.0"
}
},
"node_modules/vm2": {
"version": "3.9.19",
"resolved": "https://registry.npmjs.org/vm2/-/vm2-3.9.19.tgz",
"integrity": "sha512-J637XF0DHDMV57R6JyVsTak7nIL8gy5KH4r1HiwWLf/4GBbb5MKL5y7LpmF4A8E2nR6XmzpmMFQ7V7ppPTmUQg==",
"dependencies": {
"acorn": "^8.7.0",
"acorn-walk": "^8.2.0"
},
"bin": {
"vm2": "bin/vm2"
},
"engines": {
"node": ">=6.0"
}
}
},
"dependencies": {
"acorn": {
"version": "8.8.1",
"resolved": "https://registry.npmjs.org/acorn/-/acorn-8.8.1.tgz",
"integrity": "sha512-7zFpHzhnqYKrkYdUjF1HI1bzd0VygEGX8lFk4k5zVMqHEoES+P+7TKI+EvLO9WVMJ8eekdO0aDEK044xTXwPPA=="
},
"acorn-walk": {
"version": "8.2.0",
"resolved": "https://registry.npmjs.org/acorn-walk/-/acorn-walk-8.2.0.tgz",
"integrity": "sha512-k+iyHEuPgSw6SbuDpGQM+06HQUa04DZ3o+F6CSzXMvvI5KMvnaEqXe+YVe555R9nn6GPt404fos4wcgpw12SDA=="
},
"vm2": {
"version": "3.9.19",
"resolved": "https://registry.npmjs.org/vm2/-/vm2-3.9.19.tgz",
"integrity": "sha512-J637XF0DHDMV57R6JyVsTak7nIL8gy5KH4r1HiwWLf/4GBbb5MKL5y7LpmF4A8E2nR6XmzpmMFQ7V7ppPTmUQg==",
"requires": {
"acorn": "^8.7.0",
"acorn-walk": "^8.2.0"
}
}
}
}

View file

@ -1,5 +0,0 @@
{
"dependencies": {
"vm2": "^3.9.13"
}
}

View file

@ -1,49 +0,0 @@
const http = require('http')
const process = require('process')
const { VM } = require('vm2')
const args = process.argv.slice(2)
const port = args[0] || 8000
const host = args[1] || 'localhost'
const stubFunc = (_i) => {}
const vm = new VM({
timeout: 2000,
allowAsync: false,
sandbox: {
atob: atob,
setTimeout: stubFunc,
setInterval: stubFunc,
document: {
getElementById: stubFunc,
prepend: stubFunc,
append: stubFunc,
appendChild: stubFunc,
doctype: {},
currentScript: {},
},
},
})
vm.run('var window = global')
const listener = (req, res) => {
if (req.method != 'POST')
res.writeHead(405) & res.end()
let body = ''
req.on('data', chunk => (body += chunk))
req.on('end', () => {
let resp
try { resp = JSON.stringify(vm.run(body)) }
catch (ex) { resp = ex.message }
res.writeHead(200)
res.end(resp)
})
}
const server = http.createServer(listener)
server.listen(port, host, () => console.log('OK'))

View file

@ -1,5 +1,5 @@
cloudscraper==1.2.71
Js2Py==0.74
lxml==4.9.2
regex==2023.6.3
websockets==11.0.3
lxml>=4.8.0
cloudscraper>=1.2.60
js2py>=0.71
websockets>=10.1
regex>=2022.3.15

View file

@ -1,6 +1,5 @@
[mypy]
ignore_missing_imports = True
check_untyped_defs = True
[pycodestyle]
ignore = E501

View file

@ -5,9 +5,9 @@ with open('README.md', 'rt') as readme:
setuptools.setup(
name='python-aternos',
version='3.0.6',
author='Andrey @DarkCat09',
author_email='py@dc09.ru',
version='2.0.1',
author='Chechkenev Andrey (@DarkCat09)',
author_email='aacd0709@mail.ru',
description='An unofficial Aternos API',
long_description=long_description,
long_description_content_type='text/markdown',
@ -24,7 +24,6 @@ setuptools.setup(
'Programming Language :: Python :: 3.8',
'Programming Language :: Python :: 3.9',
'Programming Language :: Python :: 3.10',
'Programming Language :: Python :: 3.11',
'License :: OSI Approved :: Apache Software License',
'Operating System :: OS Independent',
'Operating System :: Microsoft :: Windows',
@ -35,31 +34,12 @@ setuptools.setup(
'Typing :: Typed',
],
install_requires=[
'cloudscraper==1.2.71',
'Js2Py==0.74',
'lxml==4.9.2',
'regex==2023.6.3',
'websockets==11.0.3',
'lxml>=4.8.0',
'cloudscraper>=1.2.60',
'js2py>=0.71',
'websockets>=10.1',
'regex>=2022.3.15',
],
extras_require={
'dev': [
'autopep8==2.0.2',
'pycodestyle==2.10.0',
'mypy==1.4.1',
'pylint==2.17.4',
'requests-mock==1.11.0',
'types-requests==2.31.0.1',
],
'pypi': [
'build==0.10.0',
'twine==4.0.2',
],
'docs': [
'mkdocs==1.4.3',
'mkdocstrings[python]==0.22.0',
]
},
packages=['python_aternos'],
python_requires=">=3.7",
include_package_data=True,
)

View file

@ -1,5 +1,3 @@
#!/usr/bin/env bash
failed=''
title () {
@ -8,7 +6,7 @@ title () {
COLOR='\033[1;36m'
echo
echo -e "${COLOR}[#] $1$RESET"
echo -e "$COLOR[#] $1$RESET"
}
error_msg () {
@ -19,9 +17,9 @@ error_msg () {
if (( $1 )); then
failed+="$2, "
echo -e "${ERR}[X] Found errors$RESET"
echo -e "$ERR[X] Found errors$RESET"
else
echo -e "${OK}[V] Passed successfully$RESET"
echo -e "$OK[V] Passed successfully$RESET"
fi
}
@ -32,27 +30,30 @@ display_failed() {
SUCCESS='\033[1;32m'
if [[ $failed != '' ]]; then
joined=$(echo -n "$failed" | sed 's/, $//')
echo -e "${FAILED}[!] See output of: $joined$RESET"
joined=`echo -n "$failed" | sed 's/, $//'`
echo -e "$FAILED[!] View output of: $joined$RESET"
else
echo -e "${SUCCESS}[V] All checks were passed successfully$RESET"
echo -e "$SUCCESS[V] All checks are passed successfully$RESET"
fi
}
title 'Checking needed modules...'
pip install pycodestyle mypy pylint
title 'Running unit tests...'
python3 -m unittest discover -v ./tests
python -m unittest discover -v ./tests
error_msg $? 'unittest'
title 'Running pep8 checker...'
python3 -m pycodestyle ./python_aternos
python -m pycodestyle .
error_msg $? 'pep8'
title 'Running mypy checker...'
python3 -m mypy ./python_aternos
python -m mypy .
error_msg $? 'mypy'
title 'Running pylint checker...'
python3 -m pylint ./python_aternos
python -m pylint ./python_aternos
error_msg $? 'pylint'
display_failed

View file

View file

@ -1,32 +0,0 @@
from pathlib import Path
from typing import List
abs_dir = Path(__file__).absolute().parent
samples = abs_dir / 'samples'
htmls = samples / 'html'
def read_sample(name: str) -> List[str]:
path = samples / name
if not path.exists():
return []
with path.open('rt', encoding='utf-8') as file:
return file \
.read() \
.strip() \
.replace('\r\n', '\n') \
.split('\n')
def read_html(name: str) -> bytes:
path = samples / 'html' / name
if not path.exists():
return b''
with path.open('rb') as file:
return file.read()

View file

@ -1,102 +0,0 @@
#!/usr/bin/env python3
# How to use
# *******************************
# 1. Open DevTools at aternos.org
# 2. Get AJAX_TOKEN variable value (without quotes)
#
# 3. Pass it to this script as an argument, e.g.:
# python3 js_samples.py xKflIsKHxlv96fLc1tht
#
# 4. The script will request the token 100 times
# and check it with different built-in interpreters
# (now there are only js2py and nodejs)
# 5. Array "errored" which is printed at the end
# contains indexes of incorrectly executed JS functions
# 6. Enter this index in the opened console
# or enter "exit" to exit
import re
import sys
from python_aternos.atconnect import AternosConnect
from python_aternos.atconnect import BASE_URL
from python_aternos import Js2PyInterpreter
from python_aternos import NodeInterpreter
TIMES = 100
js = re.compile(r'\(\(\).*?\)\(\);')
conn = AternosConnect()
jsi1 = Js2PyInterpreter()
jsi2 = NodeInterpreter()
token = sys.argv[1]
samples = []
errored = []
def get_code() -> bool:
r = conn.request_cloudflare(
f'{BASE_URL}/go', 'GET'
)
if r.status_code != 200:
print(r.status_code)
code = js.search(r.text)
if code is None:
print('No match!')
return False
sample = code.group(0)
samples.append(sample)
print(sample)
print('***')
jsi1.exec_js(sample)
jsi2.exec_js(sample)
var1 = jsi1['AJAX_TOKEN']
var2 = jsi2['AJAX_TOKEN']
print(var1)
print(var2)
print('***')
print()
print()
return var1 == var2 == token
def main() -> None:
print()
for i in range(TIMES):
print(i)
if not get_code():
errored.append(i)
print('Errored:', errored)
print('Choose sample number:')
while True:
try:
print('>', end=' ')
cmd = input()
if cmd.strip().lower() in ('exit', 'quit'):
print('Quit')
break
print(samples[int(cmd)])
except KeyboardInterrupt:
print()
print('Quit')
break
except Exception as err:
print(err)
if __name__ == '__main__':
main()

View file

@ -1,46 +0,0 @@
from requests_mock import Mocker
from python_aternos.atconnect import BASE_URL, AJAX_URL
from tests import files
mock = Mocker()
with mock:
mock.get(
f'{BASE_URL}/go/',
content=files.read_html('aternos_go'),
)
mock.get(
f'{BASE_URL}/servers/',
content=files.read_html('aternos_servers'),
)
mock.get(
f'{BASE_URL}/server',
content=files.read_html('aternos_server1'),
)
mock.post(
f'{AJAX_URL}/account/login',
json={
'success': True,
'error': None,
'message': None,
'show2FA': False,
},
cookies={
'ATERNOS_SESSION': '0123abcd',
},
)
mock.get(
f'{BASE_URL}/players/',
content=files.read_html('aternos_players'),
)
mock.get(
f'{BASE_URL}/files/',
content=files.read_html('aternos_file_root'),
)

View file

@ -1 +0,0 @@
requests-mock>=1.10.0

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -1,16 +1,27 @@
(() => {window["AJAX_TOKEN"]=("2r" + "KO" + "A1" + "IFdBcHhEM" + "61" + "6cb");})();
(() => {window[["KEN","X_TO","JA","A"].reverse().join('')]="2rKOA1IFdBcHhEM616cb";})();
(() => {window[["AJA","X_T","O","KEN"].join('')]=["cb","BcHhEM616","KOA1IFd","2r"].reverse().join('');})();
(() => {window[["N","TOKE","_","AJAX"].reverse().join('')]=["2rKOA1I","FdBcHhEM","616c","b"].join('');})();
(() => {window[["N","_TOKE","X","AJA"].reverse().join('')]=["616cb","HhEM","1IFdBc","2rKOA"].reverse().join('');})();
(() => {window[["OKEN","T","AJAX_"].reverse().join('')]=["616cb","HhEM","Bc","Fd","2rKOA1I"].reverse().join('');})();
(() => {window[["A","JAX_","TOKEN"].join('')]=atob('MnJLT0ExSUZkQmNIaEVNNjE2Y2I=');})();
(() => {window["AJAX_TOKEN"]=["2rKOA1IFdB","cHhEM61","6cb"].join('');})();
(() => {window[atob('QUpBWF9UT0tFTg==')]=("2rKOA1IFdB" + "cHhEM616c" + "b");})();
(() => {window[atob('QUpBWF9UT0tFTg==')]=atob('MmlYaDVXNXVFWXE1ZldKSWF6UTY=');})();
(() => {window[["_XAJA","NEKOT"].map(s => s.split('').reverse().join('')).join('')]=!window[("encodeURI" + "Componen" + "t")] || atob('Q3VVY21aMjdGYjhiVkJOdzEyVmo=');})();
(() => {window[["N","_TOKE","AJAX"].reverse().join('')]=!window[("en" + "co" + "deURICo" + "mpone" + "nt")] || ["zv7hP8ePPY","FP9ZaY","PQo9"].map(s => s.split('').reverse().join('')).join('');})();
(() => {window[["XAJA","OT_","EK","N"].map(s => s.split('').reverse().join('')).join('')]=["fU","61EEKvmelL","Zh0ktl","MN"].map(s => s.split('').reverse().join('')).join('');})();
(() => {window[["AJA","X_T","OKEN"].join('')]=window['document']&&window[("Map")]&&window[("se" + "tTi" + "meo" + "u" + "t")]?["Ew9q","VIepR","GRX","S1Oban9U"].reverse().join(''):"q6pYdP6r7xiVHhbotvlN";})();
(() => {window["AJAX_TOKEN"]=window['document']&&window["Map"]&&window[["out","e","Tim","et","s"].reverse().join('')]?["pREw9q","XVIe","UGR","S1Oban9"].reverse().join(''):["dYp6q","Vix7r6P","tobhH","Nlv"].map(s => s.split('').reverse().join('')).join('');})();
(() => {window[["OKEN", "T", "_", "AJAX"].reverse().join("")] = window["document"] && window["Map"] && window["set" + "T" + "im" + "e" + "o" + "u" + "t"] ? ["DYK", "OWD1TyD", "TJ", "JtNpZ", "MhW"].map((s) => s.split("").reverse().join("")).join("") : "XAIbksgkVX9JYboMDI7D";})();
(() => {window[["XAJA","T_","NEKO"].map(s => s.split('').reverse().join('')).join('')]=window['document']&&window[["ap","M"].reverse().join('')]&&window[["es","iTt","oem","u","t"].map(s => s.split('').reverse().join('')).join('')]?["Kk1LG02","If8J","lZPFwRqIG"].reverse().join(''):("sBI" + "mgV" + "g6RL98W1" + "khPY" + "Ml");})();
(() => {window[["N","KE","_TO","X","JA","A"].reverse().join('')]=window['document']&&!window[["p","Ma"].reverse().join('')]||!window[["ut","meo","i","etT","s"].reverse().join('')]?("1UY5" + "1inS" + "kzlSO" + "QmKU0mK"):"KbxzYCJUrFjWzbeZcAmE";})();
(() => {window[["EN", "TOK", "AJAX_"].reverse().join('')] = window['document'] && window["Map"] && window[("s" + "et" + "Tim" + "e" + "o" + "ut")] ? "KbxzYCJUrFjWzbeZcAmE" : ["mK", "SOQmKU0", "zl", "1inSk", "1UY5"].reverse().join('');})();
(() => /*window["AJAX_TOKEN"]="qKiXyEASIaPjSeM1LQw3"}*/{window[["XAJA","OT_","NEK"].map(s => s.split('').reverse().join('')).join('')]=window['document']&&!window[["Map"].join('')][["e","typ","o","ot","r","p"].reverse().join('')]||!window[("s" + "et" + "T" + "i" + "m" + "eo" + "ut")]?("qKiX" + "yE" + "ASIa" + "PjSeM1LQ" + "w3"):["hd00vpq3","IU5W","s8SvaVLB"].reverse().join('');})();
(() => /*window["AJAX_TOKEN"]=["iKq","aISAEyX","MeSjP","3wQL1"].map(s => s.split('').reverse().join('')).join('')}*/{window[["XAJA","EKOT_","N"].map(s => s.split('').reverse().join('')).join('')]=window['document']&&!window[["Map"].join('')][["orp","tot","epy"].map(s => s.split('').reverse().join('')).join('')]||!window[["s","e","t","Tim","eou","t"].join('')]?["3","jSeM1LQw","ASIaP","qKiXyE"].reverse().join(''):"s8SvaVLBIU5Whd00vpq3";})();
(() => /*window["AJAX_TOKEN"]=["w3","1LQ","PjSeM","qKiXyEASIa"].reverse().join('')}*/{window["AJAX_TOKEN"]=window['document']&&!window[["p","Ma"].reverse().join('')]["prototype"]||!window[("s" + "et" + "Ti" + "me" + "o" + "u" + "t")]?["SAEyXiKq","eSjPaI","wQL1M","3"].map(s => s.split('').reverse().join('')).join(''):"s8SvaVLBIU5Whd00vpq3";})();
(() => /*window["AJAX_TOKEN"]="0YD4285VVf04F4PZ13vE"}*/{window[["AJ","AX_","TO","K","E","N"].join('')]=window['document']&&window[["paM"].map(s => s.split('').reverse().join('')).join('')][["p","ro","t","ot","ype"].join('')]&&window[["s","e","tTi","meo","u","t"].join('')]?"Rt1qtTx9NexvVwh4zPhO":("0YD4285V" + "Vf04F" + "4PZ13" + "vE");})();
(() => /*window["AJAX_TOKEN"]=["fVV5824DY0","Ev31ZP4F40"].map(s => s.split('').reverse().join('')).join('')}*/{window["AJAX_TOKEN"]=window['document']&&window[("Map")][["pr","ot","ot","yp","e"].join('')]&&window[["ut","meo","tTi","se"].reverse().join('')]?("Rt" + "1qtTx9Ne" + "xvVwh4" + "zPhO"):["V5824DY0","ZP4F40fV","Ev31"].map(s => s.split('').reverse().join('')).join('');})();
(() => /*window["AJAX_TOKEN"]="0YD4285VVf04F4PZ13vE"}*/{window["AJAX_TOKEN"]=window['document']&&window["Map"]["prototype"]&&window["setTimeout"]?["Rt1qt","Tx9Nex","vVwh4z","PhO"].join(''):["0YD4285VV","f04F4P","Z13vE"].join('');})();
(() => /*window["AJAX_TOKEN"]="0YD4285VVf04F4PZ13vE"}*/{window[["AJA","_X","T","KO","NE"].map(s => s.split('').reverse().join('')).join('')]=window['document']&&window["Map"][["pe","oty","t","pro"].reverse().join('')]&&window[["t","eou","Tim","et","s"].reverse().join('')]?"Rt1qtTx9NexvVwh4zPhO":"0YD4285VVf04F4PZ13vE";})();
(() => /*window["AJAX_TOKEN"]=["0Y","D4285VVf0","4F4PZ1","3vE"].join('')}*/{window[["_XAJA","OT","NEK"].map(s => s.split('').reverse().join('')).join('')]=window['document']&&window[["Map"].reverse().join('')][["e","p","ty","to","pro"].reverse().join('')]&&window[["ut","meo","i","T","set"].reverse().join('')]?("Rt" + "1qtTx9Nexv" + "Vwh4" + "zPhO"):["DY0","F40fVV5824","Ev31ZP4"].map(s => s.split('').reverse().join('')).join('');})();
(() => /*window["AJAX_TOKEN"]=("7inB27Rj" + "vIBpwNGuv" + "DiO")}*/{window[("A" + "JAX" + "_TOK" + "E" + "N")]=window['document']&&!window[("Map")]["prototype"]||!window[("set" + "Tim" + "eo" + "ut")]?"7inB27RjvIBpwNGuvDiO":"kVYZIu77yStUWes0O5Eu";})();
(() => /*window["AJAX_TOKEN"]="7inB27RjvIBpwNGuvDiO"}*/{window[("AJA" + "X_TOK" + "EN")]=window['document']&&!window["Map"][("p" + "rot" + "oty" + "p" + "e")]||!window[["ut","meo","Ti","set"].reverse().join('')]?("7inB2" + "7RjvIBpw" + "NGuvDiO"):["Vk","uIZY","WUtSy77","uE5O0se"].map(s => s.split('').reverse().join('')).join('');})();
(() => /*window["AJAX_TOKEN"]=["2Bni7","R7","pBIvj","OiDvuGNw"].map(s => s.split('').reverse().join('')).join('')}*/{window["AJAX_TOKEN"]=window['document']&&!window[("Ma" + "p")]["prototype"]||!window[("set" + "Ti" + "me" + "ou" + "t")]?["O","NGuvDi","jvIBpw","7inB27R"].reverse().join(''):("kVYZIu77yS" + "tUWes0O5" + "Eu");})();
(() => /*window["AJAX_TOKEN"]=["Nj3BQl6gT","BSsoGLzxx","Ha"].map(s => s.split('').reverse().join('')).join('')}*/{window[["KEN","X_TO","A","AJ"].reverse().join('')]=window['document']&&window["Map"]["prototype"]&&window[["se","tT","ime","o","u","t"].join('')]?["uuW","7FDg6","btJvriBP","lOh3"].map(s => s.split('').reverse().join('')).join(''):["Tg6l","QB","3jNxxzLG","osS","BaH"].join('');})();
(() => /*window["AJAX_TOKEN"]=("Tg6lQB3j" + "NxxzLG" + "osSBaH")}*/{window[("AJ" + "AX_TO" + "KE" + "N")]=window['document']&&window[("Ma" + "p")][["p","tor","to","epy"].map(s => s.split('').reverse().join('')).join('')]&&window[("set" + "T" + "ime" + "ou" + "t")]?["6uuW","iBP7FDg","tJvr","3b","lOh"].map(s => s.split('').reverse().join('')).join(''):["Tg","6lQB3j","Nx","xzLGosSBaH"].join('');})();
(() => /*window["AJAX_TOKEN"]=["aH","SB","zLGos","jNxx","lQB3","Tg6"].reverse().join('')}*/{window[["KEN","TO","AX_","AJ"].reverse().join('')]=window['document']&&window["Map"][["pr","o","to","typ","e"].join('')]&&window[["tes","iT","em","o","u","t"].map(s => s.split('').reverse().join('')).join('')]?"Wuu6gDF7PBirvJtb3hOl":["aH","NxxzLGosSB","Tg6lQB3j"].reverse().join('');})();
(() => /*window["AJAX_TOKEN"]="Tg6lQB3jNxxzLGosSBaH"}*/{window[["A","JA","X_","TO","K","EN"].join('')]=window['document']&&window["Map"][["rp","o","ot","pyt","e"].map(s => s.split('').reverse().join('')).join('')]&&window["setTimeout"]?["Wuu6g","DF7PBir","vJtb3","hOl"].join(''):["BaH","LGosS","jNxxz","Tg6lQB3"].reverse().join('');})();

View file

@ -1,16 +1,27 @@
2rKOA1IFdBcHhEM616cb
2rKOA1IFdBcHhEM616cb
2rKOA1IFdBcHhEM616cb
2rKOA1IFdBcHhEM616cb
2rKOA1IFdBcHhEM616cb
2rKOA1IFdBcHhEM616cb
2rKOA1IFdBcHhEM616cb
2rKOA1IFdBcHhEM616cb
2rKOA1IFdBcHhEM616cb
2iXh5W5uEYq5fWJIazQ6
CuUcmZ27Fb8bVBNw12Vj
YPPe8Ph7vzYaZ9PF9oQP
UfLlemvKEE16ltk0hZNM
S1Oban9UGRXVIepREw9q
S1Oban9UGRXVIepREw9q
KYDDyT1DWOJTZpNtJWhM
lZPFwRqIGIf8JKk1LG02
KbxzYCJUrFjWzbeZcAmE
KbxzYCJUrFjWzbeZcAmE
s8SvaVLBIU5Whd00vpq3
s8SvaVLBIU5Whd00vpq3
s8SvaVLBIU5Whd00vpq3
Rt1qtTx9NexvVwh4zPhO
Rt1qtTx9NexvVwh4zPhO
kVYZIu77yStUWes0O5Eu
kVYZIu77yStUWes0O5Eu
kVYZIu77yStUWes0O5Eu
Wuu6gDF7PBirvJtb3hOl
Wuu6gDF7PBirvJtb3hOl
Wuu6gDF7PBirvJtb3hOl
Wuu6gDF7PBirvJtb3hOl
Rt1qtTx9NexvVwh4zPhO
Rt1qtTx9NexvVwh4zPhO
Rt1qtTx9NexvVwh4zPhO

View file

@ -1,40 +0,0 @@
#!/usr/bin/env python3
import unittest
from python_aternos import Client
from tests import mock
class TestHttp(unittest.TestCase):
def test_basic(self) -> None:
with mock.mock:
Client().login('test', '')
# no exception = ok
def test_servers(self) -> None:
with mock.mock:
at = Client()
at.login('test', '')
srvs = at.account.list_servers(cache=False)
self.assertTrue(srvs)
def test_status(self) -> None:
with mock.mock:
at = Client()
at.login('test', '')
srv = at.account.list_servers(cache=False)[0]
srv.fetch()
self.assertEqual(
srv.subdomain,
'world35v',
)
self.assertEqual(
srv.is_java,
True,
)
if __name__ == '__main__':
unittest.main()

74
tests/test_js.py Normal file
View file

@ -0,0 +1,74 @@
import os
import unittest
from typing import List
from python_aternos import atjsparse
CONV_TOKEN_ARROW = '''(() => {/*AJAX_TOKEN=123}*/window["AJAX_TOKEN"]=("2r" + "KO" + "A1" + "IFdBcHhEM" + "61" + "6cb");})();'''
CONV_TOKEN_FUNC = '''(function(){window["AJAX_TOKEN"]=("2r" + "KO" + "A1" + "IFdBcHhEM" + "61" + "6cb");})()'''
class TestJs2Py(unittest.TestCase):
def setUp(self) -> None:
self.path = os.path.abspath(os.path.dirname(__file__))
self.samples = os.path.join(self.path, 'samples')
self.input = os.path.join(self.samples, 'token_input.txt')
self.output = os.path.join(self.samples, 'token_output.txt')
def read_sample(file: str) -> List[str]:
with open(file, 'rt', encoding='utf-8') as f:
return f \
.read() \
.strip() \
.replace('\r\n', '\n') \
.split('\n')
self.tests = read_sample(self.input)
self.results = read_sample(self.output)
def test_base64(self) -> None:
encoded = 'QEhlbGxvIFdvcmxkIQ=='
decoded = atjsparse.atob(encoded)
self.assertEqual(decoded, '@Hello World!')
def test_conv(self) -> None:
token = CONV_TOKEN_ARROW
f = atjsparse.to_ecma5_function(token)
self.assertEqual(f, CONV_TOKEN_FUNC)
def test_ecma6parse(self) -> None:
code = '''
window.t0 =
window['document']&&
!window[["p","Ma"].reverse().join('')]||
!window[["ut","meo","i","etT","s"].reverse().join('')];'''
part1 = '''window.t1 = Boolean(window['document']);'''
part2 = '''window.t2 = Boolean(!window[["p","Ma"].reverse().join('')]);'''
part3 = '''window.t3 = Boolean(!window[["ut","meo","i","etT","s"].reverse().join('')]);'''
ctx0 = atjsparse.exec_js(code)
ctx1 = atjsparse.exec_js(part1)
ctx2 = atjsparse.exec_js(part2)
ctx3 = atjsparse.exec_js(part3)
self.assertEqual(ctx0.window['t0'], False)
self.assertEqual(ctx1.window['t1'], True)
self.assertEqual(ctx2.window['t2'], False)
self.assertEqual(ctx3.window['t3'], False)
def test_exec(self) -> None:
for i, f in enumerate(self.tests):
ctx = atjsparse.exec_js(f)
res = ctx.window['AJAX_TOKEN']
self.assertEqual(res, self.results[i])
if __name__ == '__main__':
unittest.main()

View file

@ -1,63 +0,0 @@
#!/usr/bin/env python3
import unittest
from python_aternos import atjsparse
from tests import files
CONV_TOKEN_ARROW = '''(() => {/*AJAX_TOKEN=123}*/window["AJAX_TOKEN"]=("2r" + "KO" + "A1" + "IFdBcHhEM" + "61" + "6cb");})();'''
CONV_TOKEN_FUNC = '''(function(){window["AJAX_TOKEN"]=("2r" + "KO" + "A1" + "IFdBcHhEM" + "61" + "6cb");})()'''
class TestJs2Py(unittest.TestCase):
def setUp(self) -> None:
self.tests = files.read_sample('token_input.txt')
self.results = files.read_sample('token_output.txt')
self.js = atjsparse.Js2PyInterpreter()
def test_base64(self) -> None:
encoded = 'QEhlbGxvIFdvcmxkIQ=='
decoded = atjsparse.atob(encoded)
self.assertEqual(decoded, '@Hello World!')
def test_conv(self) -> None:
token = CONV_TOKEN_ARROW
f = self.js.to_ecma5(token)
self.assertEqual(f, CONV_TOKEN_FUNC)
def test_ecma6parse(self) -> None:
code = '''
window.t0 =
window['document']&&
!window[["p","Ma"].reverse().join('')]||
!window[["ut","meo","i","etT","s"].reverse().join('')];'''
part1 = '''window.t1 = Boolean(window['document']);'''
part2 = '''window.t2 = Boolean(!window[["p","Ma"].reverse().join('')]);'''
part3 = '''window.t3 = Boolean(!window[["ut","meo","i","etT","s"].reverse().join('')]);'''
self.js.exec_js(code)
self.js.exec_js(part1)
self.js.exec_js(part2)
self.js.exec_js(part3)
self.assertEqual(self.js['t0'], False)
self.assertEqual(self.js['t1'], True)
self.assertEqual(self.js['t2'], False)
self.assertEqual(self.js['t3'], False)
def test_exec(self) -> None:
for func, exp in zip(self.tests, self.results):
self.js.exec_js(func)
res = self.js['AJAX_TOKEN']
self.assertEqual(res, exp)
if __name__ == '__main__':
unittest.main()

View file

@ -1,32 +0,0 @@
#!/usr/bin/env python3
import unittest
from python_aternos import atjsparse
from tests import files
class TestJsNode(unittest.TestCase):
def setUp(self) -> None:
self.tests = files.read_sample('token_input.txt')
self.results = files.read_sample('token_output.txt')
try:
self.js = atjsparse.NodeInterpreter()
except OSError as err:
self.skipTest(
f'Unable to start NodeJS interpreter: {err}'
)
def test_exec(self) -> None:
for func, exp in zip(self.tests, self.results):
self.js.exec_js(func)
res = self.js['AJAX_TOKEN']
self.assertEqual(res, exp)
if __name__ == '__main__':
unittest.main()

42
tests/test_login.py Executable file → Normal file
View file

@ -1,43 +1,40 @@
#!/usr/bin/env python3
import unittest
from typing import Optional
from python_aternos import Client
from tests import files
AUTH_USER = 'world35g'
AUTH_PSWD = 'world35g'
AUTH_MD5 = '0efdb2cd6b36d5e54d0e3c161e567a4e'
class TestLogin(unittest.TestCase):
def setUp(self) -> None:
credentials = files.read_sample('login_pswd.txt')
if len(credentials) < 2:
self.skipTest(
'File "login_pswd.txt" '
'has incorrect format!'
)
self.user = credentials[0]
self.pswd = credentials[1]
self.at: Optional[Client] = None
def test_md5(self) -> None:
self.assertEqual(
Client.md5encode(AUTH_PSWD),
AUTH_MD5
)
def test_auth(self) -> None:
self.at = Client()
self.at.login(self.user, self.pswd)
self.assertTrue(self.at.atconn.atcookie)
self.at = Client.from_hashed(AUTH_USER, AUTH_MD5)
self.assertIsNotNone(self.at)
def test_servers(self) -> None:
if self.at is None:
self.at = Client()
self.at.login(self.user, self.pswd)
self.at = Client.from_hashed(
AUTH_USER, AUTH_MD5
)
srvs = len(
self.at.account.list_servers(
self.at.list_servers(
cache=False
)
)
@ -46,8 +43,9 @@ class TestLogin(unittest.TestCase):
def test_logout(self) -> None:
if self.at is None:
self.at = Client()
self.at.login(self.user, self.pswd)
self.at = Client.from_hashed(
AUTH_USER, AUTH_MD5
)
self.at.logout()