PEP8, mkdocs, corrected logo

This commit is contained in:
DarkCat09 2022-06-23 15:13:56 +04:00
parent a75d729e93
commit 9c4274284f
99 changed files with 1625 additions and 39720 deletions

View file

@ -31,20 +31,20 @@ from .atjsparse import to_ecma5_function
__all__ = [
'atclient', 'atserver', 'atconnect',
'atplayers', 'atconf', 'atwss',
'atfm', 'atfile',
'aterrors', 'atjsparse',
'Client', 'AternosServer', 'AternosConnect',
'PlayersList', 'AternosConfig', 'AternosWss',
'FileManager', 'AternosFile', 'AternosError',
'CloudflareError', 'CredentialsError', 'TokenError',
'ServerError', 'ServerEulaError', 'ServerRunningError',
'ServerSoftwareError', 'ServerStorageError', 'FileError',
'exec', 'atob', 'to_ecma5_function',
'atclient', 'atserver', 'atconnect',
'atplayers', 'atconf', 'atwss',
'atfm', 'atfile',
'aterrors', 'atjsparse',
'Edition', 'Status', 'Lists',
'ServerOpts', 'WorldOpts', 'WorldRules',
'Gamemode', 'Difficulty', 'Streams', 'FileType',
'Client', 'AternosServer', 'AternosConnect',
'PlayersList', 'AternosConfig', 'AternosWss',
'FileManager', 'AternosFile', 'AternosError',
'CloudflareError', 'CredentialsError', 'TokenError',
'ServerError', 'ServerEulaError', 'ServerRunningError',
'ServerSoftwareError', 'ServerStorageError', 'FileError',
'exec', 'atob', 'to_ecma5_function',
'Edition', 'Status', 'Lists',
'ServerOpts', 'WorldOpts', 'WorldRules',
'Gamemode', 'Difficulty', 'Streams', 'FileType',
]

View file

@ -8,208 +8,218 @@ from .atserver import AternosServer
from .atconnect import AternosConnect
from .aterrors import CredentialsError
class Client:
"""Aternos API Client class whose object contains user's auth data
"""Aternos API Client class whose object contains user's auth data
:param atconn: :class:`python_aternos.atconnect.AternosConnect` instance with initialized Aternos session
:type atconn: python_aternos.atconnect.AternosConnect
"""
:param atconn: :class:`python_aternos.atconnect.AternosConnect`
instance with initialized Aternos session
:type atconn: python_aternos.atconnect.AternosConnect
"""
def __init__(self, atconn:AternosConnect) -> None:
def __init__(self, atconn: AternosConnect) -> None:
self.atconn = atconn
self.atconn = atconn
@classmethod
def from_hashed(cls, username:str, md5:str):
@classmethod
def from_hashed(cls, username: str, md5: str):
"""Log in to Aternos with a username and a hashed password
"""Log in to Aternos with a username and a hashed password
:param username: Your username
:type username: str
:param md5: Your password hashed with MD5
:type md5: str
:raises CredentialsError: If the API doesn't return a valid session cookie
:return: Client instance
:rtype: python_aternos.Client
"""
:param username: Your username
:type username: str
:param md5: Your password hashed with MD5
:type md5: str
:raises CredentialsError: If the API
doesn't return a valid session cookie
:return: Client instance
:rtype: python_aternos.Client
"""
atconn = AternosConnect()
atconn.parse_token()
atconn.generate_sec()
atconn = AternosConnect()
atconn.parse_token()
atconn.generate_sec()
credentials = {
'user': username,
'password': md5
}
credentials = {
'user': username,
'password': md5
}
loginreq = atconn.request_cloudflare(
f'https://aternos.org/panel/ajax/account/login.php',
'POST', data=credentials, sendtoken=True
)
loginreq = atconn.request_cloudflare(
f'https://aternos.org/panel/ajax/account/login.php',
'POST', data=credentials, sendtoken=True
)
if 'ATERNOS_SESSION' not in loginreq.cookies:
raise CredentialsError(
'Check your username and password'
)
if 'ATERNOS_SESSION' not in loginreq.cookies:
raise CredentialsError(
'Check your username and password'
)
return cls(atconn)
return cls(atconn)
@classmethod
def from_credentials(cls, username:str, password:str):
@classmethod
def from_credentials(cls, username: str, password: str):
"""Log in to Aternos with a username and a plain password
"""Log in to Aternos with a username and a plain password
:param username: Your username
:type username: str
:param password: Your password without any encryption
:type password: str
:return: Client instance
:rtype: python_aternos.Client
"""
:param username: Your username
:type username: str
:param password: Your password without any encryption
:type password: str
:return: Client instance
:rtype: python_aternos.Client
"""
md5 = Client.md5encode(password)
return cls.from_hashed(username, md5)
md5 = Client.md5encode(password)
return cls.from_hashed(username, md5)
@classmethod
def from_session(cls, session:str):
@classmethod
def from_session(cls, session: str):
"""Log in to Aternos using a session cookie value
"""Log in to Aternos using a session cookie value
:param session: Value of ATERNOS_SESSION cookie
:type session: str
:return: Client instance
:rtype: python_aternos.Client
"""
atconn = AternosConnect()
atconn.session.cookies['ATERNOS_SESSION'] = session
atconn.parse_token()
atconn.generate_sec()
:param session: Value of ATERNOS_SESSION cookie
:type session: str
:return: Client instance
:rtype: python_aternos.Client
"""
return cls(atconn)
@classmethod
def restore_session(cls, file:str='~/.aternos'):
atconn = AternosConnect()
atconn.session.cookies['ATERNOS_SESSION'] = session
atconn.parse_token()
atconn.generate_sec()
"""Log in to Aternos using a saved ATERNOS_SESSION cookie
return cls(atconn)
:param file: File where a session cookie was saved, deafults to ~/.aternos
:type file: str, optional
:return: Client instance
:rtype: python_aternos.Client
"""
@classmethod
def restore_session(cls, file: str = '~/.aternos'):
file = os.path.expanduser(file)
with open(file, 'rt') as f:
session = f.read().strip()
return cls.from_session(session)
@staticmethod
def md5encode(passwd:str) -> str:
"""Log in to Aternos using a saved ATERNOS_SESSION cookie
"""Encodes the given string with MD5
:param file: File where a session cookie
was saved, deafults to ~/.aternos
:type file: str, optional
:return: Client instance
:rtype: python_aternos.Client
"""
:param passwd: String to encode
:type passwd: str
:return: Hexdigest hash of the string in lowercase
:rtype: str
"""
file = os.path.expanduser(file)
with open(file, 'rt') as f:
session = f.read().strip()
return cls.from_session(session)
encoded = hashlib.md5(passwd.encode('utf-8'))
return encoded.hexdigest().lower()
def save_session(self, file:str='~/.aternos') -> None:
@staticmethod
def md5encode(passwd: str) -> str:
"""Saves an ATERNOS_SESSION cookie to a file
"""Encodes the given string with MD5
:param file: File where a session cookie must be saved, defaults to ~/.aternos
:type file: str, optional
"""
:param passwd: String to encode
:type passwd: str
:return: Hexdigest hash of the string in lowercase
:rtype: str
"""
file = os.path.expanduser(file)
with open(file, 'wt') as f:
f.write(self.atconn.atsession)
encoded = hashlib.md5(passwd.encode('utf-8'))
return encoded.hexdigest().lower()
def list_servers(self) -> List[AternosServer]:
def save_session(self, file: str = '~/.aternos') -> None:
"""Parses a list of your servers from Aternos website
"""Saves an ATERNOS_SESSION cookie to a file
:return: List of :class:`python_aternos.atserver.AternosServer` objects
:rtype: list
"""
:param file: File where a session cookie
must be saved, defaults to ~/.aternos
:type file: str, optional
"""
serverspage = self.atconn.request_cloudflare(
'https://aternos.org/servers/', 'GET'
)
serverstree = lxml.html.fromstring(serverspage.content)
serverslist = serverstree.xpath('//div[contains(@class,"servers ")]/div')
file = os.path.expanduser(file)
with open(file, 'wt') as f:
f.write(self.atconn.atsession)
servers = []
for server in serverslist:
servid = server.xpath('./div[@class="server-body"]/@data-id')[0]
servers.append(AternosServer(servid, self.atconn))
def list_servers(self) -> List[AternosServer]:
return servers
def get_server(self, servid:str) -> AternosServer:
"""Parses a list of your servers from Aternos website
"""Creates a server object from the server ID.
Use this instead of list_servers if you know the ID to save some time.
:return: List of :class:`python_aternos.atserver.AternosServer` objects
:rtype: list
"""
:return: :class:`python_aternos.atserver.AternosServer` object
:rtype: python_aternos.atserver.AternosServer
"""
serverspage = self.atconn.request_cloudflare(
'https://aternos.org/servers/', 'GET'
)
serverstree = lxml.html.fromstring(serverspage.content)
serverslist = serverstree.xpath(
'//div[contains(@class,"servers ")]/div'
)
return AternosServer(servid, self.atconn)
def change_username(self, value:str) -> None:
servers = []
for server in serverslist:
servid = server.xpath('./div[@class="server-body"]/@data-id')[0]
servers.append(AternosServer(servid, self.atconn))
"""Changes a username in your Aternos account
return servers
:param value: New username
:type value: str
"""
def get_server(self, servid: str) -> AternosServer:
self.atconn.request_cloudflare(
'https://aternos.org/panel/ajax/account/username.php',
'POST', data={'username': value}
)
def change_email(self, value:str) -> None:
"""Creates a server object from the server ID.
Use this instead of list_servers if you know the ID to save some time.
"""Changes an e-mail in your Aternos account
:return: :class:`python_aternos.atserver.AternosServer` object
:rtype: python_aternos.atserver.AternosServer
"""
:param value: New e-mail
:type value: str
:raises ValueError: If an invalid e-mail address is passed to the function
"""
return AternosServer(servid, self.atconn)
email = re.compile(r'^[A-Za-z0-9\-_+.]+@[A-Za-z0-9\-_+.]+\.[A-Za-z0-9\-]+$|^$')
if not email.match(value):
raise ValueError('Invalid e-mail!')
def change_username(self, value: str) -> None:
self.atconn.request_cloudflare(
'https://aternos.org/panel/ajax/account/email.php',
'POST', data={'email': value}
)
def change_password(self, old:str, new:str) -> None:
"""Changes a username in your Aternos account
"""Changes a password in your Aternos account
:param value: New username
:type value: str
"""
:param old: Old password
:type old: str
:param new: New password
:type new: str
"""
self.atconn.request_cloudflare(
'https://aternos.org/panel/ajax/account/username.php',
'POST', data={'username': value}
)
old = Client.md5encode(old)
new = Client.md5encode(new)
self.atconn.request_cloudflare(
'https://aternos.org/panel/ajax/account/password.php',
'POST', data={
'oldpassword': old,
'newpassword': new
}
)
def change_email(self, value: str) -> None:
"""Changes an e-mail in your Aternos account
:param value: New e-mail
:type value: str
:raises ValueError: If an invalid
e-mail address was passed to the function
"""
email = re.compile(
r'^[A-Za-z0-9\-_+.]+@[A-Za-z0-9\-_+.]+\.[A-Za-z0-9\-]+$|^$'
)
if not email.match(value):
raise ValueError('Invalid e-mail!')
self.atconn.request_cloudflare(
'https://aternos.org/panel/ajax/account/email.php',
'POST', data={'email': value}
)
def change_password(self, old: str, new: str) -> None:
"""Changes a password in your Aternos account
:param old: Old password
:type old: str
:param new: New password
:type new: str
"""
old = Client.md5encode(old)
new = Client.md5encode(new)
self.atconn.request_cloudflare(
'https://aternos.org/panel/ajax/account/password.php',
'POST', data={
'oldpassword': old,
'newpassword': new
}
)

