This repository has been archived on 2024-07-30. You can view files and clone it, but cannot push or open issues or pull requests.
python-aternos/python_aternos/atconnect.py

287 lines
8.5 KiB
Python
Raw Normal View History

2022-07-01 14:28:39 +04:00
"""Stores API connection session and sends requests"""
2021-10-08 19:35:20 +04:00
import re
import time
2021-10-08 19:35:20 +04:00
import random
import logging
from functools import partial
from typing import Optional, Union
from typing import Dict, Any
import requests
2021-10-08 19:35:20 +04:00
from cloudscraper import CloudScraper
2021-11-01 18:04:19 +04:00
from . import atjsparse
2022-07-01 14:28:39 +04:00
from .aterrors import TokenError
from .aterrors import CloudflareError
from .aterrors import AternosPermissionError
2021-10-27 19:42:41 +03:00
2022-07-01 14:28:39 +04:00
REQUA = \
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 ' \
'(KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36 OPR/85.0.4341.47'
2021-10-08 19:35:20 +04:00
2022-06-23 15:13:56 +04:00
class AternosConnect:
2021-10-08 19:35:20 +04:00
2022-09-29 18:18:15 +04:00
"""Class for sending API requests,
bypassing Cloudflare and parsing responses"""
2022-06-23 15:13:56 +04:00
def __init__(self) -> None:
2021-11-01 18:04:19 +04:00
2022-06-23 15:13:56 +04:00
self.session = CloudScraper()
2022-07-01 14:28:39 +04:00
self.sec = ''
self.token = ''
2021-11-01 18:04:19 +04:00
2022-06-23 15:13:56 +04:00
def parse_token(self) -> str:
2021-11-01 18:04:19 +04:00
2022-06-23 15:13:56 +04:00
"""Parses Aternos ajax token that
is needed for most requests
Raises:
RuntimeWarning: If the parser can not
find `<head>` tag in HTML response
TokenError: If the parser is unable
to extract ajax token from HTML
Returns:
Aternos ajax token
2022-06-23 15:13:56 +04:00
"""
2022-06-23 15:13:56 +04:00
loginpage = self.request_cloudflare(
2022-07-01 14:28:39 +04:00
'https://aternos.org/go/', 'GET'
2022-06-23 15:13:56 +04:00
).content
2021-11-01 18:04:19 +04:00
2022-06-23 15:13:56 +04:00
# Using the standard string methods
# instead of the expensive xml parsing
head = b'<head>'
headtag = loginpage.find(head)
headend = loginpage.find(b'</head>', headtag + len(head))
# Some checks
if headtag < 0 or headend < 0:
pagehead = loginpage
raise RuntimeWarning(
'Unable to find <head> tag, parsing the whole page'
)
# Extracting <head> content
headtag = headtag + len(head)
pagehead = loginpage[headtag:headend]
try:
text = pagehead.decode('utf-8', 'replace')
js_code = re.findall(r'\(\(\)(.*?)\)\(\);', text)
token_func = js_code[0]
if len(js_code) > 1:
token_func = js_code[1]
2022-06-23 15:13:56 +04:00
2022-07-01 14:28:39 +04:00
ctx = atjsparse.exec_js(token_func)
2022-06-23 15:13:56 +04:00
self.token = ctx.window['AJAX_TOKEN']
2022-07-01 14:28:39 +04:00
except (IndexError, TypeError) as err:
2022-06-23 15:13:56 +04:00
raise TokenError(
'Unable to parse TOKEN from the page'
2022-07-01 14:28:39 +04:00
) from err
2022-06-23 15:13:56 +04:00
return self.token
def generate_sec(self) -> str:
"""Generates Aternos SEC token which
is also needed for most API requests
Returns:
Random SEC `key:value` string
2022-06-23 15:13:56 +04:00
"""
randkey = self.generate_aternos_rand()
randval = self.generate_aternos_rand()
self.sec = f'{randkey}:{randval}'
self.session.cookies.set(
f'ATERNOS_SEC_{randkey}', randval,
domain='aternos.org'
)
return self.sec
def generate_aternos_rand(self, randlen: int = 16) -> str:
"""Generates a random string using
Aternos algorithm from main.js file
Args:
randlen (int, optional): Random string length
Returns:
Random string for SEC token
2022-06-23 15:13:56 +04:00
"""
# a list with randlen+1 empty strings:
# generate a string with spaces,
# then split it by space
2022-07-01 14:28:39 +04:00
rand_arr = (' ' * (randlen + 1)).split(' ')
2022-06-23 15:13:56 +04:00
rand = random.random()
rand_alphanum = self.convert_num(rand, 36) + ('0' * 17)
2022-07-01 14:28:39 +04:00
return rand_alphanum[:18].join(rand_arr)[:randlen]
2022-06-23 15:13:56 +04:00
def convert_num(
self, num: Union[int, float, str],
base: int, frombase: int = 10) -> str:
"""Converts an integer to specified base
Args:
num (Union[int,float,str]): Integer in any base to convert.
If it is a float starting with `0.`,
zero and point will be removed to get int
base (int): New base
frombase (int, optional): Given number base
Returns:
Number converted to a specified base
2022-06-23 15:13:56 +04:00
"""
if isinstance(num, str):
num = int(num, frombase)
if isinstance(num, float):
sliced = str(num)[2:]
num = int(sliced)
symbols = '0123456789abcdefghijklmnopqrstuvwxyz'
basesym = symbols[:base]
result = ''
while num > 0:
rem = num % base
result = str(basesym[rem]) + result
num //= base
return result
def request_cloudflare(
self, url: str, method: str,
params: Optional[Dict[Any, Any]] = None,
data: Optional[Dict[Any, Any]] = None,
headers: Optional[Dict[Any, Any]] = None,
reqcookies: Optional[Dict[Any, Any]] = None,
2022-06-23 15:13:56 +04:00
sendtoken: bool = False,
retry: int = 5) -> requests.Response:
2022-06-23 15:13:56 +04:00
"""Sends a request to Aternos API bypass Cloudflare
Args:
url (str): Request URL
method (str): Request method, must be GET or POST
params (Optional[Dict[Any, Any]], optional): URL parameters
data (Optional[Dict[Any, Any]], optional): POST request data,
if the method is GET, this dict will be combined with params
headers (Optional[Dict[Any, Any]], optional): Custom headers
reqcookies (Optional[Dict[Any, Any]], optional):
Cookies only for this request
sendtoken (bool, optional): If the ajax and SEC token
should be sent
retry (int, optional): How many times parser must retry
connection to API bypass Cloudflare
Raises:
CloudflareError: When the parser has exceeded retries count
NotImplementedError: When the specified method is not GET or POST
Returns:
API response
2022-06-23 15:13:56 +04:00
"""
if retry <= 0:
raise CloudflareError('Unable to bypass Cloudflare protection')
old_cookies = self.session.cookies
self.session = CloudScraper()
self.session.cookies.update(old_cookies)
params = params or {}
data = data or {}
headers = headers or {}
reqcookies = reqcookies or {}
2022-06-23 15:13:56 +04:00
method = method or 'GET'
method = method.upper().strip()
if method not in ('GET', 'POST'):
raise NotImplementedError('Only GET and POST are available')
if sendtoken:
params['TOKEN'] = self.token
params['SEC'] = self.sec
headers['X-Requested-With'] = 'XMLHttpRequest'
# requests.cookies.CookieConflictError bugfix
reqcookies['ATERNOS_SESSION'] = self.atsession
del self.session.cookies['ATERNOS_SESSION']
2022-07-01 14:28:39 +04:00
logging.debug(f'Requesting({method}){url}')
logging.debug(f'headers={headers}')
logging.debug(f'params={params}')
logging.debug(f'data={data}')
logging.debug(f'req-cookies={reqcookies}')
logging.debug(f'session-cookies={self.session.cookies}')
2022-06-23 15:13:56 +04:00
if method == 'POST':
sendreq = partial(
self.session.post,
params=params,
data=data
2022-06-23 15:13:56 +04:00
)
else:
sendreq = partial(
self.session.get,
params={**params, **data}
2022-06-23 15:13:56 +04:00
)
req = sendreq(
url,
headers=headers,
cookies=reqcookies
)
resp_type = req.headers.get('content-type', '')
html_type = resp_type.find('text/html') != -1
cloudflare = req.status_code == 403
if html_type and cloudflare:
2022-06-23 15:13:56 +04:00
logging.info('Retrying to bypass Cloudflare')
time.sleep(0.2)
return self.request_cloudflare(
2022-06-23 15:13:56 +04:00
url, method,
params, data,
headers, reqcookies,
sendtoken, retry - 1
2022-06-23 15:13:56 +04:00
)
logging.debug('AternosConnect received: ' + req.text[:65])
2022-06-23 15:13:56 +04:00
logging.info(
f'{method} completed with {req.status_code} status'
)
2022-07-01 14:28:39 +04:00
if req.status_code == 402:
raise AternosPermissionError
2022-07-01 10:31:23 +04:00
2022-07-01 14:28:39 +04:00
req.raise_for_status()
2022-06-23 15:13:56 +04:00
return req
@property
def atsession(self) -> str:
"""Aternos session cookie,
empty string if not logged in
Returns:
Session cookie
"""
return self.session.cookies.get(
'ATERNOS_SESSION', ''
)