reverted CRLF to LF changes

This commit is contained in:
ArabCoders 2023-11-03 14:16:11 +03:00
parent 25c44b4da9
commit fead1f99c7

View file

@ -1,353 +1,353 @@
import os import os
import yt_dlp import yt_dlp
from collections import OrderedDict from collections import OrderedDict
import shelve import shelve
import time import time
import asyncio import asyncio
import multiprocessing import multiprocessing
import logging import logging
import re import re
from dl_formats import get_format, get_opts, AUDIO_FORMATS from dl_formats import get_format, get_opts, AUDIO_FORMATS
log = logging.getLogger('ytdl') log = logging.getLogger('ytdl')
class DownloadQueueNotifier: class DownloadQueueNotifier:
async def added(self, dl): async def added(self, dl):
raise NotImplementedError raise NotImplementedError
async def updated(self, dl): async def updated(self, dl):
raise NotImplementedError raise NotImplementedError
async def completed(self, dl): async def completed(self, dl):
raise NotImplementedError raise NotImplementedError
async def canceled(self, id): async def canceled(self, id):
raise NotImplementedError raise NotImplementedError
async def cleared(self, id): async def cleared(self, id):
raise NotImplementedError raise NotImplementedError
class DownloadInfo: class DownloadInfo:
def __init__(self, id, title, url, quality, format, folder, custom_name_prefix): def __init__(self, id, title, url, quality, format, folder, custom_name_prefix):
self.id = id if len(custom_name_prefix) == 0 else f'{custom_name_prefix}.{id}' self.id = id if len(custom_name_prefix) == 0 else f'{custom_name_prefix}.{id}'
self.title = title if len(custom_name_prefix) == 0 else f'{custom_name_prefix}.{title}' self.title = title if len(custom_name_prefix) == 0 else f'{custom_name_prefix}.{title}'
self.url = url self.url = url
self.quality = quality self.quality = quality
self.format = format self.format = format
self.folder = folder self.folder = folder
self.custom_name_prefix = custom_name_prefix self.custom_name_prefix = custom_name_prefix
self.status = self.msg = self.percent = self.speed = self.eta = None self.status = self.msg = self.percent = self.speed = self.eta = None
self.timestamp = time.time_ns() self.timestamp = time.time_ns()
class Download: class Download:
manager = None manager = None
def __init__(self, download_dir, temp_dir, output_template, output_template_chapter, quality, format, ytdl_opts, info): def __init__(self, download_dir, temp_dir, output_template, output_template_chapter, quality, format, ytdl_opts, info):
self.download_dir = download_dir self.download_dir = download_dir
self.temp_dir = temp_dir self.temp_dir = temp_dir
self.output_template = output_template self.output_template = output_template
self.output_template_chapter = output_template_chapter self.output_template_chapter = output_template_chapter
self.format = get_format(format, quality) self.format = get_format(format, quality)
self.ytdl_opts = get_opts(format, quality, ytdl_opts) self.ytdl_opts = get_opts(format, quality, ytdl_opts)
self.info = info self.info = info
self.canceled = False self.canceled = False
self.tmpfilename = None self.tmpfilename = None
self.status_queue = None self.status_queue = None
self.proc = None self.proc = None
self.loop = None self.loop = None
self.notifier = None self.notifier = None
def _download(self): def _download(self):
try: try:
def put_status(st): def put_status(st):
self.status_queue.put({k: v for k, v in st.items() if k in ( self.status_queue.put({k: v for k, v in st.items() if k in (
'tmpfilename', 'tmpfilename',
'filename', 'filename',
'status', 'status',
'msg', 'msg',
'total_bytes', 'total_bytes',
'total_bytes_estimate', 'total_bytes_estimate',
'downloaded_bytes', 'downloaded_bytes',
'speed', 'speed',
'eta', 'eta',
)}) )})
def put_status_postprocessor(d): def put_status_postprocessor(d):
if d['postprocessor'] == 'MoveFiles' and d['status'] == 'finished': if d['postprocessor'] == 'MoveFiles' and d['status'] == 'finished':
if '__finaldir' in d['info_dict']: if '__finaldir' in d['info_dict']:
filename = os.path.join(d['info_dict']['__finaldir'], os.path.basename(d['info_dict']['filepath'])) filename = os.path.join(d['info_dict']['__finaldir'], os.path.basename(d['info_dict']['filepath']))
else: else:
filename = d['info_dict']['filepath'] filename = d['info_dict']['filepath']
self.status_queue.put({'status': 'finished', 'filename': filename}) self.status_queue.put({'status': 'finished', 'filename': filename})
ret = yt_dlp.YoutubeDL(params={ ret = yt_dlp.YoutubeDL(params={
'quiet': True, 'quiet': True,
'no_color': True, 'no_color': True,
#'skip_download': True, #'skip_download': True,
'paths': {"home": self.download_dir, "temp": self.temp_dir}, 'paths': {"home": self.download_dir, "temp": self.temp_dir},
'outtmpl': { "default": self.output_template, "chapter": self.output_template_chapter }, 'outtmpl': { "default": self.output_template, "chapter": self.output_template_chapter },
'format': self.format, 'format': self.format,
'socket_timeout': 30, 'socket_timeout': 30,
'progress_hooks': [put_status], 'progress_hooks': [put_status],
'postprocessor_hooks': [put_status_postprocessor], 'postprocessor_hooks': [put_status_postprocessor],
**self.ytdl_opts, **self.ytdl_opts,
}).download([self.info.url]) }).download([self.info.url])
self.status_queue.put({'status': 'finished' if ret == 0 else 'error'}) self.status_queue.put({'status': 'finished' if ret == 0 else 'error'})
except yt_dlp.utils.YoutubeDLError as exc: except yt_dlp.utils.YoutubeDLError as exc:
self.status_queue.put({'status': 'error', 'msg': str(exc)}) self.status_queue.put({'status': 'error', 'msg': str(exc)})
async def start(self, notifier): async def start(self, notifier):
if Download.manager is None: if Download.manager is None:
Download.manager = multiprocessing.Manager() Download.manager = multiprocessing.Manager()
self.status_queue = Download.manager.Queue() self.status_queue = Download.manager.Queue()
self.proc = multiprocessing.Process(target=self._download) self.proc = multiprocessing.Process(target=self._download)
self.proc.start() self.proc.start()
self.loop = asyncio.get_running_loop() self.loop = asyncio.get_running_loop()
self.notifier = notifier self.notifier = notifier
self.info.status = 'preparing' self.info.status = 'preparing'
await self.notifier.updated(self.info) await self.notifier.updated(self.info)
asyncio.create_task(self.update_status()) asyncio.create_task(self.update_status())
return await self.loop.run_in_executor(None, self.proc.join) return await self.loop.run_in_executor(None, self.proc.join)
def cancel(self): def cancel(self):
if self.running(): if self.running():
self.proc.kill() self.proc.kill()
self.canceled = True self.canceled = True
def close(self): def close(self):
if self.started(): if self.started():
self.proc.close() self.proc.close()
self.status_queue.put(None) self.status_queue.put(None)
def running(self): def running(self):
try: try:
return self.proc is not None and self.proc.is_alive() return self.proc is not None and self.proc.is_alive()
except ValueError: except ValueError:
return False return False
def started(self): def started(self):
return self.proc is not None return self.proc is not None
async def update_status(self): async def update_status(self):
while True: while True:
status = await self.loop.run_in_executor(None, self.status_queue.get) status = await self.loop.run_in_executor(None, self.status_queue.get)
if status is None: if status is None:
return return
self.tmpfilename = status.get('tmpfilename') self.tmpfilename = status.get('tmpfilename')
if 'filename' in status: if 'filename' in status:
self.info.filename = os.path.relpath(status.get('filename'), self.download_dir) self.info.filename = os.path.relpath(status.get('filename'), self.download_dir)
# Set correct file extension for thumbnails # Set correct file extension for thumbnails
if(self.info.format == 'thumbnail'): if(self.info.format == 'thumbnail'):
self.info.filename = re.sub(r'\.webm$', '.jpg', self.info.filename) self.info.filename = re.sub(r'\.webm$', '.jpg', self.info.filename)
self.info.status = status['status'] self.info.status = status['status']
self.info.msg = status.get('msg') self.info.msg = status.get('msg')
if 'downloaded_bytes' in status: if 'downloaded_bytes' in status:
total = status.get('total_bytes') or status.get('total_bytes_estimate') total = status.get('total_bytes') or status.get('total_bytes_estimate')
if total: if total:
self.info.percent = status['downloaded_bytes'] / total * 100 self.info.percent = status['downloaded_bytes'] / total * 100
self.info.speed = status.get('speed') self.info.speed = status.get('speed')
self.info.eta = status.get('eta') self.info.eta = status.get('eta')
await self.notifier.updated(self.info) await self.notifier.updated(self.info)
class PersistentQueue: class PersistentQueue:
def __init__(self, path): def __init__(self, path):
pdir = os.path.dirname(path) pdir = os.path.dirname(path)
if not os.path.isdir(pdir): if not os.path.isdir(pdir):
os.mkdir(pdir) os.mkdir(pdir)
with shelve.open(path, 'c'): with shelve.open(path, 'c'):
pass pass
self.path = path self.path = path
self.dict = OrderedDict() self.dict = OrderedDict()
def load(self): def load(self):
for k, v in self.saved_items(): for k, v in self.saved_items():
self.dict[k] = Download(None, None, None, None, None, None, {}, v) self.dict[k] = Download(None, None, None, None, None, None, {}, v)
def exists(self, key): def exists(self, key):
return key in self.dict return key in self.dict
def get(self, key): def get(self, key):
return self.dict[key] return self.dict[key]
def items(self): def items(self):
return self.dict.items() return self.dict.items()
def saved_items(self): def saved_items(self):
with shelve.open(self.path, 'r') as shelf: with shelve.open(self.path, 'r') as shelf:
return sorted(shelf.items(), key=lambda item: item[1].timestamp) return sorted(shelf.items(), key=lambda item: item[1].timestamp)
def put(self, value): def put(self, value):
key = value.info.url key = value.info.url
self.dict[key] = value self.dict[key] = value
with shelve.open(self.path, 'w') as shelf: with shelve.open(self.path, 'w') as shelf:
shelf[key] = value.info shelf[key] = value.info
def delete(self, key): def delete(self, key):
del self.dict[key] del self.dict[key]
with shelve.open(self.path, 'w') as shelf: with shelve.open(self.path, 'w') as shelf:
shelf.pop(key) shelf.pop(key)
def next(self): def next(self):
k, v = next(iter(self.dict.items())) k, v = next(iter(self.dict.items()))
return k, v return k, v
def empty(self): def empty(self):
return not bool(self.dict) return not bool(self.dict)
class DownloadQueue: class DownloadQueue:
def __init__(self, config, notifier): def __init__(self, config, notifier):
self.config = config self.config = config
self.notifier = notifier self.notifier = notifier
self.queue = PersistentQueue(self.config.STATE_DIR + '/queue') self.queue = PersistentQueue(self.config.STATE_DIR + '/queue')
self.done = PersistentQueue(self.config.STATE_DIR + '/completed') self.done = PersistentQueue(self.config.STATE_DIR + '/completed')
self.done.load() self.done.load()
async def __import_queue(self): async def __import_queue(self):
for k, v in self.queue.saved_items(): for k, v in self.queue.saved_items():
await self.add(v.url, v.quality, v.format, v.folder, v.custom_name_prefix) await self.add(v.url, v.quality, v.format, v.folder, v.custom_name_prefix)
async def initialize(self): async def initialize(self):
self.event = asyncio.Event() self.event = asyncio.Event()
asyncio.create_task(self.__download()) asyncio.create_task(self.__download())
asyncio.create_task(self.__import_queue()) asyncio.create_task(self.__import_queue())
def __extract_info(self, url): def __extract_info(self, url):
return yt_dlp.YoutubeDL(params={ return yt_dlp.YoutubeDL(params={
'quiet': True, 'quiet': True,
'no_color': True, 'no_color': True,
'extract_flat': True, 'extract_flat': True,
**self.config.YTDL_OPTIONS, **self.config.YTDL_OPTIONS,
}).extract_info(url, download=False) }).extract_info(url, download=False)
def __calc_download_path(self, quality, format, folder): def __calc_download_path(self, quality, format, folder):
"""Calculates download path from quality, format and folder attributes. """Calculates download path from quality, format and folder attributes.
Returns: Returns:
Tuple dldirectory, error_message both of which might be None (but not at the same time) Tuple dldirectory, error_message both of which might be None (but not at the same time)
""" """
# Keep consistent with frontend # Keep consistent with frontend
base_directory = self.config.DOWNLOAD_DIR if (quality != 'audio' and format not in AUDIO_FORMATS) else self.config.AUDIO_DOWNLOAD_DIR base_directory = self.config.DOWNLOAD_DIR if (quality != 'audio' and format not in AUDIO_FORMATS) else self.config.AUDIO_DOWNLOAD_DIR
if folder: if folder:
if not self.config.CUSTOM_DIRS: if not self.config.CUSTOM_DIRS:
return None, {'status': 'error', 'msg': f'A folder for the download was specified but CUSTOM_DIRS is not true in the configuration.'} return None, {'status': 'error', 'msg': f'A folder for the download was specified but CUSTOM_DIRS is not true in the configuration.'}
dldirectory = os.path.realpath(os.path.join(base_directory, folder)) dldirectory = os.path.realpath(os.path.join(base_directory, folder))
real_base_directory = os.path.realpath(base_directory) real_base_directory = os.path.realpath(base_directory)
if not dldirectory.startswith(real_base_directory): if not dldirectory.startswith(real_base_directory):
return None, {'status': 'error', 'msg': f'Folder "{folder}" must resolve inside the base download directory "{real_base_directory}"'} return None, {'status': 'error', 'msg': f'Folder "{folder}" must resolve inside the base download directory "{real_base_directory}"'}
if not os.path.isdir(dldirectory): if not os.path.isdir(dldirectory):
if not self.config.CREATE_CUSTOM_DIRS: if not self.config.CREATE_CUSTOM_DIRS:
return None, {'status': 'error', 'msg': f'Folder "{folder}" for download does not exist inside base directory "{real_base_directory}", and CREATE_CUSTOM_DIRS is not true in the configuration.'} return None, {'status': 'error', 'msg': f'Folder "{folder}" for download does not exist inside base directory "{real_base_directory}", and CREATE_CUSTOM_DIRS is not true in the configuration.'}
os.makedirs(dldirectory, exist_ok=True) os.makedirs(dldirectory, exist_ok=True)
else: else:
dldirectory = base_directory dldirectory = base_directory
return dldirectory, None return dldirectory, None
async def __add_entry(self, entry, quality, format, folder, custom_name_prefix, already): async def __add_entry(self, entry, quality, format, folder, custom_name_prefix, already):
if not entry: if not entry:
return {'status': 'error', 'msg': "Invalid/empty data was given."} return {'status': 'error', 'msg': "Invalid/empty data was given."}
etype = entry.get('_type') or 'video' etype = entry.get('_type') or 'video'
if etype == 'playlist': if etype == 'playlist':
entries = entry['entries'] entries = entry['entries']
log.info(f'playlist detected with {len(entries)} entries') log.info(f'playlist detected with {len(entries)} entries')
playlist_index_digits = len(str(len(entries))) playlist_index_digits = len(str(len(entries)))
results = [] results = []
for index, etr in enumerate(entries, start=1): for index, etr in enumerate(entries, start=1):
etr["playlist"] = entry["id"] etr["playlist"] = entry["id"]
etr["playlist_index"] = '{{0:0{0:d}d}}'.format(playlist_index_digits).format(index) etr["playlist_index"] = '{{0:0{0:d}d}}'.format(playlist_index_digits).format(index)
for property in ("id", "title", "uploader", "uploader_id"): for property in ("id", "title", "uploader", "uploader_id"):
if property in entry: if property in entry:
etr[f"playlist_{property}"] = entry[property] etr[f"playlist_{property}"] = entry[property]
results.append(await self.__add_entry(etr, quality, format, folder, custom_name_prefix, already)) results.append(await self.__add_entry(etr, quality, format, folder, custom_name_prefix, already))
if any(res['status'] == 'error' for res in results): if any(res['status'] == 'error' for res in results):
return {'status': 'error', 'msg': ', '.join(res['msg'] for res in results if res['status'] == 'error' and 'msg' in res)} return {'status': 'error', 'msg': ', '.join(res['msg'] for res in results if res['status'] == 'error' and 'msg' in res)}
return {'status': 'ok'} return {'status': 'ok'}
elif etype == 'video' or etype.startswith('url') and 'id' in entry and 'title' in entry: elif etype == 'video' or etype.startswith('url') and 'id' in entry and 'title' in entry:
if not self.queue.exists(entry['id']): if not self.queue.exists(entry['id']):
dl = DownloadInfo(entry['id'], entry['title'], entry.get('webpage_url') or entry['url'], quality, format, folder, custom_name_prefix) dl = DownloadInfo(entry['id'], entry['title'], entry.get('webpage_url') or entry['url'], quality, format, folder, custom_name_prefix)
dldirectory, error_message = self.__calc_download_path(quality, format, folder) dldirectory, error_message = self.__calc_download_path(quality, format, folder)
if error_message is not None: if error_message is not None:
return error_message return error_message
output = self.config.OUTPUT_TEMPLATE if len(custom_name_prefix) == 0 else f'{custom_name_prefix}.{self.config.OUTPUT_TEMPLATE}' output = self.config.OUTPUT_TEMPLATE if len(custom_name_prefix) == 0 else f'{custom_name_prefix}.{self.config.OUTPUT_TEMPLATE}'
output_chapter = self.config.OUTPUT_TEMPLATE_CHAPTER output_chapter = self.config.OUTPUT_TEMPLATE_CHAPTER
for property, value in entry.items(): for property, value in entry.items():
if property.startswith("playlist"): if property.startswith("playlist"):
output = output.replace(f"%({property})s", str(value)) output = output.replace(f"%({property})s", str(value))
self.queue.put(Download(dldirectory, self.config.TEMP_DIR, output, output_chapter, quality, format, self.config.YTDL_OPTIONS, dl)) self.queue.put(Download(dldirectory, self.config.TEMP_DIR, output, output_chapter, quality, format, self.config.YTDL_OPTIONS, dl))
self.event.set() self.event.set()
await self.notifier.added(dl) await self.notifier.added(dl)
return {'status': 'ok'} return {'status': 'ok'}
elif etype.startswith('url'): elif etype.startswith('url'):
return await self.add(entry['url'], quality, format, folder, custom_name_prefix, already) return await self.add(entry['url'], quality, format, folder, custom_name_prefix, already)
return {'status': 'error', 'msg': f'Unsupported resource "{etype}"'} return {'status': 'error', 'msg': f'Unsupported resource "{etype}"'}
async def add(self, url, quality, format, folder, custom_name_prefix, already=None): async def add(self, url, quality, format, folder, custom_name_prefix, already=None):
log.info(f'adding {url}: {quality=} {format=} {already=} {folder=} {custom_name_prefix=}') log.info(f'adding {url}: {quality=} {format=} {already=} {folder=} {custom_name_prefix=}')
already = set() if already is None else already already = set() if already is None else already
if url in already: if url in already:
log.info('recursion detected, skipping') log.info('recursion detected, skipping')
return {'status': 'ok'} return {'status': 'ok'}
else: else:
already.add(url) already.add(url)
try: try:
entry = await asyncio.get_running_loop().run_in_executor(None, self.__extract_info, url) entry = await asyncio.get_running_loop().run_in_executor(None, self.__extract_info, url)
except yt_dlp.utils.YoutubeDLError as exc: except yt_dlp.utils.YoutubeDLError as exc:
return {'status': 'error', 'msg': str(exc)} return {'status': 'error', 'msg': str(exc)}
return await self.__add_entry(entry, quality, format, folder, custom_name_prefix, already) return await self.__add_entry(entry, quality, format, folder, custom_name_prefix, already)
async def cancel(self, ids): async def cancel(self, ids):
for id in ids: for id in ids:
if not self.queue.exists(id): if not self.queue.exists(id):
log.warn(f'requested cancel for non-existent download {id}') log.warn(f'requested cancel for non-existent download {id}')
continue continue
if self.queue.get(id).started(): if self.queue.get(id).started():
self.queue.get(id).cancel() self.queue.get(id).cancel()
else: else:
self.queue.delete(id) self.queue.delete(id)
await self.notifier.canceled(id) await self.notifier.canceled(id)
return {'status': 'ok'} return {'status': 'ok'}
async def clear(self, ids): async def clear(self, ids):
for id in ids: for id in ids:
if not self.done.exists(id): if not self.done.exists(id):
log.warn(f'requested delete for non-existent download {id}') log.warn(f'requested delete for non-existent download {id}')
continue continue
if self.config.DELETE_FILE_ON_TRASHCAN: if self.config.DELETE_FILE_ON_TRASHCAN:
dl = self.done.get(id) dl = self.done.get(id)
try: try:
dldirectory, _ = self.__calc_download_path(dl.info.quality, dl.info.format, dl.info.folder) dldirectory, _ = self.__calc_download_path(dl.info.quality, dl.info.format, dl.info.folder)
os.remove(os.path.join(dldirectory, dl.info.filename)) os.remove(os.path.join(dldirectory, dl.info.filename))
except Exception as e: except Exception as e:
log.warn(f'deleting file for download {id} failed with error message {e!r}') log.warn(f'deleting file for download {id} failed with error message {e!r}')
self.done.delete(id) self.done.delete(id)
await self.notifier.cleared(id) await self.notifier.cleared(id)
return {'status': 'ok'} return {'status': 'ok'}
def get(self): def get(self):
return(list((k, v.info) for k, v in self.queue.items()), return(list((k, v.info) for k, v in self.queue.items()),
list((k, v.info) for k, v in self.done.items())) list((k, v.info) for k, v in self.done.items()))
async def __download(self): async def __download(self):
while True: while True:
while self.queue.empty(): while self.queue.empty():
log.info('waiting for item to download') log.info('waiting for item to download')
await self.event.wait() await self.event.wait()
self.event.clear() self.event.clear()
id, entry = self.queue.next() id, entry = self.queue.next()
log.info(f'downloading {entry.info.title}') log.info(f'downloading {entry.info.title}')
await entry.start(self.notifier) await entry.start(self.notifier)
if entry.info.status != 'finished': if entry.info.status != 'finished':
if entry.tmpfilename and os.path.isfile(entry.tmpfilename): if entry.tmpfilename and os.path.isfile(entry.tmpfilename):
try: try:
os.remove(entry.tmpfilename) os.remove(entry.tmpfilename)
except: except:
pass pass
entry.info.status = 'error' entry.info.status = 'error'
entry.close() entry.close()
if self.queue.exists(id): if self.queue.exists(id):
self.queue.delete(id) self.queue.delete(id)
if entry.canceled: if entry.canceled:
await self.notifier.canceled(id) await self.notifier.canceled(id)
else: else:
self.done.put(entry) self.done.put(entry)
await self.notifier.completed(entry.info) await self.notifier.completed(entry.info)