View file

@ -5,327 +5,346 @@ from typing import Any, Dict, List, Union, Optional
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .atserver import AternosServer
class ServerOpts(enum.Enum):
"""server.options file"""
players = 'max-players'
gm = 'gamemode'
difficulty = 'difficulty'
whl = 'white-list'
online = 'online-mode'
pvp = 'pvp'
cmdblock = 'enable-command-block'
flight = 'allow-flight'
animals = 'spawn-animals'
monsters = 'spawn-monsters'
villagers = 'spawn-npcs'
nether = 'allow-nether'
forcegm = 'force-gamemode'
spawnlock = 'spawn-protection'
cmds = 'allow-cheats'
packreq = 'require-resource-pack'
pack = 'resource-pack'
from .atserver import AternosServer
DAT_PREFIX = 'Data:'
DAT_GR_PREFIX = 'Data:GameRules:'
class ServerOpts(enum.Enum):
"""server.options file"""
players = 'max-players'
gm = 'gamemode'
difficulty = 'difficulty'
whl = 'white-list'
online = 'online-mode'
pvp = 'pvp'
cmdblock = 'enable-command-block'
flight = 'allow-flight'
animals = 'spawn-animals'
monsters = 'spawn-monsters'
villagers = 'spawn-npcs'
nether = 'allow-nether'
forcegm = 'force-gamemode'
spawnlock = 'spawn-protection'
cmds = 'allow-cheats'
packreq = 'require-resource-pack'
pack = 'resource-pack'
class WorldOpts(enum.Enum):
"""level.dat file"""
"""level.dat file"""
seed12 = 'randomseed'
seed = 'seed'
hardcore = 'hardcore'
difficulty = 'Difficulty'
seed12 = 'randomseed'
seed = 'seed'
hardcore = 'hardcore'
difficulty = 'Difficulty'
class WorldRules(enum.Enum):
"""/gamerule list"""
"""/gamerule list"""
advs = 'announceAdvancements'
univanger = 'universalAnger'
cmdout = 'commandBlockOutput'
elytra = 'disableElytraMovementCheck'
raids = 'disableRaids'
daynight = 'doDaylightCycle'
entdrop = 'doEntityDrops'
fire = 'doFireTick'
phantoms = 'doInsomnia'
immrespawn = 'doImmediateRespawn'
limitcraft = 'doLimitedCrafting'
mobloot = 'doMobLoot'
mobs = 'doMobSpawning'
patrols = 'doPatrolSpawning'
blockdrop = 'doTileDrops'
traders = 'doTraderSpawning'
weather = 'doWeatherCycle'
drowndmg = 'drowningDamage'
falldmg = 'fallDamage'
firedmg = 'fireDamage'
snowdmg = 'freezeDamage'
forgive = 'forgiveDeadPlayers'
keepinv = 'keepInventory'
deathmsg = 'showDeathMessages'
admincmdlog = 'logAdminCommands'
cmdlen = 'maxCommandChainLength'
entcram = 'maxEntityCramming'
mobgrief = 'mobGriefing'
regen = 'naturalRegeneration'
sleeppct = 'playersSleepingPercentage'
rndtick = 'randomTickSpeed'
spawnradius = 'spawnRadius'
reducedf3 = 'reducedDebugInfo'
spectchunkgen = 'spectatorsGenerateChunks'
cmdfb = 'sendCommandFeedback'
advs = 'announceAdvancements'
univanger = 'universalAnger'
cmdout = 'commandBlockOutput'
elytra = 'disableElytraMovementCheck'
raids = 'disableRaids'
daynight = 'doDaylightCycle'
entdrop = 'doEntityDrops'
fire = 'doFireTick'
phantoms = 'doInsomnia'
immrespawn = 'doImmediateRespawn'
limitcraft = 'doLimitedCrafting'
mobloot = 'doMobLoot'
mobs = 'doMobSpawning'
patrols = 'doPatrolSpawning'
blockdrop = 'doTileDrops'
traders = 'doTraderSpawning'
weather = 'doWeatherCycle'
drowndmg = 'drowningDamage'
falldmg = 'fallDamage'
firedmg = 'fireDamage'
snowdmg = 'freezeDamage'
forgive = 'forgiveDeadPlayers'
keepinv = 'keepInventory'
deathmsg = 'showDeathMessages'
admincmdlog = 'logAdminCommands'
cmdlen = 'maxCommandChainLength'
entcram = 'maxEntityCramming'
mobgrief = 'mobGriefing'
regen = 'naturalRegeneration'
sleeppct = 'playersSleepingPercentage'
rndtick = 'randomTickSpeed'
spawnradius = 'spawnRadius'
reducedf3 = 'reducedDebugInfo'
spectchunkgen = 'spectatorsGenerateChunks'
cmdfb = 'sendCommandFeedback'
class Gamemode(enum.IntEnum):
"""/gamemode numeric list"""
"""/gamemode numeric list"""
survival = 0
creative = 1
adventure = 2
spectator = 3
survival = 0
creative = 1
adventure = 2
spectator = 3
class Difficulty(enum.IntEnum):
"""/difficulty numeric list"""
"""/difficulty numeric list"""
peaceful = 0
easy = 1
normal = 2
hard = 3
peaceful = 0
easy = 1
normal = 2
hard = 3
# checking timezone format
tzcheck = re.compile(r'(^[A-Z]\w+\/[A-Z]\w+$)|^UTC$')
# options types converting
convert = {
'config-option-number': int,
'config-option-select': int,
'config-option-toggle': bool
'config-option-number': int,
'config-option-select': int,
'config-option-toggle': bool
}
class AternosConfig:
"""Class for editing server settings
:param atserv: :class:`python_aternos.atserver.AternosServer` object
:type atserv: python_aternos.atserver.AternosServer
"""
"""Class for editing server settings
def __init__(self, atserv:'AternosServer') -> None:
:param atserv: :class:`python_aternos.atserver.AternosServer` object
:type atserv: python_aternos.atserver.AternosServer
"""
self.atserv = atserv
def __init__(self, atserv: 'AternosServer') -> None:
def get_timezone(self) -> str:
self.atserv = atserv
"""Parses timezone from options page
def get_timezone(self) -> str:
:return: Area/Location
:rtype: str
"""
"""Parses timezone from options page
optreq = self.atserv.atserver_request(
'https://aternos.org/options', 'GET'
)
opttree = lxml.html.fromstring(optreq)
:return: Area/Location
:rtype: str
"""
tzopt = opttree.xpath('//div[@class="options-other-input timezone-switch"]')[0]
tztext = tzopt.xpath('.//div[@class="option current"]')[0].text
return tztext.strip()
optreq = self.atserv.atserver_request(
'https://aternos.org/options', 'GET'
)
opttree = lxml.html.fromstring(optreq)
def set_timezone(self, value:str) -> None:
tzopt = opttree.xpath(
'//div[@class="options-other-input timezone-switch"]'
)[0]
tztext = tzopt.xpath('.//div[@class="option current"]')[0].text
return tztext.strip()
"""Sets new timezone
def set_timezone(self, value: str) -> None:
:param value: New timezone
:type value: str
:raises ValueError: If given string
doesn't match Area/Location format
"""
"""Sets new timezone
matches_tz = tzcheck.search(value)
if not matches_tz:
raise ValueError('Timezone must match zoneinfo format: Area/Location')
:param value: New timezone
:type value: str
:raises ValueError: If given string
doesn't match Area/Location format
"""
self.atserv.atserver_request(
'https://aternos.org/panel/ajax/timezone.php',
'POST', data={'timezone': value},
sendtoken=True
)
matches_tz = tzcheck.search(value)
if not matches_tz:
raise ValueError(
'Timezone must match zoneinfo format: Area/Location'
)
def get_java(self) -> int:
self.atserv.atserver_request(
'https://aternos.org/panel/ajax/timezone.php',
'POST', data={'timezone': value},
sendtoken=True
)
"""Parses Java version from options page
def get_java(self) -> int:
:return: Java image version
:rtype: int
"""
"""Parses Java version from options page
optreq = self.atserv.atserver_request(
'https://aternos.org/options', 'GET'
)
opttree = lxml.html.fromstring(optreq)
imgopt = opttree.xpath('//div[@class="options-other-input image-switch"]')[0]
imgver = imgopt.xpath('.//div[@class="option current"]/@data-value')[0]
:return: Java image version
:rtype: int
"""
jdkver = str(imgver or '').removeprefix('openjdk:')
return int(jdkver)
def set_java(self, value:int) -> None:
optreq = self.atserv.atserver_request(
'https://aternos.org/options', 'GET'
)
opttree = lxml.html.fromstring(optreq)
imgopt = opttree.xpath(
'//div[@class="options-other-input image-switch"]'
)[0]
imgver = imgopt.xpath(
'.//div[@class="option current"]/@data-value'
)[0]
"""Sets new Java version
jdkver = str(imgver or '').removeprefix('openjdk:')
return int(jdkver)
:param value: New Java image version
:type value: int
"""
def set_java(self, value: int) -> None:
self.atserv.atserver_request(
'https://aternos.org/panel/ajax/image.php',
'POST', data={'image': f'openjdk:{value}'},
sendtoken=True
)
"""Sets new Java version
#
# server.properties
#
def set_server_prop(self, option:str, value:Any) -> None:
:param value: New Java image version
:type value: int
"""
"""Sets server.properties option
self.atserv.atserver_request(
'https://aternos.org/panel/ajax/image.php',
'POST', data={'image': f'openjdk:{value}'},
sendtoken=True
)
:param option: Option name
:type option: str
:param value: New value
:type value: Any
"""
#
# server.properties
#
def set_server_prop(self, option: str, value: Any) -> None:
self.__set_prop(
'/server.properties',
option, value
)
"""Sets server.properties option
def get_server_props(self, proptyping:bool=True) -> Dict[str,Any]:
:param option: Option name
:type option: str
:param value: New value
:type value: Any
"""
"""Parses all server.properties from options page
self.__set_prop(
'/server.properties',
option, value
)
:param proptyping: If the returned dict should contain value
that matches property type (e.g. max-players will be int)
instead of string, defaults to True
:type proptyping: bool, optional
:return: Server.properties dict
:rtype: Dict[str,Any]
"""
def get_server_props(self, proptyping: bool = True) -> Dict[str, Any]:
return self.__get_all_props('https://aternos.org/options', proptyping)
"""Parses all server.properties from options page
def set_server_props(self, props:Dict[str,Any]) -> None:
:param proptyping: If the returned dict should contain value
that matches property type (e.g. max-players will be int)
instead of string, defaults to True
:type proptyping: bool, optional
:return: Server.properties dict
:rtype: Dict[str,Any]
"""
"""Updates server.properties options with the given dict
return self.__get_all_props('https://aternos.org/options', proptyping)
:param props: Dict with properties `{key:value}`
:type props: Dict[str,Any]
"""
def set_server_props(self, props: Dict[str, Any]) -> None:
for key in props:
self.set_server_prop(key, props[key])
"""Updates server.properties options with the given dict
#
# level.dat
#
def set_world_prop(
self, option:Union[WorldOpts,WorldRules],
value:Any, gamerule:bool=False,
world:str='world') -> None:
:param props: Dict with properties `{key:value}`
:type props: Dict[str,Any]
"""
"""Sets level.dat option for specified world
for key in props:
self.set_server_prop(key, props[key])
:param option: Option name
:type option: Union[WorldOpts,WorldRules]
:param value: New value
:type value: Any
:param gamerule: If the option
is a gamerule, defaults to False
:type gamerule: bool, optional
:param world: Name of the world which
level.dat must be edited, defaults to 'world'
:type world: str, optional
"""
#
# level.dat
#
def set_world_prop(
self, option: Union[WorldOpts, WorldRules],
value: Any, gamerule: bool = False,
world: str = 'world') -> None:
prefix = DAT_PREFIX
if gamerule:
prefix = DAT_GR_PREFIX
"""Sets level.dat option for specified world
self.__set_prop(
f'/{world}/level.dat',
f'{prefix}{option}',
value
)
:param option: Option name
:type option: Union[WorldOpts,WorldRules]
:param value: New value
:type value: Any
:param gamerule: If the option
is a gamerule, defaults to False
:type gamerule: bool, optional
:param world: Name of the world which
level.dat must be edited, defaults to 'world'
:type world: str, optional
"""
def get_world_props(
self, world:str='world',
proptyping:bool=True) -> Dict[str,Any]:
prefix = DAT_PREFIX
if gamerule:
prefix = DAT_GR_PREFIX
"""Parses level.dat from specified world's options page
self.__set_prop(
f'/{world}/level.dat',
f'{prefix}{option}',
value
)
:param world: Name of the world, defaults to 'world'
:type world: str, optional
:param proptyping: If the returned dict should contain the value
that matches property type (e.g. randomTickSpeed will be bool)
instead of string, defaults to True
:type proptyping: bool, optional
:return: Level.dat dict
:rtype: Dict[str,Any]
"""
def get_world_props(
self, world: str = 'world',
proptyping: bool = True) -> Dict[str, Any]:
self.__get_all_props(
f'https://aternos.org/files/{world}/level.dat',
proptyping, [DAT_PREFIX, DAT_GR_PREFIX]
)
"""Parses level.dat from specified world's options page
def set_world_props(self, props:Dict[str,Any]) -> None:
for key in props:
self.set_world_prop(key, props[key])
:param world: Name of the world, defaults to 'world'
:type world: str, optional
:param proptyping: If the returned dict should contain the value
that matches property type (e.g. randomTickSpeed will be bool)
instead of string, defaults to True
:type proptyping: bool, optional
:return: Level.dat dict
:rtype: Dict[str,Any]
"""
#
# helpers
#
def __set_prop(self, file:str, option:str, value:Any) -> None:
self.__get_all_props(
f'https://aternos.org/files/{world}/level.dat',
proptyping, [DAT_PREFIX, DAT_GR_PREFIX]
)
self.atserv.atserver_request(
'https://aternos.org/panel/ajax/config.php',
'POST', data={
'file': file,
'option': option,
'value': value
}, sendtoken=True
)
def set_world_props(self, props: Dict[str, Any]) -> None:
for key in props:
self.set_world_prop(key, props[key])
def __get_all_props(
self, url:str, proptyping:bool=True,
prefixes:Optional[List[str]]=None) -> Dict[str,Any]:
#
# helpers
#
def __set_prop(self, file: str, option: str, value: Any) -> None:
optreq = self.atserv.atserver_request(url, 'GET')
opttree = lxml.html.fromstring(optreq.content)
configs = opttree.xpath('//div[@class="config-options"]')
self.atserv.atserver_request(
'https://aternos.org/panel/ajax/config.php',
'POST', data={
'file': file,
'option': option,
'value': value
}, sendtoken=True
)
for i, conf in enumerate(configs):
opts = conf.xpath('/div[contains(@class,"config-option ")]')
result = {}
def __get_all_props(
self, url: str, proptyping: bool = True,
prefixes: Optional[List[str]] = None) -> Dict[str, Any]:
for opt in opts:
key = opt.xpath('.//span[@class="config-option-output-key"]')[0].text
value = opt.xpath('.//span[@class="config-option-output-value"]')[0].text
optreq = self.atserv.atserver_request(url, 'GET')
opttree = lxml.html.fromstring(optreq.content)
configs = opttree.xpath('//div[@class="config-options"]')
if prefixes != None:
key = f'{prefixes[i]}{key}'
for i, conf in enumerate(configs):
opts = conf.xpath('/div[contains(@class,"config-option ")]')
result = {}
opttype = opt.xpath('/@class').split(' ')[1]
if proptyping and opttype in convert:
value = convert[opttype](value)
for opt in opts:
key = opt.xpath(
'.//span[@class="config-option-output-key"]'
)[0].text
value = opt.xpath(
'.//span[@class="config-option-output-value"]'
)[0].text
result[key] = value
if prefixes is not None:
key = f'{prefixes[i]}{key}'
return result
opttype = opt.xpath('/@class').split(' ')[1]
if proptyping and opttype in convert:
value = convert[opttype](value)
result[key] = value
return result

