update class

This commit is contained in:
evilmonkeydiaz 2024-08-17 14:22:03 -07:00
parent 50e67e1459
commit d2bf4bd385
2 changed files with 199 additions and 260 deletions

View file

@ -3,19 +3,20 @@
import os import os
import sys import sys
import asyncio
from aiohttp import web from aiohttp import web
import socketio import socketio
import logging import logging
import json import json
import pathlib import pathlib
import sys
from ytdl import DownloadQueueNotifier, DownloadQueue from ytdl import DownloadQueueNotifier, DownloadQueue
log = logging.getLogger('main') log = logging.getLogger('main')
class Config: class Config:
_DEFAULTS = { _DEFAULTS = {
'DOWNLOAD_DIR': '.', 'DOWNLOAD_DIR': 'C:/Users/Roger/Desktop/MeTube',
'AUDIO_DOWNLOAD_DIR': '%%DOWNLOAD_DIR', 'AUDIO_DOWNLOAD_DIR': '%%DOWNLOAD_DIR',
'TEMP_DIR': '%%DOWNLOAD_DIR', 'TEMP_DIR': '%%DOWNLOAD_DIR',
'DOWNLOAD_DIRS_INDEXABLE': 'false', 'DOWNLOAD_DIRS_INDEXABLE': 'false',
@ -40,10 +41,10 @@ class Config:
def __init__(self): def __init__(self):
for k, v in self._DEFAULTS.items(): for k, v in self._DEFAULTS.items():
setattr(self, k, os.environ[k] if k in os.environ else v) setattr(self, k, os.environ.get(k, v))
for k, v in self.__dict__.items(): for k, v in self.__dict__.items():
if v.startswith('%%'): if isinstance(v, str) and v.startswith('%%'):
setattr(self, k, getattr(self, v[2:])) setattr(self, k, getattr(self, v[2:]))
if k in self._BOOLEAN: if k in self._BOOLEAN:
if v not in ('true', 'false', 'True', 'False', 'on', 'off', '1', '0'): if v not in ('true', 'false', 'True', 'False', 'on', 'off', '1', '0'):
@ -85,10 +86,6 @@ class ObjectSerializer(json.JSONEncoder):
return json.JSONEncoder.default(self, obj) return json.JSONEncoder.default(self, obj)
serializer = ObjectSerializer() serializer = ObjectSerializer()
app = web.Application()
sio = socketio.AsyncServer(cors_allowed_origins='*')
sio.attach(app, socketio_path=config.URL_PREFIX + 'socket.io')
routes = web.RouteTableDef()
class Notifier(DownloadQueueNotifier): class Notifier(DownloadQueueNotifier):
async def added(self, dl): async def added(self, dl):
@ -106,8 +103,12 @@ class Notifier(DownloadQueueNotifier):
async def cleared(self, id): async def cleared(self, id):
await sio.emit('cleared', serializer.encode(id)) await sio.emit('cleared', serializer.encode(id))
dqueue = DownloadQueue(config, Notifier()) async def init_app():
app.on_startup.append(lambda app: dqueue.initialize()) app = web.Application()
global sio
sio = socketio.AsyncServer(cors_allowed_origins='*')
sio.attach(app, socketio_path=config.URL_PREFIX + 'socket.io')
routes = web.RouteTableDef()
@routes.post(config.URL_PREFIX + 'add') @routes.post(config.URL_PREFIX + 'add')
async def add(request): async def add(request):
@ -124,7 +125,7 @@ async def add(request):
custom_name_prefix = '' custom_name_prefix = ''
if auto_start is None: if auto_start is None:
auto_start = True auto_start = True
status = await dqueue.add(url, quality, format, folder, custom_name_prefix, auto_start) status = await request.app['dqueue'].add(url, quality, format, folder, custom_name_prefix, auto_start)
return web.Response(text=serializer.encode(status)) return web.Response(text=serializer.encode(status))
@routes.post(config.URL_PREFIX + 'delete') @routes.post(config.URL_PREFIX + 'delete')
@ -134,65 +135,34 @@ async def delete(request):
where = post.get('where') where = post.get('where')
if not ids or where not in ['queue', 'done']: if not ids or where not in ['queue', 'done']:
raise web.HTTPBadRequest() raise web.HTTPBadRequest()
status = await (dqueue.cancel(ids) if where == 'queue' else dqueue.clear(ids)) status = await (request.app['dqueue'].cancel(ids) if where == 'queue' else request.app['dqueue'].clear(ids))
return web.Response(text=serializer.encode(status)) return web.Response(text=serializer.encode(status))
@routes.post(config.URL_PREFIX + 'start') @routes.post(config.URL_PREFIX + 'start')
async def start(request): async def start(request):
post = await request.json() post = await request.json()
ids = post.get('ids') ids = post.get('ids')
status = await dqueue.start_pending(ids) status = await request.app['dqueue'].start_pending(ids)
return web.Response(text=serializer.encode(status)) return web.Response(text=serializer.encode(status))
@routes.get(config.URL_PREFIX + 'history') @routes.get(config.URL_PREFIX + 'history')
async def history(request): async def history(request):
history = { 'done': [], 'queue': []} history = { 'done': [], 'queue': []}
for _ ,v in dqueue.queue.saved_items(): for _ ,v in request.app['dqueue'].queue.saved_items():
history['queue'].append(v) history['queue'].append(v)
for _ ,v in dqueue.done.saved_items(): for _ ,v in request.app['dqueue'].done.saved_items():
history['done'].append(v) history['done'].append(v)
return web.Response(text=serializer.encode(history)) return web.Response(text=serializer.encode(history))
@sio.event @sio.event
async def connect(sid, environ): async def connect(sid, environ):
await sio.emit('all', serializer.encode(dqueue.get()), to=sid) await sio.emit('all', serializer.encode(request.app['dqueue'].get()), to=sid)
await sio.emit('configuration', serializer.encode(config), to=sid) await sio.emit('configuration', serializer.encode(config), to=sid)
if config.CUSTOM_DIRS: if config.CUSTOM_DIRS:
await sio.emit('custom_dirs', serializer.encode(get_custom_dirs()), to=sid) await sio.emit('custom_dirs', serializer.encode(get_custom_dirs()), to=sid)
def get_custom_dirs():
def recursive_dirs(base):
path = pathlib.Path(base)
# Converts PosixPath object to string, and remove base/ prefix
def convert(p):
s = str(p)
if s.startswith(base):
s = s[len(base):]
if s.startswith('/'):
s = s[1:]
return s
# Recursively lists all subdirectories of DOWNLOAD_DIR
dirs = list(filter(None, map(convert, path.glob('**'))))
return dirs
download_dir = recursive_dirs(config.DOWNLOAD_DIR)
audio_download_dir = download_dir
if config.DOWNLOAD_DIR != config.AUDIO_DOWNLOAD_DIR:
audio_download_dir = recursive_dirs(config.AUDIO_DOWNLOAD_DIR)
return {
"download_dir": download_dir,
"audio_download_dir": audio_download_dir
}
@routes.get(config.URL_PREFIX) @routes.get(config.URL_PREFIX)
def index(request): def index(request):
response = web.FileResponse(os.path.join(config.BASE_DIR, 'ui/dist/metube/index.html')) response = web.FileResponse(os.path.join(config.BASE_DIR, 'ui/dist/metube/index.html'))
@ -212,6 +182,7 @@ if config.URL_PREFIX != '/':
routes.static(config.URL_PREFIX + 'download/', config.DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE) routes.static(config.URL_PREFIX + 'download/', config.DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE)
routes.static(config.URL_PREFIX + 'audio_download/', config.AUDIO_DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE) routes.static(config.URL_PREFIX + 'audio_download/', config.AUDIO_DOWNLOAD_DIR, show_index=config.DOWNLOAD_DIRS_INDEXABLE)
routes.static(config.URL_PREFIX, os.path.join(config.BASE_DIR, 'ui/dist/metube')) routes.static(config.URL_PREFIX, os.path.join(config.BASE_DIR, 'ui/dist/metube'))
try: try:
app.add_routes(routes) app.add_routes(routes)
except ValueError as e: except ValueError as e:
@ -219,14 +190,11 @@ except ValueError as e:
raise RuntimeError('Could not find the frontend UI static assets. Please run `node_modules/.bin/ng build` inside the ui folder') from e raise RuntimeError('Could not find the frontend UI static assets. Please run `node_modules/.bin/ng build` inside the ui folder') from e
raise e raise e
# https://github.com/aio-libs/aiohttp/pull/4615 waiting for release
# @routes.options(config.URL_PREFIX + 'add')
async def add_cors(request): async def add_cors(request):
return web.Response(text=serializer.encode({"status": "ok"})) return web.Response(text=serializer.encode({"status": "ok"}))
app.router.add_route('OPTIONS', config.URL_PREFIX + 'add', add_cors) app.router.add_route('OPTIONS', config.URL_PREFIX + 'add', add_cors)
async def on_prepare(request, response): async def on_prepare(request, response):
if 'Origin' in request.headers: if 'Origin' in request.headers:
response.headers['Access-Control-Allow-Origin'] = request.headers['Origin'] response.headers['Access-Control-Allow-Origin'] = request.headers['Origin']
@ -234,11 +202,65 @@ async def on_prepare(request, response):
app.on_response_prepare.append(on_prepare) app.on_response_prepare.append(on_prepare)
dqueue = DownloadQueue(config, Notifier())
await dqueue.initialize()
app['dqueue'] = dqueue
if __name__ == '__main__': return app
def get_custom_dirs():
def recursive_dirs(base):
path = pathlib.Path(base)
def convert(p):
s = str(p)
if s.startswith(base):
s = s[len(base):]
if s.startswith('/'):
s = s[1:]
return s
dirs = list(filter(None, map(convert, path.glob('**'))))
return dirs
download_dir = recursive_dirs(config.DOWNLOAD_DIR)
audio_download_dir = download_dir
if config.DOWNLOAD_DIR != config.AUDIO_DOWNLOAD_DIR:
audio_download_dir = recursive_dirs(config.AUDIO_DOWNLOAD_DIR)
return {
"download_dir": download_dir,
"audio_download_dir": audio_download_dir
}
async def start_background_tasks(app):
app['dqueue_task'] = asyncio.create_task(app['dqueue'].run())
async def cleanup_background_tasks(app):
app['dqueue_task'].cancel()
await app['dqueue_task']
def main():
logging.basicConfig(level=logging.DEBUG) logging.basicConfig(level=logging.DEBUG)
log.info(f"Listening on {config.HOST}:{config.PORT}") log.info(f"Initializing application on {config.HOST}:{config.PORT}")
loop = asyncio.get_event_loop()
app = loop.run_until_complete(init_app())
app.on_startup.append(start_background_tasks)
app.on_cleanup.append(cleanup_background_tasks)
try:
if sys.platform.startswith('win'): if sys.platform.startswith('win'):
web.run_app(app, host=config.HOST, port=int(config.PORT)) web.run_app(app, host=config.HOST, port=int(config.PORT))
else: else:
web.run_app(app, host=config.HOST, port=int(config.PORT), reuse_port=True) web.run_app(app, host=config.HOST, port=int(config.PORT), reuse_port=True)
except Exception as e:
log.error(f"Failed to start the server: {str(e)}")
sys.exit(1)
if __name__ == '__main__':
if sys.platform.startswith('win'):
asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
main()

View file

@ -4,7 +4,6 @@ from collections import OrderedDict
import shelve import shelve
import time import time
import asyncio import asyncio
import multiprocessing
import logging import logging
import re import re
from dl_formats import get_format, get_opts, AUDIO_FORMATS from dl_formats import get_format, get_opts, AUDIO_FORMATS
@ -44,8 +43,6 @@ class DownloadInfo:
self.error = error self.error = error
class Download: class Download:
manager = None
def __init__(self, download_dir, temp_dir, output_template, output_template_chapter, quality, format, ytdl_opts, info): def __init__(self, download_dir, temp_dir, output_template, output_template_chapter, quality, format, ytdl_opts, info):
self.download_dir = download_dir self.download_dir = download_dir
self.temp_dir = temp_dir self.temp_dir = temp_dir
@ -56,106 +53,48 @@ class Download:
self.info = info self.info = info
self.canceled = False self.canceled = False
self.tmpfilename = None self.tmpfilename = None
self.status_queue = None
self.proc = None
self.loop = None
self.notifier = None
async def start(self, notifier):
self.info.status = 'preparing'
await notifier.updated(self.info)
try:
result = await asyncio.get_event_loop().run_in_executor(None, self._download)
if result['status'] == 'finished':
self.info.status = 'finished'
self.info.filename = result.get('filename')
self.info.size = os.path.getsize(result['filename']) if os.path.exists(result['filename']) else None
else:
self.info.status = 'error'
self.info.msg = result.get('msg', 'Unknown error occurred')
except Exception as e:
self.info.status = 'error'
self.info.msg = str(e)
await notifier.updated(self.info)
def _download(self): def _download(self):
try: ydl_opts = {
def put_status(st):
self.status_queue.put({k: v for k, v in st.items() if k in (
'tmpfilename',
'filename',
'status',
'msg',
'total_bytes',
'total_bytes_estimate',
'downloaded_bytes',
'speed',
'eta',
)})
def put_status_postprocessor(d):
if d['postprocessor'] == 'MoveFiles' and d['status'] == 'finished':
if '__finaldir' in d['info_dict']:
filename = os.path.join(d['info_dict']['__finaldir'], os.path.basename(d['info_dict']['filepath']))
else:
filename = d['info_dict']['filepath']
self.status_queue.put({'status': 'finished', 'filename': filename})
ret = yt_dlp.YoutubeDL(params={
'quiet': True, 'quiet': True,
'no_color': True, 'no_color': True,
#'skip_download': True,
'paths': {"home": self.download_dir, "temp": self.temp_dir}, 'paths': {"home": self.download_dir, "temp": self.temp_dir},
'outtmpl': {"default": self.output_template, "chapter": self.output_template_chapter}, 'outtmpl': {"default": self.output_template, "chapter": self.output_template_chapter},
'format': self.format, 'format': self.format,
'socket_timeout': 30, 'socket_timeout': 30,
'ignore_no_formats_error': True, 'ignore_no_formats_error': True,
'progress_hooks': [put_status],
'postprocessor_hooks': [put_status_postprocessor],
**self.ytdl_opts, **self.ytdl_opts,
}).download([self.info.url]) }
self.status_queue.put({'status': 'finished' if ret == 0 else 'error'})
except yt_dlp.utils.YoutubeDLError as exc:
self.status_queue.put({'status': 'error', 'msg': str(exc)})
async def start(self, notifier): with yt_dlp.YoutubeDL(ydl_opts) as ydl:
if Download.manager is None: try:
Download.manager = multiprocessing.Manager() info = ydl.extract_info(self.info.url, download=True)
self.status_queue = Download.manager.Queue() return {'status': 'finished', 'filename': ydl.prepare_filename(info)}
self.proc = multiprocessing.Process(target=self._download) except yt_dlp.utils.DownloadError as e:
self.proc.start() return {'status': 'error', 'msg': str(e)}
self.loop = asyncio.get_running_loop()
self.notifier = notifier
self.info.status = 'preparing'
await self.notifier.updated(self.info)
asyncio.create_task(self.update_status())
return await self.loop.run_in_executor(None, self.proc.join)
def cancel(self): def cancel(self):
if self.running():
self.proc.kill()
self.canceled = True self.canceled = True
def close(self):
if self.started():
self.proc.close()
self.status_queue.put(None)
def running(self):
try:
return self.proc is not None and self.proc.is_alive()
except ValueError:
return False
def started(self):
return self.proc is not None
async def update_status(self):
while True:
status = await self.loop.run_in_executor(None, self.status_queue.get)
if status is None:
return
self.tmpfilename = status.get('tmpfilename')
if 'filename' in status:
fileName = status.get('filename')
self.info.filename = os.path.relpath(fileName, self.download_dir)
self.info.size = os.path.getsize(fileName) if os.path.exists(fileName) else None
# Set correct file extension for thumbnails
if(self.info.format == 'thumbnail'):
self.info.filename = re.sub(r'\.webm$', '.jpg', self.info.filename)
self.info.status = status['status']
self.info.msg = status.get('msg')
if 'downloaded_bytes' in status:
total = status.get('total_bytes') or status.get('total_bytes_estimate')
if total:
self.info.percent = status['downloaded_bytes'] / total * 100
self.info.speed = status.get('speed')
self.info.eta = status.get('eta')
await self.notifier.updated(self.info)
class PersistentQueue: class PersistentQueue:
def __init__(self, path): def __init__(self, path):
pdir = os.path.dirname(path) pdir = os.path.dirname(path)
@ -209,19 +148,25 @@ class DownloadQueue:
self.done = PersistentQueue(self.config.STATE_DIR + '/completed') self.done = PersistentQueue(self.config.STATE_DIR + '/completed')
self.pending = PersistentQueue(self.config.STATE_DIR + '/pending') self.pending = PersistentQueue(self.config.STATE_DIR + '/pending')
self.done.load() self.done.load()
self.max_concurrent_downloads = self.config.MAX_CONCURRENT_DOWNLOADS # New configuration option
self.active_downloads = set() self.active_downloads = set()
self.download_semaphore = asyncio.Semaphore(self.max_concurrent_downloads) self.max_concurrent_downloads = 3 # Adjust this value as needed
self.event = asyncio.Event()
async def initialize(self):
await self.__import_queue()
async def run(self):
while True:
try:
await self.__manage_downloads()
except Exception as e:
log.error(f"Error in download queue: {str(e)}")
await asyncio.sleep(5) # Wait a bit before retrying
async def __import_queue(self): async def __import_queue(self):
for k, v in self.queue.saved_items(): for k, v in self.queue.saved_items():
await self.add(v.url, v.quality, v.format, v.folder, v.custom_name_prefix) await self.add(v.url, v.quality, v.format, v.folder, v.custom_name_prefix)
async def initialize(self):
self.event = asyncio.Event()
asyncio.create_task(self.__manage_downloads())
await self.__import_queue()
def __extract_info(self, url): def __extract_info(self, url):
return yt_dlp.YoutubeDL(params={ return yt_dlp.YoutubeDL(params={
'quiet': True, 'quiet': True,
@ -233,12 +178,6 @@ class DownloadQueue:
}).extract_info(url, download=False) }).extract_info(url, download=False)
def __calc_download_path(self, quality, format, folder): def __calc_download_path(self, quality, format, folder):
"""Calculates download path from quality, format and folder attributes.
Returns:
Tuple dldirectory, error_message both of which might be None (but not at the same time)
"""
# Keep consistent with frontend
base_directory = self.config.DOWNLOAD_DIR if (quality != 'audio' and format not in AUDIO_FORMATS) else self.config.AUDIO_DOWNLOAD_DIR base_directory = self.config.DOWNLOAD_DIR if (quality != 'audio' and format not in AUDIO_FORMATS) else self.config.AUDIO_DOWNLOAD_DIR
if folder: if folder:
if not self.config.CUSTOM_DIRS: if not self.config.CUSTOM_DIRS:
@ -254,17 +193,7 @@ class DownloadQueue:
else: else:
dldirectory = base_directory dldirectory = base_directory
return dldirectory, None return dldirectory, None
async def __manage_downloads(self):
while True:
while self.queue.empty():
log.info('waiting for item to download')
await self.event.wait()
self.event.clear()
async with self.download_semaphore:
id, entry = self.queue.next()
self.active_downloads.add(id)
asyncio.create_task(self.__process_download(id, entry))
async def __add_entry(self, entry, quality, format, folder, custom_name_prefix, auto_start, already): async def __add_entry(self, entry, quality, format, folder, custom_name_prefix, auto_start, already):
if not entry: if not entry:
return {'status': 'error', 'msg': "Invalid/empty data was given."} return {'status': 'error', 'msg': "Invalid/empty data was given."}
@ -324,10 +253,15 @@ class DownloadQueue:
else: else:
already.add(url) already.add(url)
try: try:
entry = await asyncio.get_running_loop().run_in_executor(None, self.__extract_info, url) entry = await asyncio.get_event_loop().run_in_executor(None, self.__extract_info, url)
except yt_dlp.utils.YoutubeDLError as exc: except yt_dlp.utils.YoutubeDLError as exc:
return {'status': 'error', 'msg': str(exc)} return {'status': 'error', 'msg': str(exc)}
return await self.__add_entry(entry, quality, format, folder, custom_name_prefix, auto_start, already) result = await self.__add_entry(entry, quality, format, folder, custom_name_prefix, auto_start, already)
if result['status'] == 'ok' and auto_start:
self.event.set() # Signal that new items are available for download
return result
async def start_pending(self, ids): async def start_pending(self, ids):
for id in ids: for id in ids:
@ -349,9 +283,9 @@ class DownloadQueue:
if not self.queue.exists(id): if not self.queue.exists(id):
log.warn(f'requested cancel for non-existent download {id}') log.warn(f'requested cancel for non-existent download {id}')
continue continue
if self.queue.get(id).started(): dl = self.queue.get(id)
self.queue.get(id).cancel() if isinstance(dl, Download):
else: dl.cancel()
self.queue.delete(id) self.queue.delete(id)
await self.notifier.canceled(id) await self.notifier.canceled(id)
return {'status': 'ok'} return {'status': 'ok'}
@ -375,35 +309,18 @@ class DownloadQueue:
def get(self): def get(self):
return(list((k, v.info) for k, v in self.queue.items()) + list((k, v.info) for k, v in self.pending.items()), return(list((k, v.info) for k, v in self.queue.items()) + list((k, v.info) for k, v in self.pending.items()),
list((k, v.info) for k, v in self.done.items())) list((k, v.info) for k, v in self.done.items()))
async def __process_download(self, id, entry):
try: async def __manage_downloads(self):
log.info(f'downloading {entry.info.title}')
await entry.start(self.notifier)
if entry.info.status != 'finished':
if entry.tmpfilename and os.path.isfile(entry.tmpfilename):
try:
os.remove(entry.tmpfilename)
except:
pass
entry.info.status = 'error'
entry.close()
finally:
if self.queue.exists(id):
self.queue.delete(id)
if entry.canceled:
await self.notifier.canceled(id)
else:
self.done.put(entry)
await self.notifier.completed(entry.info)
self.active_downloads.remove(id)
self.download_semaphore.release()
async def __download(self):
while True: while True:
while self.queue.empty(): while not self.queue.empty() and len(self.active_downloads) < self.max_concurrent_downloads:
log.info('waiting for item to download')
await self.event.wait()
self.event.clear()
id, entry = self.queue.next() id, entry = self.queue.next()
if id not in self.active_downloads:
self.active_downloads.add(id)
asyncio.create_task(self.__download(id, entry))
await asyncio.sleep(1) # Add a small delay to prevent busy waiting
async def __download(self, id, entry):
try:
log.info(f'downloading {entry.info.title}') log.info(f'downloading {entry.info.title}')
await entry.start(self.notifier) await entry.start(self.notifier)
if entry.info.status != 'finished': if entry.info.status != 'finished':
@ -412,8 +329,6 @@ class DownloadQueue:
os.remove(entry.tmpfilename) os.remove(entry.tmpfilename)
except: except:
pass pass
entry.info.status = 'error'
entry.close()
if self.queue.exists(id): if self.queue.exists(id):
self.queue.delete(id) self.queue.delete(id)
if entry.canceled: if entry.canceled:
@ -421,3 +336,5 @@ class DownloadQueue:
else: else:
self.done.put(entry) self.done.put(entry)
await self.notifier.completed(entry.info) await self.notifier.completed(entry.info)
finally:
self.active_downloads.remove(id)