From 07a2315703443d8e52122041f5a8afc4415c57e8 Mon Sep 17 00:00:00 2001 From: evilmonkeydiaz Date: Sat, 17 Aug 2024 12:25:12 -0700 Subject: [PATCH 01/12] Updated to cocurrent downloads attempt 1 --- app/main.py | 7 +++++-- app/ytdl.py | 45 ++++++++++++++++++++++++++++++++++++++++----- docker-compose.yml | 10 ++++++++++ 3 files changed, 55 insertions(+), 7 deletions(-) create mode 100644 docker-compose.yml diff --git a/app/main.py b/app/main.py index 3daed83..2caf1fb 100644 --- a/app/main.py +++ b/app/main.py @@ -8,7 +8,7 @@ import socketio import logging import json import pathlib - +import sys from ytdl import DownloadQueueNotifier, DownloadQueue log = logging.getLogger('main') @@ -238,4 +238,7 @@ app.on_response_prepare.append(on_prepare) if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) log.info(f"Listening on {config.HOST}:{config.PORT}") - web.run_app(app, host=config.HOST, port=int(config.PORT), reuse_port=True) + if sys.platform.startswith('win'): + web.run_app(app, host=config.HOST, port=int(config.PORT)) + else: + web.run_app(app, host=config.HOST, port=int(config.PORT), reuse_port=True) diff --git a/app/ytdl.py b/app/ytdl.py index 086d7c9..da28631 100644 --- a/app/ytdl.py +++ b/app/ytdl.py @@ -5,6 +5,7 @@ import shelve import time import asyncio import multiprocessing +from concurrent.futures import ThreadPoolExecutor import logging import re from dl_formats import get_format, get_opts, AUDIO_FORMATS @@ -104,14 +105,13 @@ class Download: if Download.manager is None: Download.manager = multiprocessing.Manager() self.status_queue = Download.manager.Queue() - self.proc = multiprocessing.Process(target=self._download) - self.proc.start() + self.proc = await asyncio.get_running_loop().run_in_executor(None, self._download) self.loop = asyncio.get_running_loop() self.notifier = notifier self.info.status = 'preparing' await self.notifier.updated(self.info) asyncio.create_task(self.update_status()) - return await self.loop.run_in_executor(None, self.proc.join) + return self.proc def cancel(self): if self.running(): @@ -208,15 +208,50 @@ class DownloadQueue: self.queue = PersistentQueue(self.config.STATE_DIR + '/queue') self.done = PersistentQueue(self.config.STATE_DIR + '/completed') self.pending = PersistentQueue(self.config.STATE_DIR + '/pending') + self.max_concurrent_downloads = 5 # Adjust this number as needed + self.semaphore = asyncio.Semaphore(self.max_concurrent_downloads) + self.executor = ThreadPoolExecutor(max_workers=self.max_concurrent_downloads) self.done.load() - + + async def __download_entry(self, id, entry): + async with self.semaphore: + await entry.start(self.notifier) + if entry.info.status != 'finished': + if entry.tmpfilename and os.path.isfile(entry.tmpfilename): + try: + os.remove(entry.tmpfilename) + except: + pass + entry.info.status = 'error' + entry.close() + if self.queue.exists(id): + self.queue.delete(id) + if entry.canceled: + await self.notifier.canceled(id) + else: + self.done.put(entry) + await self.notifier.completed(entry.info) + + async def __process_queue(self): + while True: + while self.queue.empty(): + await self.event.wait() + self.event.clear() + + tasks = [] + while not self.queue.empty() and len(tasks) < self.max_concurrent_downloads: + id, entry = self.queue.next() + task = asyncio.create_task(self.__download_entry(id, entry)) + tasks.append(task) + + await asyncio.gather(*tasks) async def __import_queue(self): for k, v in self.queue.saved_items(): await self.add(v.url, v.quality, v.format, v.folder, v.custom_name_prefix) async def initialize(self): self.event = asyncio.Event() - asyncio.create_task(self.__download()) + asyncio.create_task(self.__process_queue()) asyncio.create_task(self.__import_queue()) def __extract_info(self, url): diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..bb5f68c --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,10 @@ +version: "3" +services: + metube: + image: ghcr.io/alexta69/metube + container_name: metube + restart: unless-stopped + ports: + - "8081:8081" + volumes: + - c:/Users/Roger/Downloads:/downloads \ No newline at end of file From 50e67e1459219db96db41f031435938c124abeef Mon Sep 17 00:00:00 2001 From: evilmonkeydiaz Date: Sat, 17 Aug 2024 13:27:35 -0700 Subject: [PATCH 02/12] Update ytdl.py --- app/ytdl.py | 87 ++++++++++++++++++++++++++--------------------------- 1 file changed, 43 insertions(+), 44 deletions(-) diff --git a/app/ytdl.py b/app/ytdl.py index da28631..e9b4ee6 100644 --- a/app/ytdl.py +++ b/app/ytdl.py @@ -5,7 +5,6 @@ import shelve import time import asyncio import multiprocessing -from concurrent.futures import ThreadPoolExecutor import logging import re from dl_formats import get_format, get_opts, AUDIO_FORMATS @@ -105,13 +104,14 @@ class Download: if Download.manager is None: Download.manager = multiprocessing.Manager() self.status_queue = Download.manager.Queue() - self.proc = await asyncio.get_running_loop().run_in_executor(None, self._download) + self.proc = multiprocessing.Process(target=self._download) + self.proc.start() self.loop = asyncio.get_running_loop() self.notifier = notifier self.info.status = 'preparing' await self.notifier.updated(self.info) asyncio.create_task(self.update_status()) - return self.proc + return await self.loop.run_in_executor(None, self.proc.join) def cancel(self): if self.running(): @@ -208,51 +208,19 @@ class DownloadQueue: self.queue = PersistentQueue(self.config.STATE_DIR + '/queue') self.done = PersistentQueue(self.config.STATE_DIR + '/completed') self.pending = PersistentQueue(self.config.STATE_DIR + '/pending') - self.max_concurrent_downloads = 5 # Adjust this number as needed - self.semaphore = asyncio.Semaphore(self.max_concurrent_downloads) - self.executor = ThreadPoolExecutor(max_workers=self.max_concurrent_downloads) self.done.load() - - async def __download_entry(self, id, entry): - async with self.semaphore: - await entry.start(self.notifier) - if entry.info.status != 'finished': - if entry.tmpfilename and os.path.isfile(entry.tmpfilename): - try: - os.remove(entry.tmpfilename) - except: - pass - entry.info.status = 'error' - entry.close() - if self.queue.exists(id): - self.queue.delete(id) - if entry.canceled: - await self.notifier.canceled(id) - else: - self.done.put(entry) - await self.notifier.completed(entry.info) - - async def __process_queue(self): - while True: - while self.queue.empty(): - await self.event.wait() - self.event.clear() - - tasks = [] - while not self.queue.empty() and len(tasks) < self.max_concurrent_downloads: - id, entry = self.queue.next() - task = asyncio.create_task(self.__download_entry(id, entry)) - tasks.append(task) - - await asyncio.gather(*tasks) + self.max_concurrent_downloads = self.config.MAX_CONCURRENT_DOWNLOADS # New configuration option + self.active_downloads = set() + self.download_semaphore = asyncio.Semaphore(self.max_concurrent_downloads) + async def __import_queue(self): for k, v in self.queue.saved_items(): await self.add(v.url, v.quality, v.format, v.folder, v.custom_name_prefix) async def initialize(self): self.event = asyncio.Event() - asyncio.create_task(self.__process_queue()) - asyncio.create_task(self.__import_queue()) + asyncio.create_task(self.__manage_downloads()) + await self.__import_queue() def __extract_info(self, url): return yt_dlp.YoutubeDL(params={ @@ -286,7 +254,17 @@ class DownloadQueue: else: dldirectory = base_directory return dldirectory, None - + async def __manage_downloads(self): + while True: + while self.queue.empty(): + log.info('waiting for item to download') + await self.event.wait() + self.event.clear() + + async with self.download_semaphore: + id, entry = self.queue.next() + self.active_downloads.add(id) + asyncio.create_task(self.__process_download(id, entry)) async def __add_entry(self, entry, quality, format, folder, custom_name_prefix, auto_start, already): if not entry: return {'status': 'error', 'msg': "Invalid/empty data was given."} @@ -397,7 +375,28 @@ class DownloadQueue: def get(self): return(list((k, v.info) for k, v in self.queue.items()) + list((k, v.info) for k, v in self.pending.items()), list((k, v.info) for k, v in self.done.items())) - + async def __process_download(self, id, entry): + try: + log.info(f'downloading {entry.info.title}') + await entry.start(self.notifier) + if entry.info.status != 'finished': + if entry.tmpfilename and os.path.isfile(entry.tmpfilename): + try: + os.remove(entry.tmpfilename) + except: + pass + entry.info.status = 'error' + entry.close() + finally: + if self.queue.exists(id): + self.queue.delete(id) + if entry.canceled: + await self.notifier.canceled(id) + else: + self.done.put(entry) + await self.notifier.completed(entry.info) + self.active_downloads.remove(id) + self.download_semaphore.release() async def __download(self): while True: while self.queue.empty(): @@ -421,4 +420,4 @@ class DownloadQueue: await self.notifier.canceled(id) else: self.done.put(entry) - await self.notifier.completed(entry.info) + await self.notifier.completed(entry.info) \ No newline at end of file From d2bf4bd38587705dad18aca6c0e2288d77b13169 Mon Sep 17 00:00:00 2001 From: evilmonkeydiaz Date: Sat, 17 Aug 2024 14:22:03 -0700 Subject: [PATCH 03/12] update class --- app/main.py | 226 +++++++++++++++++++++++++++----------------------- app/ytdl.py | 233 +++++++++++++++++----------------------------------- 2 files changed, 199 insertions(+), 260 deletions(-) diff --git a/app/main.py b/app/main.py index 2caf1fb..c00231f 100644 --- a/app/main.py +++ b/app/main.py @@ -3,19 +3,20 @@ import os import sys +import asyncio from aiohttp import web import socketio import logging import json import pathlib -import sys + from ytdl import DownloadQueueNotifier, DownloadQueue log = logging.getLogger('main') class Config: _DEFAULTS = { - 'DOWNLOAD_DIR': '.', + 'DOWNLOAD_DIR': 'C:/Users/Roger/Desktop/MeTube', 'AUDIO_DOWNLOAD_DIR': '%%DOWNLOAD_DIR', 'TEMP_DIR': '%%DOWNLOAD_DIR', 'DOWNLOAD_DIRS_INDEXABLE': 'false', @@ -40,10 +41,10 @@ class Config: def __init__(self): for k, v in self._DEFAULTS.items(): - setattr(self, k, os.environ[k] if k in os.environ else v) + setattr(self, k, os.environ.get(k, v)) for k, v in self.__dict__.items(): - if v.startswith('%%'): + if isinstance(v, str) and v.startswith('%%'): setattr(self, k, getattr(self, v[2:])) if k in self._BOOLEAN: if v not in ('true', 'false', 'True', 'False', 'on', 'off', '1', '0'): @@ -85,10 +86,6 @@ class ObjectSerializer(json.JSONEncoder): return json.JSONEncoder.default(self, obj) serializer = ObjectSerializer() -app = web.Application() -sio = socketio.AsyncServer(cors_allowed_origins='*') -sio.attach(app, socketio_path=config.URL_PREFIX + 'socket.io') -routes = web.RouteTableDef() class Notifier(DownloadQueueNotifier): async def added(self, dl): @@ -106,80 +103,124 @@ class Notifier(DownloadQueueNotifier): async def cleared(self, id): await sio.emit('cleared', serializer.encode(id)) -dqueue = DownloadQueue(config, Notifier()) -app.on_startup.append(lambda app: dqueue.initialize()) +async def init_app(): + app = web.Application() + global sio + sio = socketio.AsyncServer(cors_allowed_origins='*') + sio.attach(app, socketio_path=config.URL_PREFIX + 'socket.io') + routes = web.RouteTableDef() -@routes.post(config.URL_PREFIX + 'add') -async def add(request): - post = await request.json() - url = post.get('url') - quality = post.get('quality') - if not url or not quality: - raise web.HTTPBadRequest() - format = post.get('format') - folder = post.get('folder') - custom_name_prefix = post.get('custom_name_prefix') - auto_start = post.get('auto_start') - if custom_name_prefix is None: - custom_name_prefix = '' - if auto_start is None: - auto_start = True - status = await dqueue.add(url, quality, format, folder, custom_name_prefix, auto_start) - return web.Response(text=serializer.encode(status)) + @routes.post(config.URL_PREFIX + 'add') + async def add(request): + post = await request.json() + url = post.get('url') + quality = post.get('quality') + if not url or not quality: + raise web.HTTPBadRequest() + format = post.get('format') + folder = post.get('folder') + custom_name_prefix = post.get('custom_name_prefix') + auto_start = post.get('auto_start') + if custom_name_prefix is None: + custom_name_prefix = '' + if auto_start is None: + auto_start = True + status = await request.app['dqueue'].add(url, quality, format, folder, custom_name_prefix, auto_start) + return web.Response(text=serializer.encode(status)) -@routes.post(config.URL_PREFIX + 'delete') -async def delete(request): - post = await request.json() - ids = post.get('ids') - where = post.get('where') - if not ids or where not in ['queue', 'done']: - raise web.HTTPBadRequest() - status = await (dqueue.cancel(ids) if where == 'queue' else dqueue.clear(ids)) - return web.Response(text=serializer.encode(status)) + @routes.post(config.URL_PREFIX + 'delete') + async def delete(request): + post = await request.json() + ids = post.get('ids') + where = post.get('where') + if not ids or where not in ['queue', 'done']: + raise web.HTTPBadRequest() + status = await (request.app['dqueue'].cancel(ids) if where == 'queue' else request.app['dqueue'].clear(ids)) + return web.Response(text=serializer.encode(status)) -@routes.post(config.URL_PREFIX + 'start') -async def start(request): - post = await request.json() - ids = post.get('ids') - status = await dqueue.start_pending(ids) - return web.Response(text=serializer.encode(status)) + @routes.post(config.URL_PREFIX + 'start') + async def start(request): + post = await request.json() + ids = post.get('ids') + status = await request.app['dqueue'].start_pending(ids) + return web.Response(text=serializer.encode(status)) -@routes.get(config.URL_PREFIX + 'history') -async def history(request): - history = { 'done': [], 'queue': []} + @routes.get(config.URL_PREFIX + 'history') + async def history(request): + history = { 'done': [], 'queue': []} - for _ ,v in dqueue.queue.saved_items(): - history['queue'].append(v) - for _ ,v in dqueue.done.saved_items(): - history['done'].append(v) + for _ ,v in request.app['dqueue'].queue.saved_items(): + history['queue'].append(v) + for _ ,v in request.app['dqueue'].done.saved_items(): + history['done'].append(v) - return web.Response(text=serializer.encode(history)) + return web.Response(text=serializer.encode(history)) -@sio.event -async def connect(sid, environ): - await sio.emit('all', serializer.encode(dqueue.get()), to=sid) - await sio.emit('configuration', serializer.encode(config), to=sid) - if config.CUSTOM_DIRS: - await sio.emit('custom_dirs', serializer.encode(get_custom_dirs()), to=sid) + @sio.event + async def connect(sid, environ): + await sio.emit('all', serializer.encode(request.app['dqueue'].get()), to=sid) + await sio.emit('configuration', serializer.encode(config), to=sid) + if config.CUSTOM_DIRS: + await sio.emit('custom_dirs', serializer.encode(get_custom_dirs()), to=sid) + + @routes.get(config.URL_PREFIX) + def index(request): + response = web.FileResponse(os.path.join(config.BASE_DIR, 'ui/dist/metube/index.html')) + if 'metube_theme' not in request.cookies: + response.set_cookie('metube_theme', config.DEFAULT_THEME) + return response + + if config.URL_PREFIX != '/': + @routes.get('/') + def index_redirect_root(request): + return web.HTTPFound(config.URL_PREFIX) + + @routes.get(config.URL_PREFIX[:-1]) + def index_redirect_dir(request): + return web.HTTPFound(config.URL_PREFIX) + + routes.static(config.URL_PREFIX + 'download/', config.DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE) + routes.static(config.URL_PREFIX + 'audio_download/', config.AUDIO_DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE) + routes.static(config.URL_PREFIX, os.path.join(config.BASE_DIR, 'ui/dist/metube')) + + try: + app.add_routes(routes) + except ValueError as e: + if 'ui/dist/metube' in str(e): + raise RuntimeError('Could not find the frontend UI static assets. Please run `node_modules/.bin/ng build` inside the ui folder') from e + raise e + + async def add_cors(request): + return web.Response(text=serializer.encode({"status": "ok"})) + + app.router.add_route('OPTIONS', config.URL_PREFIX + 'add', add_cors) + + async def on_prepare(request, response): + if 'Origin' in request.headers: + response.headers['Access-Control-Allow-Origin'] = request.headers['Origin'] + response.headers['Access-Control-Allow-Headers'] = 'Content-Type' + + app.on_response_prepare.append(on_prepare) + + dqueue = DownloadQueue(config, Notifier()) + await dqueue.initialize() + app['dqueue'] = dqueue + + return app def get_custom_dirs(): def recursive_dirs(base): path = pathlib.Path(base) - # Converts PosixPath object to string, and remove base/ prefix def convert(p): s = str(p) if s.startswith(base): s = s[len(base):] - if s.startswith('/'): s = s[1:] - return s - # Recursively lists all subdirectories of DOWNLOAD_DIR dirs = list(filter(None, map(convert, path.glob('**')))) - return dirs download_dir = recursive_dirs(config.DOWNLOAD_DIR) @@ -193,52 +234,33 @@ def get_custom_dirs(): "audio_download_dir": audio_download_dir } -@routes.get(config.URL_PREFIX) -def index(request): - response = web.FileResponse(os.path.join(config.BASE_DIR, 'ui/dist/metube/index.html')) - if 'metube_theme' not in request.cookies: - response.set_cookie('metube_theme', config.DEFAULT_THEME) - return response +async def start_background_tasks(app): + app['dqueue_task'] = asyncio.create_task(app['dqueue'].run()) -if config.URL_PREFIX != '/': - @routes.get('/') - def index_redirect_root(request): - return web.HTTPFound(config.URL_PREFIX) +async def cleanup_background_tasks(app): + app['dqueue_task'].cancel() + await app['dqueue_task'] - @routes.get(config.URL_PREFIX[:-1]) - def index_redirect_dir(request): - return web.HTTPFound(config.URL_PREFIX) +def main(): + logging.basicConfig(level=logging.DEBUG) + log.info(f"Initializing application on {config.HOST}:{config.PORT}") -routes.static(config.URL_PREFIX + 'download/', config.DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE) -routes.static(config.URL_PREFIX + 'audio_download/', config.AUDIO_DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE) -routes.static(config.URL_PREFIX, os.path.join(config.BASE_DIR, 'ui/dist/metube')) -try: - app.add_routes(routes) -except ValueError as e: - if 'ui/dist/metube' in str(e): - raise RuntimeError('Could not find the frontend UI static assets. Please run `node_modules/.bin/ng build` inside the ui folder') from e - raise e + loop = asyncio.get_event_loop() + app = loop.run_until_complete(init_app()) -# https://github.com/aio-libs/aiohttp/pull/4615 waiting for release -# @routes.options(config.URL_PREFIX + 'add') -async def add_cors(request): - return web.Response(text=serializer.encode({"status": "ok"})) - -app.router.add_route('OPTIONS', config.URL_PREFIX + 'add', add_cors) - - -async def on_prepare(request, response): - if 'Origin' in request.headers: - response.headers['Access-Control-Allow-Origin'] = request.headers['Origin'] - response.headers['Access-Control-Allow-Headers'] = 'Content-Type' - -app.on_response_prepare.append(on_prepare) + app.on_startup.append(start_background_tasks) + app.on_cleanup.append(cleanup_background_tasks) + try: + if sys.platform.startswith('win'): + web.run_app(app, host=config.HOST, port=int(config.PORT)) + else: + web.run_app(app, host=config.HOST, port=int(config.PORT), reuse_port=True) + except Exception as e: + log.error(f"Failed to start the server: {str(e)}") + sys.exit(1) if __name__ == '__main__': - logging.basicConfig(level=logging.DEBUG) - log.info(f"Listening on {config.HOST}:{config.PORT}") if sys.platform.startswith('win'): - web.run_app(app, host=config.HOST, port=int(config.PORT)) - else: - web.run_app(app, host=config.HOST, port=int(config.PORT), reuse_port=True) + asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) + main() \ No newline at end of file diff --git a/app/ytdl.py b/app/ytdl.py index e9b4ee6..fe8eba4 100644 --- a/app/ytdl.py +++ b/app/ytdl.py @@ -4,7 +4,6 @@ from collections import OrderedDict import shelve import time import asyncio -import multiprocessing import logging import re from dl_formats import get_format, get_opts, AUDIO_FORMATS @@ -44,8 +43,6 @@ class DownloadInfo: self.error = error class Download: - manager = None - def __init__(self, download_dir, temp_dir, output_template, output_template_chapter, quality, format, ytdl_opts, info): self.download_dir = download_dir self.temp_dir = temp_dir @@ -56,106 +53,48 @@ class Download: self.info = info self.canceled = False self.tmpfilename = None - self.status_queue = None - self.proc = None - self.loop = None - self.notifier = None - - - def _download(self): - try: - def put_status(st): - self.status_queue.put({k: v for k, v in st.items() if k in ( - 'tmpfilename', - 'filename', - 'status', - 'msg', - 'total_bytes', - 'total_bytes_estimate', - 'downloaded_bytes', - 'speed', - 'eta', - )}) - def put_status_postprocessor(d): - if d['postprocessor'] == 'MoveFiles' and d['status'] == 'finished': - if '__finaldir' in d['info_dict']: - filename = os.path.join(d['info_dict']['__finaldir'], os.path.basename(d['info_dict']['filepath'])) - else: - filename = d['info_dict']['filepath'] - self.status_queue.put({'status': 'finished', 'filename': filename}) - ret = yt_dlp.YoutubeDL(params={ - 'quiet': True, - 'no_color': True, - #'skip_download': True, - 'paths': {"home": self.download_dir, "temp": self.temp_dir}, - 'outtmpl': { "default": self.output_template, "chapter": self.output_template_chapter }, - 'format': self.format, - 'socket_timeout': 30, - 'ignore_no_formats_error': True, - 'progress_hooks': [put_status], - 'postprocessor_hooks': [put_status_postprocessor], - **self.ytdl_opts, - }).download([self.info.url]) - self.status_queue.put({'status': 'finished' if ret == 0 else 'error'}) - except yt_dlp.utils.YoutubeDLError as exc: - self.status_queue.put({'status': 'error', 'msg': str(exc)}) async def start(self, notifier): - if Download.manager is None: - Download.manager = multiprocessing.Manager() - self.status_queue = Download.manager.Queue() - self.proc = multiprocessing.Process(target=self._download) - self.proc.start() - self.loop = asyncio.get_running_loop() - self.notifier = notifier self.info.status = 'preparing' - await self.notifier.updated(self.info) - asyncio.create_task(self.update_status()) - return await self.loop.run_in_executor(None, self.proc.join) + await notifier.updated(self.info) + + try: + result = await asyncio.get_event_loop().run_in_executor(None, self._download) + if result['status'] == 'finished': + self.info.status = 'finished' + self.info.filename = result.get('filename') + self.info.size = os.path.getsize(result['filename']) if os.path.exists(result['filename']) else None + else: + self.info.status = 'error' + self.info.msg = result.get('msg', 'Unknown error occurred') + except Exception as e: + self.info.status = 'error' + self.info.msg = str(e) + + await notifier.updated(self.info) + + def _download(self): + ydl_opts = { + 'quiet': True, + 'no_color': True, + 'paths': {"home": self.download_dir, "temp": self.temp_dir}, + 'outtmpl': {"default": self.output_template, "chapter": self.output_template_chapter}, + 'format': self.format, + 'socket_timeout': 30, + 'ignore_no_formats_error': True, + **self.ytdl_opts, + } + + with yt_dlp.YoutubeDL(ydl_opts) as ydl: + try: + info = ydl.extract_info(self.info.url, download=True) + return {'status': 'finished', 'filename': ydl.prepare_filename(info)} + except yt_dlp.utils.DownloadError as e: + return {'status': 'error', 'msg': str(e)} def cancel(self): - if self.running(): - self.proc.kill() self.canceled = True - def close(self): - if self.started(): - self.proc.close() - self.status_queue.put(None) - - def running(self): - try: - return self.proc is not None and self.proc.is_alive() - except ValueError: - return False - - def started(self): - return self.proc is not None - - async def update_status(self): - while True: - status = await self.loop.run_in_executor(None, self.status_queue.get) - if status is None: - return - self.tmpfilename = status.get('tmpfilename') - if 'filename' in status: - fileName = status.get('filename') - self.info.filename = os.path.relpath(fileName, self.download_dir) - self.info.size = os.path.getsize(fileName) if os.path.exists(fileName) else None - - # Set correct file extension for thumbnails - if(self.info.format == 'thumbnail'): - self.info.filename = re.sub(r'\.webm$', '.jpg', self.info.filename) - self.info.status = status['status'] - self.info.msg = status.get('msg') - if 'downloaded_bytes' in status: - total = status.get('total_bytes') or status.get('total_bytes_estimate') - if total: - self.info.percent = status['downloaded_bytes'] / total * 100 - self.info.speed = status.get('speed') - self.info.eta = status.get('eta') - await self.notifier.updated(self.info) - class PersistentQueue: def __init__(self, path): pdir = os.path.dirname(path) @@ -209,19 +148,25 @@ class DownloadQueue: self.done = PersistentQueue(self.config.STATE_DIR + '/completed') self.pending = PersistentQueue(self.config.STATE_DIR + '/pending') self.done.load() - self.max_concurrent_downloads = self.config.MAX_CONCURRENT_DOWNLOADS # New configuration option self.active_downloads = set() - self.download_semaphore = asyncio.Semaphore(self.max_concurrent_downloads) + self.max_concurrent_downloads = 3 # Adjust this value as needed + self.event = asyncio.Event() + + async def initialize(self): + await self.__import_queue() + + async def run(self): + while True: + try: + await self.__manage_downloads() + except Exception as e: + log.error(f"Error in download queue: {str(e)}") + await asyncio.sleep(5) # Wait a bit before retrying async def __import_queue(self): for k, v in self.queue.saved_items(): await self.add(v.url, v.quality, v.format, v.folder, v.custom_name_prefix) - async def initialize(self): - self.event = asyncio.Event() - asyncio.create_task(self.__manage_downloads()) - await self.__import_queue() - def __extract_info(self, url): return yt_dlp.YoutubeDL(params={ 'quiet': True, @@ -233,12 +178,6 @@ class DownloadQueue: }).extract_info(url, download=False) def __calc_download_path(self, quality, format, folder): - """Calculates download path from quality, format and folder attributes. - - Returns: - Tuple dldirectory, error_message both of which might be None (but not at the same time) - """ - # Keep consistent with frontend base_directory = self.config.DOWNLOAD_DIR if (quality != 'audio' and format not in AUDIO_FORMATS) else self.config.AUDIO_DOWNLOAD_DIR if folder: if not self.config.CUSTOM_DIRS: @@ -254,17 +193,7 @@ class DownloadQueue: else: dldirectory = base_directory return dldirectory, None - async def __manage_downloads(self): - while True: - while self.queue.empty(): - log.info('waiting for item to download') - await self.event.wait() - self.event.clear() - - async with self.download_semaphore: - id, entry = self.queue.next() - self.active_downloads.add(id) - asyncio.create_task(self.__process_download(id, entry)) + async def __add_entry(self, entry, quality, format, folder, custom_name_prefix, auto_start, already): if not entry: return {'status': 'error', 'msg': "Invalid/empty data was given."} @@ -324,10 +253,15 @@ class DownloadQueue: else: already.add(url) try: - entry = await asyncio.get_running_loop().run_in_executor(None, self.__extract_info, url) + entry = await asyncio.get_event_loop().run_in_executor(None, self.__extract_info, url) except yt_dlp.utils.YoutubeDLError as exc: return {'status': 'error', 'msg': str(exc)} - return await self.__add_entry(entry, quality, format, folder, custom_name_prefix, auto_start, already) + result = await self.__add_entry(entry, quality, format, folder, custom_name_prefix, auto_start, already) + + if result['status'] == 'ok' and auto_start: + self.event.set() # Signal that new items are available for download + + return result async def start_pending(self, ids): for id in ids: @@ -349,11 +283,11 @@ class DownloadQueue: if not self.queue.exists(id): log.warn(f'requested cancel for non-existent download {id}') continue - if self.queue.get(id).started(): - self.queue.get(id).cancel() - else: - self.queue.delete(id) - await self.notifier.canceled(id) + dl = self.queue.get(id) + if isinstance(dl, Download): + dl.cancel() + self.queue.delete(id) + await self.notifier.canceled(id) return {'status': 'ok'} async def clear(self, ids): @@ -375,35 +309,18 @@ class DownloadQueue: def get(self): return(list((k, v.info) for k, v in self.queue.items()) + list((k, v.info) for k, v in self.pending.items()), list((k, v.info) for k, v in self.done.items())) - async def __process_download(self, id, entry): - try: - log.info(f'downloading {entry.info.title}') - await entry.start(self.notifier) - if entry.info.status != 'finished': - if entry.tmpfilename and os.path.isfile(entry.tmpfilename): - try: - os.remove(entry.tmpfilename) - except: - pass - entry.info.status = 'error' - entry.close() - finally: - if self.queue.exists(id): - self.queue.delete(id) - if entry.canceled: - await self.notifier.canceled(id) - else: - self.done.put(entry) - await self.notifier.completed(entry.info) - self.active_downloads.remove(id) - self.download_semaphore.release() - async def __download(self): + + async def __manage_downloads(self): while True: - while self.queue.empty(): - log.info('waiting for item to download') - await self.event.wait() - self.event.clear() - id, entry = self.queue.next() + while not self.queue.empty() and len(self.active_downloads) < self.max_concurrent_downloads: + id, entry = self.queue.next() + if id not in self.active_downloads: + self.active_downloads.add(id) + asyncio.create_task(self.__download(id, entry)) + await asyncio.sleep(1) # Add a small delay to prevent busy waiting + + async def __download(self, id, entry): + try: log.info(f'downloading {entry.info.title}') await entry.start(self.notifier) if entry.info.status != 'finished': @@ -412,12 +329,12 @@ class DownloadQueue: os.remove(entry.tmpfilename) except: pass - entry.info.status = 'error' - entry.close() if self.queue.exists(id): self.queue.delete(id) if entry.canceled: await self.notifier.canceled(id) else: self.done.put(entry) - await self.notifier.completed(entry.info) \ No newline at end of file + await self.notifier.completed(entry.info) + finally: + self.active_downloads.remove(id) \ No newline at end of file From 8552faf9c5c40f0427641988148dda848daad81c Mon Sep 17 00:00:00 2001 From: evilmonkeydiaz Date: Sat, 17 Aug 2024 15:15:53 -0700 Subject: [PATCH 04/12] Concurrent downloads --- app/main.py | 236 +++++++++++++++++++++++++--------------------------- app/ytdl.py | 235 ++++++++++++++++++++++++++++++--------------------- 2 files changed, 255 insertions(+), 216 deletions(-) diff --git a/app/main.py b/app/main.py index c00231f..7e1fefc 100644 --- a/app/main.py +++ b/app/main.py @@ -3,7 +3,6 @@ import os import sys -import asyncio from aiohttp import web import socketio import logging @@ -16,7 +15,7 @@ log = logging.getLogger('main') class Config: _DEFAULTS = { - 'DOWNLOAD_DIR': 'C:/Users/Roger/Desktop/MeTube', + 'DOWNLOAD_DIR': '.', 'AUDIO_DOWNLOAD_DIR': '%%DOWNLOAD_DIR', 'TEMP_DIR': '%%DOWNLOAD_DIR', 'DOWNLOAD_DIRS_INDEXABLE': 'false', @@ -41,10 +40,10 @@ class Config: def __init__(self): for k, v in self._DEFAULTS.items(): - setattr(self, k, os.environ.get(k, v)) + setattr(self, k, os.environ[k] if k in os.environ else v) for k, v in self.__dict__.items(): - if isinstance(v, str) and v.startswith('%%'): + if v.startswith('%%'): setattr(self, k, getattr(self, v[2:])) if k in self._BOOLEAN: if v not in ('true', 'false', 'True', 'False', 'on', 'off', '1', '0'): @@ -86,141 +85,115 @@ class ObjectSerializer(json.JSONEncoder): return json.JSONEncoder.default(self, obj) serializer = ObjectSerializer() +app = web.Application() +sio = socketio.AsyncServer(cors_allowed_origins='*') +sio.attach(app, socketio_path=config.URL_PREFIX + 'socket.io') +routes = web.RouteTableDef() class Notifier(DownloadQueueNotifier): async def added(self, dl): + log.info(f"Notifier: Download added - {dl.title}") await sio.emit('added', serializer.encode(dl)) async def updated(self, dl): + log.info(f"Notifier: Download updated - {dl.title}") await sio.emit('updated', serializer.encode(dl)) async def completed(self, dl): + log.info(f"Notifier: Download completed - {dl.title}") await sio.emit('completed', serializer.encode(dl)) async def canceled(self, id): + log.info(f"Notifier: Download canceled - {id}") await sio.emit('canceled', serializer.encode(id)) async def cleared(self, id): + log.info(f"Notifier: Download cleared - {id}") await sio.emit('cleared', serializer.encode(id)) -async def init_app(): - app = web.Application() - global sio - sio = socketio.AsyncServer(cors_allowed_origins='*') - sio.attach(app, socketio_path=config.URL_PREFIX + 'socket.io') - routes = web.RouteTableDef() +dqueue = DownloadQueue(config, Notifier()) +app.on_startup.append(lambda app: dqueue.initialize()) - @routes.post(config.URL_PREFIX + 'add') - async def add(request): - post = await request.json() - url = post.get('url') - quality = post.get('quality') - if not url or not quality: - raise web.HTTPBadRequest() - format = post.get('format') - folder = post.get('folder') - custom_name_prefix = post.get('custom_name_prefix') - auto_start = post.get('auto_start') - if custom_name_prefix is None: - custom_name_prefix = '' - if auto_start is None: - auto_start = True - status = await request.app['dqueue'].add(url, quality, format, folder, custom_name_prefix, auto_start) - return web.Response(text=serializer.encode(status)) +@routes.post(config.URL_PREFIX + 'add') +async def add(request): + log.info("Received request to add download") + post = await request.json() + log.info(f"Request data: {post}") + url = post.get('url') + quality = post.get('quality') + if not url or not quality: + log.error("Bad request: missing 'url' or 'quality'") + raise web.HTTPBadRequest() + format = post.get('format') + folder = post.get('folder') + custom_name_prefix = post.get('custom_name_prefix') + auto_start = post.get('auto_start') + if custom_name_prefix is None: + custom_name_prefix = '' + if auto_start is None: + auto_start = True + status = await dqueue.add(url, quality, format, folder, custom_name_prefix, auto_start) + log.info(f"Download added to queue: {url}") + return web.Response(text=serializer.encode(status)) - @routes.post(config.URL_PREFIX + 'delete') - async def delete(request): - post = await request.json() - ids = post.get('ids') - where = post.get('where') - if not ids or where not in ['queue', 'done']: - raise web.HTTPBadRequest() - status = await (request.app['dqueue'].cancel(ids) if where == 'queue' else request.app['dqueue'].clear(ids)) - return web.Response(text=serializer.encode(status)) +@routes.post(config.URL_PREFIX + 'delete') +async def delete(request): + post = await request.json() + ids = post.get('ids') + where = post.get('where') + if not ids or where not in ['queue', 'done']: + log.error("Bad request: missing 'ids' or incorrect 'where' value") + raise web.HTTPBadRequest() + status = await (dqueue.cancel(ids) if where == 'queue' else dqueue.clear(ids)) + log.info(f"Download delete request processed for ids: {ids}, where: {where}") + return web.Response(text=serializer.encode(status)) - @routes.post(config.URL_PREFIX + 'start') - async def start(request): - post = await request.json() - ids = post.get('ids') - status = await request.app['dqueue'].start_pending(ids) - return web.Response(text=serializer.encode(status)) +@routes.post(config.URL_PREFIX + 'start') +async def start(request): + post = await request.json() + ids = post.get('ids') + log.info(f"Received request to start pending downloads for ids: {ids}") + status = await dqueue.start_pending(ids) + return web.Response(text=serializer.encode(status)) - @routes.get(config.URL_PREFIX + 'history') - async def history(request): - history = { 'done': [], 'queue': []} +@routes.get(config.URL_PREFIX + 'history') +async def history(request): + history = { 'done': [], 'queue': []} - for _ ,v in request.app['dqueue'].queue.saved_items(): - history['queue'].append(v) - for _ ,v in request.app['dqueue'].done.saved_items(): - history['done'].append(v) + for _ ,v in dqueue.queue.saved_items(): + history['queue'].append(v) + for _ ,v in dqueue.done.saved_items(): + history['done'].append(v) - return web.Response(text=serializer.encode(history)) + log.info("Sending download history") + return web.Response(text=serializer.encode(history)) - @sio.event - async def connect(sid, environ): - await sio.emit('all', serializer.encode(request.app['dqueue'].get()), to=sid) - await sio.emit('configuration', serializer.encode(config), to=sid) - if config.CUSTOM_DIRS: - await sio.emit('custom_dirs', serializer.encode(get_custom_dirs()), to=sid) - - @routes.get(config.URL_PREFIX) - def index(request): - response = web.FileResponse(os.path.join(config.BASE_DIR, 'ui/dist/metube/index.html')) - if 'metube_theme' not in request.cookies: - response.set_cookie('metube_theme', config.DEFAULT_THEME) - return response - - if config.URL_PREFIX != '/': - @routes.get('/') - def index_redirect_root(request): - return web.HTTPFound(config.URL_PREFIX) - - @routes.get(config.URL_PREFIX[:-1]) - def index_redirect_dir(request): - return web.HTTPFound(config.URL_PREFIX) - - routes.static(config.URL_PREFIX + 'download/', config.DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE) - routes.static(config.URL_PREFIX + 'audio_download/', config.AUDIO_DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE) - routes.static(config.URL_PREFIX, os.path.join(config.BASE_DIR, 'ui/dist/metube')) - - try: - app.add_routes(routes) - except ValueError as e: - if 'ui/dist/metube' in str(e): - raise RuntimeError('Could not find the frontend UI static assets. Please run `node_modules/.bin/ng build` inside the ui folder') from e - raise e - - async def add_cors(request): - return web.Response(text=serializer.encode({"status": "ok"})) - - app.router.add_route('OPTIONS', config.URL_PREFIX + 'add', add_cors) - - async def on_prepare(request, response): - if 'Origin' in request.headers: - response.headers['Access-Control-Allow-Origin'] = request.headers['Origin'] - response.headers['Access-Control-Allow-Headers'] = 'Content-Type' - - app.on_response_prepare.append(on_prepare) - - dqueue = DownloadQueue(config, Notifier()) - await dqueue.initialize() - app['dqueue'] = dqueue - - return app +@sio.event +async def connect(sid, environ): + log.info(f"Client connected: {sid}") + await sio.emit('all', serializer.encode(dqueue.get()), to=sid) + await sio.emit('configuration', serializer.encode(config), to=sid) + if config.CUSTOM_DIRS: + await sio.emit('custom_dirs', serializer.encode(get_custom_dirs()), to=sid) def get_custom_dirs(): def recursive_dirs(base): path = pathlib.Path(base) + # Converts PosixPath object to string, and remove base/ prefix def convert(p): s = str(p) if s.startswith(base): s = s[len(base):] + if s.startswith('/'): s = s[1:] + return s + # Recursively lists all subdirectories of DOWNLOAD_DIR dirs = list(filter(None, map(convert, path.glob('**')))) + return dirs download_dir = recursive_dirs(config.DOWNLOAD_DIR) @@ -234,23 +207,49 @@ def get_custom_dirs(): "audio_download_dir": audio_download_dir } -async def start_background_tasks(app): - app['dqueue_task'] = asyncio.create_task(app['dqueue'].run()) +@routes.get(config.URL_PREFIX) +def index(request): + response = web.FileResponse(os.path.join(config.BASE_DIR, 'ui/dist/metube/index.html')) + if 'metube_theme' not in request.cookies: + response.set_cookie('metube_theme', config.DEFAULT_THEME) + return response -async def cleanup_background_tasks(app): - app['dqueue_task'].cancel() - await app['dqueue_task'] +if config.URL_PREFIX != '/': + @routes.get('/') + def index_redirect_root(request): + return web.HTTPFound(config.URL_PREFIX) -def main(): + @routes.get(config.URL_PREFIX[:-1]) + def index_redirect_dir(request): + return web.HTTPFound(config.URL_PREFIX) + +routes.static(config.URL_PREFIX + 'download/', config.DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE) +routes.static(config.URL_PREFIX + 'audio_download/', config.AUDIO_DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE) +routes.static(config.URL_PREFIX, os.path.join(config.BASE_DIR, 'ui/dist/metube')) +try: + app.add_routes(routes) +except ValueError as e: + if 'ui/dist/metube' in str(e): + raise RuntimeError('Could not find the frontend UI static assets. Please run `node_modules/.bin/ng build` inside the ui folder') from e + raise e + +# https://github.com/aio-libs/aiohttp/pull/4615 waiting for release +# @routes.options(config.URL_PREFIX + 'add') +async def add_cors(request): + return web.Response(text=serializer.encode({"status": "ok"})) + +app.router.add_route('OPTIONS', config.URL_PREFIX + 'add', add_cors) + +async def on_prepare(request, response): + if 'Origin' in request.headers: + response.headers['Access-Control-Allow-Origin'] = request.headers['Origin'] + response.headers['Access-Control-Allow-Headers'] = 'Content-Type' + +app.on_response_prepare.append(on_prepare) + +if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) - log.info(f"Initializing application on {config.HOST}:{config.PORT}") - - loop = asyncio.get_event_loop() - app = loop.run_until_complete(init_app()) - - app.on_startup.append(start_background_tasks) - app.on_cleanup.append(cleanup_background_tasks) - + log.info(f"Listening on {config.HOST}:{config.PORT}") try: if sys.platform.startswith('win'): web.run_app(app, host=config.HOST, port=int(config.PORT)) @@ -258,9 +257,4 @@ def main(): web.run_app(app, host=config.HOST, port=int(config.PORT), reuse_port=True) except Exception as e: log.error(f"Failed to start the server: {str(e)}") - sys.exit(1) - -if __name__ == '__main__': - if sys.platform.startswith('win'): - asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) - main() \ No newline at end of file + sys.exit(1) \ No newline at end of file diff --git a/app/ytdl.py b/app/ytdl.py index fe8eba4..16133ec 100644 --- a/app/ytdl.py +++ b/app/ytdl.py @@ -4,6 +4,7 @@ from collections import OrderedDict import shelve import time import asyncio +import multiprocessing import logging import re from dl_formats import get_format, get_opts, AUDIO_FORMATS @@ -43,6 +44,8 @@ class DownloadInfo: self.error = error class Download: + manager = None + def __init__(self, download_dir, temp_dir, output_template, output_template_chapter, quality, format, ytdl_opts, info): self.download_dir = download_dir self.temp_dir = temp_dir @@ -53,48 +56,113 @@ class Download: self.info = info self.canceled = False self.tmpfilename = None - - async def start(self, notifier): - self.info.status = 'preparing' - await notifier.updated(self.info) - - try: - result = await asyncio.get_event_loop().run_in_executor(None, self._download) - if result['status'] == 'finished': - self.info.status = 'finished' - self.info.filename = result.get('filename') - self.info.size = os.path.getsize(result['filename']) if os.path.exists(result['filename']) else None - else: - self.info.status = 'error' - self.info.msg = result.get('msg', 'Unknown error occurred') - except Exception as e: - self.info.status = 'error' - self.info.msg = str(e) - - await notifier.updated(self.info) + self.status_queue = None + self.proc = None + self.loop = None + self.notifier = None def _download(self): - ydl_opts = { - 'quiet': True, - 'no_color': True, - 'paths': {"home": self.download_dir, "temp": self.temp_dir}, - 'outtmpl': {"default": self.output_template, "chapter": self.output_template_chapter}, - 'format': self.format, - 'socket_timeout': 30, - 'ignore_no_formats_error': True, - **self.ytdl_opts, - } + log.info(f"Starting download for: {self.info.title} ({self.info.url})") + try: + def put_status(st): + self.status_queue.put({k: v for k, v in st.items() if k in ( + 'tmpfilename', + 'filename', + 'status', + 'msg', + 'total_bytes', + 'total_bytes_estimate', + 'downloaded_bytes', + 'speed', + 'eta', + )}) - with yt_dlp.YoutubeDL(ydl_opts) as ydl: - try: - info = ydl.extract_info(self.info.url, download=True) - return {'status': 'finished', 'filename': ydl.prepare_filename(info)} - except yt_dlp.utils.DownloadError as e: - return {'status': 'error', 'msg': str(e)} + def put_status_postprocessor(d): + if d['postprocessor'] == 'MoveFiles' and d['status'] == 'finished': + if '__finaldir' in d['info_dict']: + filename = os.path.join(d['info_dict']['__finaldir'], os.path.basename(d['info_dict']['filepath'])) + else: + filename = d['info_dict']['filepath'] + self.status_queue.put({'status': 'finished', 'filename': filename}) + + ret = yt_dlp.YoutubeDL(params={ + 'quiet': True, + 'no_color': True, + 'paths': {"home": self.download_dir, "temp": self.temp_dir}, + 'outtmpl': { "default": self.output_template, "chapter": self.output_template_chapter }, + 'format': self.format, + 'socket_timeout': 30, + 'ignore_no_formats_error': True, + 'progress_hooks': [put_status], + 'postprocessor_hooks': [put_status_postprocessor], + **self.ytdl_opts, + }).download([self.info.url]) + self.status_queue.put({'status': 'finished' if ret == 0 else 'error'}) + log.info(f"Finished download for: {self.info.title}") + except yt_dlp.utils.YoutubeDLError as exc: + log.error(f"Download error for {self.info.title}: {str(exc)}") + self.status_queue.put({'status': 'error', 'msg': str(exc)}) + + async def start(self, notifier): + log.info(f"Preparing download for: {self.info.title}") + if Download.manager is None: + Download.manager = multiprocessing.Manager() + self.status_queue = Download.manager.Queue() + self.proc = multiprocessing.Process(target=self._download) + self.proc.start() + self.loop = asyncio.get_running_loop() + self.notifier = notifier + self.info.status = 'preparing' + await self.notifier.updated(self.info) + asyncio.create_task(self.update_status()) + return await self.loop.run_in_executor(None, self.proc.join) def cancel(self): + log.info(f"Cancelling download: {self.info.title}") + if self.running(): + self.proc.kill() self.canceled = True + def close(self): + log.info(f"Closing download process for: {self.info.title}") + if self.started(): + self.proc.close() + self.status_queue.put(None) + + def running(self): + try: + return self.proc is not None and self.proc.is_alive() + except ValueError: + return False + + def started(self): + return self.proc is not None + + async def update_status(self): + while True: + status = await self.loop.run_in_executor(None, self.status_queue.get) + if status is None: + log.info(f"Status update finished for: {self.info.title}") + return + self.tmpfilename = status.get('tmpfilename') + if 'filename' in status: + fileName = status.get('filename') + self.info.filename = os.path.relpath(fileName, self.download_dir) + self.info.size = os.path.getsize(fileName) if os.path.exists(fileName) else None + + if self.info.format == 'thumbnail': + self.info.filename = re.sub(r'\.webm$', '.jpg', self.info.filename) + self.info.status = status['status'] + self.info.msg = status.get('msg') + if 'downloaded_bytes' in status: + total = status.get('total_bytes') or status.get('total_bytes_estimate') + if total: + self.info.percent = status['downloaded_bytes'] / total * 100 + self.info.speed = status.get('speed') + self.info.eta = status.get('eta') + log.info(f"Updating status for {self.info.title}: {status}") + await self.notifier.updated(self.info) + class PersistentQueue: def __init__(self, path): pdir = os.path.dirname(path) @@ -147,26 +215,17 @@ class DownloadQueue: self.queue = PersistentQueue(self.config.STATE_DIR + '/queue') self.done = PersistentQueue(self.config.STATE_DIR + '/completed') self.pending = PersistentQueue(self.config.STATE_DIR + '/pending') - self.done.load() self.active_downloads = set() - self.max_concurrent_downloads = 3 # Adjust this value as needed - self.event = asyncio.Event() - - async def initialize(self): - await self.__import_queue() - - async def run(self): - while True: - try: - await self.__manage_downloads() - except Exception as e: - log.error(f"Error in download queue: {str(e)}") - await asyncio.sleep(5) # Wait a bit before retrying + self.done.load() async def __import_queue(self): for k, v in self.queue.saved_items(): await self.add(v.url, v.quality, v.format, v.folder, v.custom_name_prefix) + async def initialize(self): + log.info("Initializing DownloadQueue") + asyncio.create_task(self.__import_queue()) + def __extract_info(self, url): return yt_dlp.YoutubeDL(params={ 'quiet': True, @@ -234,8 +293,9 @@ class DownloadQueue: if property.startswith("playlist"): output = output.replace(f"%({property})s", str(value)) if auto_start is True: - self.queue.put(Download(dldirectory, self.config.TEMP_DIR, output, output_chapter, quality, format, self.config.YTDL_OPTIONS, dl)) - self.event.set() + download = Download(dldirectory, self.config.TEMP_DIR, output, output_chapter, quality, format, self.config.YTDL_OPTIONS, dl) + self.queue.put(download) + asyncio.create_task(self.__start_download(download)) else: self.pending.put(Download(dldirectory, self.config.TEMP_DIR, output, output_chapter, quality, format, self.config.YTDL_OPTIONS, dl)) await self.notifier.added(dl) @@ -244,6 +304,24 @@ class DownloadQueue: return await self.add(entry['url'], quality, format, folder, custom_name_prefix, auto_start, already) return {'status': 'error', 'msg': f'Unsupported resource "{etype}"'} + async def __start_download(self, download): + await download.start(self.notifier) + if download.info.status != 'finished': + if download.tmpfilename and os.path.isfile(download.tmpfilename): + try: + os.remove(download.tmpfilename) + except: + pass + download.info.status = 'error' + download.close() + if self.queue.exists(download.info.url): + self.queue.delete(download.info.url) + if download.canceled: + await self.notifier.canceled(download.info.url) + else: + self.done.put(download) + await self.notifier.completed(download.info) + async def add(self, url, quality, format, folder, custom_name_prefix, auto_start=True, already=None): log.info(f'adding {url}: {quality=} {format=} {already=} {folder=} {custom_name_prefix=}') already = set() if already is None else already @@ -253,15 +331,10 @@ class DownloadQueue: else: already.add(url) try: - entry = await asyncio.get_event_loop().run_in_executor(None, self.__extract_info, url) + entry = await asyncio.get_running_loop().run_in_executor(None, self.__extract_info, url) except yt_dlp.utils.YoutubeDLError as exc: return {'status': 'error', 'msg': str(exc)} - result = await self.__add_entry(entry, quality, format, folder, custom_name_prefix, auto_start, already) - - if result['status'] == 'ok' and auto_start: - self.event.set() # Signal that new items are available for download - - return result + return await self.__add_entry(entry, quality, format, folder, custom_name_prefix, auto_start, already) async def start_pending(self, ids): for id in ids: @@ -271,7 +344,7 @@ class DownloadQueue: dl = self.pending.get(id) self.queue.put(dl) self.pending.delete(id) - self.event.set() + asyncio.create_task(self.__start_download(dl)) return {'status': 'ok'} async def cancel(self, ids): @@ -283,11 +356,11 @@ class DownloadQueue: if not self.queue.exists(id): log.warn(f'requested cancel for non-existent download {id}') continue - dl = self.queue.get(id) - if isinstance(dl, Download): - dl.cancel() - self.queue.delete(id) - await self.notifier.canceled(id) + if self.queue.get(id).started(): + self.queue.get(id).cancel() + else: + self.queue.delete(id) + await self.notifier.canceled(id) return {'status': 'ok'} async def clear(self, ids): @@ -307,34 +380,6 @@ class DownloadQueue: return {'status': 'ok'} def get(self): - return(list((k, v.info) for k, v in self.queue.items()) + list((k, v.info) for k, v in self.pending.items()), - list((k, v.info) for k, v in self.done.items())) - - async def __manage_downloads(self): - while True: - while not self.queue.empty() and len(self.active_downloads) < self.max_concurrent_downloads: - id, entry = self.queue.next() - if id not in self.active_downloads: - self.active_downloads.add(id) - asyncio.create_task(self.__download(id, entry)) - await asyncio.sleep(1) # Add a small delay to prevent busy waiting - - async def __download(self, id, entry): - try: - log.info(f'downloading {entry.info.title}') - await entry.start(self.notifier) - if entry.info.status != 'finished': - if entry.tmpfilename and os.path.isfile(entry.tmpfilename): - try: - os.remove(entry.tmpfilename) - except: - pass - if self.queue.exists(id): - self.queue.delete(id) - if entry.canceled: - await self.notifier.canceled(id) - else: - self.done.put(entry) - await self.notifier.completed(entry.info) - finally: - self.active_downloads.remove(id) \ No newline at end of file + return (list((k, v.info) for k, v in self.queue.items()) + + list((k, v.info) for k, v in self.pending.items()), + list((k, v.info) for k, v in self.done.items())) From 2097a7adfa20033ee3fb688a922eaa74eafbf8ab Mon Sep 17 00:00:00 2001 From: evilmonkeydiaz Date: Sat, 17 Aug 2024 16:09:42 -0700 Subject: [PATCH 05/12] Added concurrent and limited modes --- app/main.py | 12 ++++++---- app/ytdl.py | 66 ++++++++++++++++++++++++++++++++++++++--------------- 2 files changed, 55 insertions(+), 23 deletions(-) diff --git a/app/main.py b/app/main.py index 7e1fefc..77e7ecf 100644 --- a/app/main.py +++ b/app/main.py @@ -33,17 +33,19 @@ class Config: 'HOST': '0.0.0.0', 'PORT': '8081', 'BASE_DIR': '', - 'DEFAULT_THEME': 'auto' + 'DEFAULT_THEME': 'auto', + 'DOWNLOAD_MODE': 'sequential', # Can be 'sequential', 'concurrent', or 'limited' + 'MAX_CONCURRENT_DOWNLOADS': 3, # Used if DOWNLOAD_MODE is 'limited' } _BOOLEAN = ('DOWNLOAD_DIRS_INDEXABLE', 'CUSTOM_DIRS', 'CREATE_CUSTOM_DIRS', 'DELETE_FILE_ON_TRASHCAN') def __init__(self): for k, v in self._DEFAULTS.items(): - setattr(self, k, os.environ[k] if k in os.environ else v) + setattr(self, k, os.environ.get(k, v)) for k, v in self.__dict__.items(): - if v.startswith('%%'): + if isinstance(v, str) and v.startswith('%%'): setattr(self, k, getattr(self, v[2:])) if k in self._BOOLEAN: if v not in ('true', 'false', 'True', 'False', 'on', 'off', '1', '0'): @@ -160,9 +162,9 @@ async def start(request): async def history(request): history = { 'done': [], 'queue': []} - for _ ,v in dqueue.queue.saved_items(): + for _, v in dqueue.queue.saved_items(): history['queue'].append(v) - for _ ,v in dqueue.done.saved_items(): + for _, v in dqueue.done.saved_items(): history['done'].append(v) log.info("Sending download history") diff --git a/app/ytdl.py b/app/ytdl.py index 16133ec..b009257 100644 --- a/app/ytdl.py +++ b/app/ytdl.py @@ -216,6 +216,11 @@ class DownloadQueue: self.done = PersistentQueue(self.config.STATE_DIR + '/completed') self.pending = PersistentQueue(self.config.STATE_DIR + '/pending') self.active_downloads = set() + self.semaphore = None + + if self.config.DOWNLOAD_MODE == 'limited': + self.semaphore = asyncio.Semaphore(self.config.MAX_CONCURRENT_DOWNLOADS) + self.done.load() async def __import_queue(self): @@ -226,6 +231,49 @@ class DownloadQueue: log.info("Initializing DownloadQueue") asyncio.create_task(self.__import_queue()) + async def __start_download(self, download): + if self.config.DOWNLOAD_MODE == 'sequential': + await self.__sequential_download(download) + elif self.config.DOWNLOAD_MODE == 'limited' and self.semaphore is not None: + await self.__limited_concurrent_download(download) + else: # concurrent without limit + await self.__concurrent_download(download) + + async def __sequential_download(self, download): + log.info("Starting sequential download.") + await download.start(self.notifier) + self._post_download_cleanup(download) + + async def __concurrent_download(self, download): + log.info("Starting concurrent download without limits.") + asyncio.create_task(self._run_download(download)) + + async def __limited_concurrent_download(self, download): + log.info("Starting limited concurrent download.") + async with self.semaphore: + await self._run_download(download) + + async def _run_download(self, download): + await download.start(self.notifier) + self._post_download_cleanup(download) + + def _post_download_cleanup(self, download): + if download.info.status != 'finished': + if download.tmpfilename and os.path.isfile(download.tmpfilename): + try: + os.remove(download.tmpfilename) + except: + pass + download.info.status = 'error' + download.close() + if self.queue.exists(download.info.url): + self.queue.delete(download.info.url) + if download.canceled: + asyncio.create_task(self.notifier.canceled(download.info.url)) + else: + self.done.put(download) + asyncio.create_task(self.notifier.completed(download.info)) + def __extract_info(self, url): return yt_dlp.YoutubeDL(params={ 'quiet': True, @@ -304,24 +352,6 @@ class DownloadQueue: return await self.add(entry['url'], quality, format, folder, custom_name_prefix, auto_start, already) return {'status': 'error', 'msg': f'Unsupported resource "{etype}"'} - async def __start_download(self, download): - await download.start(self.notifier) - if download.info.status != 'finished': - if download.tmpfilename and os.path.isfile(download.tmpfilename): - try: - os.remove(download.tmpfilename) - except: - pass - download.info.status = 'error' - download.close() - if self.queue.exists(download.info.url): - self.queue.delete(download.info.url) - if download.canceled: - await self.notifier.canceled(download.info.url) - else: - self.done.put(download) - await self.notifier.completed(download.info) - async def add(self, url, quality, format, folder, custom_name_prefix, auto_start=True, already=None): log.info(f'adding {url}: {quality=} {format=} {already=} {folder=} {custom_name_prefix=}') already = set() if already is None else already From e4fa9723ba65e1fcf732bd0364f6c3b8c2f5c33d Mon Sep 17 00:00:00 2001 From: evilmonkeydiaz Date: Sat, 17 Aug 2024 18:02:48 -0700 Subject: [PATCH 06/12] Update main.py --- app/main.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/main.py b/app/main.py index 77e7ecf..d3791e0 100644 --- a/app/main.py +++ b/app/main.py @@ -34,7 +34,7 @@ class Config: 'PORT': '8081', 'BASE_DIR': '', 'DEFAULT_THEME': 'auto', - 'DOWNLOAD_MODE': 'sequential', # Can be 'sequential', 'concurrent', or 'limited' + 'DOWNLOAD_MODE': 'concurrent', # Can be 'sequential', 'concurrent', or 'limited' 'MAX_CONCURRENT_DOWNLOADS': 3, # Used if DOWNLOAD_MODE is 'limited' } From ee48b3d59559606a8cdc7e2f2a8a95a6e7a04ba7 Mon Sep 17 00:00:00 2001 From: rdiaz738 Date: Sat, 1 Mar 2025 14:36:06 -0800 Subject: [PATCH 07/12] Update app.component.html --- ui/src/app/app.component.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ui/src/app/app.component.html b/ui/src/app/app.component.html index dc5e3bb..f401e89 100644 --- a/ui/src/app/app.component.html +++ b/ui/src/app/app.component.html @@ -168,7 +168,7 @@
Completed
- + From 11cb4a1d281b6cc661f66146ee88df8d599be8ad Mon Sep 17 00:00:00 2001 From: rdiaz738 Date: Sat, 1 Mar 2025 14:38:51 -0800 Subject: [PATCH 08/12] Update app.component.html --- ui/src/app/app.component.html | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ui/src/app/app.component.html b/ui/src/app/app.component.html index f401e89..dc5e3bb 100644 --- a/ui/src/app/app.component.html +++ b/ui/src/app/app.component.html @@ -168,7 +168,7 @@
Completed
- + From 8d70ed9d36edcbe086dfc1e07d71337dba5a6d45 Mon Sep 17 00:00:00 2001 From: rdiaz738 Date: Sat, 1 Mar 2025 17:43:48 -0800 Subject: [PATCH 09/12] Updated ui and backend Added Sequential, limited and concurrent downloading and import export buttons --- app/main.py | 2 +- app/ytdl.py | 63 +++++++++------ docker-compose.yml | 6 +- ui/src/app/app.component.html | 43 ++++++++++ ui/src/app/app.component.sass | 40 +++++++++- ui/src/app/app.component.ts | 135 ++++++++++++++++++++++++++++++++ ui/src/app/downloads.service.ts | 22 ++++++ 7 files changed, 282 insertions(+), 29 deletions(-) diff --git a/app/main.py b/app/main.py index 053c2d6..0b7554a 100644 --- a/app/main.py +++ b/app/main.py @@ -43,7 +43,7 @@ class Config: 'KEYFILE': '', 'BASE_DIR': '', 'DEFAULT_THEME': 'auto', - 'DOWNLOAD_MODE': 'concurrent', # Can be 'sequential', 'concurrent', or 'limited' + 'DOWNLOAD_MODE': 'limited', # Can be 'sequential', 'concurrent', or 'limited' 'MAX_CONCURRENT_DOWNLOADS': 3, # Used if DOWNLOAD_MODE is 'limited' } diff --git a/app/ytdl.py b/app/ytdl.py index 61d47cb..0000455 100644 --- a/app/ytdl.py +++ b/app/ytdl.py @@ -122,14 +122,20 @@ class Download: def cancel(self): log.info(f"Cancelling download: {self.info.title}") if self.running(): - self.proc.kill() + try: + self.proc.kill() + except Exception as e: + log.error(f"Error killing process for {self.info.title}: {e}") self.canceled = True + if self.status_queue is not None: + self.status_queue.put(None) def close(self): log.info(f"Closing download process for: {self.info.title}") if self.started(): self.proc.close() - self.status_queue.put(None) + if self.status_queue is not None: + self.status_queue.put(None) def running(self): try: @@ -146,12 +152,14 @@ class Download: if status is None: log.info(f"Status update finished for: {self.info.title}") return + if self.canceled: + log.info(f"Download {self.info.title} is canceled; stopping status updates.") + return self.tmpfilename = status.get('tmpfilename') if 'filename' in status: fileName = status.get('filename') self.info.filename = os.path.relpath(fileName, self.download_dir) self.info.size = os.path.getsize(fileName) if os.path.exists(fileName) else None - if self.info.format == 'thumbnail': self.info.filename = re.sub(r'\.webm$', '.jpg', self.info.filename) self.info.status = status['status'] @@ -199,9 +207,10 @@ class PersistentQueue: shelf[key] = value.info def delete(self, key): - del self.dict[key] - with shelve.open(self.path, 'w') as shelf: - shelf.pop(key) + if key in self.dict: + del self.dict[key] + with shelve.open(self.path, 'w') as shelf: + shelf.pop(key, None) def next(self): k, v = next(iter(self.dict.items())) @@ -219,8 +228,10 @@ class DownloadQueue: self.pending = PersistentQueue(self.config.STATE_DIR + '/pending') self.active_downloads = set() self.semaphore = None - - if self.config.DOWNLOAD_MODE == 'limited': + # For sequential mode, use an asyncio lock to ensure one-at-a-time execution. + if self.config.DOWNLOAD_MODE == 'sequential': + self.seq_lock = asyncio.Lock() + elif self.config.DOWNLOAD_MODE == 'limited': self.semaphore = asyncio.Semaphore(self.config.MAX_CONCURRENT_DOWNLOADS) self.done.load() @@ -234,18 +245,19 @@ class DownloadQueue: asyncio.create_task(self.__import_queue()) async def __start_download(self, download): + if download.canceled: + log.info(f"Download {download.info.title} was canceled, skipping start.") + return if self.config.DOWNLOAD_MODE == 'sequential': - await self.__sequential_download(download) + async with self.seq_lock: + log.info("Starting sequential download.") + await download.start(self.notifier) + self._post_download_cleanup(download) elif self.config.DOWNLOAD_MODE == 'limited' and self.semaphore is not None: await self.__limited_concurrent_download(download) - else: # concurrent without limit + else: await self.__concurrent_download(download) - async def __sequential_download(self, download): - log.info("Starting sequential download.") - await download.start(self.notifier) - self._post_download_cleanup(download) - async def __concurrent_download(self, download): log.info("Starting concurrent download without limits.") asyncio.create_task(self._run_download(download)) @@ -256,6 +268,9 @@ class DownloadQueue: await self._run_download(download) async def _run_download(self, download): + if download.canceled: + log.info(f"Download {download.info.title} is canceled; skipping start.") + return await download.start(self.notifier) self._post_download_cleanup(download) @@ -332,7 +347,7 @@ class DownloadQueue: log.info(f'Playlist item limit is set. Processing only first {playlist_item_limit} entries') entries = entries[:playlist_item_limit] for index, etr in enumerate(entries, start=1): - etr["_type"] = "video" # Prevents video to be treated as url and lose below properties during processing + etr["_type"] = "video" etr["playlist"] = entry["id"] etr["playlist_index"] = '{{0:0{0:d}d}}'.format(playlist_index_digits).format(index) for property in ("id", "title", "uploader", "uploader_id"): @@ -342,10 +357,11 @@ class DownloadQueue: if any(res['status'] == 'error' for res in results): return {'status': 'error', 'msg': ', '.join(res['msg'] for res in results if res['status'] == 'error' and 'msg' in res)} return {'status': 'ok'} - elif etype == 'video' or etype.startswith('url') and 'id' in entry and 'title' in entry: + elif etype == 'video' or (etype.startswith('url') and 'id' in entry and 'title' in entry): log.debug('Processing as a video') - if not self.queue.exists(entry['id']): - dl = DownloadInfo(entry['id'], entry.get('title') or entry['id'], entry.get('webpage_url') or entry['url'], quality, format, folder, custom_name_prefix, error) + key = entry.get('webpage_url') or entry['url'] + if not self.queue.exists(key): + dl = DownloadInfo(entry['id'], entry.get('title') or entry['id'], key, quality, format, folder, custom_name_prefix, error) dldirectory, error_message = self.__calc_download_path(quality, format, folder) if error_message is not None: return error_message @@ -354,21 +370,20 @@ class DownloadQueue: if 'playlist' in entry and entry['playlist'] is not None: if len(self.config.OUTPUT_TEMPLATE_PLAYLIST): output = self.config.OUTPUT_TEMPLATE_PLAYLIST - for property, value in entry.items(): if property.startswith("playlist"): output = output.replace(f"%({property})s", str(value)) - ytdl_options = dict(self.config.YTDL_OPTIONS) - if playlist_item_limit > 0: log.info(f'playlist limit is set. Processing only first {playlist_item_limit} entries') ytdl_options['playlistend'] = playlist_item_limit - if auto_start is True: download = Download(dldirectory, self.config.TEMP_DIR, output, output_chapter, quality, format, ytdl_options, dl) self.queue.put(download) - asyncio.create_task(self.__start_download(download)) + if self.config.DOWNLOAD_MODE == 'sequential': + asyncio.create_task(self.__start_download(download)) + else: + asyncio.create_task(self.__start_download(download)) else: self.pending.put(Download(dldirectory, self.config.TEMP_DIR, output, output_chapter, quality, format, ytdl_options, dl)) await self.notifier.added(dl) diff --git a/docker-compose.yml b/docker-compose.yml index bb5f68c..7a98088 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -1,10 +1,10 @@ version: "3" services: metube: - image: ghcr.io/alexta69/metube - container_name: metube + image: notataco/metube:latest + container_name: metube-notataco restart: unless-stopped ports: - "8081:8081" volumes: - - c:/Users/Roger/Downloads:/downloads \ No newline at end of file + - C:/Users/ilike/Downloads:/downloads diff --git a/ui/src/app/app.component.html b/ui/src/app/app.component.html index dc5e3bb..7762a70 100644 --- a/ui/src/app/app.component.html +++ b/ui/src/app/app.component.html @@ -119,6 +119,49 @@
+
+ + + + + + +
+ + + + -
+
+
+ + + +
-
- - - - - - -