musicdlp/backend/ydl_pool.py

166 lines
4.3 KiB
Python

import asyncio
import logging
from typing import Callable, Awaitable, Iterable
from yt_dlp import YoutubeDL
from yt_dlp.postprocessor import FFmpegExtractAudioPP
import config
import response
import id3pp
class _CreateYDL:
@staticmethod
def youtube() -> YoutubeDL:
ydl = YoutubeDL({'format': 'ba'})
ydl.add_post_processor(id3pp.InfoYouTubePP(), when='before_dl')
ydl.add_post_processor(FFmpegExtractAudioPP(preferredcodec='mp3'), when='post_process')
return ydl
@staticmethod
def yt_proxied() -> YoutubeDL:
ydl = _CreateYDL.youtube()
ydl.params['proxy'] = config.get().yt_proxy
return ydl
@staticmethod
def yandex() -> YoutubeDL:
return YoutubeDL()
create_ydl_fn = {
'youtube': _CreateYDL.youtube,
'yt_proxied': _CreateYDL.yt_proxied,
'yandex': _CreateYDL.yandex,
}
ydl_fn_keys = create_ydl_fn.keys()
# need process=True for track title in extract_info output
NP_YDLS = {'yandex'}
class YdlLogger:
def __init__(
self,
log_cb: Callable[[response.YdlLogLevel, str], Awaitable],
loop: asyncio.AbstractEventLoop) -> None:
self.log_cb = log_cb
self.loop = loop
self.mdlp_logger = logging.getLogger('musicdlp')
def debug(self, msg: str) -> None:
asyncio.run_coroutine_threadsafe(self.log_cb('debug', msg), self.loop)
def info(self, _: str) -> None: # afaik not used in yt-dlp
pass
def warning(self, msg: str) -> None:
asyncio.run_coroutine_threadsafe(self.log_cb('warning', msg), self.loop)
def error(self, msg: str) -> None:
self.mdlp_logger.error(msg)
asyncio.run_coroutine_threadsafe(self.log_cb('error', msg), self.loop)
class Downloader:
def __init__(self, logger: YdlLogger | None = None) -> None:
self.ydls: dict[str, YoutubeDL | None] = {
'youtube': None,
'yt_proxied': None,
'yandex': None,
}
self.cur_ydl: YoutubeDL | None = None
self.cur_site = ''
self.id3pp_obj = id3pp.ID3TagsPP()
self.logger = logger
def choose_ydl(self, site: str) -> None:
ydl = self.ydls[site]
cfg = config.get()
if ydl is None:
ydl = create_ydl_fn[site]()
if self.logger is not None:
ydl.params['logger'] = self.logger
ydl.params['outtmpl']['default'] = cfg.tmpl
ydl.add_post_processor(self.id3pp_obj, when='post_process')
cookies = cfg.cookies_dir / (site + '.txt')
if cookies.exists():
ydl.params['cookiefile'] = str(cookies)
self.cur_ydl = ydl
self.cur_site = site
def get_cur_ydl(self) -> YoutubeDL:
ydl = self.cur_ydl
if ydl is None:
raise RuntimeError('ydl object not initialized')
return ydl
async def get_playlist_items(self, url: str) -> list[str]:
return await asyncio.get_event_loop().run_in_executor(
None,
Downloader._target_get_playlist_items,
self.get_cur_ydl(),
url,
self.cur_site in NP_YDLS,
)
@staticmethod
def _target_get_playlist_items(ydl: YoutubeDL, url: str, process: bool) -> list[str]:
info = ydl.extract_info(url, download=False, process=process)
if info is None:
raise RuntimeError('ydl.extract_info returned None')
return [
entry['track'] if 'track' in entry else entry['title']
for entry in info['entries']
]
async def download(
self,
url: str,
playlist_items: Iterable[int] | None = None) -> int:
return await asyncio.get_event_loop().run_in_executor(
None,
Downloader._target_download,
self.get_cur_ydl(),
url,
playlist_items,
)
@staticmethod
def _target_download(
ydl: YoutubeDL,
url: str,
playlist_items: Iterable[int] | None = None) -> int:
if playlist_items:
ydl.params['playlist_items'] = ','.join(str(i) for i in playlist_items)
ret = ydl.download(url)
del ydl.params['playlist_items']
return ret
def cleanup(self) -> None:
for ydl in self.ydls.values():
if ydl is not None:
ydl.close()