Expose done/queue list as json endpoint

This commit is contained in:
ArabCoders 2023-11-03 14:02:37 +03:00
parent 0985f97b36
commit 25c44b4da9
3 changed files with 368 additions and 350 deletions

1
.gitignore vendored
View file

@ -46,3 +46,4 @@ Thumbs.db
__pycache__ __pycache__
.venv

View file

@ -132,6 +132,20 @@ async def delete(request):
status = await (dqueue.cancel(ids) if where == 'queue' else dqueue.clear(ids)) status = await (dqueue.cancel(ids) if where == 'queue' else dqueue.clear(ids))
return web.Response(text=serializer.encode(status)) return web.Response(text=serializer.encode(status))
@routes.get(config.URL_PREFIX + 'history')
async def list_queue(_):
history = [];
for _ ,v in dqueue.queue.saved_items():
obj = vars(v)
obj['list_type'] = 'queue'
history.append(obj)
for _ ,v in dqueue.done.saved_items():
obj = vars(v)
obj['list_type'] = 'done'
history.append(obj)
return web.Response(text=json.dumps(history))
@sio.event @sio.event
async def connect(sid, environ): async def connect(sid, environ):
await sio.emit('all', serializer.encode(dqueue.get()), to=sid) await sio.emit('all', serializer.encode(dqueue.get()), to=sid)

View file

