deluge/deluge/ui/common.py
Calum Lind 0d72195281
[Lint] Format code with ruff
`pre-commit run --all-files`
2024-09-08 17:51:43 +01:00

728 lines
24 KiB
Python

#
# Copyright (C) Damien Churchill 2008-2009 <damoxc@gmail.com>
# Copyright (C) Andrew Resch 2009 <andrewresch@gmail.com>
#
# This file is part of Deluge and is licensed under GNU General Public License 3.0, or later, with
# the additional special exception to link portions of this program with the OpenSSL library.
# See LICENSE for more details.
#
"""
The ui common module contains methods and classes that are deemed useful for all the interfaces.
"""
import logging
import os
from hashlib import sha1 as sha
from typing import Tuple
from deluge import bencode
from deluge.common import decode_bytes
log = logging.getLogger(__name__)
# Dummy translation dicts so the text is available for Translators.
#
# All entries in deluge.common.TORRENT_STATE should be added here.
#
# No need to import these, just simply use the `_()` function around a status variable.
def _(message):
return message
STATE_TRANSLATION = {
'All': _('All'),
'Active': _('Active'),
'Allocating': _('Allocating'),
'Checking': _('Checking'),
'Downloading': _('Downloading'),
'Seeding': _('Seeding'),
'Paused': _('Paused'),
'Queued': _('Queued'),
'Error': _('Error'),
}
TORRENT_DATA_FIELD = {
'queue': {'name': '#', 'status': ['queue']},
'name': {'name': _('Name'), 'status': ['state', 'name']},
'progress_state': {'name': _('Progress'), 'status': ['progress', 'state']},
'state': {'name': _('State'), 'status': ['state']},
'progress': {'name': _('Progress'), 'status': ['progress']},
'size': {'name': _('Size'), 'status': ['total_wanted']},
'downloaded': {'name': _('Downloaded'), 'status': ['all_time_download']},
'uploaded': {'name': _('Uploaded'), 'status': ['total_uploaded']},
'remaining': {'name': _('Remaining'), 'status': ['total_remaining']},
'ratio': {'name': _('Ratio'), 'status': ['ratio']},
'download_speed': {'name': _('Down Speed'), 'status': ['download_payload_rate']},
'upload_speed': {'name': _('Up Speed'), 'status': ['upload_payload_rate']},
'max_download_speed': {'name': _('Down Limit'), 'status': ['max_download_speed']},
'max_upload_speed': {'name': _('Up Limit'), 'status': ['max_upload_speed']},
'max_connections': {'name': _('Max Connections'), 'status': ['max_connections']},
'max_upload_slots': {'name': _('Max Upload Slots'), 'status': ['max_upload_slots']},
'peers': {'name': _('Peers'), 'status': ['num_peers', 'total_peers']},
'seeds': {'name': _('Seeds'), 'status': ['num_seeds', 'total_seeds']},
'avail': {'name': _('Avail'), 'status': ['distributed_copies']},
'seeds_peers_ratio': {'name': _('Seeds:Peers'), 'status': ['seeds_peers_ratio']},
'time_added': {'name': _('Added'), 'status': ['time_added']},
'tracker': {'name': _('Tracker'), 'status': ['tracker_host']},
'download_location': {
'name': _('Download Folder'),
'status': ['download_location'],
},
'seeding_time': {'name': _('Seeding Time'), 'status': ['seeding_time']},
'active_time': {'name': _('Active Time'), 'status': ['active_time']},
'time_since_transfer': {
'name': _('Last Activity'),
'status': ['time_since_transfer'],
},
'finished_time': {'name': _('Finished Time'), 'status': ['finished_time']},
'last_seen_complete': {
'name': _('Complete Seen'),
'status': ['last_seen_complete'],
},
'completed_time': {'name': _('Completed'), 'status': ['completed_time']},
'eta': {'name': _('ETA'), 'status': ['eta']},
'shared': {'name': _('Shared'), 'status': ['shared']},
'prioritize_first_last': {
'name': _('Prioritize First/Last'),
'status': ['prioritize_first_last'],
},
'sequential_download': {
'name': _('Sequential Download'),
'status': ['sequential_download'],
},
'is_auto_managed': {'name': _('Auto Managed'), 'status': ['is_auto_managed']},
'auto_managed': {'name': _('Auto Managed'), 'status': ['auto_managed']},
'stop_at_ratio': {'name': _('Stop At Ratio'), 'status': ['stop_at_ratio']},
'stop_ratio': {'name': _('Stop Ratio'), 'status': ['stop_ratio']},
'remove_at_ratio': {'name': _('Remove At Ratio'), 'status': ['remove_at_ratio']},
'move_completed': {'name': _('Move On Completed'), 'status': ['move_completed']},
'move_completed_path': {
'name': _('Move Completed Path'),
'status': ['move_completed_path'],
},
'move_on_completed': {
'name': _('Move On Completed'),
'status': ['move_on_completed'],
},
'move_on_completed_path': {
'name': _('Move On Completed Path'),
'status': ['move_on_completed_path'],
},
'owner': {'name': _('Owner'), 'status': ['owner']},
'pieces': {'name': _('Pieces'), 'status': ['num_pieces', 'piece_length']},
'seed_rank': {'name': _('Seed Rank'), 'status': ['seed_rank']},
'super_seeding': {'name': _('Super Seeding'), 'status': ['super_seeding']},
}
TRACKER_STATUS_TRANSLATION = [
_('Error'),
_('Warning'),
_('Announce OK'),
_('Announce Sent'),
]
PREFS_CATOG_TRANS = {
'interface': _('Interface'),
'downloads': _('Downloads'),
'bandwidth': _('Bandwidth'),
'queue': _('Queue'),
'network': _('Network'),
'proxy': _('Proxy'),
'cache': _('Cache'),
'other': _('Other'),
'daemon': _('Daemon'),
'plugins': _('Plugins'),
}
FILE_PRIORITY = {
0: 'Skip',
1: 'Low',
2: 'Low',
3: 'Low',
4: 'Normal',
5: 'High',
6: 'High',
7: 'High',
_('Skip'): 0,
_('Low'): 1,
_('Normal'): 4,
_('High'): 7,
}
del _
# The keys from session statistics for cache status.
DISK_CACHE_KEYS = [
'disk.num_blocks_read',
'disk.num_blocks_written',
'disk.num_read_ops',
'disk.num_write_ops',
'read_hit_ratio',
'write_hit_ratio',
'disk.disk_blocks_in_use',
'disk.read_cache_blocks',
]
class TorrentInfo:
"""Collects information about a torrent file.
Args:
filename (str, optional): The path to the .torrent file.
filetree (int, optional): The version of filetree to create (defaults to 1).
torrent_file (dict, optional): A bdecoded .torrent file contents.
force_bt_version (int, optional): The BitTorrent spec to use for parsing (defaults to 1).
"""
def __init__(self, filename='', filetree=1, torrent_file=None, force_bt_version=1):
self._filedata = None
if torrent_file:
self._metainfo = torrent_file
elif filename:
log.debug('Attempting to open %s.', filename)
try:
with open(filename, 'rb') as _file:
self._filedata = _file.read()
except OSError as ex:
log.warning('Unable to open %s: %s', filename, ex)
return
try:
self._metainfo = bencode.bdecode(self._filedata)
except bencode.BTFailure as ex:
log.warning('Failed to decode %s: %s', filename, ex)
return
else:
log.warning('Requires valid arguments.')
return
# info_dict with keys decoded to unicode.
info_dict = {k.decode(): v for k, v in self._metainfo[b'info'].items()}
self._info_hash = sha(bencode.bencode(info_dict)).hexdigest()
# Get encoding from torrent file if available
encoding = info_dict.get(
'encoding', info_dict.get('codepage', b'UTF-8')
).decode()
# Decode 'name' with encoding unless 'name.utf-8' found.
if 'name.utf-8' in info_dict:
self._name = decode_bytes(info_dict['name.utf-8'])
else:
self._name = decode_bytes(info_dict['name'], encoding)
meta_version = info_dict['meta version'] if 'meta version' in info_dict else -1
is_hybrid = 'files' in info_dict and meta_version == 2
parse_v1 = False
parse_v2 = False
if is_hybrid:
if force_bt_version == 1:
parse_v1 = True
elif force_bt_version == 2:
parse_v2 = True
elif 'files' in info_dict:
parse_v1 = True
elif meta_version == 2 and 'file tree' in info_dict:
parse_v2 = True
# Get list of files from torrent info
self._files = []
if parse_v1:
paths = {}
dirs = {}
prefix = self._name
for index, f in enumerate(info_dict['files']):
f = {k.decode(): v for k, v in f.items()}
if 'path.utf-8' in f:
path = decode_bytes(os.path.join(*f['path.utf-8']))
else:
path = decode_bytes(os.path.join(*f['path']), encoding)
if prefix:
path = os.path.join(prefix, path)
# Ensure agnostic path separator
path = path.replace('\\', '/')
self._files.append(
{'path': path, 'size': f['length'], 'download': True}
)
paths[path] = {'path': path, 'index': index, 'length': f['length']}
dirname = os.path.dirname(path)
while dirname:
dirinfo = dirs.setdefault(dirname, {})
dirinfo['length'] = dirinfo.get('length', 0) + f['length']
dirname = os.path.dirname(dirname)
if filetree == 2:
def walk(full_path, item):
if item['type'] == 'dir':
item.update(dirs[full_path])
else:
item.update(paths[full_path])
item['download'] = True
file_tree = FileTree2(list(paths))
file_tree.walk(walk)
else:
def walk(full_path, item):
if isinstance(item, dict):
return item
return [paths[full_path]['index'], paths[full_path]['length'], True]
file_tree = FileTree(paths)
file_tree.walk(walk)
self._files_tree = file_tree.get_tree()
elif parse_v2:
def single_file_torrent(inner_info_dict):
if len(inner_info_dict['file tree']) > 1:
return False
file_name = [key for key in inner_info_dict['file tree']][0]
return inner_info_dict['name'] == file_name
if not single_file_torrent(info_dict):
info_dict['file tree'] = {info_dict['name']: info_dict['file tree']}
if filetree == 2:
def walk(full_path, item):
if item['type'] == 'file':
item['path'] = full_path
self._files.append(
{
'path': full_path,
'size': item['length'],
'download': True,
}
)
item['download'] = True
file_tree = FileTree2BTv2(info_dict['file tree'])
file_tree.walk(walk)
else:
def walk(full_path, item):
if isinstance(item, dict):
return item
self._files.append(
{'path': full_path, 'size': item[1], 'download': True}
)
return [item[0], item[1], True]
file_tree = FiletreeBTv2(info_dict['file tree'])
file_tree.walk(walk)
self._files_tree = file_tree.get_tree()
else:
self._files.append(
{'path': self._name, 'size': info_dict['length'], 'download': True}
)
if filetree == 2:
self._files_tree = {
'contents': {
self._name: {
'type': 'file',
'index': 0,
'length': info_dict['length'],
'download': True,
}
}
}
else:
self._files_tree = {self._name: (0, info_dict['length'], True)}
@classmethod
def from_metadata(cls, metadata, trackers=None):
"""Create a TorrentInfo from metadata and trackers
Args:
metadata (dict): A bdecoded info section of torrent file.
trackers (list of lists, optional): The trackers to include.
"""
if not isinstance(metadata, dict):
return
metainfo = {b'info': metadata}
if trackers:
metainfo[b'announce'] = trackers[0][0].encode('utf-8')
trackers_utf8 = [[t.encode('utf-8') for t in tier] for tier in trackers]
metainfo[b'announce-list'] = trackers_utf8
return cls(torrent_file=metainfo)
def as_dict(self, *keys):
"""The torrent info as a dictionary, filtered by keys.
Args:
keys (str): A space-separated string of keys.
Returns:
dict: The torrent info dict with specified keys.
"""
return {key: getattr(self, key) for key in keys}
@property
def name(self):
"""The name of the torrent.
Returns:
str: The torrent name.
"""
return self._name
@property
def info_hash(self):
"""The calculated torrent info_hash.
Returns:
str: The torrent info_hash.
"""
return self._info_hash
@property
def files(self):
"""The files that the torrent contains.
Returns:
list: The list of torrent files.
"""
return self._files
@property
def files_tree(self):
"""A tree of the files the torrent contains.
::
{
"some_directory": {
"some_file": (index, size, download)
}
}
Returns:
dict: The tree of files.
"""
return self._files_tree
@property
def metainfo(self):
"""Returns the torrent metainfo dictionary.
This is the bdecoded torrent file contents.
Returns:
dict: The metainfo dictionary.
"""
return self._metainfo
@property
def filedata(self):
"""The contents of the .torrent file.
Returns:
bytes: The bencoded metainfo.
"""
if not self._filedata:
self._filedata = bencode.bencode(self._metainfo)
return self._filedata
class FileTree2:
"""
Converts a list of paths, from a V1 torrent, into a file tree.
Each file will have the dictionary structure of:
{ file_name: {type, path, index, length, download} }
where:
type (str): will always be "file"
path (str): the absolute file path from the root the torrent
index (int): the index of file in the torrent
length (int): the size of the file, in bytes
download (bool): marks the file to download
Folder will be dictionaries of files:
{ dir1: type, contents: {file_name1: {...}, file_name2: {...}}, dir2: ... }
where:
type (str): will always be "dir"
contents (dict): a dictionary of inner files and folders
The entire tree will start with a root dictionary:
{ contents: {dirs...}, type: "dir" }
Args:
paths (list): The paths to be converted.
"""
def __init__(self, paths: list):
self.tree = {'contents': {}, 'type': 'dir'}
def get_parent(path):
parent = self.tree
while '/' in path:
directory, path = path.split('/', 1)
child = parent['contents'].get(directory)
if child is None:
parent['contents'][directory] = {'type': 'dir', 'contents': {}}
parent = parent['contents'][directory]
return parent, path
for path in paths:
if path[-1] == '/':
path = path[:-1]
parent, path = get_parent(path)
parent['contents'][path] = {'type': 'dir', 'contents': {}}
else:
parent, path = get_parent(path)
parent['contents'][path] = {'type': 'file'}
def get_tree(self):
"""
Return the tree.
:returns: the file tree.
:rtype: dictionary
"""
return self.tree
def walk(self, callback):
"""
Walk through the file tree calling the callback function on each item
contained.
:param callback: The function to be used as a callback, it should have
the signature func(item, path) where item is a `tuple` for a file
and `dict` for a directory.
:type callback: function
"""
def walk(directory, parent_path):
for path in list(directory['contents']):
full_path = os.path.join(parent_path, path).replace('\\', '/')
if directory['contents'][path]['type'] == 'dir':
directory['contents'][path] = (
callback(full_path, directory['contents'][path])
or directory['contents'][path]
)
walk(directory['contents'][path], full_path)
else:
directory['contents'][path] = (
callback(full_path, directory['contents'][path])
or directory['contents'][path]
)
walk(self.tree, '')
def __str__(self):
lines = []
def write(path, item):
depth = path.count('/')
path = os.path.basename(path)
path = path + '/' if item['type'] == 'dir' else path
lines.append(' ' * depth + path)
self.walk(write)
return '\n'.join(lines)
class FileTree:
"""
Converts a dict of paths, from a V1 torrent, into a file tree.
Each file will have the dictionary structure of:
{ file_name: [index, length, download] }
Where:
index (int): the index of file in the torrent
length (int): the size of the file, in bytes
download (bool): marks the file to download
Folder will be dictionaries of files:
{ dir1: {file_name1: [...], file_name2: [...]}, dir2: ... }
Args:
paths (dict): The paths to be converted.
"""
def __init__(self, paths: dict):
self.tree = {}
def get_parent(path):
parent = self.tree
while '/' in path:
directory, path = path.split('/', 1)
child = parent.get(directory)
if child is None:
parent[directory] = {}
parent = parent[directory]
return parent, path
for path in paths:
if path[-1] == '/':
path = path[:-1]
parent, path = get_parent(path)
parent[path] = {}
else:
parent, path = get_parent(path)
parent[path] = []
def get_tree(self):
"""
Return the tree, after first converting all file lists to a tuple.
Returns:
dict: the file tree.
"""
def to_tuple(path, item):
if isinstance(item, dict):
return item
return tuple(item)
self.walk(to_tuple)
return self.tree
def walk(self, callback):
"""
Walk through the file tree calling the callback function on each item
contained.
Args:
callback (function): The function to be used as a callback, it should have
the signature func(item, path) where item is a `tuple` for a file
and `dict` for a directory.
"""
def walk(directory, parent_path):
for path in list(directory):
full_path = os.path.join(parent_path, path).replace('\\', '/')
if isinstance(directory[path], dict):
directory[path] = (
callback(full_path, directory[path]) or directory[path]
)
walk(directory[path], full_path)
else:
directory[path] = (
callback(full_path, directory[path]) or directory[path]
)
walk(self.tree, '')
def __str__(self):
lines = []
def write(path, item):
depth = path.count('/')
path = os.path.basename(path)
path = isinstance(item, dict) and path + '/' or path
lines.append(' ' * depth + path)
self.walk(write)
return '\n'.join(lines)
class FiletreeBTv2(FileTree):
"""
Converts a dict of paths, from a V2 torrent, into a file tree.
Each file will have the dictionary structure of:
{ file_name: [index, length, download] }
Where:
index (int): the index of file in the torrent
length (int): the size of the file, in bytes
download (bool): marks the file to download
Folder will be dictionaries of files:
{ dir1: {file_name1: [...], file_name2: [...]}, dir2: ... }
Args:
file_tree (dict): The paths to be converted.
"""
def __init__(self, file_tree):
self.tree = {}
def get_parent(curr_tree_dict, index, parent) -> int:
for key, item in curr_tree_dict.items():
key = decode_bytes(key)
if b'' in item:
parent[key] = [index, item[b''][b'length']]
index += 1
else:
parent[key] = {}
index = get_parent(item, index, parent[key])
return index
get_parent(file_tree, 0, self.tree)
class FileTree2BTv2(FileTree2):
"""
Converts a dict of paths, from a V2 torrent, into a file tree.
Each file will have the dictionary structure of:
{ file_name: {type, path, index, length, download} }
where:
type (str): will always be "file"
path (str): the absolute file path from the root the torrent
index (int): the index of file in the torrent
length (int): the size of the file, in bytes
download (bool): marks the file to download
Folder will be dictionaries of files:
{ dir1: type, contents: {file_name1: {...}, file_name2: {...}}, dir2: ... }
where:
type (str): will always be "dir"
contents (dict): a dictionary of inner files and folders
The entire tree will start with a root dictionary:
{ contents: {dirs...}, type: "dir" }
Args:
file_tree (dict): The paths to be converted.
"""
def __init__(self, file_tree):
self.tree = {'contents': {}, 'type': 'dir'}
def get_parent(curr_tree_dict, index, parent) -> Tuple[int, int]:
total_length = 0
for key, item in curr_tree_dict.items():
key = decode_bytes(key)
if b'' in item:
length = item[b''][b'length']
total_length += length
parent['contents'][key] = {
'index': index,
'length': length,
'type': 'file',
}
index += 1
else:
parent['contents'][key] = {
'contents': {},
'type': 'dir',
'length': 0,
}
index, length = get_parent(item, index, parent['contents'][key])
parent['contents'][key]['length'] = length
total_length += length
return index, total_length
get_parent(file_tree, 0, self.tree)