Bugfixes: Cloudflare bypassing, JS parser; tests, pylint, makefile

This commit is contained in:
DarkCat09 2022-08-22 09:55:08 +04:00
parent 1055628241
commit dc52f92985
19 changed files with 142 additions and 72 deletions

View file

@ -1,8 +1,13 @@
build:
sudo python -m build
python -m build
upload:
python -m twine upload dist/*
clean:
sudo rm -rf dist/ python_aternos.egg-info/
rm -rf dist/ python_aternos.egg-info/
rm -rf .mypy_cache/ python_aternos/__pycache__/
check:
chmod +x test.sh
bash test.sh

View file

@ -39,12 +39,9 @@ disable=raw-checker-failed,
useless-suppression,
deprecated-pragma,
use-symbolic-message-instead,
wrong-import-order,
unspecified-encoding,
logging-not-lazy,
logging-fstring-interpolation,
no-member,
too-many-branches,
too-many-arguments,
too-many-public-methods,
too-many-instance-attributes

View file

@ -28,8 +28,7 @@ from .aterrors import ServerError
from .aterrors import ServerStartError
from .aterrors import FileError
from .aterrors import AternosPermissionError
from .atjsparse import exec_js, atob
from .atjsparse import to_ecma5_function
from .atjsparse import exec_js
__all__ = [
@ -43,8 +42,7 @@ __all__ = [
'FileManager', 'AternosFile', 'AternosError',
'CloudflareError', 'CredentialsError', 'TokenError',
'ServerError', 'ServerStartError', 'FileError',
'AternosPermissionError',
'exec_js', 'atob', 'to_ecma5_function',
'AternosPermissionError', 'exec_js',
'Edition', 'Status', 'Lists',
'ServerOpts', 'WorldOpts', 'WorldRules',

View file

@ -4,9 +4,11 @@ and allows to manage your account"""
import os
import re
import hashlib
import lxml.html
from typing import List, Optional
import lxml.html
from .atserver import AternosServer
from .atconnect import AternosConnect
from .aterrors import CredentialsError
@ -119,7 +121,7 @@ class Client:
"""
file = os.path.expanduser(file)
with open(file, 'rt') as f:
with open(file, 'rt', encoding='utf-8') as f:
saved = f.read().replace('\r\n', '\n').split('\n')
session = saved[0].strip()
@ -164,7 +166,7 @@ class Client:
"""
file = os.path.expanduser(file)
with open(file, 'wt') as f:
with open(file, 'wt', encoding='utf-8') as f:
f.write(self.atconn.atsession + '\n')
if not incl_servers:

View file

@ -2,10 +2,12 @@
import enum
import re
import lxml.html
from typing import Any, Dict, List, Union, Optional
from typing import TYPE_CHECKING
import lxml.html
if TYPE_CHECKING:
from .atserver import AternosServer

View file

@ -1,11 +1,15 @@
"""Stores API connection session and sends requests"""
import re
import time
import random
import logging
from requests import Response
from cloudscraper import CloudScraper
from functools import partial
from typing import Optional, Union
from requests import Response
from cloudscraper import CloudScraper
from . import atjsparse
from .aterrors import TokenError
@ -160,8 +164,7 @@ class AternosConnect:
headers: Optional[dict] = None,
reqcookies: Optional[dict] = None,
sendtoken: bool = False,
redirect: bool = True,
retry: int = 3) -> Response:
retry: int = 5) -> Response:
"""Sends a request to Aternos API bypass Cloudflare
@ -181,11 +184,8 @@ class AternosConnect:
:param sendtoken: If the ajax and SEC token
should be sent, defaults to False
:type sendtoken: bool, optional
:param redirect: If requests lib should follow
Location header in 3xx responses, defaults to True
:type redirect: bool, optional
:param retry: How many times parser must retry
connection to API bypass Cloudflare, defaults to 3
connection to API bypass Cloudflare, defaults to 5
:type retry: int, optional
:raises CloudflareError:
When the parser has exceeded retries count
@ -198,21 +198,26 @@ class AternosConnect:
if retry <= 0:
raise CloudflareError('Unable to bypass Cloudflare protection')
old_cookies = self.session.cookies
self.session = CloudScraper()
self.session.cookies.update(old_cookies)
try:
self.atsession = self.session.cookies['ATERNOS_SESSION']
except KeyError:
# don't rewrite atsession value
pass
params = params or {}
data = data or {}
headers = headers or {}
reqcookies = reqcookies or {}
method = method or 'GET'
method = method.upper().strip()
if method not in ('GET', 'POST'):
raise NotImplementedError('Only GET and POST are available')
headers = headers or {}
params = params or {}
data = data or {}
reqcookies = reqcookies or {}
if sendtoken:
params['TOKEN'] = self.token
params['SEC'] = self.sec
@ -230,26 +235,35 @@ class AternosConnect:
logging.debug(f'session-cookies={self.session.cookies}')
if method == 'POST':
req = self.session.post(
url, data=data, params=params,
headers=headers, cookies=reqcookies,
allow_redirects=redirect
sendreq = partial(
self.session.post,
params=params,
data=data
)
else:
req = self.session.get(
url, params={**params, **data},
headers=headers, cookies=reqcookies,
allow_redirects=redirect
sendreq = partial(
self.session.get,
params={**params, **data}
)
if '<title>Please Wait... | Cloudflare</title>' in req.text:
req = sendreq(
url,
headers=headers,
cookies=reqcookies
)
resp_type = req.headers.get('content-type', '')
html_type = resp_type.find('text/html') != -1
cloudflare = req.status_code == 403
if html_type and cloudflare:
logging.info('Retrying to bypass Cloudflare')
time.sleep(0.2)
return self.request_cloudflare(
url, method,
params, data,
headers, reqcookies,
sendtoken, redirect,
retry - 1
sendtoken, retry - 1
)
logging.debug('AternosConnect received: ' + req.text[:65])

View file

@ -1,10 +1,12 @@
"""File info object used by `python_aternos.atfm`"""
import enum
import lxml.html
from typing import Union
from typing import TYPE_CHECKING
import lxml.html
from .aterrors import FileError
if TYPE_CHECKING:

View file

@ -1,9 +1,10 @@
"""Exploring files in your server directory"""
import lxml.html
from typing import Union, Optional, Any, List
from typing import TYPE_CHECKING
import lxml.html
from .atfile import AternosFile, FileType
if TYPE_CHECKING:
from .atserver import AternosServer

View file

@ -1,10 +1,12 @@
"""Parsing and executing JavaScript code"""
import regex
import base64
import js2py
from typing import Any
import regex
import js2py
# Thanks to http://regex.inginf.units.it/
arrowexp = regex.compile(r'\w[^\}]*+')
@ -19,6 +21,7 @@ def to_ecma5_function(f: str) -> str:
:rtype: str
"""
f = regex.sub(r'/\*.+?\*/', '', f)
match = arrowexp.search(f)
conv = '(function(){' + match.group(0) + '})()'
return regex.sub(

View file

@ -1,10 +1,12 @@
"""Operators, whitelist and banned players lists"""
import enum
import lxml.html
from typing import List, Union
from typing import TYPE_CHECKING
import lxml.html
if TYPE_CHECKING:
from .atserver import AternosServer

View file

@ -2,8 +2,9 @@
import enum
import json
from requests import Response
from typing import Optional, List
from requests import Response
from .atconnect import AternosConnect
from .aterrors import ServerStartError

View file

@ -5,12 +5,14 @@ import enum
import json
import asyncio
import logging
import websockets
from typing import Union, Any
from typing import Dict, Tuple
from typing import Tuple, Dict
from typing import Callable, Coroutine
from typing import TYPE_CHECKING
import websockets
from .atconnect import REQUA
if TYPE_CHECKING:
from .atserver import AternosServer
@ -55,10 +57,11 @@ class AternosWss:
autoconfirm: bool = False) -> None:
self.atserv = atserv
self.cookies = atserv.atconn.session.cookies
self.session = self.cookies['ATERNOS_SESSION']
self.servid = atserv.servid
cookies = atserv.atconn.session.cookies
self.session = cookies['ATERNOS_SESSION']
recvtype = Dict[Streams, ArgsTuple]
self.recv: recvtype = {}
self.autoconfirm = autoconfirm