@ -1,350 +1,353 @@
import os import os
import yt_dlp import yt_dlp
from collections import OrderedDict from collections import OrderedDict
import shelve import shelve
import time import time
import asyncio import asyncio
import multiprocessing import multiprocessing
import logging import logging
import re import re
from dl_formats import get_format, get_opts, AUDIO_FORMATS from dl_formats import get_format, get_opts, AUDIO_FORMATS
log = logging.getLogger('ytdl') log = logging.getLogger('ytdl')
class DownloadQueueNotifier: class DownloadQueueNotifier:
async def added(self, dl): async def added(self, dl):
raise NotImplementedError raise NotImplementedError
async def updated(self, dl): async def updated(self, dl):
raise NotImplementedError raise NotImplementedError
async def completed(self, dl): async def completed(self, dl):
raise NotImplementedError raise NotImplementedError
async def canceled(self, id): async def canceled(self, id):
raise NotImplementedError raise NotImplementedError
async def cleared(self, id): async def cleared(self, id):
raise NotImplementedError raise NotImplementedError
class DownloadInfo: class DownloadInfo:
def __init__(self, id, title, url, quality, format, folder, custom_name_prefix): def __init__(self, id, title, url, quality, format, folder, custom_name_prefix):
self.id = id if len(custom_name_prefix) == 0 else f'{custom_name_prefix}.{id}' self.id = id if len(custom_name_prefix) == 0 else f'{custom_name_prefix}.{id}'
self.title = title if len(custom_name_prefix) == 0 else f'{custom_name_prefix}.{title}' self.title = title if len(custom_name_prefix) == 0 else f'{custom_name_prefix}.{title}'
self.url = url self.url = url
self.quality = quality self.quality = quality
self.format = format self.format = format
self.folder = folder self.folder = folder
self.custom_name_prefix = custom_name_prefix self.custom_name_prefix = custom_name_prefix
self.status = self.msg = self.percent = self.speed = self.eta = None self.status = self.msg = self.percent = self.speed = self.eta = None
self.timestamp = time.time_ns() self.timestamp = time.time_ns()
class Download: class Download:
manager = None manager = None
def __init__(self, download_dir, temp_dir, output_template, output_template_chapter, quality, format, ytdl_opts, info): def __init__(self, download_dir, temp_dir, output_template, output_template_chapter, quality, format, ytdl_opts, info):
self.download_dir = download_dir self.download_dir = download_dir
self.temp_dir = temp_dir self.temp_dir = temp_dir
self.output_template = output_template self.output_template = output_template
self.output_template_chapter = output_template_chapter self.output_template_chapter = output_template_chapter
self.format = get_format(format, quality) self.format = get_format(format, quality)
self.ytdl_opts = get_opts(format, quality, ytdl_opts) self.ytdl_opts = get_opts(format, quality, ytdl_opts)
self.info = info self.info = info
self.canceled = False self.canceled = False
self.tmpfilename = None self.tmpfilename = None
self.status_queue = None self.status_queue = None
self.proc = None self.proc = None
self.loop = None self.loop = None
self.notifier = None self.notifier = None
def _download(self): def _download(self):
try: try:
def put_status(st): def put_status(st):
self.status_queue.put({k: v for k, v in st.items() if k in ( self.status_queue.put({k: v for k, v in st.items() if k in (
'tmpfilename', 'tmpfilename',
'filename', 'filename',
'status', 'status',
'msg', 'msg',
'total_bytes', 'total_bytes',
'total_bytes_estimate', 'total_bytes_estimate',
'downloaded_bytes', 'downloaded_bytes',
'speed', 'speed',
'eta', 'eta',
)}) )})
def put_status_postprocessor(d): def put_status_postprocessor(d):
if d['postprocessor'] == 'MoveFiles' and d['status'] == 'finished': if d['postprocessor'] == 'MoveFiles' and d['status'] == 'finished':
if '__finaldir' in d['info_dict']: if '__finaldir' in d['info_dict']:
filename = os.path.join(d['info_dict']['__finaldir'], os.path.basename(d['info_dict']['filepath'])) filename = os.path.join(d['info_dict']['__finaldir'], os.path.basename(d['info_dict']['filepath']))
else: else:
filename = d['info_dict']['filepath'] filename = d['info_dict']['filepath']
self.status_queue.put({'status': 'finished', 'filename': filename}) self.status_queue.put({'status': 'finished', 'filename': filename})
ret = yt_dlp.YoutubeDL(params={ ret = yt_dlp.YoutubeDL(params={
'quiet': True, 'quiet': True,
'no_color': True, 'no_color': True,
#'skip_download': True, #'skip_download': True,
'paths': {"home": self.download_dir, "temp": self.temp_dir}, 'paths': {"home": self.download_dir, "temp": self.temp_dir},
'outtmpl': { "default": self.output_template, "chapter": self.output_template_chapter }, 'outtmpl': { "default": self.output_template, "chapter": self.output_template_chapter },
'format': self.format, 'format': self.format,
'socket_timeout': 30, 'socket_timeout': 30,
'progress_hooks': [put_status], 'progress_hooks': [put_status],
'postprocessor_hooks': [put_status_postprocessor], 'postprocessor_hooks': [put_status_postprocessor],
**self.ytdl_opts, **self.ytdl_opts,
}).download([self.info.url]) }).download([self.info.url])
self.status_queue.put({'status': 'finished' if ret == 0 else 'error'}) self.status_queue.put({'status': 'finished' if ret == 0 else 'error'})
except yt_dlp.utils.YoutubeDLError as exc: except yt_dlp.utils.YoutubeDLError as exc:
self.status_queue.put({'status': 'error', 'msg': str(exc)}) self.status_queue.put({'status': 'error', 'msg': str(exc)})
async def start(self, notifier): async def start(self, notifier):
if Download.manager is None: if Download.manager is None:
Download.manager = multiprocessing.Manager() Download.manager = multiprocessing.Manager()
self.status_queue = Download.manager.Queue() self.status_queue = Download.manager.Queue()
self.proc = multiprocessing.Process(target=self._download) self.proc = multiprocessing.Process(target=self._download)
self.proc.start() self.proc.start()
self.loop = asyncio.get_running_loop() self.loop = asyncio.get_running_loop()
self.notifier = notifier self.notifier = notifier
self.info.status = 'preparing' self.info.status = 'preparing'
await self.notifier.updated(self.info) await self.notifier.updated(self.info)
asyncio.create_task(self.update_status()) asyncio.create_task(self.update_status())
return await self.loop.run_in_executor(None, self.proc.join) return await self.loop.run_in_executor(None, self.proc.join)
def cancel(self): def cancel(self):
if self.running(): if self.running():
self.proc.kill() self.proc.kill()
self.canceled = True self.canceled = True
def close(self): def close(self):
if self.started(): if self.started():
self.proc.close() self.proc.close()
self.status_queue.put(None) self.status_queue.put(None)
def running(self): def running(self):
try: try:
return self.proc is not None and self.proc.is_alive() return self.proc is not None and self.proc.is_alive()
except ValueError: except ValueError:
return False return False
def started(self): def started(self):
return self.proc is not None return self.proc is not None
async def update_status(self): async def update_status(self):
while True: while True:
status = await self.loop.run_in_executor(None, self.status_queue.get) status = await self.loop.run_in_executor(None, self.status_queue.get)
if status is None: if status is None:
return return
self.tmpfilename = status.get('tmpfilename') self.tmpfilename = status.get('tmpfilename')
if 'filename' in status: if 'filename' in status:
self.info.filename = os.path.relpath(status.get('filename'), self.download_dir) self.info.filename = os.path.relpath(status.get('filename'), self.download_dir)
# Set correct file extension for thumbnails # Set correct file extension for thumbnails
if(self.info.format == 'thumbnail'): if(self.info.format == 'thumbnail'):
self.info.filename = re.sub(r'\.webm$', '.jpg', self.info.filename) self.info.filename = re.sub(r'\.webm$', '.jpg', self.info.filename)
self.info.status = status['status'] self.info.status = status['status']
self.info.msg = status.get('msg') self.info.msg = status.get('msg')
if 'downloaded_bytes' in status: if 'downloaded_bytes' in status:
total = status.get('total_bytes') or status.get('total_bytes_estimate') total = status.get('total_bytes') or status.get('total_bytes_estimate')
if total: if total:
self.info.percent = status['downloaded_bytes'] / total * 100 self.info.percent = status['downloaded_bytes'] / total * 100
self.info.speed = status.get('speed') self.info.speed = status.get('speed')
self.info.eta = status.get('eta') self.info.eta = status.get('eta')
await self.notifier.updated(self.info) await self.notifier.updated(self.info)
class PersistentQueue: class PersistentQueue:
def __init__(self, path): def __init__(self, path):
pdir = os.path.dirname(path) pdir = os.path.dirname(path)
if not os.path.isdir(pdir): if not os.path.isdir(pdir):
os.mkdir(pdir) os.mkdir(pdir)
with shelve.open(path, 'c'): with shelve.open(path, 'c'):
pass pass
self.path = path self.path = path
self.dict = OrderedDict() self.dict = OrderedDict()
def load(self): def load(self):
for k, v in self.saved_items(): for k, v in self.saved_items():
self.dict[k] = Download(None, None, None, None, None, None, {}, v) self.dict[k] = Download(None, None, None, None, None, None, {}, v)
def exists(self, key): def exists(self, key):
return key in self.dict return key in self.dict
def get(self, key): def get(self, key):
return self.dict[key] return self.dict[key]
def items(self): def items(self):
return self.dict.items() return self.dict.items()
def saved_items(self): def saved_items(self):
with shelve.open(self.path, 'r') as shelf: with shelve.open(self.path, 'r') as shelf:
return sorted(shelf.items(), key=lambda item: item[1].timestamp) return sorted(shelf.items(), key=lambda item: item[1].timestamp)
def put(self, value): def put(self, value):
key = value.info.url key = value.info.url
self.dict[key] = value self.dict[key] = value
with shelve.open(self.path, 'w') as shelf: with shelve.open(self.path, 'w') as shelf:
shelf[key] = value.info shelf[key] = value.info
def delete(self, key): def delete(self, key):
del self.dict[key] del self.dict[key]
with shelve.open(self.path, 'w') as shelf: with shelve.open(self.path, 'w') as shelf:
shelf.pop(key) shelf.pop(key)
def next(self): def next(self):
k, v = next(iter(self.dict.items())) k, v = next(iter(self.dict.items()))
return k, v return k, v
def empty(self): def empty(self):
return not bool(self.dict) return not bool(self.dict)
class DownloadQueue: class DownloadQueue:
def __init__(self, config, notifier): def __init__(self, config, notifier):
self.config = config self.config = config
self.notifier = notifier self.notifier = notifier
self.queue = PersistentQueue(self.config.STATE_DIR + '/queue') self.queue = PersistentQueue(self.config.STATE_DIR + '/queue')
self.done = PersistentQueue(self.config.STATE_DIR + '/completed') self.done = PersistentQueue(self.config.STATE_DIR + '/completed')
self.done.load() self.done.load()
async def __import_queue(self): async def __import_queue(self):
for k, v in self.queue.saved_items(): for k, v in self.queue.saved_items():
await self.add(v.url, v.quality, v.format, v.folder, v.custom_name_prefix) await self.add(v.url, v.quality, v.format, v.folder, v.custom_name_prefix)
async def initialize(self): async def initialize(self):
self.event = asyncio.Event() self.event = asyncio.Event()
asyncio.create_task(self.__download()) asyncio.create_task(self.__download())
asyncio.create_task(self.__import_queue()) asyncio.create_task(self.__import_queue())
def __extract_info(self, url): def __extract_info(self, url):
return yt_dlp.YoutubeDL(params={ return yt_dlp.YoutubeDL(params={
'quiet': True, 'quiet': True,
'no_color': True, 'no_color': True,
'extract_flat': True, 'extract_flat': True,
**self.config.YTDL_OPTIONS, **self.config.YTDL_OPTIONS,
}).extract_info(url, download=False) }).extract_info(url, download=False)
def __calc_download_path(self, quality, format, folder): def __calc_download_path(self, quality, format, folder):
"""Calculates download path from quality, format and folder attributes. """Calculates download path from quality, format and folder attributes.
Returns: Returns:
Tuple dldirectory, error_message both of which might be None (but not at the same time) Tuple dldirectory, error_message both of which might be None (but not at the same time)
""" """
# Keep consistent with frontend # Keep consistent with frontend
base_directory = self.config.DOWNLOAD_DIR if (quality != 'audio' and format not in AUDIO_FORMATS) else self.config.AUDIO_DOWNLOAD_DIR base_directory = self.config.DOWNLOAD_DIR if (quality != 'audio' and format not in AUDIO_FORMATS) else self.config.AUDIO_DOWNLOAD_DIR
if folder: if folder:
if not self.config.CUSTOM_DIRS: if not self.config.CUSTOM_DIRS:
return None, {'status': 'error', 'msg': f'A folder for the download was specified but CUSTOM_DIRS is not true in the configuration.'} return None, {'status': 'error', 'msg': f'A folder for the download was specified but CUSTOM_DIRS is not true in the configuration.'}
dldirectory = os.path.realpath(os.path.join(base_directory, folder)) dldirectory = os.path.realpath(os.path.join(base_directory, folder))
real_base_directory = os.path.realpath(base_directory) real_base_directory = os.path.realpath(base_directory)
if not dldirectory.startswith(real_base_directory): if not dldirectory.startswith(real_base_directory):
return None, {'status': 'error', 'msg': f'Folder "{folder}" must resolve inside the base download directory "{real_base_directory}"'} return None, {'status': 'error', 'msg': f'Folder "{folder}" must resolve inside the base download directory "{real_base_directory}"'}
if not os.path.isdir(dldirectory): if not os.path.isdir(dldirectory):
if not self.config.CREATE_CUSTOM_DIRS: if not self.config.CREATE_CUSTOM_DIRS:
return None, {'status': 'error', 'msg': f'Folder "{folder}" for download does not exist inside base directory "{real_base_directory}", and CREATE_CUSTOM_DIRS is not true in the configuration.'} return None, {'status': 'error', 'msg': f'Folder "{folder}" for download does not exist inside base directory "{real_base_directory}", and CREATE_CUSTOM_DIRS is not true in the configuration.'}
os.makedirs(dldirectory, exist_ok=True) os.makedirs(dldirectory, exist_ok=True)
else: else:
dldirectory = base_directory dldirectory = base_directory
return dldirectory, None return dldirectory, None
async def __add_entry(self, entry, quality, format, folder, custom_name_prefix, already): async def __add_entry(self, entry, quality, format, folder, custom_name_prefix, already):
etype = entry.get('_type') or 'video' if not entry:
if etype == 'playlist': return {'status': 'error', 'msg': "Invalid/empty data was given."}
entries = entry['entries']
log.info(f'playlist detected with {len(entries)} entries') etype = entry.get('_type') or 'video'
playlist_index_digits = len(str(len(entries))) if etype == 'playlist':
results = [] entries = entry['entries']
for index, etr in enumerate(entries, start=1): log.info(f'playlist detected with {len(entries)} entries')
etr["playlist"] = entry["id"] playlist_index_digits = len(str(len(entries)))
etr["playlist_index"] = '{{0:0{0:d}d}}'.format(playlist_index_digits).format(index) results = []
for property in ("id", "title", "uploader", "uploader_id"): for index, etr in enumerate(entries, start=1):
if property in entry: etr["playlist"] = entry["id"]
etr[f"playlist_{property}"] = entry[property] etr["playlist_index"] = '{{0:0{0:d}d}}'.format(playlist_index_digits).format(index)
results.append(await self.__add_entry(etr, quality, format, folder, custom_name_prefix, already)) for property in ("id", "title", "uploader", "uploader_id"):
if any(res['status'] == 'error' for res in results): if property in entry:
return {'status': 'error', 'msg': ', '.join(res['msg'] for res in results if res['status'] == 'error' and 'msg' in res)} etr[f"playlist_{property}"] = entry[property]
return {'status': 'ok'} results.append(await self.__add_entry(etr, quality, format, folder, custom_name_prefix, already))
elif etype == 'video' or etype.startswith('url') and 'id' in entry and 'title' in entry: if any(res['status'] == 'error' for res in results):
if not self.queue.exists(entry['id']): return {'status': 'error', 'msg': ', '.join(res['msg'] for res in results if res['status'] == 'error' and 'msg' in res)}
dl = DownloadInfo(entry['id'], entry['title'], entry.get('webpage_url') or entry['url'], quality, format, folder, custom_name_prefix) return {'status': 'ok'}
dldirectory, error_message = self.__calc_download_path(quality, format, folder) elif etype == 'video' or etype.startswith('url') and 'id' in entry and 'title' in entry:
if error_message is not None: if not self.queue.exists(entry['id']):
return error_message dl = DownloadInfo(entry['id'], entry['title'], entry.get('webpage_url') or entry['url'], quality, format, folder, custom_name_prefix)
output = self.config.OUTPUT_TEMPLATE if len(custom_name_prefix) == 0 else f'{custom_name_prefix}.{self.config.OUTPUT_TEMPLATE}' dldirectory, error_message = self.__calc_download_path(quality, format, folder)
output_chapter = self.config.OUTPUT_TEMPLATE_CHAPTER if error_message is not None:
for property, value in entry.items(): return error_message
if property.startswith("playlist"): output = self.config.OUTPUT_TEMPLATE if len(custom_name_prefix) == 0 else f'{custom_name_prefix}.{self.config.OUTPUT_TEMPLATE}'
output = output.replace(f"%({property})s", str(value)) output_chapter = self.config.OUTPUT_TEMPLATE_CHAPTER
self.queue.put(Download(dldirectory, self.config.TEMP_DIR, output, output_chapter, quality, format, self.config.YTDL_OPTIONS, dl)) for property, value in entry.items():
self.event.set() if property.startswith("playlist"):
await self.notifier.added(dl) output = output.replace(f"%({property})s", str(value))
return {'status': 'ok'} self.queue.put(Download(dldirectory, self.config.TEMP_DIR, output, output_chapter, quality, format, self.config.YTDL_OPTIONS, dl))
elif etype.startswith('url'): self.event.set()
return await self.add(entry['url'], quality, format, folder, custom_name_prefix, already) await self.notifier.added(dl)
return {'status': 'error', 'msg': f'Unsupported resource "{etype}"'} return {'status': 'ok'}
elif etype.startswith('url'):
async def add(self, url, quality, format, folder, custom_name_prefix, already=None): return await self.add(entry['url'], quality, format, folder, custom_name_prefix, already)
log.info(f'adding {url}: {quality=} {format=} {already=} {folder=} {custom_name_prefix=}') return {'status': 'error', 'msg': f'Unsupported resource "{etype}"'}
already = set() if already is None else already
if url in already: async def add(self, url, quality, format, folder, custom_name_prefix, already=None):
log.info('recursion detected, skipping') log.info(f'adding {url}: {quality=} {format=} {already=} {folder=} {custom_name_prefix=}')
return {'status': 'ok'} already = set() if already is None else already
else: if url in already:
already.add(url) log.info('recursion detected, skipping')
try: return {'status': 'ok'}
entry = await asyncio.get_running_loop().run_in_executor(None, self.__extract_info, url) else:
except yt_dlp.utils.YoutubeDLError as exc: already.add(url)
return {'status': 'error', 'msg': str(exc)} try:
return await self.__add_entry(entry, quality, format, folder, custom_name_prefix, already) entry = await asyncio.get_running_loop().run_in_executor(None, self.__extract_info, url)
except yt_dlp.utils.YoutubeDLError as exc:
async def cancel(self, ids): return {'status': 'error', 'msg': str(exc)}
for id in ids: return await self.__add_entry(entry, quality, format, folder, custom_name_prefix, already)
if not self.queue.exists(id):
log.warn(f'requested cancel for non-existent download {id}') async def cancel(self, ids):
continue for id in ids:
if self.queue.get(id).started(): if not self.queue.exists(id):
self.queue.get(id).cancel() log.warn(f'requested cancel for non-existent download {id}')
else: continue
self.queue.delete(id) if self.queue.get(id).started():
await self.notifier.canceled(id) self.queue.get(id).cancel()
return {'status': 'ok'} else:
self.queue.delete(id)
async def clear(self, ids): await self.notifier.canceled(id)
for id in ids: return {'status': 'ok'}
if not self.done.exists(id):
log.warn(f'requested delete for non-existent download {id}') async def clear(self, ids):
continue for id in ids:
if self.config.DELETE_FILE_ON_TRASHCAN: if not self.done.exists(id):
dl = self.done.get(id) log.warn(f'requested delete for non-existent download {id}')
try: continue
dldirectory, _ = self.__calc_download_path(dl.info.quality, dl.info.format, dl.info.folder) if self.config.DELETE_FILE_ON_TRASHCAN:
os.remove(os.path.join(dldirectory, dl.info.filename)) dl = self.done.get(id)
except Exception as e: try:
log.warn(f'deleting file for download {id} failed with error message {e!r}') dldirectory, _ = self.__calc_download_path(dl.info.quality, dl.info.format, dl.info.folder)
self.done.delete(id) os.remove(os.path.join(dldirectory, dl.info.filename))
await self.notifier.cleared(id) except Exception as e:
return {'status': 'ok'} log.warn(f'deleting file for download {id} failed with error message {e!r}')
self.done.delete(id)
def get(self): await self.notifier.cleared(id)
return(list((k, v.info) for k, v in self.queue.items()), return {'status': 'ok'}
list((k, v.info) for k, v in self.done.items()))
def get(self):
async def __download(self): return(list((k, v.info) for k, v in self.queue.items()),
while True: list((k, v.info) for k, v in self.done.items()))
while self.queue.empty():
log.info('waiting for item to download') async def __download(self):
await self.event.wait() while True:
self.event.clear() while self.queue.empty():
id, entry = self.queue.next() log.info('waiting for item to download')
log.info(f'downloading {entry.info.title}') await self.event.wait()
await entry.start(self.notifier) self.event.clear()
if entry.info.status != 'finished': id, entry = self.queue.next()
if entry.tmpfilename and os.path.isfile(entry.tmpfilename): log.info(f'downloading {entry.info.title}')
try: await entry.start(self.notifier)
os.remove(entry.tmpfilename) if entry.info.status != 'finished':
except: if entry.tmpfilename and os.path.isfile(entry.tmpfilename):
pass try:
entry.info.status = 'error' os.remove(entry.tmpfilename)
entry.close() except:
if self.queue.exists(id): pass
self.queue.delete(id) entry.info.status = 'error'
if entry.canceled: entry.close()
await self.notifier.canceled(id) if self.queue.exists(id):
else: self.queue.delete(id)
self.done.put(entry) if entry.canceled:
await self.notifier.completed(entry.info) await self.notifier.canceled(id)
else:
self.done.put(entry)
await self.notifier.completed(entry.info)