View file

@ -10,235 +10,242 @@ from .aterrors import TokenError, CloudflareError
REQUA = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/99.0.4844.84 Safari/537.36 OPR/85.0.4341.47'
class AternosConnect:
"""
Class for sending API requests bypass Cloudflare
and parsing responses"""
"""
Class for sending API requests bypass Cloudflare
and parsing responses"""
def __init__(self) -> None:
def __init__(self) -> None:
self.session = CloudScraper()
self.atsession = ''
self.session = CloudScraper()
self.atsession = ''
def parse_token(self) -> str:
def parse_token(self) -> str:
"""Parses Aternos ajax token that
is needed for most requests
"""Parses Aternos ajax token that
is needed for most requests
:raises RuntimeWarning: If the parser
can not find <head> tag in HTML response
:raises CredentialsError: If the parser
is unable to extract ajax token in HTML
:return: Aternos ajax token
:rtype: str
"""
:raises RuntimeWarning: If the parser
can not find <head> tag in HTML response
:raises CredentialsError: If the parser
is unable to extract ajax token in HTML
:return: Aternos ajax token
:rtype: str
"""
loginpage = self.request_cloudflare(
f'https://aternos.org/go/', 'GET'
).content
loginpage = self.request_cloudflare(
f'https://aternos.org/go/', 'GET'
).content
# Using the standard string methods
# instead of the expensive xml parsing
head = b'<head>'
headtag = loginpage.find(head)
headend = loginpage.find(b'</head>', headtag + len(head))
# Using the standard string methods
# instead of the expensive xml parsing
head = b'<head>'
headtag = loginpage.find(head)
headend = loginpage.find(b'</head>', headtag + len(head))
# Some checks
if headtag < 0 or headend < 0:
pagehead = loginpage
raise RuntimeWarning('Unable to find <head> tag, parsing the whole page')
# Some checks
if headtag < 0 or headend < 0:
pagehead = loginpage
raise RuntimeWarning(
'Unable to find <head> tag, parsing the whole page'
)
# Extracting <head> content
headtag = headtag + len(head)
pagehead = loginpage[headtag:headend]
# Extracting <head> content
headtag = headtag + len(head)
pagehead = loginpage[headtag:headend]
try:
text = pagehead.decode('utf-8', 'replace')
js_code = re.findall(r'\(\(\)(.*?)\)\(\);', text)
token_func = js_code[1] if len(js_code) > 1 else js_code[0]
try:
text = pagehead.decode('utf-8', 'replace')
js_code = re.findall(r'\(\(\)(.*?)\)\(\);', text)
token_func = js_code[1] if len(js_code) > 1 else js_code[0]
ctx = atjsparse.exec(token_func)
self.token = ctx.window['AJAX_TOKEN']
ctx = atjsparse.exec(token_func)
self.token = ctx.window['AJAX_TOKEN']
except (IndexError, TypeError):
raise TokenError(
'Unable to parse TOKEN from the page'
)
except (IndexError, TypeError):
raise TokenError(
'Unable to parse TOKEN from the page'
)
return self.token
return self.token
def generate_sec(self) -> str:
def generate_sec(self) -> str:
"""Generates Aternos SEC token which
is also needed for most API requests
"""Generates Aternos SEC token which
is also needed for most API requests
:return: Random SEC key:value string
:rtype: str
"""
:return: Random SEC key:value string
:rtype: str
"""
randkey = self.generate_aternos_rand()
randval = self.generate_aternos_rand()
self.sec = f'{randkey}:{randval}'
self.session.cookies.set(
f'ATERNOS_SEC_{randkey}', randval,
domain='aternos.org'
)
randkey = self.generate_aternos_rand()
randval = self.generate_aternos_rand()
self.sec = f'{randkey}:{randval}'
self.session.cookies.set(
f'ATERNOS_SEC_{randkey}', randval,
domain='aternos.org'
)
return self.sec
return self.sec
def generate_aternos_rand(self, randlen:int=16) -> str:
def generate_aternos_rand(self, randlen: int = 16) -> str:
"""Generates a random string using
Aternos algorithm from main.js file
"""Generates a random string using
Aternos algorithm from main.js file
:param randlen: Random string length, defaults to 16
:type randlen: int, optional
:return: Random string for SEC token
:rtype: str
"""
:param randlen: Random string length, defaults to 16
:type randlen: int, optional
:return: Random string for SEC token
:rtype: str
"""
# a list with randlen+1 empty strings:
# generate a string with spaces,
# then split it by space
rand_arr = (' ' * (randlen+1)).split(' ')
# a list with randlen+1 empty strings:
# generate a string with spaces,
# then split it by space
rand_arr = (' ' * (randlen+1)).split(' ')
rand = random.random()
rand_alphanum = self.convert_num(rand, 36) + ('0' * 17)
rand = random.random()
rand_alphanum = self.convert_num(rand, 36) + ('0' * 17)
return (rand_alphanum[:18].join(rand_arr)[:randlen])
return (rand_alphanum[:18].join(rand_arr)[:randlen])
def convert_num(
self, num:Union[int,float,str],
base:int, frombase:int=10) -> str:
def convert_num(
self, num: Union[int, float, str],
base: int, frombase: int = 10) -> str:
"""Converts an integer to specified base
"""Converts an integer to specified base
:param num: Integer in any base to convert.
If it is a float started with `0,`,
zero and comma will be removed to get int
:type num: Union[int,float,str]
:param base: New base
:type base: int
:param frombase: Given number base, defaults to 10
:type frombase: int, optional
:return: Number converted to a specified base
:rtype: str
"""
:param num: Integer in any base to convert.
If it is a float started with `0,`,
zero and comma will be removed to get int
:type num: Union[int,float,str]
:param base: New base
:type base: int
:param frombase: Given number base, defaults to 10
:type frombase: int, optional
:return: Number converted to a specified base
:rtype: str
"""
if isinstance(num, str):
num = int(num, frombase)
if isinstance(num, str):
num = int(num, frombase)
if isinstance(num, float):
sliced = str(num)[2:]
num = int(sliced)
if isinstance(num, float):
sliced = str(num)[2:]
num = int(sliced)
symbols = '0123456789abcdefghijklmnopqrstuvwxyz'
basesym = symbols[:base]
result = ''
while num > 0:
rem = num % base
result = str(basesym[rem]) + result
num //= base
return result
symbols = '0123456789abcdefghijklmnopqrstuvwxyz'
basesym = symbols[:base]
result = ''
while num > 0:
rem = num % base
result = str(basesym[rem]) + result
num //= base
return result
def request_cloudflare(
self, url:str, method:str,
params:Optional[dict]=None, data:Optional[dict]=None,
headers:Optional[dict]=None, reqcookies:Optional[dict]=None,
sendtoken:bool=False, redirect:bool=True, retry:int=3) -> Response:
def request_cloudflare(
self, url: str, method: str,
params: Optional[dict] = None,
data: Optional[dict] = None,
headers: Optional[dict] = None,
reqcookies: Optional[dict] = None,
sendtoken: bool = False,
redirect: bool = True,
retry: int = 3) -> Response:
"""Sends a request to Aternos API bypass Cloudflare
"""Sends a request to Aternos API bypass Cloudflare
:param url: Request URL
:type url: str
:param method: Request method, must be GET or POST
:type method: str
:param params: URL parameters, defaults to None
:type params: Optional[dict], optional
:param data: POST request data, if the method is GET,
this dict will be combined with params, defaults to None
:type data: Optional[dict], optional
:param headers: Custom headers, defaults to None
:type headers: Optional[dict], optional
:param reqcookies: Cookies only for this request, defaults to None
:type reqcookies: Optional[dict], optional
:param sendtoken: If the ajax and SEC token
should be sent, defaults to False
:type sendtoken: bool, optional
:param redirect: If requests lib should follow
Location header in 3xx responses, defaults to True
:type redirect: bool, optional
:param retry: How many times parser must retry
connection to API bypass Cloudflare, defaults to 3
:type retry: int, optional
:raises CloudflareError:
When the parser has exceeded retries count
:raises NotImplementedError:
When the specified method is not GET or POST
:return: API response
:rtype: requests.Response
"""
:param url: Request URL
:type url: str
:param method: Request method, must be GET or POST
:type method: str
:param params: URL parameters, defaults to None
:type params: Optional[dict], optional
:param data: POST request data, if the method is GET,
this dict will be combined with params, defaults to None
:type data: Optional[dict], optional
:param headers: Custom headers, defaults to None
:type headers: Optional[dict], optional
:param reqcookies: Cookies only for this request, defaults to None
:type reqcookies: Optional[dict], optional
:param sendtoken: If the ajax and SEC token
should be sent, defaults to False
:type sendtoken: bool, optional
:param redirect: If requests lib should follow
Location header in 3xx responses, defaults to True
:type redirect: bool, optional
:param retry: How many times parser must retry
connection to API bypass Cloudflare, defaults to 3
:type retry: int, optional
:raises CloudflareError:
When the parser has exceeded retries count
:raises NotImplementedError:
When the specified method is not GET or POST
:return: API response
:rtype: requests.Response
"""
if retry <= 0:
raise CloudflareError('Unable to bypass Cloudflare protection')
if retry <= 0:
raise CloudflareError('Unable to bypass Cloudflare protection')
try:
self.atsession = self.session.cookies['ATERNOS_SESSION']
except KeyError:
pass
try:
self.atsession = self.session.cookies['ATERNOS_SESSION']
except KeyError:
pass
method = method or 'GET'
method = method.upper().strip()
if method not in ('GET', 'POST'):
raise NotImplementedError('Only GET and POST are available')
method = method or 'GET'
method = method.upper().strip()
if method not in ('GET', 'POST'):
raise NotImplementedError('Only GET and POST are available')
headers = headers or {}
params = params or {}
data = data or {}
reqcookies = reqcookies or {}
headers = headers or {}
params = params or {}
data = data or {}
reqcookies = reqcookies or {}
if sendtoken:
params['TOKEN'] = self.token
params['SEC'] = self.sec
headers['X-Requested-With'] = 'XMLHttpRequest'
if sendtoken:
params['TOKEN'] = self.token
params['SEC'] = self.sec
headers['X-Requested-With'] = 'XMLHttpRequest'
# requests.cookies.CookieConflictError bugfix
reqcookies['ATERNOS_SESSION'] = self.atsession
del self.session.cookies['ATERNOS_SESSION']
# requests.cookies.CookieConflictError bugfix
reqcookies['ATERNOS_SESSION'] = self.atsession
del self.session.cookies['ATERNOS_SESSION']
logging.debug(f'Requesting({method})' + url)
logging.debug('headers=' + str(headers))
logging.debug('params=' + str(params))
logging.debug('data=' + str(data))
logging.debug('req-cookies=' + str(reqcookies))
logging.debug('session-cookies=' + str(self.session.cookies))
if method == 'POST':
req = self.session.post(
url, data=data, params=params,
headers=headers, cookies=reqcookies,
allow_redirects=redirect
)
else:
req = self.session.get(
url, params={**params, **data},
headers=headers, cookies=reqcookies,
allow_redirects=redirect
)
if '<title>Please Wait... | Cloudflare</title>' in req.text:
logging.info('Retrying to bypass Cloudflare')
self.request_cloudflare(
url, method,
params, data,
headers, reqcookies,
sendtoken, redirect,
retry - 1
)
logging.info(
f'{method} completed with {req.status_code} status'
)
logging.debug(f'Requesting({method})' + url)
logging.debug('headers=' + str(headers))
logging.debug('params=' + str(params))
logging.debug('data=' + str(data))
logging.debug('req-cookies=' + str(reqcookies))
logging.debug('session-cookies=' + str(self.session.cookies))
return req
if method == 'POST':
req = self.session.post(
url, data=data, params=params,
headers=headers, cookies=reqcookies,
allow_redirects=redirect
)
else:
req = self.session.get(
url, params={**params, **data},
headers=headers, cookies=reqcookies,
allow_redirects=redirect
)
if '<title>Please Wait... | Cloudflare</title>' in req.text:
logging.info('Retrying to bypass Cloudflare')
self.request_cloudflare(
url, method,
params, data,
headers, reqcookies,
sendtoken, redirect,
retry - 1
)
logging.info(
f'{method} completed with {req.status_code} status'
)
return req

