diff --git a/README.md b/README.md index abde8f0..edc49a3 100644 --- a/README.md +++ b/README.md @@ -61,6 +61,13 @@ Certain values can be set via environment variables, using the `-e` parameter on * __YTDL_OPTIONS__: Additional options to pass to youtube-dl, in JSON format. [See available options here](https://github.com/yt-dlp/yt-dlp/blob/master/yt_dlp/YoutubeDL.py#L183). They roughly correspond to command-line options, though some do not have exact equivalents here, for example `--recode-video` has to be specified via `postprocessors`. Also note that dashes are replaced with underscores. * __YTDL_OPTIONS_FILE__: A path to a JSON file that will be loaded and used for populating `YTDL_OPTIONS` above. Please note that if both `YTDL_OPTIONS_FILE` and `YTDL_OPTIONS` are specified, the options in `YTDL_OPTIONS` take precedence. * __ROBOTS_TXT__: A path to a `robots.txt` file mounted in the container +* __DOWNLOAD_MODE__ :This flag controls how downloads are scheduled and executed. Options are `sequential`, `concurrent`, and `limited`. Defaults to `limited`: + * `sequential`: Downloads are processed one at a time. A new download won’t start until the previous one has finished. This mode is useful for conserving system resources or ensuring downloads occur in a strict order. + * `concurrent`: Downloads are started immediately as they are added, with no built-in limit on how many run simultaneously. This mode may overwhelm your system if too many downloads start at once. + * `limited`: Downloads are started concurrently but are capped by a concurrency limit. In this mode, a semaphore is used so that at most a fixed number of downloads run at any given time. +* **MAX\_CONCURRENT\_DOWNLOADS** This flag is used only when **DOWNLOAD\_MODE** is set to **limited**. + It specifies the maximum number of simultaneous downloads allowed. For example, if set to `5`, then at most five downloads will run concurrently, and any additional downloads will wait until one of the active downloads completes. Defaults to `3`. + The following example value for `YTDL_OPTIONS` embeds English subtitles and chapter markers (for videos that have them), and also changes the permissions on the downloaded video and sets the file modification timestamp to the date of when it was downloaded: diff --git a/app/main.py b/app/main.py index ae17291..98ca411 100644 --- a/app/main.py +++ b/app/main.py @@ -43,17 +43,19 @@ class Config: 'CERTFILE': '', 'KEYFILE': '', 'BASE_DIR': '', - 'DEFAULT_THEME': 'auto' + 'DEFAULT_THEME': 'auto', + 'DOWNLOAD_MODE': 'limited', + 'MAX_CONCURRENT_DOWNLOADS': 3, } _BOOLEAN = ('DOWNLOAD_DIRS_INDEXABLE', 'CUSTOM_DIRS', 'CREATE_CUSTOM_DIRS', 'DELETE_FILE_ON_TRASHCAN', 'DEFAULT_OPTION_PLAYLIST_STRICT_MODE', 'HTTPS') def __init__(self): for k, v in self._DEFAULTS.items(): - setattr(self, k, os.environ[k] if k in os.environ else v) + setattr(self, k, os.environ.get(k, v)) for k, v in self.__dict__.items(): - if v.startswith('%%'): + if isinstance(v, str) and v.startswith('%%'): setattr(self, k, getattr(self, v[2:])) if k in self._BOOLEAN: if v not in ('true', 'false', 'True', 'False', 'on', 'off', '1', '0'): @@ -102,18 +104,23 @@ routes = web.RouteTableDef() class Notifier(DownloadQueueNotifier): async def added(self, dl): + log.info(f"Notifier: Download added - {dl.title}") await sio.emit('added', serializer.encode(dl)) async def updated(self, dl): + log.info(f"Notifier: Download updated - {dl.title}") await sio.emit('updated', serializer.encode(dl)) async def completed(self, dl): + log.info(f"Notifier: Download completed - {dl.title}") await sio.emit('completed', serializer.encode(dl)) async def canceled(self, id): + log.info(f"Notifier: Download canceled - {id}") await sio.emit('canceled', serializer.encode(id)) async def cleared(self, id): + log.info(f"Notifier: Download cleared - {id}") await sio.emit('cleared', serializer.encode(id)) dqueue = DownloadQueue(config, Notifier()) @@ -121,10 +128,13 @@ app.on_startup.append(lambda app: dqueue.initialize()) @routes.post(config.URL_PREFIX + 'add') async def add(request): + log.info("Received request to add download") post = await request.json() + log.info(f"Request data: {post}") url = post.get('url') quality = post.get('quality') if not url or not quality: + log.error("Bad request: missing 'url' or 'quality'") raise web.HTTPBadRequest() format = post.get('format') folder = post.get('folder') @@ -153,14 +163,17 @@ async def delete(request): ids = post.get('ids') where = post.get('where') if not ids or where not in ['queue', 'done']: + log.error("Bad request: missing 'ids' or incorrect 'where' value") raise web.HTTPBadRequest() status = await (dqueue.cancel(ids) if where == 'queue' else dqueue.clear(ids)) + log.info(f"Download delete request processed for ids: {ids}, where: {where}") return web.Response(text=serializer.encode(status)) @routes.post(config.URL_PREFIX + 'start') async def start(request): post = await request.json() ids = post.get('ids') + log.info(f"Received request to start pending downloads for ids: {ids}") status = await dqueue.start_pending(ids) return web.Response(text=serializer.encode(status)) @@ -168,17 +181,19 @@ async def start(request): async def history(request): history = { 'done': [], 'queue': [], 'pending': []} - for _ ,v in dqueue.queue.saved_items(): + for _, v in dqueue.queue.saved_items(): history['queue'].append(v) - for _ ,v in dqueue.done.saved_items(): + for _, v in dqueue.done.saved_items(): history['done'].append(v) - for _ ,v in dqueue.pending.saved_items(): + for _, v in dqueue.pending.saved_items(): history['pending'].append(v) + log.info("Sending download history") return web.Response(text=serializer.encode(history)) @sio.event async def connect(sid, environ): + log.info(f"Client connected: {sid}") await sio.emit('all', serializer.encode(dqueue.get()), to=sid) await sio.emit('configuration', serializer.encode(config), to=sid) if config.CUSTOM_DIRS: @@ -262,14 +277,13 @@ async def add_cors(request): app.router.add_route('OPTIONS', config.URL_PREFIX + 'add', add_cors) - async def on_prepare(request, response): if 'Origin' in request.headers: response.headers['Access-Control-Allow-Origin'] = request.headers['Origin'] response.headers['Access-Control-Allow-Headers'] = 'Content-Type' app.on_response_prepare.append(on_prepare) - + def supports_reuse_port(): try: sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) diff --git a/app/ytdl.py b/app/ytdl.py index 3434358..b8fd72e 100644 --- a/app/ytdl.py +++ b/app/ytdl.py @@ -63,8 +63,8 @@ class Download: self.loop = None self.notifier = None - def _download(self): + log.info(f"Starting download for: {self.info.title} ({self.info.url})") try: def put_status(st): self.status_queue.put({k: v for k, v in st.items() if k in ( @@ -78,6 +78,7 @@ class Download: 'speed', 'eta', )}) + def put_status_postprocessor(d): if d['postprocessor'] == 'MoveFiles' and d['status'] == 'finished': if '__finaldir' in d['info_dict']: @@ -85,10 +86,10 @@ class Download: else: filename = d['info_dict']['filepath'] self.status_queue.put({'status': 'finished', 'filename': filename}) + ret = yt_dlp.YoutubeDL(params={ 'quiet': True, 'no_color': True, - #'skip_download': True, 'paths': {"home": self.download_dir, "temp": self.temp_dir}, 'outtmpl': { "default": self.output_template, "chapter": self.output_template_chapter }, 'format': self.format, @@ -99,10 +100,13 @@ class Download: **self.ytdl_opts, }).download([self.info.url]) self.status_queue.put({'status': 'finished' if ret == 0 else 'error'}) + log.info(f"Finished download for: {self.info.title}") except yt_dlp.utils.YoutubeDLError as exc: + log.error(f"Download error for {self.info.title}: {str(exc)}") self.status_queue.put({'status': 'error', 'msg': str(exc)}) async def start(self, notifier): + log.info(f"Preparing download for: {self.info.title}") if Download.manager is None: Download.manager = multiprocessing.Manager() self.status_queue = Download.manager.Queue() @@ -116,14 +120,22 @@ class Download: return await self.loop.run_in_executor(None, self.proc.join) def cancel(self): + log.info(f"Cancelling download: {self.info.title}") if self.running(): - self.proc.kill() + try: + self.proc.kill() + except Exception as e: + log.error(f"Error killing process for {self.info.title}: {e}") self.canceled = True + if self.status_queue is not None: + self.status_queue.put(None) def close(self): + log.info(f"Closing download process for: {self.info.title}") if self.started(): self.proc.close() - self.status_queue.put(None) + if self.status_queue is not None: + self.status_queue.put(None) def running(self): try: @@ -138,15 +150,17 @@ class Download: while True: status = await self.loop.run_in_executor(None, self.status_queue.get) if status is None: + log.info(f"Status update finished for: {self.info.title}") + return + if self.canceled: + log.info(f"Download {self.info.title} is canceled; stopping status updates.") return self.tmpfilename = status.get('tmpfilename') if 'filename' in status: fileName = status.get('filename') self.info.filename = os.path.relpath(fileName, self.download_dir) self.info.size = os.path.getsize(fileName) if os.path.exists(fileName) else None - - # Set correct file extension for thumbnails - if(self.info.format == 'thumbnail'): + if self.info.format == 'thumbnail': self.info.filename = re.sub(r'\.webm$', '.jpg', self.info.filename) self.info.status = status['status'] self.info.msg = status.get('msg') @@ -156,6 +170,7 @@ class Download: self.info.percent = status['downloaded_bytes'] / total * 100 self.info.speed = status.get('speed') self.info.eta = status.get('eta') + log.info(f"Updating status for {self.info.title}: {status}") await self.notifier.updated(self.info) class PersistentQueue: @@ -192,9 +207,10 @@ class PersistentQueue: shelf[key] = value.info def delete(self, key): - del self.dict[key] - with shelve.open(self.path, 'w') as shelf: - shelf.pop(key) + if key in self.dict: + del self.dict[key] + with shelve.open(self.path, 'w') as shelf: + shelf.pop(key, None) def next(self): k, v = next(iter(self.dict.items())) @@ -210,6 +226,14 @@ class DownloadQueue: self.queue = PersistentQueue(self.config.STATE_DIR + '/queue') self.done = PersistentQueue(self.config.STATE_DIR + '/completed') self.pending = PersistentQueue(self.config.STATE_DIR + '/pending') + self.active_downloads = set() + self.semaphore = None + # For sequential mode, use an asyncio lock to ensure one-at-a-time execution. + if self.config.DOWNLOAD_MODE == 'sequential': + self.seq_lock = asyncio.Lock() + elif self.config.DOWNLOAD_MODE == 'limited': + self.semaphore = asyncio.Semaphore(self.config.MAX_CONCURRENT_DOWNLOADS) + self.done.load() async def __import_queue(self): @@ -217,10 +241,56 @@ class DownloadQueue: await self.add(v.url, v.quality, v.format, v.folder, v.custom_name_prefix, v.playlist_strict_mode, v.playlist_item_limit) async def initialize(self): - self.event = asyncio.Event() - asyncio.create_task(self.__download()) + log.info("Initializing DownloadQueue") asyncio.create_task(self.__import_queue()) + async def __start_download(self, download): + if download.canceled: + log.info(f"Download {download.info.title} was canceled, skipping start.") + return + if self.config.DOWNLOAD_MODE == 'sequential': + async with self.seq_lock: + log.info("Starting sequential download.") + await download.start(self.notifier) + self._post_download_cleanup(download) + elif self.config.DOWNLOAD_MODE == 'limited' and self.semaphore is not None: + await self.__limited_concurrent_download(download) + else: + await self.__concurrent_download(download) + + async def __concurrent_download(self, download): + log.info("Starting concurrent download without limits.") + asyncio.create_task(self._run_download(download)) + + async def __limited_concurrent_download(self, download): + log.info("Starting limited concurrent download.") + async with self.semaphore: + await self._run_download(download) + + async def _run_download(self, download): + if download.canceled: + log.info(f"Download {download.info.title} is canceled; skipping start.") + return + await download.start(self.notifier) + self._post_download_cleanup(download) + + def _post_download_cleanup(self, download): + if download.info.status != 'finished': + if download.tmpfilename and os.path.isfile(download.tmpfilename): + try: + os.remove(download.tmpfilename) + except: + pass + download.info.status = 'error' + download.close() + if self.queue.exists(download.info.url): + self.queue.delete(download.info.url) + if download.canceled: + asyncio.create_task(self.notifier.canceled(download.info.url)) + else: + self.done.put(download) + asyncio.create_task(self.notifier.completed(download.info)) + def __extract_info(self, url, playlist_strict_mode): return yt_dlp.YoutubeDL(params={ 'quiet': True, @@ -234,12 +304,6 @@ class DownloadQueue: }).extract_info(url, download=False) def __calc_download_path(self, quality, format, folder): - """Calculates download path from quality, format and folder attributes. - - Returns: - Tuple dldirectory, error_message both of which might be None (but not at the same time) - """ - # Keep consistent with frontend base_directory = self.config.DOWNLOAD_DIR if (quality != 'audio' and format not in AUDIO_FORMATS) else self.config.AUDIO_DOWNLOAD_DIR if folder: if not self.config.CUSTOM_DIRS: @@ -283,7 +347,7 @@ class DownloadQueue: log.info(f'Playlist item limit is set. Processing only first {playlist_item_limit} entries') entries = entries[:playlist_item_limit] for index, etr in enumerate(entries, start=1): - etr["_type"] = "video" # Prevents video to be treated as url and lose below properties during processing + etr["_type"] = "video" etr["playlist"] = entry["id"] etr["playlist_index"] = '{{0:0{0:d}d}}'.format(playlist_index_digits).format(index) for property in ("id", "title", "uploader", "uploader_id"): @@ -293,10 +357,11 @@ class DownloadQueue: if any(res['status'] == 'error' for res in results): return {'status': 'error', 'msg': ', '.join(res['msg'] for res in results if res['status'] == 'error' and 'msg' in res)} return {'status': 'ok'} - elif etype == 'video' or etype.startswith('url') and 'id' in entry and 'title' in entry: + elif etype == 'video' or (etype.startswith('url') and 'id' in entry and 'title' in entry): log.debug('Processing as a video') - if not self.queue.exists(entry['id']): - dl = DownloadInfo(entry['id'], entry.get('title') or entry['id'], entry.get('webpage_url') or entry['url'], quality, format, folder, custom_name_prefix, error) + key = entry.get('webpage_url') or entry['url'] + if not self.queue.exists(key): + dl = DownloadInfo(entry['id'], entry.get('title') or entry['id'], key, quality, format, folder, custom_name_prefix, error) dldirectory, error_message = self.__calc_download_path(quality, format, folder) if error_message is not None: return error_message @@ -305,20 +370,17 @@ class DownloadQueue: if 'playlist' in entry and entry['playlist'] is not None: if len(self.config.OUTPUT_TEMPLATE_PLAYLIST): output = self.config.OUTPUT_TEMPLATE_PLAYLIST - for property, value in entry.items(): if property.startswith("playlist"): output = output.replace(f"%({property})s", str(value)) - ytdl_options = dict(self.config.YTDL_OPTIONS) - if playlist_item_limit > 0: log.info(f'playlist limit is set. Processing only first {playlist_item_limit} entries') ytdl_options['playlistend'] = playlist_item_limit - if auto_start is True: - self.queue.put(Download(dldirectory, self.config.TEMP_DIR, output, output_chapter, quality, format, ytdl_options, dl)) - self.event.set() + download = Download(dldirectory, self.config.TEMP_DIR, output, output_chapter, quality, format, ytdl_options, dl) + self.queue.put(download) + asyncio.create_task(self.__start_download(download)) else: self.pending.put(Download(dldirectory, self.config.TEMP_DIR, output, output_chapter, quality, format, ytdl_options, dl)) await self.notifier.added(dl) @@ -347,7 +409,7 @@ class DownloadQueue: dl = self.pending.get(id) self.queue.put(dl) self.pending.delete(id) - self.event.set() + asyncio.create_task(self.__start_download(dl)) return {'status': 'ok'} async def cancel(self, ids): @@ -383,30 +445,6 @@ class DownloadQueue: return {'status': 'ok'} def get(self): - return(list((k, v.info) for k, v in self.queue.items()) + list((k, v.info) for k, v in self.pending.items()), - list((k, v.info) for k, v in self.done.items())) - - async def __download(self): - while True: - while self.queue.empty(): - log.info('waiting for item to download') - await self.event.wait() - self.event.clear() - id, entry = self.queue.next() - log.info(f'downloading {entry.info.title}') - await entry.start(self.notifier) - if entry.info.status != 'finished': - if entry.tmpfilename and os.path.isfile(entry.tmpfilename): - try: - os.remove(entry.tmpfilename) - except: - pass - entry.info.status = 'error' - entry.close() - if self.queue.exists(id): - self.queue.delete(id) - if entry.canceled: - await self.notifier.canceled(id) - else: - self.done.put(entry) - await self.notifier.completed(entry.info) + return (list((k, v.info) for k, v in self.queue.items()) + + list((k, v.info) for k, v in self.pending.items()), + list((k, v.info) for k, v in self.done.items())) diff --git a/ui/src/app/app.component.html b/ui/src/app/app.component.html index 781f60b..e17a454 100644 --- a/ui/src/app/app.component.html +++ b/ui/src/app/app.component.html @@ -75,7 +75,7 @@ -