diff --git a/app/main.py b/app/main.py index c00231f..7e1fefc 100644 --- a/app/main.py +++ b/app/main.py @@ -3,7 +3,6 @@ import os import sys -import asyncio from aiohttp import web import socketio import logging @@ -16,7 +15,7 @@ log = logging.getLogger('main') class Config: _DEFAULTS = { - 'DOWNLOAD_DIR': 'C:/Users/Roger/Desktop/MeTube', + 'DOWNLOAD_DIR': '.', 'AUDIO_DOWNLOAD_DIR': '%%DOWNLOAD_DIR', 'TEMP_DIR': '%%DOWNLOAD_DIR', 'DOWNLOAD_DIRS_INDEXABLE': 'false', @@ -41,10 +40,10 @@ class Config: def __init__(self): for k, v in self._DEFAULTS.items(): - setattr(self, k, os.environ.get(k, v)) + setattr(self, k, os.environ[k] if k in os.environ else v) for k, v in self.__dict__.items(): - if isinstance(v, str) and v.startswith('%%'): + if v.startswith('%%'): setattr(self, k, getattr(self, v[2:])) if k in self._BOOLEAN: if v not in ('true', 'false', 'True', 'False', 'on', 'off', '1', '0'): @@ -86,141 +85,115 @@ class ObjectSerializer(json.JSONEncoder): return json.JSONEncoder.default(self, obj) serializer = ObjectSerializer() +app = web.Application() +sio = socketio.AsyncServer(cors_allowed_origins='*') +sio.attach(app, socketio_path=config.URL_PREFIX + 'socket.io') +routes = web.RouteTableDef() class Notifier(DownloadQueueNotifier): async def added(self, dl): + log.info(f"Notifier: Download added - {dl.title}") await sio.emit('added', serializer.encode(dl)) async def updated(self, dl): + log.info(f"Notifier: Download updated - {dl.title}") await sio.emit('updated', serializer.encode(dl)) async def completed(self, dl): + log.info(f"Notifier: Download completed - {dl.title}") await sio.emit('completed', serializer.encode(dl)) async def canceled(self, id): + log.info(f"Notifier: Download canceled - {id}") await sio.emit('canceled', serializer.encode(id)) async def cleared(self, id): + log.info(f"Notifier: Download cleared - {id}") await sio.emit('cleared', serializer.encode(id)) -async def init_app(): - app = web.Application() - global sio - sio = socketio.AsyncServer(cors_allowed_origins='*') - sio.attach(app, socketio_path=config.URL_PREFIX + 'socket.io') - routes = web.RouteTableDef() +dqueue = DownloadQueue(config, Notifier()) +app.on_startup.append(lambda app: dqueue.initialize()) - @routes.post(config.URL_PREFIX + 'add') - async def add(request): - post = await request.json() - url = post.get('url') - quality = post.get('quality') - if not url or not quality: - raise web.HTTPBadRequest() - format = post.get('format') - folder = post.get('folder') - custom_name_prefix = post.get('custom_name_prefix') - auto_start = post.get('auto_start') - if custom_name_prefix is None: - custom_name_prefix = '' - if auto_start is None: - auto_start = True - status = await request.app['dqueue'].add(url, quality, format, folder, custom_name_prefix, auto_start) - return web.Response(text=serializer.encode(status)) +@routes.post(config.URL_PREFIX + 'add') +async def add(request): + log.info("Received request to add download") + post = await request.json() + log.info(f"Request data: {post}") + url = post.get('url') + quality = post.get('quality') + if not url or not quality: + log.error("Bad request: missing 'url' or 'quality'") + raise web.HTTPBadRequest() + format = post.get('format') + folder = post.get('folder') + custom_name_prefix = post.get('custom_name_prefix') + auto_start = post.get('auto_start') + if custom_name_prefix is None: + custom_name_prefix = '' + if auto_start is None: + auto_start = True + status = await dqueue.add(url, quality, format, folder, custom_name_prefix, auto_start) + log.info(f"Download added to queue: {url}") + return web.Response(text=serializer.encode(status)) - @routes.post(config.URL_PREFIX + 'delete') - async def delete(request): - post = await request.json() - ids = post.get('ids') - where = post.get('where') - if not ids or where not in ['queue', 'done']: - raise web.HTTPBadRequest() - status = await (request.app['dqueue'].cancel(ids) if where == 'queue' else request.app['dqueue'].clear(ids)) - return web.Response(text=serializer.encode(status)) +@routes.post(config.URL_PREFIX + 'delete') +async def delete(request): + post = await request.json() + ids = post.get('ids') + where = post.get('where') + if not ids or where not in ['queue', 'done']: + log.error("Bad request: missing 'ids' or incorrect 'where' value") + raise web.HTTPBadRequest() + status = await (dqueue.cancel(ids) if where == 'queue' else dqueue.clear(ids)) + log.info(f"Download delete request processed for ids: {ids}, where: {where}") + return web.Response(text=serializer.encode(status)) - @routes.post(config.URL_PREFIX + 'start') - async def start(request): - post = await request.json() - ids = post.get('ids') - status = await request.app['dqueue'].start_pending(ids) - return web.Response(text=serializer.encode(status)) +@routes.post(config.URL_PREFIX + 'start') +async def start(request): + post = await request.json() + ids = post.get('ids') + log.info(f"Received request to start pending downloads for ids: {ids}") + status = await dqueue.start_pending(ids) + return web.Response(text=serializer.encode(status)) - @routes.get(config.URL_PREFIX + 'history') - async def history(request): - history = { 'done': [], 'queue': []} +@routes.get(config.URL_PREFIX + 'history') +async def history(request): + history = { 'done': [], 'queue': []} - for _ ,v in request.app['dqueue'].queue.saved_items(): - history['queue'].append(v) - for _ ,v in request.app['dqueue'].done.saved_items(): - history['done'].append(v) + for _ ,v in dqueue.queue.saved_items(): + history['queue'].append(v) + for _ ,v in dqueue.done.saved_items(): + history['done'].append(v) - return web.Response(text=serializer.encode(history)) + log.info("Sending download history") + return web.Response(text=serializer.encode(history)) - @sio.event - async def connect(sid, environ): - await sio.emit('all', serializer.encode(request.app['dqueue'].get()), to=sid) - await sio.emit('configuration', serializer.encode(config), to=sid) - if config.CUSTOM_DIRS: - await sio.emit('custom_dirs', serializer.encode(get_custom_dirs()), to=sid) - - @routes.get(config.URL_PREFIX) - def index(request): - response = web.FileResponse(os.path.join(config.BASE_DIR, 'ui/dist/metube/index.html')) - if 'metube_theme' not in request.cookies: - response.set_cookie('metube_theme', config.DEFAULT_THEME) - return response - - if config.URL_PREFIX != '/': - @routes.get('/') - def index_redirect_root(request): - return web.HTTPFound(config.URL_PREFIX) - - @routes.get(config.URL_PREFIX[:-1]) - def index_redirect_dir(request): - return web.HTTPFound(config.URL_PREFIX) - - routes.static(config.URL_PREFIX + 'download/', config.DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE) - routes.static(config.URL_PREFIX + 'audio_download/', config.AUDIO_DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE) - routes.static(config.URL_PREFIX, os.path.join(config.BASE_DIR, 'ui/dist/metube')) - - try: - app.add_routes(routes) - except ValueError as e: - if 'ui/dist/metube' in str(e): - raise RuntimeError('Could not find the frontend UI static assets. Please run `node_modules/.bin/ng build` inside the ui folder') from e - raise e - - async def add_cors(request): - return web.Response(text=serializer.encode({"status": "ok"})) - - app.router.add_route('OPTIONS', config.URL_PREFIX + 'add', add_cors) - - async def on_prepare(request, response): - if 'Origin' in request.headers: - response.headers['Access-Control-Allow-Origin'] = request.headers['Origin'] - response.headers['Access-Control-Allow-Headers'] = 'Content-Type' - - app.on_response_prepare.append(on_prepare) - - dqueue = DownloadQueue(config, Notifier()) - await dqueue.initialize() - app['dqueue'] = dqueue - - return app +@sio.event +async def connect(sid, environ): + log.info(f"Client connected: {sid}") + await sio.emit('all', serializer.encode(dqueue.get()), to=sid) + await sio.emit('configuration', serializer.encode(config), to=sid) + if config.CUSTOM_DIRS: + await sio.emit('custom_dirs', serializer.encode(get_custom_dirs()), to=sid) def get_custom_dirs(): def recursive_dirs(base): path = pathlib.Path(base) + # Converts PosixPath object to string, and remove base/ prefix def convert(p): s = str(p) if s.startswith(base): s = s[len(base):] + if s.startswith('/'): s = s[1:] + return s + # Recursively lists all subdirectories of DOWNLOAD_DIR dirs = list(filter(None, map(convert, path.glob('**')))) + return dirs download_dir = recursive_dirs(config.DOWNLOAD_DIR) @@ -234,23 +207,49 @@ def get_custom_dirs(): "audio_download_dir": audio_download_dir } -async def start_background_tasks(app): - app['dqueue_task'] = asyncio.create_task(app['dqueue'].run()) +@routes.get(config.URL_PREFIX) +def index(request): + response = web.FileResponse(os.path.join(config.BASE_DIR, 'ui/dist/metube/index.html')) + if 'metube_theme' not in request.cookies: + response.set_cookie('metube_theme', config.DEFAULT_THEME) + return response -async def cleanup_background_tasks(app): - app['dqueue_task'].cancel() - await app['dqueue_task'] +if config.URL_PREFIX != '/': + @routes.get('/') + def index_redirect_root(request): + return web.HTTPFound(config.URL_PREFIX) -def main(): + @routes.get(config.URL_PREFIX[:-1]) + def index_redirect_dir(request): + return web.HTTPFound(config.URL_PREFIX) + +routes.static(config.URL_PREFIX + 'download/', config.DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE) +routes.static(config.URL_PREFIX + 'audio_download/', config.AUDIO_DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE) +routes.static(config.URL_PREFIX, os.path.join(config.BASE_DIR, 'ui/dist/metube')) +try: + app.add_routes(routes) +except ValueError as e: + if 'ui/dist/metube' in str(e): + raise RuntimeError('Could not find the frontend UI static assets. Please run `node_modules/.bin/ng build` inside the ui folder') from e + raise e + +# https://github.com/aio-libs/aiohttp/pull/4615 waiting for release +# @routes.options(config.URL_PREFIX + 'add') +async def add_cors(request): + return web.Response(text=serializer.encode({"status": "ok"})) + +app.router.add_route('OPTIONS', config.URL_PREFIX + 'add', add_cors) + +async def on_prepare(request, response): + if 'Origin' in request.headers: + response.headers['Access-Control-Allow-Origin'] = request.headers['Origin'] + response.headers['Access-Control-Allow-Headers'] = 'Content-Type' + +app.on_response_prepare.append(on_prepare) + +if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) - log.info(f"Initializing application on {config.HOST}:{config.PORT}") - - loop = asyncio.get_event_loop() - app = loop.run_until_complete(init_app()) - - app.on_startup.append(start_background_tasks) - app.on_cleanup.append(cleanup_background_tasks) - + log.info(f"Listening on {config.HOST}:{config.PORT}") try: if sys.platform.startswith('win'): web.run_app(app, host=config.HOST, port=int(config.PORT)) @@ -258,9 +257,4 @@ def main(): web.run_app(app, host=config.HOST, port=int(config.PORT), reuse_port=True) except Exception as e: log.error(f"Failed to start the server: {str(e)}") - sys.exit(1) - -if __name__ == '__main__': - if sys.platform.startswith('win'): - asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) - main() \ No newline at end of file + sys.exit(1) \ No newline at end of file diff --git a/app/ytdl.py b/app/ytdl.py index fe8eba4..16133ec 100644 --- a/app/ytdl.py +++ b/app/ytdl.py @@ -4,6 +4,7 @@ from collections import OrderedDict import shelve import time import asyncio +import multiprocessing import logging import re from dl_formats import get_format, get_opts, AUDIO_FORMATS @@ -43,6 +44,8 @@ class DownloadInfo: self.error = error class Download: + manager = None + def __init__(self, download_dir, temp_dir, output_template, output_template_chapter, quality, format, ytdl_opts, info): self.download_dir = download_dir self.temp_dir = temp_dir @@ -53,48 +56,113 @@ class Download: self.info = info self.canceled = False self.tmpfilename = None - - async def start(self, notifier): - self.info.status = 'preparing' - await notifier.updated(self.info) - - try: - result = await asyncio.get_event_loop().run_in_executor(None, self._download) - if result['status'] == 'finished': - self.info.status = 'finished' - self.info.filename = result.get('filename') - self.info.size = os.path.getsize(result['filename']) if os.path.exists(result['filename']) else None - else: - self.info.status = 'error' - self.info.msg = result.get('msg', 'Unknown error occurred') - except Exception as e: - self.info.status = 'error' - self.info.msg = str(e) - - await notifier.updated(self.info) + self.status_queue = None + self.proc = None + self.loop = None + self.notifier = None def _download(self): - ydl_opts = { - 'quiet': True, - 'no_color': True, - 'paths': {"home": self.download_dir, "temp": self.temp_dir}, - 'outtmpl': {"default": self.output_template, "chapter": self.output_template_chapter}, - 'format': self.format, - 'socket_timeout': 30, - 'ignore_no_formats_error': True, - **self.ytdl_opts, - } + log.info(f"Starting download for: {self.info.title} ({self.info.url})") + try: + def put_status(st): + self.status_queue.put({k: v for k, v in st.items() if k in ( + 'tmpfilename', + 'filename', + 'status', + 'msg', + 'total_bytes', + 'total_bytes_estimate', + 'downloaded_bytes', + 'speed', + 'eta', + )}) - with yt_dlp.YoutubeDL(ydl_opts) as ydl: - try: - info = ydl.extract_info(self.info.url, download=True) - return {'status': 'finished', 'filename': ydl.prepare_filename(info)} - except yt_dlp.utils.DownloadError as e: - return {'status': 'error', 'msg': str(e)} + def put_status_postprocessor(d): + if d['postprocessor'] == 'MoveFiles' and d['status'] == 'finished': + if '__finaldir' in d['info_dict']: + filename = os.path.join(d['info_dict']['__finaldir'], os.path.basename(d['info_dict']['filepath'])) + else: + filename = d['info_dict']['filepath'] + self.status_queue.put({'status': 'finished', 'filename': filename}) + + ret = yt_dlp.YoutubeDL(params={ + 'quiet': True, + 'no_color': True, + 'paths': {"home": self.download_dir, "temp": self.temp_dir}, + 'outtmpl': { "default": self.output_template, "chapter": self.output_template_chapter }, + 'format': self.format, + 'socket_timeout': 30, + 'ignore_no_formats_error': True, + 'progress_hooks': [put_status], + 'postprocessor_hooks': [put_status_postprocessor], + **self.ytdl_opts, + }).download([self.info.url]) + self.status_queue.put({'status': 'finished' if ret == 0 else 'error'}) + log.info(f"Finished download for: {self.info.title}") + except yt_dlp.utils.YoutubeDLError as exc: + log.error(f"Download error for {self.info.title}: {str(exc)}") + self.status_queue.put({'status': 'error', 'msg': str(exc)}) + + async def start(self, notifier): + log.info(f"Preparing download for: {self.info.title}") + if Download.manager is None: + Download.manager = multiprocessing.Manager() + self.status_queue = Download.manager.Queue() + self.proc = multiprocessing.Process(target=self._download) + self.proc.start() + self.loop = asyncio.get_running_loop() + self.notifier = notifier + self.info.status = 'preparing' + await self.notifier.updated(self.info) + asyncio.create_task(self.update_status()) + return await self.loop.run_in_executor(None, self.proc.join) def cancel(self): + log.info(f"Cancelling download: {self.info.title}") + if self.running(): + self.proc.kill() self.canceled = True + def close(self): + log.info(f"Closing download process for: {self.info.title}") + if self.started(): + self.proc.close() + self.status_queue.put(None) + + def running(self): + try: + return self.proc is not None and self.proc.is_alive() + except ValueError: + return False + + def started(self): + return self.proc is not None + + async def update_status(self): + while True: + status = await self.loop.run_in_executor(None, self.status_queue.get) + if status is None: + log.info(f"Status update finished for: {self.info.title}") + return + self.tmpfilename = status.get('tmpfilename') + if 'filename' in status: + fileName = status.get('filename') + self.info.filename = os.path.relpath(fileName, self.download_dir) + self.info.size = os.path.getsize(fileName) if os.path.exists(fileName) else None + + if self.info.format == 'thumbnail': + self.info.filename = re.sub(r'\.webm$', '.jpg', self.info.filename) + self.info.status = status['status'] + self.info.msg = status.get('msg') + if 'downloaded_bytes' in status: + total = status.get('total_bytes') or status.get('total_bytes_estimate') + if total: + self.info.percent = status['downloaded_bytes'] / total * 100 + self.info.speed = status.get('speed') + self.info.eta = status.get('eta') + log.info(f"Updating status for {self.info.title}: {status}") + await self.notifier.updated(self.info) + class PersistentQueue: def __init__(self, path): pdir = os.path.dirname(path) @@ -147,26 +215,17 @@ class DownloadQueue: self.queue = PersistentQueue(self.config.STATE_DIR + '/queue') self.done = PersistentQueue(self.config.STATE_DIR + '/completed') self.pending = PersistentQueue(self.config.STATE_DIR + '/pending') - self.done.load() self.active_downloads = set() - self.max_concurrent_downloads = 3 # Adjust this value as needed - self.event = asyncio.Event() - - async def initialize(self): - await self.__import_queue() - - async def run(self): - while True: - try: - await self.__manage_downloads() - except Exception as e: - log.error(f"Error in download queue: {str(e)}") - await asyncio.sleep(5) # Wait a bit before retrying + self.done.load() async def __import_queue(self): for k, v in self.queue.saved_items(): await self.add(v.url, v.quality, v.format, v.folder, v.custom_name_prefix) + async def initialize(self): + log.info("Initializing DownloadQueue") + asyncio.create_task(self.__import_queue()) + def __extract_info(self, url): return yt_dlp.YoutubeDL(params={ 'quiet': True, @@ -234,8 +293,9 @@ class DownloadQueue: if property.startswith("playlist"): output = output.replace(f"%({property})s", str(value)) if auto_start is True: - self.queue.put(Download(dldirectory, self.config.TEMP_DIR, output, output_chapter, quality, format, self.config.YTDL_OPTIONS, dl)) - self.event.set() + download = Download(dldirectory, self.config.TEMP_DIR, output, output_chapter, quality, format, self.config.YTDL_OPTIONS, dl) + self.queue.put(download) + asyncio.create_task(self.__start_download(download)) else: self.pending.put(Download(dldirectory, self.config.TEMP_DIR, output, output_chapter, quality, format, self.config.YTDL_OPTIONS, dl)) await self.notifier.added(dl) @@ -244,6 +304,24 @@ class DownloadQueue: return await self.add(entry['url'], quality, format, folder, custom_name_prefix, auto_start, already) return {'status': 'error', 'msg': f'Unsupported resource "{etype}"'} + async def __start_download(self, download): + await download.start(self.notifier) + if download.info.status != 'finished': + if download.tmpfilename and os.path.isfile(download.tmpfilename): + try: + os.remove(download.tmpfilename) + except: + pass + download.info.status = 'error' + download.close() + if self.queue.exists(download.info.url): + self.queue.delete(download.info.url) + if download.canceled: + await self.notifier.canceled(download.info.url) + else: + self.done.put(download) + await self.notifier.completed(download.info) + async def add(self, url, quality, format, folder, custom_name_prefix, auto_start=True, already=None): log.info(f'adding {url}: {quality=} {format=} {already=} {folder=} {custom_name_prefix=}') already = set() if already is None else already @@ -253,15 +331,10 @@ class DownloadQueue: else: already.add(url) try: - entry = await asyncio.get_event_loop().run_in_executor(None, self.__extract_info, url) + entry = await asyncio.get_running_loop().run_in_executor(None, self.__extract_info, url) except yt_dlp.utils.YoutubeDLError as exc: return {'status': 'error', 'msg': str(exc)} - result = await self.__add_entry(entry, quality, format, folder, custom_name_prefix, auto_start, already) - - if result['status'] == 'ok' and auto_start: - self.event.set() # Signal that new items are available for download - - return result + return await self.__add_entry(entry, quality, format, folder, custom_name_prefix, auto_start, already) async def start_pending(self, ids): for id in ids: @@ -271,7 +344,7 @@ class DownloadQueue: dl = self.pending.get(id) self.queue.put(dl) self.pending.delete(id) - self.event.set() + asyncio.create_task(self.__start_download(dl)) return {'status': 'ok'} async def cancel(self, ids): @@ -283,11 +356,11 @@ class DownloadQueue: if not self.queue.exists(id): log.warn(f'requested cancel for non-existent download {id}') continue - dl = self.queue.get(id) - if isinstance(dl, Download): - dl.cancel() - self.queue.delete(id) - await self.notifier.canceled(id) + if self.queue.get(id).started(): + self.queue.get(id).cancel() + else: + self.queue.delete(id) + await self.notifier.canceled(id) return {'status': 'ok'} async def clear(self, ids): @@ -307,34 +380,6 @@ class DownloadQueue: return {'status': 'ok'} def get(self): - return(list((k, v.info) for k, v in self.queue.items()) + list((k, v.info) for k, v in self.pending.items()), - list((k, v.info) for k, v in self.done.items())) - - async def __manage_downloads(self): - while True: - while not self.queue.empty() and len(self.active_downloads) < self.max_concurrent_downloads: - id, entry = self.queue.next() - if id not in self.active_downloads: - self.active_downloads.add(id) - asyncio.create_task(self.__download(id, entry)) - await asyncio.sleep(1) # Add a small delay to prevent busy waiting - - async def __download(self, id, entry): - try: - log.info(f'downloading {entry.info.title}') - await entry.start(self.notifier) - if entry.info.status != 'finished': - if entry.tmpfilename and os.path.isfile(entry.tmpfilename): - try: - os.remove(entry.tmpfilename) - except: - pass - if self.queue.exists(id): - self.queue.delete(id) - if entry.canceled: - await self.notifier.canceled(id) - else: - self.done.put(entry) - await self.notifier.completed(entry.info) - finally: - self.active_downloads.remove(id) \ No newline at end of file + return (list((k, v.info) for k, v in self.queue.items()) + + list((k, v.info) for k, v in self.pending.items()), + list((k, v.info) for k, v in self.done.items()))