View file

@ -1,39 +1,56 @@
class AternosError(Exception):
"""Common error class"""
"""Common error class"""
class CloudflareError(AternosError):
"""Raised when the parser is unable to bypass Cloudflare protection"""
"""Raised when the parser is unable
to bypass Cloudflare protection"""
class CredentialsError(AternosError):
"""Raised when a session cookie is empty which means incorrect credentials"""
"""Raised when a session cookie is empty
which means incorrect credentials"""
class TokenError(AternosError):
"""Raised when the parser is unable to extract Aternos ajax token"""
"""Raised when the parser is unable
to extract Aternos ajax token"""
class ServerError(AternosError):
"""Common class for server errors"""
"""Common class for server errors"""
class ServerEulaError(ServerError):
"""Raised when trying to start without confirming Mojang EULA"""
"""Raised when trying to start without
confirming Mojang EULA"""
class ServerRunningError(ServerError):
"""Raised when trying to start already running server"""
"""Raised when trying to start
already running server"""
class ServerSoftwareError(ServerError):
"""Raised when Aternos notifies about incorrect software version"""
"""Raised when Aternos notifies about
incorrect software version"""
class ServerStorageError(ServerError):
"""Raised when Aternos notifies about violation of storage limits (4 GB for now)"""
"""Raised when Aternos notifies about
violation of storage limits (4 GB for now)"""
class FileError(AternosError):
"""Raised when trying to execute a disallowed by Aternos file operation"""
"""Raised when trying to execute a disallowed
by Aternos file operation"""

View file