View file

@ -1,5 +1,5 @@
lxml>=4.8.0
cloudscraper>=1.2.58
cloudscraper>=1.2.60
js2py>=0.71
websockets>=10.1
regex>=2022.3.15

View file

@ -5,7 +5,7 @@ with open('README.md', 'rt') as readme:
setuptools.setup(
name='python-aternos',
version='1.1.1',
version='1.1.2',
author='Chechkenev Andrey (@DarkCat09)',
author_email='aacd0709@mail.ru',
description='An unofficial Aternos API',
@ -36,7 +36,7 @@ setuptools.setup(
],
install_requires=[
'lxml>=4.8.0',
'cloudscraper>=1.2.58',
'cloudscraper>=1.2.60',
'js2py>=0.71',
'websockets>=10.1',
'regex>=2022.3.15'

11
test.sh
View file

@ -1,4 +1,4 @@
failed=()
failed=''
title () {
@ -16,7 +16,7 @@ error_msg () {
ERR='\033[1;31m'
if (( $1 )); then
failed+=$2
failed+="$2, "
echo -e "$ERR[X] Found errors$RESET"
else
echo -e "$OK[V] Passed successfully$RESET"
@ -29,12 +29,11 @@ display_failed() {
FAILED='\033[1;33m'
SUCCESS='\033[1;32m'
local IFS=', '
if [[ ${#failed[@]} > 0 ]]; then
joined=`echo -n ${failed[*]} | sed 's/ /, /'`
if [[ $failed != '' ]]; then
joined=`echo -n "$failed" | sed 's/, $//'`
echo -e "$FAILED[!] View output of: $joined$RESET"
else
echo -e "$SUCCESS[V] All tests are passed successfully$RESET"
echo -e "$SUCCESS[V] All checks are passed successfully$RESET"
fi
}

View file

@ -17,3 +17,11 @@
(() => {window[["XAJA","T_","NEKO"].map(s => s.split('').reverse().join('')).join('')]=window['document']&&window[["ap","M"].reverse().join('')]&&window[["es","iTt","oem","u","t"].map(s => s.split('').reverse().join('')).join('')]?["Kk1LG02","If8J","lZPFwRqIG"].reverse().join(''):("sBI" + "mgV" + "g6RL98W1" + "khPY" + "Ml");})();
(() => {window[["N","KE","_TO","X","JA","A"].reverse().join('')]=window['document']&&!window[["p","Ma"].reverse().join('')]||!window[["ut","meo","i","etT","s"].reverse().join('')]?("1UY5" + "1inS" + "kzlSO" + "QmKU0mK"):"KbxzYCJUrFjWzbeZcAmE";})();
(() => {window[["EN", "TOK", "AJAX_"].reverse().join('')] = window['document'] && window["Map"] && window[("s" + "et" + "Tim" + "e" + "o" + "ut")] ? "KbxzYCJUrFjWzbeZcAmE" : ["mK", "SOQmKU0", "zl", "1inSk", "1UY5"].reverse().join('');})();
(() => /*window["AJAX_TOKEN"]="qKiXyEASIaPjSeM1LQw3"}*/{window[["XAJA","OT_","NEK"].map(s => s.split('').reverse().join('')).join('')]=window['document']&&!window[["Map"].join('')][["e","typ","o","ot","r","p"].reverse().join('')]||!window[("s" + "et" + "T" + "i" + "m" + "eo" + "ut")]?("qKiX" + "yE" + "ASIa" + "PjSeM1LQ" + "w3"):["hd00vpq3","IU5W","s8SvaVLB"].reverse().join('');})();
(() => /*window["AJAX_TOKEN"]=["iKq","aISAEyX","MeSjP","3wQL1"].map(s => s.split('').reverse().join('')).join('')}*/{window[["XAJA","EKOT_","N"].map(s => s.split('').reverse().join('')).join('')]=window['document']&&!window[["Map"].join('')][["orp","tot","epy"].map(s => s.split('').reverse().join('')).join('')]||!window[["s","e","t","Tim","eou","t"].join('')]?["3","jSeM1LQw","ASIaP","qKiXyE"].reverse().join(''):"s8SvaVLBIU5Whd00vpq3";})();
(() => /*window["AJAX_TOKEN"]=["w3","1LQ","PjSeM","qKiXyEASIa"].reverse().join('')}*/{window["AJAX_TOKEN"]=window['document']&&!window[["p","Ma"].reverse().join('')]["prototype"]||!window[("s" + "et" + "Ti" + "me" + "o" + "u" + "t")]?["SAEyXiKq","eSjPaI","wQL1M","3"].map(s => s.split('').reverse().join('')).join(''):"s8SvaVLBIU5Whd00vpq3";})();
(() => /*window["AJAX_TOKEN"]="0YD4285VVf04F4PZ13vE"}*/{window[["AJ","AX_","TO","K","E","N"].join('')]=window['document']&&window[["paM"].map(s => s.split('').reverse().join('')).join('')][["p","ro","t","ot","ype"].join('')]&&window[["s","e","tTi","meo","u","t"].join('')]?"Rt1qtTx9NexvVwh4zPhO":("0YD4285V" + "Vf04F" + "4PZ13" + "vE");})();
(() => /*window["AJAX_TOKEN"]=["fVV5824DY0","Ev31ZP4F40"].map(s => s.split('').reverse().join('')).join('')}*/{window["AJAX_TOKEN"]=window['document']&&window[("Map")][["pr","ot","ot","yp","e"].join('')]&&window[["ut","meo","tTi","se"].reverse().join('')]?("Rt" + "1qtTx9Ne" + "xvVwh4" + "zPhO"):["V5824DY0","ZP4F40fV","Ev31"].map(s => s.split('').reverse().join('')).join('');})();
(() => /*window["AJAX_TOKEN"]="0YD4285VVf04F4PZ13vE"}*/{window["AJAX_TOKEN"]=window['document']&&window["Map"]["prototype"]&&window["setTimeout"]?["Rt1qt","Tx9Nex","vVwh4z","PhO"].join(''):["0YD4285VV","f04F4P","Z13vE"].join('');})();
(() => /*window["AJAX_TOKEN"]="0YD4285VVf04F4PZ13vE"}*/{window[["AJA","_X","T","KO","NE"].map(s => s.split('').reverse().join('')).join('')]=window['document']&&window["Map"][["pe","oty","t","pro"].reverse().join('')]&&window[["t","eou","Tim","et","s"].reverse().join('')]?"Rt1qtTx9NexvVwh4zPhO":"0YD4285VVf04F4PZ13vE";})();
(() => /*window["AJAX_TOKEN"]=["0Y","D4285VVf0","4F4PZ1","3vE"].join('')}*/{window[["_XAJA","OT","NEK"].map(s => s.split('').reverse().join('')).join('')]=window['document']&&window[["Map"].reverse().join('')][["e","p","ty","to","pro"].reverse().join('')]&&window[["ut","meo","i","T","set"].reverse().join('')]?("Rt" + "1qtTx9Nexv" + "Vwh4" + "zPhO"):["DY0","F40fVV5824","Ev31ZP4"].map(s => s.split('').reverse().join('')).join('');})();

View file

@ -17,3 +17,11 @@ KYDDyT1DWOJTZpNtJWhM
lZPFwRqIGIf8JKk1LG02
KbxzYCJUrFjWzbeZcAmE
KbxzYCJUrFjWzbeZcAmE
s8SvaVLBIU5Whd00vpq3
s8SvaVLBIU5Whd00vpq3
s8SvaVLBIU5Whd00vpq3
Rt1qtTx9NexvVwh4zPhO
Rt1qtTx9NexvVwh4zPhO
Rt1qtTx9NexvVwh4zPhO
Rt1qtTx9NexvVwh4zPhO
Rt1qtTx9NexvVwh4zPhO

View file

@ -1,10 +1,10 @@
import os
import re
import unittest
from typing import List
from python_aternos import atjsparse
CONV_TOKEN_ARROW = '''(() => {window["AJAX_TOKEN"]=("2r" + "KO" + "A1" + "IFdBcHhEM" + "61" + "6cb");})();'''
CONV_TOKEN_ARROW = '''(() => {/*AJAX_TOKEN=123}*/window["AJAX_TOKEN"]=("2r" + "KO" + "A1" + "IFdBcHhEM" + "61" + "6cb");})();'''
CONV_TOKEN_FUNC = '''(function(){window["AJAX_TOKEN"]=("2r" + "KO" + "A1" + "IFdBcHhEM" + "61" + "6cb");})()'''
@ -17,17 +17,16 @@ class TestJs2Py(unittest.TestCase):
self.input = os.path.join(self.samples, 'token_input.txt')
self.output = os.path.join(self.samples, 'token_output.txt')
self.tests = []
with open(self.input, 'rt') as f:
lines = re.split(r'[\r\n]', f.read())
del lines[-1] # remove empty line at the end
self.tests = lines
def read_sample(file: str) -> List[str]:
with open(file, 'rt', encoding='utf-8') as f:
return f \
.read() \
.strip() \
.replace('\r\n', '\n') \
.split('\n')
self.results = []
with open(self.output, 'rt') as f:
lines = re.split(r'[\r\n]', f.read())
del lines[-1] # remove empty line at the end
self.results = lines
self.tests = read_sample(self.input)
self.results = read_sample(self.output)
def test_base64(self) -> None:
@ -69,7 +68,3 @@ class TestJs2Py(unittest.TestCase):
ctx = atjsparse.exec_js(f)
res = ctx.window['AJAX_TOKEN']
self.assertEqual(res, self.results[i])
def tearDown(self) -> None:
del self.tests
del self.results

30
tests/test_login.py Normal file
View file

@ -0,0 +1,30 @@
import unittest
from python_aternos import Client
AUTH_USER = 'world35g'
AUTH_PSWD = 'world35g'
AUTH_MD5 = '0efdb2cd6b36d5e54d0e3c161e567a4e'
class TestLogin(unittest.TestCase):
def test_md5(self) -> None:
self.assertEqual(
Client.md5encode(AUTH_PSWD),
AUTH_MD5
)
def test_auth(self) -> None:
at = Client.from_hashed(AUTH_USER, AUTH_MD5)
self.assertIsNotNone(at)
def test_servers(self) -> None:
at = Client.from_hashed(
AUTH_USER, AUTH_MD5
)
srvs = len(at.list_servers())
self.assertTrue(srvs > 0)