From 50e67e1459219db96db41f031435938c124abeef Mon Sep 17 00:00:00 2001 From: evilmonkeydiaz Date: Sat, 17 Aug 2024 13:27:35 -0700 Subject: [PATCH] Update ytdl.py --- app/ytdl.py | 87 ++++++++++++++++++++++++++--------------------------- 1 file changed, 43 insertions(+), 44 deletions(-) diff --git a/app/ytdl.py b/app/ytdl.py index da28631..e9b4ee6 100644 --- a/app/ytdl.py +++ b/app/ytdl.py @@ -5,7 +5,6 @@ import shelve import time import asyncio import multiprocessing -from concurrent.futures import ThreadPoolExecutor import logging import re from dl_formats import get_format, get_opts, AUDIO_FORMATS @@ -105,13 +104,14 @@ class Download: if Download.manager is None: Download.manager = multiprocessing.Manager() self.status_queue = Download.manager.Queue() - self.proc = await asyncio.get_running_loop().run_in_executor(None, self._download) + self.proc = multiprocessing.Process(target=self._download) + self.proc.start() self.loop = asyncio.get_running_loop() self.notifier = notifier self.info.status = 'preparing' await self.notifier.updated(self.info) asyncio.create_task(self.update_status()) - return self.proc + return await self.loop.run_in_executor(None, self.proc.join) def cancel(self): if self.running(): @@ -208,51 +208,19 @@ class DownloadQueue: self.queue = PersistentQueue(self.config.STATE_DIR + '/queue') self.done = PersistentQueue(self.config.STATE_DIR + '/completed') self.pending = PersistentQueue(self.config.STATE_DIR + '/pending') - self.max_concurrent_downloads = 5 # Adjust this number as needed - self.semaphore = asyncio.Semaphore(self.max_concurrent_downloads) - self.executor = ThreadPoolExecutor(max_workers=self.max_concurrent_downloads) self.done.load() - - async def __download_entry(self, id, entry): - async with self.semaphore: - await entry.start(self.notifier) - if entry.info.status != 'finished': - if entry.tmpfilename and os.path.isfile(entry.tmpfilename): - try: - os.remove(entry.tmpfilename) - except: - pass - entry.info.status = 'error' - entry.close() - if self.queue.exists(id): - self.queue.delete(id) - if entry.canceled: - await self.notifier.canceled(id) - else: - self.done.put(entry) - await self.notifier.completed(entry.info) - - async def __process_queue(self): - while True: - while self.queue.empty(): - await self.event.wait() - self.event.clear() - - tasks = [] - while not self.queue.empty() and len(tasks) < self.max_concurrent_downloads: - id, entry = self.queue.next() - task = asyncio.create_task(self.__download_entry(id, entry)) - tasks.append(task) - - await asyncio.gather(*tasks) + self.max_concurrent_downloads = self.config.MAX_CONCURRENT_DOWNLOADS # New configuration option + self.active_downloads = set() + self.download_semaphore = asyncio.Semaphore(self.max_concurrent_downloads) + async def __import_queue(self): for k, v in self.queue.saved_items(): await self.add(v.url, v.quality, v.format, v.folder, v.custom_name_prefix) async def initialize(self): self.event = asyncio.Event() - asyncio.create_task(self.__process_queue()) - asyncio.create_task(self.__import_queue()) + asyncio.create_task(self.__manage_downloads()) + await self.__import_queue() def __extract_info(self, url): return yt_dlp.YoutubeDL(params={ @@ -286,7 +254,17 @@ class DownloadQueue: else: dldirectory = base_directory return dldirectory, None - + async def __manage_downloads(self): + while True: + while self.queue.empty(): + log.info('waiting for item to download') + await self.event.wait() + self.event.clear() + + async with self.download_semaphore: + id, entry = self.queue.next() + self.active_downloads.add(id) + asyncio.create_task(self.__process_download(id, entry)) async def __add_entry(self, entry, quality, format, folder, custom_name_prefix, auto_start, already): if not entry: return {'status': 'error', 'msg': "Invalid/empty data was given."} @@ -397,7 +375,28 @@ class DownloadQueue: def get(self): return(list((k, v.info) for k, v in self.queue.items()) + list((k, v.info) for k, v in self.pending.items()), list((k, v.info) for k, v in self.done.items())) - + async def __process_download(self, id, entry): + try: + log.info(f'downloading {entry.info.title}') + await entry.start(self.notifier) + if entry.info.status != 'finished': + if entry.tmpfilename and os.path.isfile(entry.tmpfilename): + try: + os.remove(entry.tmpfilename) + except: + pass + entry.info.status = 'error' + entry.close() + finally: + if self.queue.exists(id): + self.queue.delete(id) + if entry.canceled: + await self.notifier.canceled(id) + else: + self.done.put(entry) + await self.notifier.completed(entry.info) + self.active_downloads.remove(id) + self.download_semaphore.release() async def __download(self): while True: while self.queue.empty(): @@ -421,4 +420,4 @@ class DownloadQueue: await self.notifier.canceled(id) else: self.done.put(entry) - await self.notifier.completed(entry.info) + await self.notifier.completed(entry.info) \ No newline at end of file