@ -6,142 +6,146 @@ from typing import TYPE_CHECKING
from .aterrors import FileError
if TYPE_CHECKING:
from .atserver import AternosServer
from .atserver import AternosServer
class FileType(enum.IntEnum):
"""File or dierctory"""
"""File or dierctory"""
file = 0
directory = 1
file = 0
directory = 1
class AternosFile:
"""File class which contains info about its path, type and size
:param atserv: :class:`python_aternos.atserver.AternosServer` instance
:type atserv: python_aternos.atserver.AternosServer
:param path: Path to the file
:type path: str
:param name: Filename
:type name: str
:param ftype: File or directory
:type ftype: python_aternos.atfile.FileType
:param size: File size, defaults to 0
:type size: Union[int,float], optional
"""
"""File class which contains info about its path, type and size
def __init__(
self, atserv:'AternosServer',
path:str, name:str, ftype:FileType=FileType.file,
size:Union[int,float]=0) -> None:
:param atserv: :class:`python_aternos.atserver.AternosServer` instance
:type atserv: python_aternos.atserver.AternosServer
:param path: Path to the file
:type path: str
:param name: Filename
:type name: str
:param ftype: File or directory
:type ftype: python_aternos.atfile.FileType
:param size: File size, defaults to 0
:type size: Union[int,float], optional
"""
self.atserv = atserv
self._path = path.lstrip('/')
self._name = name
self._full = path + name
self._ftype = ftype
self._size = float(size)
def __init__(
self,
atserv: 'AternosServer',
path: str, name: str,
ftype: FileType = FileType.file,
size: Union[int, float] = 0) -> None:
def delete(self) -> None:
self.atserv = atserv
self._path = path.lstrip('/')
self._name = name
self._full = path + name
self._ftype = ftype
self._size = float(size)
"""Deletes the file"""
def delete(self) -> None:
self.atserv.atserver_request(
'https://aternos.org/panel/ajax/delete.php',
'POST', data={'file': self._full},
sendtoken=True
)
"""Deletes the file"""
def get_content(self) -> bytes:
self.atserv.atserver_request(
'https://aternos.org/panel/ajax/delete.php',
'POST', data={'file': self._full},
sendtoken=True
)
"""Requests file content in bytes (downloads it)
def get_content(self) -> bytes:
:raises FileError: If downloading
the file is not allowed by Aternos
:return: File content
:rtype: bytes
"""
"""Requests file content in bytes (downloads it)
file = self.atserv.atserver_request(
'https://aternos.org/panel/ajax/files/download.php',
'GET', params={
'file': self._full
}
)
if file.content == b'{"success":false}':
raise FileError('Unable to download the file. Try to get text')
return file.content
:raises FileError: If downloading
the file is not allowed by Aternos
:return: File content
:rtype: bytes
"""
def set_content(self, value:bytes) -> None:
file = self.atserv.atserver_request(
'https://aternos.org/panel/ajax/files/download.php',
'GET', params={
'file': self._full
}
)
if file.content == b'{"success":false}':
raise FileError('Unable to download the file. Try to get text')
return file.content
"""Modifies the file content
def set_content(self, value: bytes) -> None:
:param value: New content
:type value: bytes
"""
"""Modifies the file content
self.atserv.atserver_request(
f'https://aternos.org/panel/ajax/save.php',
'POST', data={
'file': self._full,
'content': value
}, sendtoken=True
)
:param value: New content
:type value: bytes
"""
def get_text(self) -> str:
self.atserv.atserver_request(
f'https://aternos.org/panel/ajax/save.php',
'POST', data={
'file': self._full,
'content': value
}, sendtoken=True
)
"""Requests editing the file as a text
(try it if downloading is disallowed)
def get_text(self) -> str:
:return: File text content
:rtype: str
"""
"""Requests editing the file as a text
(try it if downloading is disallowed)
editor = self.atserv.atserver_request(
f'https://aternos.org/files/{self._full.lstrip("/")}', 'GET'
)
edittree = lxml.html.fromstring(editor.content)
editblock = edittree.xpath('//div[@id="editor"]')[0]
return editblock.text_content()
:return: File text content
:rtype: str
"""
def set_text(self, value:str) -> None:
"""Modifies the file content,
but unlike set_content takes
a string as a new value
editor = self.atserv.atserver_request(
f'https://aternos.org/files/{self._full.lstrip("/")}', 'GET'
)
edittree = lxml.html.fromstring(editor.content)
:param value: New content
:type value: str
"""
self.set_content(value.encode('utf-8'))
editblock = edittree.xpath('//div[@id="editor"]')[0]
return editblock.text_content()
@property
def path(self):
return self._path
def set_text(self, value: str) -> None:
@property
def name(self) -> str:
return self._name
@property
def full(self) -> str:
return self._full
"""Modifies the file content,
but unlike set_content takes
a string as a new value
@property
def is_dir(self) -> bool:
if self._ftype == FileType.directory:
return True
return False
:param value: New content
:type value: str
"""
@property
def is_file(self) -> bool:
if self._ftype == FileType.file:
return True
return False
@property
def size(self) -> float:
return self._size
self.set_content(value.encode('utf-8'))
@property
def path(self):
return self._path
@property
def name(self) -> str:
return self._name
@property
def full(self) -> str:
return self._full
@property
def is_dir(self) -> bool:
if self._ftype == FileType.directory:
return True
return False
@property
def is_file(self) -> bool:
if self._ftype == FileType.file:
return True
return False
@property
def size(self) -> float:
return self._size

View file

@ -4,150 +4,155 @@ from typing import TYPE_CHECKING
from .atfile import AternosFile, FileType
if TYPE_CHECKING:
from .atserver import AternosServer
from .atserver import AternosServer
class FileManager:
"""Aternos file manager class for viewing files structure
:param atserv: :class:`python_aternos.atserver.AternosServer` instance
:type atserv: python_aternos.atserver.AternosServer
"""
"""Aternos file manager class for viewing files structure
def __init__(self, atserv:'AternosServer') -> None:
:param atserv: :class:`python_aternos.atserver.AternosServer` instance
:type atserv: python_aternos.atserver.AternosServer
"""
self.atserv = atserv
def __init__(self, atserv: 'AternosServer') -> None:
def listdir(self, path:str='') -> List[AternosFile]:
self.atserv = atserv
"""Requests a list of files
in the specified directory
def listdir(self, path: str = '') -> List[AternosFile]:
:param path: Directory
(an empty string means root), defaults to ''
:type path: str, optional
:return: List of :class:`python_aternos.atfile.AternosFile`
:rtype: List[AternosFile]
"""
"""Requests a list of files
in the specified directory
path = path.lstrip('/')
filesreq = self.atserv.atserver_request(
f'https://aternos.org/files/{path}', 'GET'
)
filestree = lxml.html.fromstring(filesreq.content)
fileslist = filestree.xpath('//div[contains(concat(" ",normalize-space(@class)," ")," file ")]')
:param path: Directory
(an empty string means root), defaults to ''
:type path: str, optional
:return: List of :class:`python_aternos.atfile.AternosFile`
:rtype: List[AternosFile]
"""
files = []
for f in fileslist:
path = path.lstrip('/')
filesreq = self.atserv.atserver_request(
f'https://aternos.org/files/{path}', 'GET'
)
filestree = lxml.html.fromstring(filesreq.content)
fileslist = filestree.xpath(
'//div[contains(concat(" ",normalize-space(@class)," ")," file ")]'
)
ftype_raw = f.xpath('@data-type')[0]
ftype = FileType.file \
if ftype_raw == 'file' \
else FileType.directory
files = []
for f in fileslist:
fsize_raw = f.xpath('./div[@class="filesize"]')
fsize = 0
if len(fsize_raw) > 0:
ftype_raw = f.xpath('@data-type')[0]
ftype = FileType.file \
if ftype_raw == 'file' \
else FileType.directory
fsize_text = fsize_raw[0].text.strip()
fsize_num = fsize_text[:fsize_text.rfind(' ')]
fsize_msr = fsize_text[fsize_text.rfind(' ')+1:]
fsize_raw = f.xpath('./div[@class="filesize"]')
fsize = 0
if len(fsize_raw) > 0:
try:
fsize = self.convert_size(float(fsize_num), fsize_msr)
except ValueError:
fsize = -1
fsize_text = fsize_raw[0].text.strip()
fsize_num = fsize_text[:fsize_text.rfind(' ')]
fsize_msr = fsize_text[fsize_text.rfind(' ')+1:]
fullpath = f.xpath('@data-path')[0]
filepath = fullpath[:fullpath.rfind('/')]
filename = fullpath[fullpath.rfind('/'):]
files.append(
AternosFile(
self.atserv,
filepath, filename,
ftype, fsize
)
)
try:
fsize = self.convert_size(float(fsize_num), fsize_msr)
except ValueError:
fsize = -1
return files
fullpath = f.xpath('@data-path')[0]
filepath = fullpath[:fullpath.rfind('/')]
filename = fullpath[fullpath.rfind('/'):]
files.append(
AternosFile(
self.atserv,
filepath, filename,
ftype, fsize
)
)
def convert_size(self, num:Union[int,float], measure:str) -> float:
return files
"""Converts "human" file size to size in bytes
def convert_size(self, num: Union[int, float], measure: str) -> float:
:param num: Size
:type num: Union[int,float]
:param measure: Units (B, kB, MB, GB)
:type measure: str
:return: Size in bytes
:rtype: float
"""
"""Converts "human" file size to size in bytes
measure_match = {
'B': 1,
'kB': 1000,
'MB': 1000000,
'GB': 1000000000
}
try:
return num * measure_match[measure]
except KeyError:
return -1
:param num: Size
:type num: Union[int,float]
:param measure: Units (B, kB, MB, GB)
:type measure: str
:return: Size in bytes
:rtype: float
"""
def get_file(self, path:str) -> Optional[AternosFile]:
measure_match = {
'B': 1,
'kB': 1000,
'MB': 1000000,
'GB': 1000000000
}
try:
return num * measure_match[measure]
except KeyError:
return -1
"""Returns :class:`python_aternos.atfile.AternosFile`
instance by its path
def get_file(self, path: str) -> Optional[AternosFile]:
:param path: Path to file including its filename
:type path: str
:return: _description_
:rtype: Optional[AternosFile]
"""
"""Returns :class:`python_aternos.atfile.AternosFile`
instance by its path
filepath = path[:path.rfind('/')]
filename = path[path.rfind('/'):]
:param path: Path to file including its filename
:type path: str
:return: _description_
:rtype: Optional[AternosFile]
"""
filedir = self.listdir(filepath)
for file in filedir:
if file.name == filename:
return file
filepath = path[:path.rfind('/')]
filename = path[path.rfind('/'):]
return None
filedir = self.listdir(filepath)
for file in filedir:
if file.name == filename:
return file
def dl_file(self, path:str) -> bytes:
return None
"""Returns the file content in bytes (downloads it)
def dl_file(self, path: str) -> bytes:
:param path: Path to file including its filename
:type path: str
:return: File content
:rtype: bytes
"""
"""Returns the file content in bytes (downloads it)
file = self.atserv.atserver_request(
f'https://aternos.org/panel/ajax/files/download.php?' + \
f'file={path.replace("/","%2F")}',
'GET'
)
:param path: Path to file including its filename
:type path: str
:return: File content
:rtype: bytes
"""
return file.content
file = self.atserv.atserver_request(
'https://aternos.org/panel/ajax/files/download.php'
'GET', params={
'file': path.replace('/', '%2F')
}
)
def dl_world(self, world:str='world') -> bytes:
return file.content
"""Returns the world zip file content
by its name (downloads it)
def dl_world(self, world: str = 'world') -> bytes:
:param world: Name of world, defaults to 'world'
:type world: str, optional
:return: Zip file content
:rtype: bytes
"""
"""Returns the world zip file content
by its name (downloads it)
world = self.atserv.atserver_request(
f'https://aternos.org/panel/ajax/worlds/download.php?' + \
f'world={world.replace("/","%2F")}',
'GET'
)
:param world: Name of world, defaults to 'world'
:type world: str, optional
:return: Zip file content
:rtype: bytes
"""
return world.content
world = self.atserv.atserver_request(
'https://aternos.org/panel/ajax/worlds/download.php'
'GET', params={
'world': world.replace('/', '%2F')
}
)
return world.content

View file

