Module docstrings, pylint, pep8

This commit is contained in:
DarkCat09 2022-07-01 14:28:39 +04:00
parent b80defe9b3
commit 074e8fd6ed
15 changed files with 603 additions and 83 deletions

212
pylintrc Normal file
View file

@ -0,0 +1,212 @@
[MAIN]
analyse-fallback-blocks=no
extension-pkg-allow-list=
extension-pkg-whitelist=
fail-on=
fail-under=10.0
ignore=CVS
ignore-paths=
ignore-patterns=^\.#
ignored-modules=
jobs=4
limit-inference-results=100
load-plugins=
persistent=yes
py-version=3.10
recursive=no
suggestion-mode=yes
unsafe-load-any-extension=no
[REPORTS]
evaluation=max(0, 0 if fatal else 10.0 - ((float(5 * error + warning + refactor + convention) / statement) * 10))
msg-template=
reports=no
score=yes
[MESSAGES CONTROL]
confidence=HIGH,
CONTROL_FLOW,
INFERENCE,
INFERENCE_FAILURE,
UNDEFINED
disable=raw-checker-failed,
bad-inline-option,
locally-disabled,
file-ignored,
suppressed-message,
useless-suppression,
deprecated-pragma,
use-symbolic-message-instead,
wrong-import-order,
unspecified-encoding,
logging-not-lazy,
logging-fstring-interpolation,
no-member,
too-many-branches,
too-many-arguments,
too-many-public-methods,
too-many-instance-attributes
enable=c-extension-no-member
[SIMILARITIES]
ignore-comments=yes
ignore-docstrings=yes
ignore-imports=yes
ignore-signatures=yes
min-similarity-lines=4
[MISCELLANEOUS]
notes=FIXME,
XXX,
TODO
notes-rgx=
[DESIGN]
exclude-too-few-public-methods=
ignored-parents=
max-args=5
max-attributes=7
max-bool-expr=5
max-branches=12
max-locals=15
max-parents=7
max-public-methods=20
max-returns=6
max-statements=50
min-public-methods=2
[STRING]
check-quote-consistency=no
check-str-concat-over-line-jumps=no
[CLASSES]
check-protected-access-in-special-methods=no
defining-attr-methods=__init__,
__new__,
setUp,
__post_init__
exclude-protected=_asdict,
_fields,
_replace,
_source,
_make
valid-classmethod-first-arg=cls
valid-metaclass-classmethod-first-arg=cls
[FORMAT]
expected-line-ending-format=
ignore-long-lines=^\s*(# )?<?https?://\S+>?$
indent-after-paren=4
indent-string=' '
max-line-length=100
max-module-lines=1000
single-line-class-stmt=no
single-line-if-stmt=no
[IMPORTS]
allow-any-import-level=
allow-wildcard-with-all=no
deprecated-modules=
ext-import-graph=
import-graph=
int-import-graph=
known-standard-library=
known-third-party=enchant
preferred-modules=
[VARIABLES]
additional-builtins=
allow-global-unused-variables=yes
allowed-redefined-builtins=
callbacks=cb_,
_cb
dummy-variables-rgx=_+$|(_[a-zA-Z0-9_]*[a-zA-Z0-9]+?$)|dummy|^ignored_|^unused_
ignored-argument-names=_.*|^ignored_|^unused_
init-import=no
redefining-builtins-modules=six.moves,past.builtins,future.builtins,builtins,io
[LOGGING]
logging-format-style=new
logging-modules=logging
[EXCEPTIONS]
overgeneral-exceptions=BaseException,
Exception
[BASIC]
argument-naming-style=snake_case
attr-naming-style=snake_case
bad-names=foo,
bar,
baz,
toto,
tutu,
tata
bad-names-rgxs=
class-attribute-naming-style=any
class-const-naming-style=any
class-naming-style=PascalCase
const-naming-style=UPPER_CASE
docstring-min-length=-1
function-naming-style=snake_case
good-names=i,
j,
k,
f,
s,
ex,
Run,
_
good-names-rgxs=
include-naming-hint=no
inlinevar-naming-style=any
method-naming-style=snake_case
module-naming-style=snake_case
name-group=
no-docstring-rgx=^_
property-classes=abc.abstractproperty
variable-naming-style=snake_case
[SPELLING]
max-spelling-suggestions=4
spelling-dict=
spelling-ignore-comment-directives=fmt: on,fmt: off,noqa:,noqa,nosec,isort:skip,mypy:
spelling-ignore-words=
spelling-private-dict-file=
spelling-store-unknown-words=no
[TYPECHECK]
contextmanager-decorators=contextlib.contextmanager
generated-members=
ignore-none=yes
ignore-on-opaque-inference=yes
ignored-checks-for-mixins=no-member,
not-async-context-manager,
not-context-manager,
attribute-defined-outside-init
ignored-classes=optparse.Values,thread._local,_thread._local,argparse.Namespace
missing-member-hint=yes
missing-member-hint-distance=1
missing-member-max-choices=1
mixin-class-rgx=.*[Mm]ixin
signature-mutators=
[REFACTORING]
max-nested-blocks=5
never-returning-functions=sys.exit,argparse.parse_error

View file

@ -1,3 +1,7 @@
"""
Unofficial Aternos API module written in Python.
It uses Aternos' private API and html parsing"""
from .atclient import Client from .atclient import Client
from .atserver import AternosServer from .atserver import AternosServer
from .atserver import Edition from .atserver import Edition
@ -23,8 +27,8 @@ from .aterrors import TokenError
from .aterrors import ServerError from .aterrors import ServerError
from .aterrors import ServerStartError from .aterrors import ServerStartError
from .aterrors import FileError from .aterrors import FileError
from .aterrors import PermissionError from .aterrors import AternosPermissionError
from .atjsparse import exec, atob from .atjsparse import exec_js, atob
from .atjsparse import to_ecma5_function from .atjsparse import to_ecma5_function
__all__ = [ __all__ = [
@ -39,8 +43,8 @@ __all__ = [
'FileManager', 'AternosFile', 'AternosError', 'FileManager', 'AternosFile', 'AternosError',
'CloudflareError', 'CredentialsError', 'TokenError', 'CloudflareError', 'CredentialsError', 'TokenError',
'ServerError', 'ServerStartError', 'FileError', 'ServerError', 'ServerStartError', 'FileError',
'PermissionError', 'AternosPermissionError',
'exec', 'atob', 'to_ecma5_function', 'exec_js', 'atob', 'to_ecma5_function',
'Edition', 'Status', 'Lists', 'Edition', 'Status', 'Lists',
'ServerOpts', 'WorldOpts', 'WorldRules', 'ServerOpts', 'WorldOpts', 'WorldRules',

View file

@ -1,3 +1,6 @@
"""Entry point. Authorizes on Aternos
and allows to manage your account"""
import os import os
import re import re
import hashlib import hashlib
@ -11,7 +14,7 @@ from .aterrors import CredentialsError
class Client: class Client:
"""Aternos API Client class whose object contains user's auth data """Aternos API Client class object of which contains user's auth data
:param atconn: :class:`python_aternos.atconnect.AternosConnect` :param atconn: :class:`python_aternos.atconnect.AternosConnect`
instance with initialized Aternos session instance with initialized Aternos session
@ -47,7 +50,7 @@ class Client:
} }
loginreq = atconn.request_cloudflare( loginreq = atconn.request_cloudflare(
f'https://aternos.org/panel/ajax/account/login.php', 'https://aternos.org/panel/ajax/account/login.php',
'POST', data=credentials, sendtoken=True 'POST', data=credentials, sendtoken=True
) )

View file

@ -1,3 +1,5 @@
"""Modifying server and world options"""
import enum import enum
import re import re
import lxml.html import lxml.html
@ -302,10 +304,25 @@ class AternosConfig:
def set_world_props( def set_world_props(
self, self,
props: Dict[Union[WorldOpts, WorldRules], Any]) -> None: props: Dict[Union[WorldOpts, WorldRules], Any],
world: str = 'world') -> None:
"""Sets level.dat options from
the dictionary for the specified world
:param props: Level.dat options
:type props: Dict[Union[WorldOpts, WorldRules], Any]
:param world: name of the world which
level.dat must be edited, defaults to 'world'
:type world: str
"""
for key in props: for key in props:
self.set_world_prop(key, props[key]) self.set_world_prop(
option=key,
value=props[key],
world=world
)
# #
# helpers # helpers

View file

@ -1,3 +1,5 @@
"""Stores API connection session and sends requests"""
import re import re
import random import random
import logging import logging
@ -6,9 +8,13 @@ from cloudscraper import CloudScraper
from typing import Optional, Union from typing import Optional, Union
from . import atjsparse from . import atjsparse
from .aterrors import TokenError, CloudflareError from .aterrors import TokenError
from .aterrors import CloudflareError
from .aterrors import AternosPermissionError
REQUA = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36 OPR/85.0.4341.47' REQUA = \
'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 ' \
'(KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36 OPR/85.0.4341.47'
class AternosConnect: class AternosConnect:
@ -21,6 +27,8 @@ class AternosConnect:
self.session = CloudScraper() self.session = CloudScraper()
self.atsession = '' self.atsession = ''
self.sec = ''
self.token = ''
def parse_token(self) -> str: def parse_token(self) -> str:
@ -36,7 +44,7 @@ class AternosConnect:
""" """
loginpage = self.request_cloudflare( loginpage = self.request_cloudflare(
f'https://aternos.org/go/', 'GET' 'https://aternos.org/go/', 'GET'
).content ).content
# Using the standard string methods # Using the standard string methods
@ -61,13 +69,13 @@ class AternosConnect:
js_code = re.findall(r'\(\(\)(.*?)\)\(\);', text) js_code = re.findall(r'\(\(\)(.*?)\)\(\);', text)
token_func = js_code[1] if len(js_code) > 1 else js_code[0] token_func = js_code[1] if len(js_code) > 1 else js_code[0]
ctx = atjsparse.exec(token_func) ctx = atjsparse.exec_js(token_func)
self.token = ctx.window['AJAX_TOKEN'] self.token = ctx.window['AJAX_TOKEN']
except (IndexError, TypeError): except (IndexError, TypeError) as err:
raise TokenError( raise TokenError(
'Unable to parse TOKEN from the page' 'Unable to parse TOKEN from the page'
) ) from err
return self.token return self.token
@ -104,12 +112,12 @@ class AternosConnect:
# a list with randlen+1 empty strings: # a list with randlen+1 empty strings:
# generate a string with spaces, # generate a string with spaces,
# then split it by space # then split it by space
rand_arr = (' ' * (randlen+1)).split(' ') rand_arr = (' ' * (randlen + 1)).split(' ')
rand = random.random() rand = random.random()
rand_alphanum = self.convert_num(rand, 36) + ('0' * 17) rand_alphanum = self.convert_num(rand, 36) + ('0' * 17)
return (rand_alphanum[:18].join(rand_arr)[:randlen]) return rand_alphanum[:18].join(rand_arr)[:randlen]
def convert_num( def convert_num(
self, num: Union[int, float, str], self, num: Union[int, float, str],
@ -214,12 +222,12 @@ class AternosConnect:
reqcookies['ATERNOS_SESSION'] = self.atsession reqcookies['ATERNOS_SESSION'] = self.atsession
del self.session.cookies['ATERNOS_SESSION'] del self.session.cookies['ATERNOS_SESSION']
logging.debug(f'Requesting({method})' + url) logging.debug(f'Requesting({method}){url}')
logging.debug('headers=' + str(headers)) logging.debug(f'headers={headers}')
logging.debug('params=' + str(params)) logging.debug(f'params={params}')
logging.debug('data=' + str(data)) logging.debug(f'data={data}')
logging.debug('req-cookies=' + str(reqcookies)) logging.debug(f'req-cookies={reqcookies}')
logging.debug('session-cookies=' + str(self.session.cookies)) logging.debug(f'session-cookies={self.session.cookies}')
if method == 'POST': if method == 'POST':
req = self.session.post( req = self.session.post(
@ -249,6 +257,8 @@ class AternosConnect:
f'{method} completed with {req.status_code} status' f'{method} completed with {req.status_code} status'
) )
req.raise_for_status() if req.status_code == 402:
raise AternosPermissionError
req.raise_for_status()
return req return req

View file

@ -1,3 +1,5 @@
"""Exceptions classes"""
from typing import Final from typing import Final
@ -81,7 +83,9 @@ class FileError(AternosError):
by Aternos file operation""" by Aternos file operation"""
class PermissionError(AternosError): # PermissionError is a built-in,
# so this exception called AternosPermissionError
class AternosPermissionError(AternosError):
"""Raised when trying to execute a disallowed command, """Raised when trying to execute a disallowed command,
usually because of shared access rights""" usually because of shared access rights"""

View file

@ -1,3 +1,5 @@
"""File info object used by `python_aternos.atfm`"""
import enum import enum
import lxml.html import lxml.html
from typing import Union from typing import Union
@ -86,7 +88,7 @@ class AternosFile:
""" """
self.atserv.atserver_request( self.atserv.atserver_request(
f'https://aternos.org/panel/ajax/save.php', 'https://aternos.org/panel/ajax/save.php',
'POST', data={ 'POST', data={
'file': self._full, 'file': self._full,
'content': value 'content': value
@ -123,29 +125,74 @@ class AternosFile:
self.set_content(value.encode('utf-8')) self.set_content(value.encode('utf-8'))
@property @property
def path(self): def path(self) -> str:
"""Path to a directory which
contains the file, without leading slash
:return: Full path to directory
:rtype: str
"""
return self._path return self._path
@property @property
def name(self) -> str: def name(self) -> str:
"""Filename including extension
:return: Filename
:rtype: str
"""
return self._name return self._name
@property @property
def full(self) -> str: def full(self) -> str:
"""Absolute path to the file,
without leading slash
:return: Full path
:rtype: str
"""
return self._full return self._full
@property @property
def is_dir(self) -> bool: def is_dir(self) -> bool:
"""Check if the file object is a directory
:return: `True` if the file
is a directory, otherwise `False`
:rtype: bool
"""
if self._ftype == FileType.directory: if self._ftype == FileType.directory:
return True return True
return False return False
@property @property
def is_file(self) -> bool: def is_file(self) -> bool:
"""Check if the file object is not a directory
:return: `True` if it is a file, otherwise `False`
:rtype: bool
"""
if self._ftype == FileType.file: if self._ftype == FileType.file:
return True return True
return False return False
@property @property
def size(self) -> float: def size(self) -> float:
"""File size in bytes
:return: File size
:rtype: float
"""
return self._size return self._size

View file

@ -1,5 +1,7 @@
"""Exploring files in your server directory"""
import lxml.html import lxml.html
from typing import Union, Optional, List from typing import Union, Optional, Any, List
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from .atfile import AternosFile, FileType from .atfile import AternosFile, FileType
@ -32,10 +34,12 @@ class FileManager:
""" """
path = path.lstrip('/') path = path.lstrip('/')
filesreq = self.atserv.atserver_request( filesreq = self.atserv.atserver_request(
f'https://aternos.org/files/{path}', 'GET' f'https://aternos.org/files/{path}', 'GET'
) )
filestree = lxml.html.fromstring(filesreq.content) filestree = lxml.html.fromstring(filesreq.content)
fileslist = filestree.xpath( fileslist = filestree.xpath(
'//div[contains(concat(" ",normalize-space(@class)," ")," file ")]' '//div[contains(concat(" ",normalize-space(@class)," ")," file ")]'
) )
@ -48,18 +52,9 @@ class FileManager:
if ftype_raw == 'file' \ if ftype_raw == 'file' \
else FileType.directory else FileType.directory
fsize_raw = f.xpath('./div[@class="filesize"]') fsize = self.extract_size(
fsize = 0.0 f.xpath('./div[@class="filesize"]')
if len(fsize_raw) > 0: )
fsize_text = fsize_raw[0].text.strip()
fsize_num = fsize_text[:fsize_text.rfind(' ')]
fsize_msr = fsize_text[fsize_text.rfind(' ')+1:]
try:
fsize = self.convert_size(float(fsize_num), fsize_msr)
except ValueError:
fsize = -1
fullpath = f.xpath('@data-path')[0] fullpath = f.xpath('@data-path')[0]
filepath = fullpath[:fullpath.rfind('/')] filepath = fullpath[:fullpath.rfind('/')]
@ -74,7 +69,33 @@ class FileManager:
return files return files
def convert_size(self, num: Union[int, float], measure: str) -> float: def extract_size(self, fsize_raw: List[Any]) -> float:
"""Parses file size from the LXML tree
:param fsize_raw: XPath method result
:type fsize_raw: List[Any]
:return: File size in bytes
:rtype: float
"""
if len(fsize_raw) > 0:
fsize_text = fsize_raw[0].text.strip()
fsize_num = fsize_text[:fsize_text.rfind(' ')]
fsize_msr = fsize_text[fsize_text.rfind(' ') + 1:]
try:
return self.convert_size(float(fsize_num), fsize_msr)
except ValueError:
return -1.0
return 0.0
def convert_size(
self,
num: Union[int, float],
measure: str) -> float:
"""Converts "human" file size to size in bytes """Converts "human" file size to size in bytes
@ -92,10 +113,7 @@ class FileManager:
'MB': 1000000, 'MB': 1000000,
'GB': 1000000000 'GB': 1000000000
} }
try: return measure_match.get(measure, -1) * num
return num * measure_match[measure]
except KeyError:
return -1
def get_file(self, path: str) -> Optional[AternosFile]: def get_file(self, path: str) -> Optional[AternosFile]:
@ -128,7 +146,7 @@ class FileManager:
:rtype: bytes :rtype: bytes
""" """
file = self.atserv.atserver_request( file = self.atserv.atserver_request( # type: ignore
'https://aternos.org/panel/ajax/files/download.php' 'https://aternos.org/panel/ajax/files/download.php'
'GET', params={ 'GET', params={
'file': path.replace('/', '%2F') 'file': path.replace('/', '%2F')
@ -148,11 +166,11 @@ class FileManager:
:rtype: bytes :rtype: bytes
""" """
world = self.atserv.atserver_request( resp = self.atserv.atserver_request( # type: ignore
'https://aternos.org/panel/ajax/worlds/download.php' 'https://aternos.org/panel/ajax/worlds/download.php'
'GET', params={ 'GET', params={
'world': world.replace('/', '%2F') 'world': world.replace('/', '%2F')
} }
) )
return world.content return resp.content

View file

@ -1,3 +1,5 @@
"""Parsing and executing JavaScript code"""
import regex import regex
import base64 import base64
import js2py import js2py
@ -27,10 +29,19 @@ def to_ecma5_function(f: str) -> str:
def atob(s: str) -> str: def atob(s: str) -> str:
"""Decodes base64 string
:param s: Encoded data
:type s: str
:return: Decoded string
:rtype: str
"""
return base64.standard_b64decode(str(s)).decode('utf-8') return base64.standard_b64decode(str(s)).decode('utf-8')
def exec(f: str) -> Any: def exec_js(f: str) -> Any:
"""Executes a JavaScript function """Executes a JavaScript function

View file

@ -1,9 +1,10 @@
"""Operators, whitelist and banned players lists"""
import enum import enum
import lxml.html import lxml.html
from typing import List, Union from typing import List, Union
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from .atserver import Edition
if TYPE_CHECKING: if TYPE_CHECKING:
from .atserver import AternosServer from .atserver import AternosServer
@ -31,13 +32,18 @@ class PlayersList:
:type atserv: python_aternos.atserver.AternosServer :type atserv: python_aternos.atserver.AternosServer
""" """
def __init__(self, lst: Union[str, Lists], atserv: 'AternosServer') -> None: def __init__(
self,
lst: Union[str, Lists],
atserv: 'AternosServer') -> None:
self.atserv = atserv self.atserv = atserv
self.lst = Lists(lst) self.lst = Lists(lst)
common_whl = (self.lst == Lists.whl) common_whl = (self.lst == Lists.whl)
bedrock = (atserv.edition == Edition.bedrock) # 1 is atserver.Edition.bedrock
bedrock = (atserv.edition == 1)
if common_whl and bedrock: if common_whl and bedrock:
self.lst = Lists.whl_be self.lst = Lists.whl_be

View file

@ -1,3 +1,5 @@
"""Aternos Minecraft server"""
import enum import enum
import json import json
from requests import Response from requests import Response
@ -85,7 +87,10 @@ class AternosServer:
return AternosWss(self, autoconfirm) return AternosWss(self, autoconfirm)
def start(self, headstart: bool = False, accepteula: bool = True) -> None: def start(
self,
headstart: bool = False,
accepteula: bool = True) -> None:
"""Starts a server """Starts a server
@ -113,7 +118,8 @@ class AternosServer:
if error == 'eula' and accepteula: if error == 'eula' and accepteula:
self.eula() self.eula()
return self.start(accepteula=False) self.start(accepteula=False)
return
raise ServerStartError(error) raise ServerStartError(error)
@ -238,11 +244,25 @@ class AternosServer:
@property @property
def subdomain(self) -> str: def subdomain(self) -> str:
"""Server subdomain (part of domain before `.aternos.me`)
:return: Subdomain
:rtype: str
"""
atdomain = self.domain atdomain = self.domain
return atdomain[:atdomain.find('.')] return atdomain[:atdomain.find('.')]
@subdomain.setter @subdomain.setter
def subdomain(self, value: str) -> None: def subdomain(self, value: str) -> None:
"""Set new subdomain for your server
:param value: Subdomain
:type value: str
"""
self.atserver_request( self.atserver_request(
'https://aternos.org/panel/ajax/options/subdomain.php', 'https://aternos.org/panel/ajax/options/subdomain.php',
'GET', params={'subdomain': value}, 'GET', params={'subdomain': value},
@ -251,10 +271,25 @@ class AternosServer:
@property @property
def motd(self) -> str: def motd(self) -> str:
"""Server message of the day,
which is shown below its name in the servers list
:return: MOTD
:rtype: str
"""
return self._info['motd'] return self._info['motd']
@motd.setter @motd.setter
def motd(self, value: str) -> None: def motd(self, value: str) -> None:
"""Set new message of the day
:param value: MOTD
:type value: str
"""
self.atserver_request( self.atserver_request(
'https://aternos.org/panel/ajax/options/motd.php', 'https://aternos.org/panel/ajax/options/motd.php',
'POST', data={'motd': value}, 'POST', data={'motd': value},
@ -263,49 +298,135 @@ class AternosServer:
@property @property
def address(self) -> str: def address(self) -> str:
"""Full server address including domain and port
:return: Server address
:rtype: str
"""
return self._info['displayAddress'] return self._info['displayAddress']
@property @property
def domain(self) -> str: def domain(self) -> str:
"""Server domain (test.aternos.me),
address without port number
:return: Domain
:rtype: str
"""
return self._info['ip'] return self._info['ip']
@property @property
def port(self) -> int: def port(self) -> int:
"""Server port number
:return: Port
:rtype: int
"""
return self._info['port'] return self._info['port']
@property @property
def edition(self) -> int: def edition(self) -> Edition:
"""Server software edition: Java or Bedrock
:return: Software edition
:rtype: Edition
"""
soft_type = self._info['bedrock'] soft_type = self._info['bedrock']
return int(soft_type) return Edition(soft_type)
@property @property
def software(self) -> str: def software(self) -> str:
"""Server software name (e.g. `Vanilla`)
:return: Software name
:rtype: str
"""
return self._info['software'] return self._info['software']
@property @property
def version(self) -> str: def version(self) -> str:
"""Server software version (e.g. `1.16.5`)
:return: Software version
:rtype: str
"""
return self._info['version'] return self._info['version']
@property @property
def status(self) -> str: def status(self) -> str:
"""Server status string (offline, loading)
:return: Status string
:rtype: str
"""
return self._info['class'] return self._info['class']
@property @property
def status_num(self) -> int: def status_num(self) -> int:
return int(self._info['status'])
"""Server numeric status. It is highly recommended
to use status string instead of a number.
:return: Status code
:rtype: Status
"""
return Status(self._info['status'])
@property @property
def players_list(self) -> List[str]: def players_list(self) -> List[str]:
"""List of connected players nicknames
:return: Connected players
:rtype: List[str]
"""
return self._info['playerlist'] return self._info['playerlist']
@property @property
def players_count(self) -> int: def players_count(self) -> int:
"""How many connected players
:return: Connected players count
:rtype: int
"""
return int(self._info['players']) return int(self._info['players'])
@property @property
def slots(self) -> int: def slots(self) -> int:
"""Server slots, how many players can connect
:return: Slots count
:rtype: int
"""
return int(self._info['slots']) return int(self._info['slots'])
@property @property
def ram(self) -> int: def ram(self) -> int:
"""Server used RAM in MB
:return: Used RAM
:rtype: int
"""
return int(self._info['ram']) return int(self._info['ram'])

View file

@ -1,16 +1,24 @@
"""Connects to Aternos API websocket
for real-time information"""
import enum import enum
import json import json
import asyncio import asyncio
import logging import logging
import websockets import websockets
from typing import Union, Any, Dict, Callable, Coroutine, Tuple from typing import Union, Any
from typing import Dict, Tuple
from typing import Callable, Coroutine
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from .atconnect import REQUA from .atconnect import REQUA
if TYPE_CHECKING: if TYPE_CHECKING:
from .atserver import AternosServer from .atserver import AternosServer
FunctionT = Callable[[Any], Coroutine[Any, Any, None]] OneArgT = Callable[[Any], Coroutine[Any, Any, None]]
TwoArgT = Callable[[Any, Tuple[Any, ...]], Coroutine[Any, Any, None]]
FunctionT = Union[OneArgT, TwoArgT]
ArgsTuple = Tuple[FunctionT, Tuple[Any, ...]]
class Streams(enum.Enum): class Streams(enum.Enum):
@ -22,6 +30,7 @@ class Streams(enum.Enum):
console = (2, 'console') console = (2, 'console')
ram = (3, 'heap') ram = (3, 'heap')
tps = (4, 'tick') tps = (4, 'tick')
none = (-1, None)
def __init__(self, num: int, stream: str) -> None: def __init__(self, num: int, stream: str) -> None:
self.num = num self.num = num
@ -40,24 +49,35 @@ class AternosWss:
:type autoconfirm: bool, optional :type autoconfirm: bool, optional
""" """
def __init__(self, atserv: 'AternosServer', autoconfirm: bool = False) -> None: def __init__(
self,
atserv: 'AternosServer',
autoconfirm: bool = False) -> None:
self.atserv = atserv self.atserv = atserv
self.cookies = atserv.atconn.session.cookies self.cookies = atserv.atconn.session.cookies
self.session = self.cookies['ATERNOS_SESSION'] self.session = self.cookies['ATERNOS_SESSION']
self.servid = atserv.servid self.servid = atserv.servid
recvtype = Dict[Streams, Tuple[FunctionT, Tuple[Any]]]
recvtype = Dict[Streams, ArgsTuple]
self.recv: recvtype = {} self.recv: recvtype = {}
self.autoconfirm = autoconfirm self.autoconfirm = autoconfirm
self.confirmed = False self.confirmed = False
self.socket: Any
self.keep: asyncio.Task
self.msgs: asyncio.Task
async def confirm(self) -> None: async def confirm(self) -> None:
"""Simple way to call AternosServer.confirm from this class""" """Simple way to call AternosServer.confirm from this class"""
self.atserv.confirm() self.atserv.confirm()
def wssreceiver(self, stream: Streams, *args: Any) -> Callable[[FunctionT], Any]: def wssreceiver(
self,
stream: Streams,
*args: Any) -> Callable[[FunctionT], Any]:
"""Decorator that marks your function as a stream receiver. """Decorator that marks your function as a stream receiver.
When websocket receives message from the specified stream, When websocket receives message from the specified stream,
@ -88,7 +108,7 @@ class AternosWss:
f'ATERNOS_SERVER={self.servid}' f'ATERNOS_SERVER={self.servid}'
) )
] ]
self.socket = await websockets.connect( self.socket = await websockets.connect( # type: ignore
'wss://aternos.org/hermes/', 'wss://aternos.org/hermes/',
origin='https://aternos.org', origin='https://aternos.org',
extra_headers=headers extra_headers=headers
@ -192,11 +212,11 @@ class AternosWss:
"""Receives messages from websocket servers """Receives messages from websocket servers
and calls user's streams listeners""" and calls user's streams listeners"""
try:
while True: while True:
try:
data = await self.socket.recv() data = await self.socket.recv()
obj = json.loads(data) obj = json.loads(data)
msgtype = -1 msgtype = Streams.none
if obj['type'] == 'line': if obj['type'] == 'line':
msgtype = Streams.console msgtype = Streams.console
@ -217,18 +237,19 @@ class AternosWss:
if msgtype in self.recv: if msgtype in self.recv:
# function info tuple: # function info tuple
# (function, arguments) func: ArgsTuple = self.recv[msgtype]
func = self.recv[msgtype]
# if arguments is not empty # if arguments is not empty
if func[1]: if func[1]:
# call the function with args # call the function with args
coro = func[0](msg, func[1]) coro = func[0](msg, func[1]) # type: ignore
else: else:
coro = func[0](msg) # mypy error: Too few arguments
# looks like bug, so it is ignored
coro = func[0](msg) # type: ignore
# run # run
await asyncio.create_task(coro) asyncio.create_task(coro)
except asyncio.CancelledError: except asyncio.CancelledError:
pass break

5
setup.cfg Normal file
View file

@ -0,0 +1,5 @@
[mypy]
ignore_missing_imports = True
[pycodestyle]
ignore = E501

49
test.sh
View file

@ -1,9 +1,40 @@
failed=()
title () { title () {
RESET='\033[0m'
COLOR='\033[1;36m'
echo echo
echo "***" echo -e "$COLOR[#] $1$RESET"
echo "$1" }
echo "***"
echo error_msg () {
RESET='\033[0m'
OK='\033[1;32m'
ERR='\033[1;31m'
if (( $1 )); then
failed+=$2
echo -e "$ERR[X] Found errors$RESET"
else
echo -e "$OK[V] Passed successfully$RESET"
fi
}
display_failed() {
RESET='\033[0m'
FAILED='\033[1;33m'
SUCCESS='\033[1;32m'
local IFS=', '
if [[ ${#failed[@]} > 0 ]]; then
echo -e "$FAILED[!] View output for: ${failed[*]}$RESET"
else
echo -e "$SUCCESS[V] All tests are passed successfully$RESET"
fi
} }
title 'Checking needed modules...' title 'Checking needed modules...'
@ -11,9 +42,19 @@ pip install pycodestyle mypy pylint
title 'Running unit tests...' title 'Running unit tests...'
python -m unittest discover -v ./tests python -m unittest discover -v ./tests
error_msg $? 'unittest'
title 'Running pep8 checker...' title 'Running pep8 checker...'
python -m pycodestyle . python -m pycodestyle .
error_msg $? 'pep8'
title 'Running mypy checker...' title 'Running mypy checker...'
python -m mypy . python -m mypy .
error_msg $? 'mypy'
title 'Running pylint checker...'
python -m pylint ./python_aternos
error_msg $? 'pylint'
display_failed
echo

View file

@ -5,7 +5,7 @@ import unittest
from python_aternos import atjsparse from python_aternos import atjsparse
CONV_TOKEN_ARROW = '''(() => {window["AJAX_TOKEN"]=("2r" + "KO" + "A1" + "IFdBcHhEM" + "61" + "6cb");})();''' CONV_TOKEN_ARROW = '''(() => {window["AJAX_TOKEN"]=("2r" + "KO" + "A1" + "IFdBcHhEM" + "61" + "6cb");})();'''
CONV_TOKEN_FUNC = '(function(){window["AJAX_TOKEN"]=("2r" + "KO" + "A1" + "IFdBcHhEM" + "61" + "6cb");})()' CONV_TOKEN_FUNC = '''(function(){window["AJAX_TOKEN"]=("2r" + "KO" + "A1" + "IFdBcHhEM" + "61" + "6cb");})()'''
class TestJs2Py(unittest.TestCase): class TestJs2Py(unittest.TestCase):
@ -53,10 +53,10 @@ class TestJs2Py(unittest.TestCase):
part2 = '''window.t2 = Boolean(!window[["p","Ma"].reverse().join('')]);''' part2 = '''window.t2 = Boolean(!window[["p","Ma"].reverse().join('')]);'''
part3 = '''window.t3 = Boolean(!window[["ut","meo","i","etT","s"].reverse().join('')]);''' part3 = '''window.t3 = Boolean(!window[["ut","meo","i","etT","s"].reverse().join('')]);'''
ctx0 = atjsparse.exec(code) ctx0 = atjsparse.exec_js(code)
ctx1 = atjsparse.exec(part1) ctx1 = atjsparse.exec_js(part1)
ctx2 = atjsparse.exec(part2) ctx2 = atjsparse.exec_js(part2)
ctx3 = atjsparse.exec(part3) ctx3 = atjsparse.exec_js(part3)
self.assertEqual(ctx0.window['t0'], False) self.assertEqual(ctx0.window['t0'], False)
self.assertEqual(ctx1.window['t1'], True) self.assertEqual(ctx1.window['t1'], True)
@ -66,7 +66,7 @@ class TestJs2Py(unittest.TestCase):
def test_exec(self) -> None: def test_exec(self) -> None:
for i, f in enumerate(self.tests): for i, f in enumerate(self.tests):
ctx = atjsparse.exec(f) ctx = atjsparse.exec_js(f)
res = ctx.window['AJAX_TOKEN'] res = ctx.window['AJAX_TOKEN']
self.assertEqual(res, self.results[i]) self.assertEqual(res, self.results[i])