diff --git a/app/main.py b/app/main.py index 3daed83..2caf1fb 100644 --- a/app/main.py +++ b/app/main.py @@ -8,7 +8,7 @@ import socketio import logging import json import pathlib - +import sys from ytdl import DownloadQueueNotifier, DownloadQueue log = logging.getLogger('main') @@ -238,4 +238,7 @@ app.on_response_prepare.append(on_prepare) if __name__ == '__main__': logging.basicConfig(level=logging.DEBUG) log.info(f"Listening on {config.HOST}:{config.PORT}") - web.run_app(app, host=config.HOST, port=int(config.PORT), reuse_port=True) + if sys.platform.startswith('win'): + web.run_app(app, host=config.HOST, port=int(config.PORT)) + else: + web.run_app(app, host=config.HOST, port=int(config.PORT), reuse_port=True) diff --git a/app/ytdl.py b/app/ytdl.py index 086d7c9..da28631 100644 --- a/app/ytdl.py +++ b/app/ytdl.py @@ -5,6 +5,7 @@ import shelve import time import asyncio import multiprocessing +from concurrent.futures import ThreadPoolExecutor import logging import re from dl_formats import get_format, get_opts, AUDIO_FORMATS @@ -104,14 +105,13 @@ class Download: if Download.manager is None: Download.manager = multiprocessing.Manager() self.status_queue = Download.manager.Queue() - self.proc = multiprocessing.Process(target=self._download) - self.proc.start() + self.proc = await asyncio.get_running_loop().run_in_executor(None, self._download) self.loop = asyncio.get_running_loop() self.notifier = notifier self.info.status = 'preparing' await self.notifier.updated(self.info) asyncio.create_task(self.update_status()) - return await self.loop.run_in_executor(None, self.proc.join) + return self.proc def cancel(self): if self.running(): @@ -208,15 +208,50 @@ class DownloadQueue: self.queue = PersistentQueue(self.config.STATE_DIR + '/queue') self.done = PersistentQueue(self.config.STATE_DIR + '/completed') self.pending = PersistentQueue(self.config.STATE_DIR + '/pending') + self.max_concurrent_downloads = 5 # Adjust this number as needed + self.semaphore = asyncio.Semaphore(self.max_concurrent_downloads) + self.executor = ThreadPoolExecutor(max_workers=self.max_concurrent_downloads) self.done.load() - + + async def __download_entry(self, id, entry): + async with self.semaphore: + await entry.start(self.notifier) + if entry.info.status != 'finished': + if entry.tmpfilename and os.path.isfile(entry.tmpfilename): + try: + os.remove(entry.tmpfilename) + except: + pass + entry.info.status = 'error' + entry.close() + if self.queue.exists(id): + self.queue.delete(id) + if entry.canceled: + await self.notifier.canceled(id) + else: + self.done.put(entry) + await self.notifier.completed(entry.info) + + async def __process_queue(self): + while True: + while self.queue.empty(): + await self.event.wait() + self.event.clear() + + tasks = [] + while not self.queue.empty() and len(tasks) < self.max_concurrent_downloads: + id, entry = self.queue.next() + task = asyncio.create_task(self.__download_entry(id, entry)) + tasks.append(task) + + await asyncio.gather(*tasks) async def __import_queue(self): for k, v in self.queue.saved_items(): await self.add(v.url, v.quality, v.format, v.folder, v.custom_name_prefix) async def initialize(self): self.event = asyncio.Event() - asyncio.create_task(self.__download()) + asyncio.create_task(self.__process_queue()) asyncio.create_task(self.__import_queue()) def __extract_info(self, url): diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..bb5f68c --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,10 @@ +version: "3" +services: + metube: + image: ghcr.io/alexta69/metube + container_name: metube + restart: unless-stopped + ports: + - "8081:8081" + volumes: + - c:/Users/Roger/Downloads:/downloads \ No newline at end of file