@ -6,42 +6,45 @@ from typing import Any
# Thanks to http://regex.inginf.units.it/
arrowexp = regex.compile(r'\w[^\}]*+')
def to_ecma5_function(f:str) -> str:
"""Converts a ECMA6 function to ECMA5 format (without arrow expressions)
def to_ecma5_function(f: str) -> str:
:param f: ECMA6 function
:type f: str
:return: ECMA5 function
:rtype: str
"""
match = arrowexp.search(f)
conv = '(function(){' + match.group(0) + '})()'
return regex.sub(
r'(?:s|\(s\)) => s.split\([\'"]{2}\).reverse\(\).join\([\'"]{2}\)',
'function(s){return s.split(\'\').reverse().join(\'\')}',
conv
)
"""Converts a ECMA6 function to ECMA5 format (without arrow expressions)
def atob(s:str) -> str:
return base64.standard_b64decode(str(s)).decode('utf-8')
:param f: ECMA6 function
:type f: str
:return: ECMA5 function
:rtype: str
"""
def exec(f:str) -> Any:
match = arrowexp.search(f)
conv = '(function(){' + match.group(0) + '})()'
return regex.sub(
r'(?:s|\(s\)) => s.split\([\'"]{2}\).reverse\(\).join\([\'"]{2}\)',
'function(s){return s.split(\'\').reverse().join(\'\')}',
conv
)
"""Executes a JavaScript function
:param f: ECMA6 function
:type f: str
:return: JavaScript interpreter context
:rtype: Any
"""
def atob(s: str) -> str:
return base64.standard_b64decode(str(s)).decode('utf-8')
ctx = js2py.EvalJs({'atob': atob})
ctx.execute('window.document = { };')
ctx.execute('window.Map = function(_i){ };')
ctx.execute('window.setTimeout = function(_f,_t){ };')
ctx.execute('window.setInterval = function(_f,_t){ };')
ctx.execute('window.encodeURIComponent = function(_s){ };')
ctx.execute(to_ecma5_function(f))
return ctx
def exec(f: str) -> Any:
"""Executes a JavaScript function
:param f: ECMA6 function
:type f: str
:return: JavaScript interpreter context
:rtype: Any
"""
ctx = js2py.EvalJs({'atob': atob})
ctx.execute('window.document = { };')
ctx.execute('window.Map = function(_i){ };')
ctx.execute('window.setTimeout = function(_f,_t){ };')
ctx.execute('window.setInterval = function(_f,_t){ };')
ctx.execute('window.encodeURIComponent = function(_s){ };')
ctx.execute(to_ecma5_function(f))
return ctx

View file

@ -4,98 +4,103 @@ from typing import List, Union
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .atserver import AternosServer
from .atserver import AternosServer
class Lists(enum.Enum):
whl = 'whitelist'
ops = 'ops'
ban = 'banned-players'
ips = 'banned-ips'
"""Players list type enum"""
whl = 'whitelist'
ops = 'ops'
ban = 'banned-players'
ips = 'banned-ips'
class PlayersList:
"""Class for managing operators, whitelist and banned players lists
:param lst: Players list type, must be
:class:`python_aternos.atplayers.Lists` enum value
:type lst: Union[str,Lists]
:param atserv: :class:`python_aternos.atserver.AternosServer` instance
:type atserv: python_aternos.atserver.AternosServer
"""
"""Class for managing operators, whitelist and banned players lists
def __init__(self, lst:Union[str,Lists], atserv:'AternosServer') -> None:
:param lst: Players list type, must be
:class:`python_aternos.atplayers.Lists` enum value
:type lst: Union[str,Lists]
:param atserv: :class:`python_aternos.atserver.AternosServer` instance
:type atserv: python_aternos.atserver.AternosServer
"""
self.atserv = atserv
self.lst = Lists(lst)
self.players = []
self.parsed = False
def __init__(self, lst: Union[str, Lists], atserv: 'AternosServer') -> None:
def list_players(self, cache:bool=True) -> List[str]:
self.atserv = atserv
self.lst = Lists(lst)
self.players = []
self.parsed = False
"""Parse a players list
def list_players(self, cache: bool = True) -> List[str]:
:param cache: If the function can return cached list (highly recommended), defaults to True
:type cache: bool, optional
:return: List of players nicknames
:rtype: List[str]
"""
"""Parse a players list
if cache and self.parsed:
return self.players
:param cache: If the function can return
cached list (highly recommended), defaults to True
:type cache: bool, optional
:return: List of players nicknames
:rtype: List[str]
"""
listreq = self.atserv.atserver_request(
f'https://aternos.org/players/{self.lst.value}',
'GET'
)
listtree = lxml.html.fromstring(listreq.content)
items = listtree.xpath(
'//div[@class="list-item"]'
)
if cache and self.parsed:
return self.players
result = []
for i in items:
name = i.xpath('./div[@class="list-name"]')
result.append(name[0].text.strip())
self.players = result
self.parsed = True
return result
listreq = self.atserv.atserver_request(
f'https://aternos.org/players/{self.lst.value}',
'GET'
)
listtree = lxml.html.fromstring(listreq.content)
items = listtree.xpath(
'//div[@class="list-item"]'
)
def add(self, name:str) -> None:
result = []
for i in items:
name = i.xpath('./div[@class="list-name"]')
result.append(name[0].text.strip())
"""Appends a player to the list by the nickname
self.players = result
self.parsed = True
return result
:param name: Player's nickname
:type name: str
"""
def add(self, name: str) -> None:
self.atserv.atserver_request(
'https://aternos.org/panel/ajax/players/add.php',
'POST', data={
'list': self.lst.value,
'name': name
}, sendtoken=True
)
"""Appends a player to the list by the nickname
self.players.append(name)
:param name: Player's nickname
:type name: str
"""
def remove(self, name:str) -> None:
"""Removes a player from the list by the nickname
self.atserv.atserver_request(
'https://aternos.org/panel/ajax/players/add.php',
'POST', data={
'list': self.lst.value,
'name': name
}, sendtoken=True
)
:param name: Player's nickname
:type name: str
"""
self.players.append(name)
self.atserv.atserver_request(
'https://aternos.org/panel/ajax/players/remove.php',
'POST', data={
'list': self.lst.value,
'name': name
}, sendtoken=True
)
def remove(self, name: str) -> None:
for i, j in enumerate(self.players):
if j == name:
del self.players[i]
"""Removes a player from the list by the nickname
:param name: Player's nickname
:type name: str
"""
self.atserv.atserver_request(
'https://aternos.org/panel/ajax/players/remove.php',
'POST', data={
'list': self.lst.value,
'name': name
}, sendtoken=True
)
for i, j in enumerate(self.players):
if j == name:
del self.players[i]

View file

