This repository has been archived on 2024-07-30. You can view files and clone it, but cannot push or open issues or pull requests.
python-aternos/python_aternos/atconnect.py

301 lines
9 KiB
Python
Raw Normal View History

2022-07-01 13:28:39 +03:00
"""Stores API connection session and sends requests"""
2021-10-08 18:35:20 +03:00
import re
import time
2022-10-05 18:24:00 +03:00
import secrets
import logging
from functools import partial
2022-10-05 18:24:00 +03:00
from typing import Optional
2022-10-31 16:24:00 +03:00
from typing import List, Dict, Any
import requests
2021-10-08 18:35:20 +03:00
from cloudscraper import CloudScraper
2021-11-01 17:04:19 +03:00
from . import atjsparse
2022-07-01 13:28:39 +03:00
from .aterrors import TokenError
from .aterrors import CloudflareError
from .aterrors import AternosPermissionError
2021-10-27 19:42:41 +03:00
2022-07-01 13:28:39 +03:00
REQUA = \
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 ' \
'(KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36 OPR/85.0.4341.47'
2021-10-08 18:35:20 +03:00
2022-10-31 16:24:00 +03:00
ARROW_FN_REGEX = r'\(\(\)(.*?)\)\(\);'
SCRIPT_TAG_REGEX = (
rb'<script type=([\'"]?)text/javascript\1>.+?</script>'
)
2022-06-23 14:13:56 +03:00
class AternosConnect:
2021-10-08 18:35:20 +03:00
2022-09-29 17:18:15 +03:00
"""Class for sending API requests,
bypassing Cloudflare and parsing responses"""
2022-06-23 14:13:56 +03:00
def __init__(self) -> None:
2021-11-01 17:04:19 +03:00
self.cf_init = partial(CloudScraper)
self.session = self.cf_init()
2022-07-01 13:28:39 +03:00
self.sec = ''
self.token = ''
self.atcookie = ''
2021-11-01 17:04:19 +03:00
def add_args(self, **kwargs) -> None:
"""Pass arguments to CloudScarper
session object __init__
if kwargs is not empty
Args:
**kwargs: Keyword arguments
"""
if len(kwargs) < 1:
logging.debug('**kwargs is empty')
return
logging.debug('New args for CloudScraper: %s', kwargs)
self.cf_init = partial(CloudScraper, **kwargs)
self.refresh_session()
def clear_args(self) -> None:
"""Clear CloudScarper object __init__ arguments
which was set using add_args method"""
logging.debug('Creating session object with no keywords')
self.cf_init = partial(CloudScraper)
self.refresh_session()
def refresh_session(self) -> None:
"""Creates a new CloudScraper
session object and copies all cookies.
Required for bypassing Cloudflare"""
old_cookies = self.session.cookies
self.session = self.cf_init()
self.session.cookies.update(old_cookies)
del old_cookies
2022-06-23 14:13:56 +03:00
def parse_token(self) -> str:
"""Parses Aternos ajax token that
is needed for most requests
Raises:
RuntimeWarning: If the parser can not
find `<head>` tag in HTML response
TokenError: If the parser is unable
to extract ajax token from HTML
Returns:
Aternos ajax token
2022-06-23 14:13:56 +03:00
"""
2022-06-23 14:13:56 +03:00
loginpage = self.request_cloudflare(
2022-07-01 13:28:39 +03:00
'https://aternos.org/go/', 'GET'
2022-06-23 14:13:56 +03:00
).content
2021-11-01 17:04:19 +03:00
2022-06-23 14:13:56 +03:00
# Using the standard string methods
# instead of the expensive xml parsing
head = b'<head>'
headtag = loginpage.find(head)
headend = loginpage.find(b'</head>', headtag + len(head))
# Some checks
if headtag < 0 or headend < 0:
pagehead = loginpage
2022-10-31 16:24:00 +03:00
logging.warning(
2022-06-23 14:13:56 +03:00
'Unable to find <head> tag, parsing the whole page'
)
2022-10-31 16:24:00 +03:00
else:
# Extracting <head> content
headtag = headtag + len(head)
pagehead = loginpage[headtag:headend]
2022-06-23 14:13:56 +03:00
2022-10-31 16:24:00 +03:00
js_code: Optional[List[Any]] = None
2022-06-23 14:13:56 +03:00
try:
text = pagehead.decode('utf-8', 'replace')
2022-10-31 16:24:00 +03:00
js_code = re.findall(ARROW_FN_REGEX, text)
token_func = js_code[0]
if len(js_code) > 1:
token_func = js_code[1]
2022-06-23 14:13:56 +03:00
2022-12-25 16:51:29 +03:00
js = atjsparse.get_interpreter()
js.exec_js(token_func)
self.token = js['AJAX_TOKEN']
2022-06-23 14:13:56 +03:00
2022-07-01 13:28:39 +03:00
except (IndexError, TypeError) as err:
2022-10-31 16:24:00 +03:00
logging.warning('---')
logging.warning('Unable to parse AJAX_TOKEN!')
logging.warning('Please, insert the info below')
logging.warning('to the GitHub issue description:')
logging.warning('---')
logging.warning('JavaScript: %s', js_code)
logging.warning(
'All script tags: %s',
re.findall(SCRIPT_TAG_REGEX, pagehead)
)
logging.warning('---')
2022-06-23 14:13:56 +03:00
raise TokenError(
'Unable to parse TOKEN from the page'
2022-07-01 13:28:39 +03:00
) from err
2022-06-23 14:13:56 +03:00
return self.token
def generate_sec(self) -> str:
"""Generates Aternos SEC token which
is also needed for most API requests
Returns:
Random SEC `key:value` string
2022-06-23 14:13:56 +03:00
"""
2022-10-05 18:24:00 +03:00
randkey = secrets.token_hex(8)
randval = secrets.token_hex(8)
2022-06-23 14:13:56 +03:00
self.sec = f'{randkey}:{randval}'
self.session.cookies.set(
f'ATERNOS_SEC_{randkey}', randval,
domain='aternos.org'
)
return self.sec
def request_cloudflare(
self, url: str, method: str,
params: Optional[Dict[Any, Any]] = None,
data: Optional[Dict[Any, Any]] = None,
headers: Optional[Dict[Any, Any]] = None,
reqcookies: Optional[Dict[Any, Any]] = None,
2022-06-23 14:13:56 +03:00
sendtoken: bool = False,
retry: int = 5) -> requests.Response:
2022-06-23 14:13:56 +03:00
"""Sends a request to Aternos API bypass Cloudflare
Args:
url (str): Request URL
method (str): Request method, must be GET or POST
params (Optional[Dict[Any, Any]], optional): URL parameters
data (Optional[Dict[Any, Any]], optional): POST request data,
if the method is GET, this dict will be combined with params
headers (Optional[Dict[Any, Any]], optional): Custom headers
reqcookies (Optional[Dict[Any, Any]], optional):
Cookies only for this request
sendtoken (bool, optional): If the ajax and SEC token
should be sent
retry (int, optional): How many times parser must retry
connection to API bypass Cloudflare
Raises:
CloudflareError: When the parser has exceeded retries count
NotImplementedError: When the specified method is not GET or POST
Returns:
API response
2022-06-23 14:13:56 +03:00
"""
if retry <= 0:
raise CloudflareError('Unable to bypass Cloudflare protection')
try:
self.atcookie = self.session.cookies['ATERNOS_SESSION']
except KeyError:
pass
self.refresh_session()
params = params or {}
data = data or {}
headers = headers or {}
reqcookies = reqcookies or {}
2022-06-23 14:13:56 +03:00
method = method or 'GET'
method = method.upper().strip()
if method not in ('GET', 'POST'):
raise NotImplementedError('Only GET and POST are available')
if sendtoken:
params['TOKEN'] = self.token
params['SEC'] = self.sec
headers['X-Requested-With'] = 'XMLHttpRequest'
# requests.cookies.CookieConflictError bugfix
reqcookies['ATERNOS_SESSION'] = self.atcookie
2022-06-23 14:13:56 +03:00
del self.session.cookies['ATERNOS_SESSION']
reqcookies_dbg = {
k: str(v or '')[:3]
for k, v in reqcookies.items()
}
session_cookies_dbg = {
k: str(v or '')[:3]
for k, v in self.session.cookies.items()
}
2022-09-30 13:39:16 +03:00
logging.debug('Requesting(%s)%s', method, url)
logging.debug('headers=%s', headers)
logging.debug('params=%s', params)
logging.debug('data=%s', data)
logging.debug('req-cookies=%s', reqcookies_dbg)
logging.debug('session-cookies=%s', session_cookies_dbg)
2022-06-23 14:13:56 +03:00
if method == 'POST':
sendreq = partial(
self.session.post,
params=params,
data=data
2022-06-23 14:13:56 +03:00
)
else:
sendreq = partial(
self.session.get,
params={**params, **data}
2022-06-23 14:13:56 +03:00
)
req = sendreq(
url,
headers=headers,
cookies=reqcookies
)
resp_type = req.headers.get('content-type', '')
html_type = resp_type.find('text/html') != -1
cloudflare = req.status_code == 403
if html_type and cloudflare:
2022-06-23 14:13:56 +03:00
logging.info('Retrying to bypass Cloudflare')
2022-11-03 17:04:28 +03:00
time.sleep(0.3)
return self.request_cloudflare(
2022-06-23 14:13:56 +03:00
url, method,
params, data,
headers, reqcookies,
sendtoken, retry - 1
2022-06-23 14:13:56 +03:00
)
2022-09-30 13:39:16 +03:00
logging.debug('AternosConnect received: %s', req.text[:65])
2022-06-23 14:13:56 +03:00
logging.info(
2022-09-30 13:39:16 +03:00
'%s completed with %s status',
method, req.status_code
2022-06-23 14:13:56 +03:00
)
2022-07-01 13:28:39 +03:00
if req.status_code == 402:
raise AternosPermissionError
2022-07-01 09:31:23 +03:00
2022-07-01 13:28:39 +03:00
req.raise_for_status()
2022-06-23 14:13:56 +03:00
return req
@property
def atsession(self) -> str:
"""Aternos session cookie,
empty string if not logged in
Returns:
Session cookie
"""
return self.session.cookies.get(
'ATERNOS_SESSION', ''
)