134 lines
3.5 KiB
Python
134 lines
3.5 KiB
Python
import asyncio
|
|
from typing import Callable, Awaitable, Iterable
|
|
|
|
from yt_dlp import YoutubeDL
|
|
from yt_dlp.postprocessor import FFmpegExtractAudioPP
|
|
|
|
import config
|
|
import id3pp
|
|
|
|
|
|
class _CreateYDL:
|
|
|
|
@staticmethod
|
|
def youtube() -> YoutubeDL:
|
|
ydl = YoutubeDL({'format': 'ba'})
|
|
ydl.add_post_processor(id3pp.InfoYouTubePP(), when='before_dl')
|
|
ydl.add_post_processor(FFmpegExtractAudioPP(preferredcodec='mp3'), when='post_process')
|
|
return ydl
|
|
|
|
@staticmethod
|
|
def yt_proxied() -> YoutubeDL:
|
|
ydl = _CreateYDL.youtube()
|
|
ydl.params['proxy'] = config.get().yt_proxy
|
|
return ydl
|
|
|
|
@staticmethod
|
|
def yandex() -> YoutubeDL:
|
|
return YoutubeDL()
|
|
|
|
|
|
create_ydl_fn = {
|
|
'youtube': _CreateYDL.youtube,
|
|
'yt_proxied': _CreateYDL.yt_proxied,
|
|
'yandex': _CreateYDL.yandex,
|
|
}
|
|
|
|
ydl_fn_keys = create_ydl_fn.keys()
|
|
|
|
|
|
class Downloader:
|
|
|
|
def __init__(
|
|
self,
|
|
progress_cb: Callable[[str], Awaitable],
|
|
lyrics_cb: Callable[[list[str]], Awaitable]) -> None:
|
|
|
|
self.ydls: dict[str, YoutubeDL | None] = {
|
|
'youtube': None,
|
|
'yt_proxied': None,
|
|
'yandex': None,
|
|
}
|
|
|
|
self.cur_ydl: YoutubeDL | None = None
|
|
|
|
self.progress_cb = progress_cb
|
|
self.lyrics_cb = lyrics_cb
|
|
|
|
def choose_ydl(self, site: str) -> None:
|
|
|
|
ydl = self.ydls[site]
|
|
cfg = config.get()
|
|
|
|
if ydl is None:
|
|
ydl = create_ydl_fn[site]()
|
|
|
|
ydl.params['trim_file_name'] = cfg.path_length # Note: not only filename, but path in outtmpl
|
|
ydl.params['outtmpl']['default'] = cfg.tmpl
|
|
ydl.add_post_processor(id3pp.ID3TagsPP(), when='post_process')
|
|
|
|
cookies = cfg.cookies_dir / (site + '.txt')
|
|
if cookies.exists():
|
|
ydl.params['cookiefile'] = str(cookies)
|
|
|
|
self.cur_ydl = ydl
|
|
|
|
def get_cur_ydl(self) -> YoutubeDL:
|
|
|
|
ydl = self.cur_ydl
|
|
if ydl is None:
|
|
raise RuntimeError('ydl object not initialized')
|
|
return ydl
|
|
|
|
async def get_playlist_items(self, url: str) -> list[str]:
|
|
|
|
return await asyncio.get_event_loop().run_in_executor(
|
|
None,
|
|
Downloader._target_get_playlist_items,
|
|
self.get_cur_ydl(),
|
|
url,
|
|
)
|
|
|
|
@staticmethod
|
|
def _target_get_playlist_items(ydl: YoutubeDL, url: str) -> list[str]:
|
|
|
|
info = ydl.extract_info(url, download=False, process=True)
|
|
if info is None:
|
|
raise RuntimeError('ydl.extract_info returned None')
|
|
return [
|
|
entry['track'] if 'track' in entry else entry['title']
|
|
for entry in info['entries']
|
|
]
|
|
|
|
async def download(
|
|
self,
|
|
url: str,
|
|
playlist_items: Iterable[int] | None = None) -> int:
|
|
|
|
return await asyncio.get_event_loop().run_in_executor(
|
|
None,
|
|
Downloader._target_download,
|
|
self.get_cur_ydl(),
|
|
url,
|
|
playlist_items,
|
|
)
|
|
|
|
@staticmethod
|
|
def _target_download(
|
|
ydl: YoutubeDL,
|
|
url: str,
|
|
playlist_items: Iterable[int] | None = None) -> int:
|
|
|
|
if playlist_items:
|
|
ydl.params['playlist_items'] = ','.join(str(i) for i in playlist_items)
|
|
|
|
ret = ydl.download(url)
|
|
del ydl.params['playlist_items']
|
|
|
|
return ret
|
|
|
|
def cleanup(self) -> None:
|
|
|
|
for ydl in self.ydls.values():
|
|
if ydl is not None:
|
|
ydl.close()
|