@ -15,329 +15,337 @@ from .atplayers import PlayersList
from .atplayers import Lists
from .atwss import AternosWss
class Edition(enum.IntEnum):
"""Server edition type enum"""
"""Server edition type enum"""
java = 0
bedrock = 1
java = 0
bedrock = 1
class Status(enum.IntEnum):
"""Server numeric status enum.
It is highly recommended to use
`AternosServer.status` instead of
`AternosServer.status_num`"""
"""Server numeric status enum.
It is highly recommended to use
`AternosServer.status` instead of
`AternosServer.status_num`"""
off = 0
on = 1
starting = 2
shutdown = 3
unknown = 6
error = 7
confirm = 10
off = 0
on = 1
starting = 2
shutdown = 3
unknown = 6
error = 7
confirm = 10
class AternosServer:
"""Class for controlling your Aternos Minecraft server
"""Class for controlling your Aternos Minecraft server
:param servid: Unique server IDentifier
:type servid: str
:param atconn: :class:`python_aternos.atconnect.AternosConnect`
instance with initialized Aternos session
:type atconn: python_aternos.atconnect.AternosConnect
:param reqinfo: Automatically call AternosServer.fetch() to get all info, defaults to `True`
:type reqinfo: bool, optional
"""
:param servid: Unique server IDentifier
:type servid: str
:param atconn: :class:`python_aternos.atconnect.AternosConnect`
instance with initialized Aternos session
:type atconn: python_aternos.atconnect.AternosConnect
:param reqinfo: Automatically call AternosServer.fetch()
to get all info, defaults to `True`
:type reqinfo: bool, optional
"""
def __init__(
self, servid:str,
atconn:AternosConnect,
reqinfo:bool=True) -> None:
def __init__(
self, servid: str,
atconn: AternosConnect,
reqinfo: bool = True) -> None:
self.servid = servid
self.atconn = atconn
if reqinfo:
self.fetch()
def fetch(self) -> None:
self.servid = servid
self.atconn = atconn
if reqinfo:
self.fetch()
"""Send a request to Aternos API to get all server info"""
def fetch(self) -> None:
servreq = self.atserver_request(
'https://aternos.org/panel/ajax/status.php',
'GET', sendtoken=True
)
self._info = json.loads(servreq.content)
"""Send a request to Aternos API to get all server info"""
def wss(self, autoconfirm:bool=False) -> AternosWss:
servreq = self.atserver_request(
'https://aternos.org/panel/ajax/status.php',
'GET', sendtoken=True
)
self._info = json.loads(servreq.content)
"""Returns :class:`python_aternos.atwss.AternosWss` instance for listening server streams in real-time
def wss(self, autoconfirm: bool = False) -> AternosWss:
:param autoconfirm: Automatically start server status listener
when AternosWss connects to API to confirm server launching, defaults to `False`
:type autoconfirm: bool, optional
:return: :class:`python_aternos.atwss.AternosWss` object
:rtype: python_aternos.atwss.AternosWss
"""
"""Returns :class:`python_aternos.atwss.AternosWss`
instance for listening server streams in real-time
return AternosWss(self, autoconfirm)
:param autoconfirm: Automatically start server status listener
when AternosWss connects to API to confirm
server launching, defaults to `False`
:type autoconfirm: bool, optional
:return: :class:`python_aternos.atwss.AternosWss` object
:rtype: python_aternos.atwss.AternosWss
"""
def start(self, headstart:bool=False, accepteula:bool=True) -> None:
return AternosWss(self, autoconfirm)
"""Starts a server
def start(self, headstart: bool = False, accepteula: bool = True) -> None:
:param headstart: Start a server in the headstart mode which allows you to skip all queue, defaults to `False`
:type headstart: bool, optional
:param accepteula: Automatically accept the Mojang EULA, defaults to `True`
:type accepteula: bool, optional
:raises ServerEulaError: When trying to start a server
without accepting the Mojang EULA
:raises ServerRunningError: When trying to start a server
which is alreday running
:raises ServerSoftwareError: When Aternos notifies about
incorrect software version
:raises ServerStorageError: When Aternos notifies about
voilation of storage limits (4 GB for now)
:raises ServerError: When API is unable to start a Minecraft server
due to unavailability of Aternos' file servers or other problems
"""
"""Starts a server
startreq = self.atserver_request(
'https://aternos.org/panel/ajax/start.php',
'GET', params={'headstart': int(headstart)},
sendtoken=True
)
startresult = startreq.json()
:param headstart: Start a server in the headstart mode
which allows you to skip all queue, defaults to `False`
:type headstart: bool, optional
:param accepteula: Automatically accept
the Mojang EULA, defaults to `True`
:type accepteula: bool, optional
:raises ServerEulaError: When trying to start a server
without accepting the Mojang EULA
:raises ServerRunningError: When trying to start a server
which is alreday running
:raises ServerSoftwareError: When Aternos notifies about
incorrect software version
:raises ServerStorageError: When Aternos notifies about
voilation of storage limits (4 GB for now)
:raises ServerError: When API is unable to start a Minecraft server
due to unavailability of Aternos' file servers or other problems
"""
if startresult['success']:
return
error = startresult['error']
startreq = self.atserver_request(
'https://aternos.org/panel/ajax/start.php',
'GET', params={'headstart': int(headstart)},
sendtoken=True
)
startresult = startreq.json()
if error == 'eula' and accepteula:
self.eula()
self.start(accepteula=False)
if startresult['success']:
return
error = startresult['error']
elif error == 'eula':
raise ServerEulaError(
'EULA was not accepted. Use start(accepteula=True)'
)
if error == 'eula' and accepteula:
self.eula()
self.start(accepteula=False)
elif error == 'already':
raise ServerRunningError(
'Server is already running'
)
elif error == 'eula':
raise ServerEulaError(
'EULA was not accepted. Use start(accepteula=True)'
)
elif error == 'wrongversion':
raise ServerSoftwareError(
'Incorrect software version installed'
)
elif error == 'already':
raise ServerRunningError(
'Server is already running'
)
elif error == 'file':
raise ServerError(
'File server is unavailbale, view https://status.aternos.gmbh'
)
elif error == 'wrongversion':
raise ServerSoftwareError(
'Incorrect software version installed'
)
elif error == 'size':
raise ServerStorageError(
f'Available storage size is 4GB, ' + \
f'your server used: {startresult["size"]}'
)
elif error == 'file':
raise ServerError(
'File server is unavailbale, view https://status.aternos.gmbh'
)
else:
raise ServerError(
f'Unable to start server, code: {error}'
)
elif error == 'size':
raise ServerStorageError(
f'Available storage size is 4GB, '
f'your server used: {startresult["size"]}'
)
def confirm(self) -> None:
else:
raise ServerError(
f'Unable to start server, code: {error}'
)
"""Confirms server launching"""
def confirm(self) -> None:
self.atserver_request(
'https://aternos.org/panel/ajax/confirm.php',
'GET', sendtoken=True
)
"""Confirms server launching"""
def stop(self) -> None:
self.atserver_request(
'https://aternos.org/panel/ajax/confirm.php',
'GET', sendtoken=True
)
"""Stops the server"""
def stop(self) -> None:
self.atserver_request(
'https://aternos.org/panel/ajax/stop.php',
'GET', sendtoken=True
)
"""Stops the server"""
def cancel(self) -> None:
self.atserver_request(
'https://aternos.org/panel/ajax/stop.php',
'GET', sendtoken=True
)
"""Cancels server launching"""
def cancel(self) -> None:
self.atserver_request(
'https://aternos.org/panel/ajax/cancel.php',
'GET', sendtoken=True
)
"""Cancels server launching"""
def restart(self) -> None:
self.atserver_request(
'https://aternos.org/panel/ajax/cancel.php',
'GET', sendtoken=True
)
"""Restarts the server"""
def restart(self) -> None:
self.atserver_request(
'https://aternos.org/panel/ajax/restart.php',
'GET', sendtoken=True
)
"""Restarts the server"""
def eula(self) -> None:
self.atserver_request(
'https://aternos.org/panel/ajax/restart.php',
'GET', sendtoken=True
)
"""Accepts the Mojang EULA"""
def eula(self) -> None:
self.atserver_request(
'https://aternos.org/panel/ajax/eula.php',
'GET', sendtoken=True
)
"""Accepts the Mojang EULA"""
def files(self) -> FileManager:
self.atserver_request(
'https://aternos.org/panel/ajax/eula.php',
'GET', sendtoken=True
)
"""Returns :class:`python_aternos.atfm.FileManager`
instance for file operations
def files(self) -> FileManager:
:return: :class:`python_aternos.atfm.FileManager` object
:rtype: python_aternos.atfm.FileManager
"""
"""Returns :class:`python_aternos.atfm.FileManager`
instance for file operations
return FileManager(self)
:return: :class:`python_aternos.atfm.FileManager` object
:rtype: python_aternos.atfm.FileManager
"""
def config(self) -> AternosConfig:
return FileManager(self)
"""Returns :class:`python_aternos.atconf.AternosConfig`
instance for editing server settings
def config(self) -> AternosConfig:
:return: :class:`python_aternos.atconf.AternosConfig` object
:rtype: python_aternos.atconf.AternosConfig
"""
"""Returns :class:`python_aternos.atconf.AternosConfig`
instance for editing server settings
return AternosConfig(self)
:return: :class:`python_aternos.atconf.AternosConfig` object
:rtype: python_aternos.atconf.AternosConfig
"""
def players(self, lst:Lists) -> PlayersList:
return AternosConfig(self)
"""Returns :class:`python_aternos.atplayers.PlayersList`
instance for managing operators, whitelist and banned players lists
def players(self, lst: Lists) -> PlayersList:
:param lst: Players list type, must be
the :class:`python_aternos.atplayers.Lists` enum value
:type lst: python_aternos.atplayers.Lists
:return: :class:`python_aternos.atplayers.PlayersList`
:rtype: python_aternos.atplayers.PlayersList
"""
"""Returns :class:`python_aternos.atplayers.PlayersList`
instance for managing operators, whitelist and banned players lists
return PlayersList(lst, self)
:param lst: Players list type, must be
the :class:`python_aternos.atplayers.Lists` enum value
:type lst: python_aternos.atplayers.Lists
:return: :class:`python_aternos.atplayers.PlayersList`
:rtype: python_aternos.atplayers.PlayersList
"""
def atserver_request(
self, url:str, method:str,
params:Optional[dict]=None,
data:Optional[dict]=None,
headers:Optional[dict]=None,
sendtoken:bool=False) -> Response:
return PlayersList(lst, self)
"""Sends a request to Aternos API
with server IDenitfier parameter
def atserver_request(
self, url: str, method: str,
params: Optional[dict] = None,
data: Optional[dict] = None,
headers: Optional[dict] = None,
sendtoken: bool = False) -> Response:
:param url: Request URL
:type url: str
:param method: Request method, must be GET or POST
:type method: str
:param params: URL parameters, defaults to None
:type params: Optional[dict], optional
:param data: POST request data, if the method is GET,
this dict will be combined with params, defaults to None
:type data: Optional[dict], optional
:param headers: Custom headers, defaults to None
:type headers: Optional[dict], optional
:param sendtoken: If the ajax and SEC token
should be sent, defaults to False
:type sendtoken: bool, optional
:return: API response
:rtype: requests.Response
"""
"""Sends a request to Aternos API
with server IDenitfier parameter
return self.atconn.request_cloudflare(
url=url, method=method,
params=params, data=data,
headers=headers,
reqcookies={
'ATERNOS_SERVER': self.servid
},
sendtoken=sendtoken
)
:param url: Request URL
:type url: str
:param method: Request method, must be GET or POST
:type method: str
:param params: URL parameters, defaults to None
:type params: Optional[dict], optional
:param data: POST request data, if the method is GET,
this dict will be combined with params, defaults to None
:type data: Optional[dict], optional
:param headers: Custom headers, defaults to None
:type headers: Optional[dict], optional
:param sendtoken: If the ajax and SEC token
should be sent, defaults to False
:type sendtoken: bool, optional
:return: API response
:rtype: requests.Response
"""
@property
def subdomain(self) -> str:
atdomain = self.domain
return atdomain[:atdomain.find('.')]
return self.atconn.request_cloudflare(
url=url, method=method,
params=params, data=data,
headers=headers,
reqcookies={
'ATERNOS_SERVER': self.servid
},
sendtoken=sendtoken
)
@subdomain.setter
def subdomain(self, value:str) -> None:
self.atserver_request(
'https://aternos.org/panel/ajax/options/subdomain.php',
'GET', params={'subdomain': value},
sendtoken=True
)
@property
def subdomain(self) -> str:
atdomain = self.domain
return atdomain[:atdomain.find('.')]
@property
def motd(self) -> str:
return self._info['motd']
@subdomain.setter
def subdomain(self, value: str) -> None:
self.atserver_request(
'https://aternos.org/panel/ajax/options/subdomain.php',
'GET', params={'subdomain': value},
sendtoken=True
)
@motd.setter
def motd(self, value:str) -> None:
self.atserver_request(
'https://aternos.org/panel/ajax/options/motd.php',
'POST', data={'motd': value},
sendtoken=True
)
@property
def motd(self) -> str:
return self._info['motd']
@property
def address(self) -> str:
return self._info['displayAddress']
@property
def domain(self) -> str:
return self._info['ip']
@motd.setter
def motd(self, value: str) -> None:
self.atserver_request(
'https://aternos.org/panel/ajax/options/motd.php',
'POST', data={'motd': value},
sendtoken=True
)
@property
def port(self) -> int:
return self._info['port']
@property
def address(self) -> str:
return self._info['displayAddress']
@property
def edition(self) -> int:
soft_type = self._info['bedrock']
return int(soft_type)
@property
def domain(self) -> str:
return self._info['ip']
@property
def software(self) -> str:
return self._info['software']
@property
def version(self) -> str:
return self._info['version']
@property
def status(self) -> str:
return self._info['class']
@property
def port(self) -> int:
return self._info['port']
@property
def status_num(self) -> int:
return int(self._info['status'])
@property
def players_list(self) -> List[str]:
return self._info['playerlist']
@property
def players_count(self) -> int:
return int(self._info['players'])
@property
def slots(self) -> int:
return int(self._info['slots'])
@property
def edition(self) -> int:
soft_type = self._info['bedrock']
return int(soft_type)
@property
def ram(self) -> int:
return int(self._info['ram'])
@property
def software(self) -> str:
return self._info['software']
@property
def version(self) -> str:
return self._info['version']
@property
def status(self) -> str:
return self._info['class']
@property
def status_num(self) -> int:
return int(self._info['status'])
@property
def players_list(self) -> List[str]:
return self._info['playerlist']
@property
def players_count(self) -> int:
return int(self._info['players'])
@property
def slots(self) -> int:
return int(self._info['slots'])
@property
def ram(self) -> int:
return int(self._info['ram'])

View file

