diff --git a/app/main.py b/app/main.py index 7e1fefc..77e7ecf 100644 --- a/app/main.py +++ b/app/main.py @@ -33,17 +33,19 @@ class Config: 'HOST': '0.0.0.0', 'PORT': '8081', 'BASE_DIR': '', - 'DEFAULT_THEME': 'auto' + 'DEFAULT_THEME': 'auto', + 'DOWNLOAD_MODE': 'sequential', # Can be 'sequential', 'concurrent', or 'limited' + 'MAX_CONCURRENT_DOWNLOADS': 3, # Used if DOWNLOAD_MODE is 'limited' } _BOOLEAN = ('DOWNLOAD_DIRS_INDEXABLE', 'CUSTOM_DIRS', 'CREATE_CUSTOM_DIRS', 'DELETE_FILE_ON_TRASHCAN') def __init__(self): for k, v in self._DEFAULTS.items(): - setattr(self, k, os.environ[k] if k in os.environ else v) + setattr(self, k, os.environ.get(k, v)) for k, v in self.__dict__.items(): - if v.startswith('%%'): + if isinstance(v, str) and v.startswith('%%'): setattr(self, k, getattr(self, v[2:])) if k in self._BOOLEAN: if v not in ('true', 'false', 'True', 'False', 'on', 'off', '1', '0'): @@ -160,9 +162,9 @@ async def start(request): async def history(request): history = { 'done': [], 'queue': []} - for _ ,v in dqueue.queue.saved_items(): + for _, v in dqueue.queue.saved_items(): history['queue'].append(v) - for _ ,v in dqueue.done.saved_items(): + for _, v in dqueue.done.saved_items(): history['done'].append(v) log.info("Sending download history") diff --git a/app/ytdl.py b/app/ytdl.py index 16133ec..b009257 100644 --- a/app/ytdl.py +++ b/app/ytdl.py @@ -216,6 +216,11 @@ class DownloadQueue: self.done = PersistentQueue(self.config.STATE_DIR + '/completed') self.pending = PersistentQueue(self.config.STATE_DIR + '/pending') self.active_downloads = set() + self.semaphore = None + + if self.config.DOWNLOAD_MODE == 'limited': + self.semaphore = asyncio.Semaphore(self.config.MAX_CONCURRENT_DOWNLOADS) + self.done.load() async def __import_queue(self): @@ -226,6 +231,49 @@ class DownloadQueue: log.info("Initializing DownloadQueue") asyncio.create_task(self.__import_queue()) + async def __start_download(self, download): + if self.config.DOWNLOAD_MODE == 'sequential': + await self.__sequential_download(download) + elif self.config.DOWNLOAD_MODE == 'limited' and self.semaphore is not None: + await self.__limited_concurrent_download(download) + else: # concurrent without limit + await self.__concurrent_download(download) + + async def __sequential_download(self, download): + log.info("Starting sequential download.") + await download.start(self.notifier) + self._post_download_cleanup(download) + + async def __concurrent_download(self, download): + log.info("Starting concurrent download without limits.") + asyncio.create_task(self._run_download(download)) + + async def __limited_concurrent_download(self, download): + log.info("Starting limited concurrent download.") + async with self.semaphore: + await self._run_download(download) + + async def _run_download(self, download): + await download.start(self.notifier) + self._post_download_cleanup(download) + + def _post_download_cleanup(self, download): + if download.info.status != 'finished': + if download.tmpfilename and os.path.isfile(download.tmpfilename): + try: + os.remove(download.tmpfilename) + except: + pass + download.info.status = 'error' + download.close() + if self.queue.exists(download.info.url): + self.queue.delete(download.info.url) + if download.canceled: + asyncio.create_task(self.notifier.canceled(download.info.url)) + else: + self.done.put(download) + asyncio.create_task(self.notifier.completed(download.info)) + def __extract_info(self, url): return yt_dlp.YoutubeDL(params={ 'quiet': True, @@ -304,24 +352,6 @@ class DownloadQueue: return await self.add(entry['url'], quality, format, folder, custom_name_prefix, auto_start, already) return {'status': 'error', 'msg': f'Unsupported resource "{etype}"'} - async def __start_download(self, download): - await download.start(self.notifier) - if download.info.status != 'finished': - if download.tmpfilename and os.path.isfile(download.tmpfilename): - try: - os.remove(download.tmpfilename) - except: - pass - download.info.status = 'error' - download.close() - if self.queue.exists(download.info.url): - self.queue.delete(download.info.url) - if download.canceled: - await self.notifier.canceled(download.info.url) - else: - self.done.put(download) - await self.notifier.completed(download.info) - async def add(self, url, quality, format, folder, custom_name_prefix, auto_start=True, already=None): log.info(f'adding {url}: {quality=} {format=} {already=} {folder=} {custom_name_prefix=}') already = set() if already is None else already