Added concurrent and limited modes

This commit is contained in:
evilmonkeydiaz 2024-08-17 16:09:42 -07:00
parent 8552faf9c5
commit 2097a7adfa
2 changed files with 55 additions and 23 deletions

View file

@ -33,17 +33,19 @@ class Config:
'HOST': '0.0.0.0',
'PORT': '8081',
'BASE_DIR': '',
'DEFAULT_THEME': 'auto'
'DEFAULT_THEME': 'auto',
'DOWNLOAD_MODE': 'sequential', # Can be 'sequential', 'concurrent', or 'limited'
'MAX_CONCURRENT_DOWNLOADS': 3, # Used if DOWNLOAD_MODE is 'limited'
}
_BOOLEAN = ('DOWNLOAD_DIRS_INDEXABLE', 'CUSTOM_DIRS', 'CREATE_CUSTOM_DIRS', 'DELETE_FILE_ON_TRASHCAN')
def __init__(self):
for k, v in self._DEFAULTS.items():
setattr(self, k, os.environ[k] if k in os.environ else v)
setattr(self, k, os.environ.get(k, v))
for k, v in self.__dict__.items():
if v.startswith('%%'):
if isinstance(v, str) and v.startswith('%%'):
setattr(self, k, getattr(self, v[2:]))
if k in self._BOOLEAN:
if v not in ('true', 'false', 'True', 'False', 'on', 'off', '1', '0'):
@ -160,9 +162,9 @@ async def start(request):
async def history(request):
history = { 'done': [], 'queue': []}
for _ ,v in dqueue.queue.saved_items():
for _, v in dqueue.queue.saved_items():
history['queue'].append(v)
for _ ,v in dqueue.done.saved_items():
for _, v in dqueue.done.saved_items():
history['done'].append(v)
log.info("Sending download history")

View file

@ -216,6 +216,11 @@ class DownloadQueue:
self.done = PersistentQueue(self.config.STATE_DIR + '/completed')
self.pending = PersistentQueue(self.config.STATE_DIR + '/pending')
self.active_downloads = set()
self.semaphore = None
if self.config.DOWNLOAD_MODE == 'limited':
self.semaphore = asyncio.Semaphore(self.config.MAX_CONCURRENT_DOWNLOADS)
self.done.load()
async def __import_queue(self):
@ -226,6 +231,49 @@ class DownloadQueue:
log.info("Initializing DownloadQueue")
asyncio.create_task(self.__import_queue())
async def __start_download(self, download):
if self.config.DOWNLOAD_MODE == 'sequential':
await self.__sequential_download(download)
elif self.config.DOWNLOAD_MODE == 'limited' and self.semaphore is not None:
await self.__limited_concurrent_download(download)
else: # concurrent without limit
await self.__concurrent_download(download)
async def __sequential_download(self, download):
log.info("Starting sequential download.")
await download.start(self.notifier)
self._post_download_cleanup(download)
async def __concurrent_download(self, download):
log.info("Starting concurrent download without limits.")
asyncio.create_task(self._run_download(download))
async def __limited_concurrent_download(self, download):
log.info("Starting limited concurrent download.")
async with self.semaphore:
await self._run_download(download)
async def _run_download(self, download):
await download.start(self.notifier)
self._post_download_cleanup(download)
def _post_download_cleanup(self, download):
if download.info.status != 'finished':
if download.tmpfilename and os.path.isfile(download.tmpfilename):
try:
os.remove(download.tmpfilename)
except:
pass
download.info.status = 'error'
download.close()
if self.queue.exists(download.info.url):
self.queue.delete(download.info.url)
if download.canceled:
asyncio.create_task(self.notifier.canceled(download.info.url))
else:
self.done.put(download)
asyncio.create_task(self.notifier.completed(download.info))
def __extract_info(self, url):
return yt_dlp.YoutubeDL(params={
'quiet': True,
@ -304,24 +352,6 @@ class DownloadQueue:
return await self.add(entry['url'], quality, format, folder, custom_name_prefix, auto_start, already)
return {'status': 'error', 'msg': f'Unsupported resource "{etype}"'}
async def __start_download(self, download):
await download.start(self.notifier)
if download.info.status != 'finished':
if download.tmpfilename and os.path.isfile(download.tmpfilename):
try:
os.remove(download.tmpfilename)
except:
pass
download.info.status = 'error'
download.close()
if self.queue.exists(download.info.url):
self.queue.delete(download.info.url)
if download.canceled:
await self.notifier.canceled(download.info.url)
else:
self.done.put(download)
await self.notifier.completed(download.info)
async def add(self, url, quality, format, folder, custom_name_prefix, auto_start=True, already=None):
log.info(f'adding {url}: {quality=} {format=} {already=} {folder=} {custom_name_prefix=}')
already = set() if already is None else already