@ -8,215 +8,224 @@ from typing import TYPE_CHECKING
from .atconnect import REQUA
if TYPE_CHECKING:
from .atserver import AternosServer
from .atserver import AternosServer
class Streams(enum.Enum):
"""WebSocket streams types"""
"""WebSocket streams types"""
status = (0,None)
queue = (1,None)
console = (2,'console')
ram = (3,'heap')
tps = (4,'tick')
status = (0, None)
queue = (1, None)
console = (2, 'console')
ram = (3, 'heap')
tps = (4, 'tick')
def __init__(self, num: int, stream: str) -> None:
self.num = num
self.stream = stream
def __init__(self, num:int, stream:str):
self.num = num
self.stream = stream
class AternosWss:
"""Class for managing websocket connection
"""Class for managing websocket connection
:param atserv: :class:`python_aternos.atserver.AternosServer` instance
:type atserv: python_aternos.atserver.AternosServer
:param autoconfirm: Automatically start server status listener
when AternosWss connects to API to confirm server launching, defaults to `False`
:type autoconfirm: bool, optional
"""
:param atserv: :class:`python_aternos.atserver.AternosServer` instance
:type atserv: python_aternos.atserver.AternosServer
:param autoconfirm: Automatically start server status listener
when AternosWss connects to API to confirm
server launching, defaults to `False`
:type autoconfirm: bool, optional
"""
def __init__(self, atserv:'AternosServer', autoconfirm:bool=False) -> None:
self.atserv = atserv
self.cookies = atserv.atconn.session.cookies
self.session = self.cookies['ATERNOS_SESSION']
self.servid = atserv.servid
self.recv = {}
self.autoconfirm = autoconfirm
self.confirmed = False
async def confirm(self) -> None:
def __init__(self, atserv: 'AternosServer', autoconfirm: bool = False) -> None:
"""Simple way to call AternosServer.confirm from this class"""
self.atserv = atserv
self.cookies = atserv.atconn.session.cookies
self.session = self.cookies['ATERNOS_SESSION']
self.servid = atserv.servid
self.recv = {}
self.autoconfirm = autoconfirm
self.confirmed = False
self.atserv.confirm()
async def confirm(self) -> None:
def wssreceiver(self, stream:Streams, *args:Any) -> Callable[[Callable[[Any],Coroutine[Any,Any,None]]],Any]:
"""Simple way to call AternosServer.confirm from this class"""
"""Decorator that marks your function as a stream receiver.
When websocket receives message from the specified stream,
it calls all listeners created with this decorator.
self.atserv.confirm()
:param stream: Stream that your function should listen
:type stream: python_aternos.atwss.Streams
:param args: Arguments which will be passed to your function
:type args: tuple, optional
:return: ...
:rtype: Callable[[Callable[[Any],Coroutine[Any,Any,None]]],Any]
"""
def wssreceiver(self, stream: Streams, *args: Any) -> Callable[[Callable[[Any], Coroutine[Any, Any, None]]], Any]:
def decorator(func:Callable[[Any],Coroutine[Any,Any,None]]) -> None:
self.recv[stream] = (func, args)
return decorator
"""Decorator that marks your function as a stream receiver.
When websocket receives message from the specified stream,
it calls all listeners created with this decorator.
async def connect(self) -> None:
:param stream: Stream that your function should listen
:type stream: python_aternos.atwss.Streams
:param args: Arguments which will be passed to your function
:type args: tuple, optional
:return: ...
:rtype: Callable[[Callable[[Any], Coroutine[Any, Any, None]]], Any]
"""
"""Connect to the websocket server and start all stream listeners"""
headers = [
('Host', 'aternos.org'),
('User-Agent', REQUA),
(
'Cookie',
f'ATERNOS_SESSION={self.session}; ' + \
f'ATERNOS_SERVER={self.servid}'
)
]
self.socket = await websockets.connect(
'wss://aternos.org/hermes/',
origin='https://aternos.org',
extra_headers=headers
)
def decorator(func: Callable[[Any], Coroutine[Any, Any, None]]) -> None:
self.recv[stream] = (func, args)
return decorator
@self.wssreceiver(Streams.status)
async def confirmfunc(msg):
async def connect(self) -> None:
"""Automatically confirm Minecraft server launching"""
"""Connect to the websocket server and start all stream listeners"""
if not self.autoconfirm:
return
if msg['class'] == 'queueing' \
and msg['queue']['pending'] == 'pending'\
and not self.confirmed:
self.confirm()
@self.wssreceiver(Streams.status)
async def streamsfunc(msg):
headers = [
('Host', 'aternos.org'),
('User-Agent', REQUA),
(
'Cookie',
f'ATERNOS_SESSION={self.session}; '
f'ATERNOS_SERVER={self.servid}'
)
]
self.socket = await websockets.connect(
'wss://aternos.org/hermes/',
origin='https://aternos.org',
extra_headers=headers
)
"""Automatically starts streams. Detailed description:
@self.wssreceiver(Streams.status)
async def confirmfunc(msg):
According to the websocket messages from the web site,
Aternos can't receive any data from a stream (e.g. console) until
it requests this stream via the special message to the websocket server:
`{"stream":"console","type":"start"}`
on which the server responses with: `{"type":"connected"}`
Also, there are RAM (used heap) and TPS (ticks per second)
streams that must be enabled before trying to get information.
Enabling the stream for listening the server status is not needed,
these data is sent from API by default, so there's None value in
the second item of its stream type tuple (`<Streams.status: (0, None)>`).
https://github.com/DarkCat09/python-aternos/issues/22#issuecomment-1146788496
"""
"""Automatically confirm Minecraft server launching"""
if msg['status'] == 2:
# Automatically start streams
for strm in self.recv:
if not isinstance(strm,Streams):
continue
if strm.stream:
logging.debug(f'Enabling {strm.stream} stream')
await self.send({
'stream': strm.stream,
'type': 'start'
})
await self.wssworker()
if not self.autoconfirm:
return
async def close(self) -> None:
in_queue = (msg['class'] == 'queueing')
pending = (msg['queue']['pending'] == 'pending')
confirmation = in_queue and pending
"""Closes websocket connection and stops all listeners"""
self.keep.cancel()
self.msgs.cancel()
await self.socket.close()
del self.socket
if confirmation and not self.confirmed:
self.confirm()
async def send(self, obj:Union[Dict[str, Any],str]) -> None:
@self.wssreceiver(Streams.status)
async def streamsfunc(msg):
"""Sends a message to websocket server
"""Automatically starts streams. Detailed description:
:param obj: Message, may be a string or a dict
:type obj: Union[Dict[str, Any],str]
"""
According to the websocket messages from the web site,
Aternos can't receive any data from a stream (e.g. console) until
it requests this stream via the special message
to the websocket server: `{"stream":"console","type":"start"}`
on which the server responses with: `{"type":"connected"}`
Also, there are RAM (used heap) and TPS (ticks per second)
streams that must be enabled before trying to get information.
Enabling the stream for listening the server status is not needed,
these data is sent from API by default, so there's None value in
the second item of its stream type tuple
(`<Streams.status: (0, None)>`).
https://github.com/DarkCat09/python-aternos/issues/22#issuecomment-1146788496
"""
if isinstance(obj, dict):
obj = json.dumps(obj)
if msg['status'] == 2:
# Automatically start streams
for strm in self.recv:
await self.socket.send(obj)
if not isinstance(strm, Streams):
continue
async def wssworker(self) -> None:
if strm.stream:
logging.debug(f'Enabling {strm.stream} stream')
await self.send({
'stream': strm.stream,
'type': 'start'
})
"""Starts async tasks in background
for receiving websocket messages
and sending keepalive ping"""
await self.wssworker()
self.keep = asyncio.create_task(self.keepalive())
self.msgs = asyncio.create_task(self.receiver())
async def close(self) -> None:
async def keepalive(self) -> None:
"""Closes websocket connection and stops all listeners"""
"""Each 49 seconds sends keepalive ping to websocket server"""
self.keep.cancel()
self.msgs.cancel()
await self.socket.close()
del self.socket
try:
while True:
await asyncio.sleep(49)
await self.socket.send('{"type":"\u2764"}')
async def send(self, obj: Union[Dict[str, Any], str]) -> None:
except asyncio.CancelledError:
pass
"""Sends a message to websocket server
async def receiver(self) -> None:
:param obj: Message, may be a string or a dict
:type obj: Union[Dict[str, Any],str]
"""
"""Receives messages from websocket servers
and calls user's streams listeners"""
if isinstance(obj, dict):
obj = json.dumps(obj)
try:
while True:
data = await self.socket.recv()
obj = json.loads(data)
msgtype = -1
if obj['type'] == 'line':
msgtype = Streams.console
msg = obj['data'].strip('\r\n ')
await self.socket.send(obj)
elif obj['type'] == 'heap':
msgtype = Streams.ram
msg = int(obj['data']['usage'])
async def wssworker(self) -> None:
elif obj['type'] == 'tick':
msgtype = Streams.tps
ticks = 1000 / obj['data']['averageTickTime']
msg = 20 if ticks > 20 else ticks
"""Starts async tasks in background
for receiving websocket messages
and sending keepalive ping"""
elif obj['type'] == 'status':
msgtype = Streams.status
msg = json.loads(obj['message'])
self.keep = asyncio.create_task(self.keepalive())
self.msgs = asyncio.create_task(self.receiver())
if msgtype in self.recv:
async def keepalive(self) -> None:
# function info tuple:
# (function, arguments)
func = self.recv[msgtype]
"""Each 49 seconds sends keepalive ping to websocket server"""
# if arguments is not empty
if func[1]:
# call the function with args
coro = func[0](msg, func[1])
else:
coro = func[0](msg)
# run
await asyncio.create_task(coro)
except asyncio.CancelledError:
pass
try:
while True:
await asyncio.sleep(49)
await self.socket.send('{"type":"\u2764"}')
except asyncio.CancelledError:
pass
async def receiver(self) -> None:
"""Receives messages from websocket servers
and calls user's streams listeners"""
try:
while True:
data = await self.socket.recv()
obj = json.loads(data)
msgtype = -1
if obj['type'] == 'line':
msgtype = Streams.console
msg = obj['data'].strip('\r\n ')
elif obj['type'] == 'heap':
msgtype = Streams.ram
msg = int(obj['data']['usage'])
elif obj['type'] == 'tick':
msgtype = Streams.tps
ticks = 1000 / obj['data']['averageTickTime']
msg = 20 if ticks > 20 else ticks
elif obj['type'] == 'status':
msgtype = Streams.status
msg = json.loads(obj['message'])
if msgtype in self.recv:
# function info tuple:
# (function, arguments)
func = self.recv[msgtype]
# if arguments is not empty
if func[1]:
# call the function with args
coro = func[0](msg, func[1])
else:
coro = func[0](msg)
# run
await asyncio.create_task(coro)
except asyncio.CancelledError:
pass