Refactor: unify YDL object and proxy cfg field
This would allow to add sites much simplier or even drop the ydl_fn_keys. Main purpose of the refactoring is a code cleanup.
This commit is contained in:
parent
7a75805423
commit
9071017dbf
5 changed files with 68 additions and 113 deletions
|
@ -20,11 +20,8 @@ class Config:
|
||||||
os.getenv('TRACK_FILE_TMPL') or '%(track)S.%(ext)s',
|
os.getenv('TRACK_FILE_TMPL') or '%(track)S.%(ext)s',
|
||||||
)
|
)
|
||||||
|
|
||||||
# Proxy URL for yt_proxied downloader (can be used for geo-restricted content)
|
# `or None` -- defaults to None also if PROXY is an empty string
|
||||||
self.yt_proxy = os.getenv('YT_PROXY') or None
|
self.proxy = os.getenv('PROXY') or None
|
||||||
|
|
||||||
# Proxy for soundcloud
|
|
||||||
self.sc_proxy = os.getenv('SC_PROXY') or None
|
|
||||||
|
|
||||||
self.save_lyrics = _parse_bool(os.getenv('SAVE_LYRICS'), True)
|
self.save_lyrics = _parse_bool(os.getenv('SAVE_LYRICS'), True)
|
||||||
self.save_cover = _parse_bool(os.getenv('SAVE_COVER'), True)
|
self.save_cover = _parse_bool(os.getenv('SAVE_COVER'), True)
|
||||||
|
|
|
@ -10,19 +10,24 @@ import genius
|
||||||
ENC_UTF8 = 3
|
ENC_UTF8 = 3
|
||||||
|
|
||||||
|
|
||||||
class InfoYouTubePP(PostProcessor):
|
class InfoGenPP(PostProcessor):
|
||||||
'''Generates YT Music fields in info if necessary.
|
'''Generates track, artist(s), album, track_number fields
|
||||||
|
from other info fields if they are not present in current info dict.
|
||||||
Must be run before downloading and post-processing with
|
Must be run before downloading and post-processing with
|
||||||
FFmpegExtractAudioPP and ID3YouTubePP, so use only with
|
FFmpegExtractAudioPP and ID3TagsPP, so use only with
|
||||||
when<="before_dl" ("pre_process" also suits, see yt_dlp.utils.POSTPROCESS_WHEN)
|
when="pre_process" for correct file path and ID3 tags'''
|
||||||
for correct file path and ID3 tags'''
|
|
||||||
|
|
||||||
def run(self, information):
|
def run(self, information):
|
||||||
|
|
||||||
if not 'track' in information:
|
if not 'track' in information:
|
||||||
information['track'] = information['title']
|
information['track'] = information['title']
|
||||||
if not 'artist' in information:
|
if not 'artist' in information:
|
||||||
information['artist'] = information['uploader'].removesuffix(' - Topic')
|
artist = information['uploader'].removesuffix(' - Topic')
|
||||||
|
# <Really-dumb-fix>
|
||||||
|
if artist == 'LINKIN PARK':
|
||||||
|
artist = 'Linkin Park'
|
||||||
|
# </Really-dumb-fix>
|
||||||
|
information['artist'] = artist
|
||||||
if not 'artists' in information:
|
if not 'artists' in information:
|
||||||
information['artists'] = [information['artist']]
|
information['artists'] = [information['artist']]
|
||||||
|
|
||||||
|
@ -44,8 +49,8 @@ class InfoYouTubePP(PostProcessor):
|
||||||
|
|
||||||
|
|
||||||
class ID3TagsPP(PostProcessor):
|
class ID3TagsPP(PostProcessor):
|
||||||
'''Inserts ID3 tags after all PPs (for YT: InfoYouTubePP and FFmpegExtractAudioPP),
|
'''Inserts ID3 tags including lyrics (see parser in backend/genius.py).
|
||||||
triggers searching and parsing lyrics from Genius'''
|
Must be run after all PPs ()'''
|
||||||
|
|
||||||
def __init__(self) -> None:
|
def __init__(self) -> None:
|
||||||
self.cfg = config.get()
|
self.cfg = config.get()
|
||||||
|
|
|
@ -30,21 +30,19 @@ async def handler(socket: SocketT) -> None:
|
||||||
|
|
||||||
try:
|
try:
|
||||||
data = json.loads(message)
|
data = json.loads(message)
|
||||||
|
print(data)
|
||||||
|
|
||||||
match data['action']:
|
match data['action']:
|
||||||
|
|
||||||
case 'list': # list tracks in album
|
case 'list': # list tracks in album
|
||||||
ydls.choose_ydl(data['site'])
|
ydls.update_params(data['site'], data.get('proxy', False))
|
||||||
await socket.send(response.playlist(
|
await socket.send(response.playlist(
|
||||||
await ydls.get_playlist_items(data['url']),
|
await ydls.get_playlist_items(data['url']),
|
||||||
))
|
))
|
||||||
|
|
||||||
case 'download': # download by URL
|
case 'download': # download by URL
|
||||||
ydls.choose_ydl(data['site'])
|
ydls.update_params(data['site'], data.get('proxy', False), data.get('items'))
|
||||||
ret = await ydls.download(
|
ret = await ydls.download(data['url'])
|
||||||
data['url'],
|
|
||||||
data.get('items'),
|
|
||||||
)
|
|
||||||
await socket.send(response.ydl_end(ret))
|
await socket.send(response.ydl_end(ret))
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
|
@ -57,6 +55,8 @@ async def handler(socket: SocketT) -> None:
|
||||||
if not socket.closed:
|
if not socket.closed:
|
||||||
await socket.send(response.error(ex))
|
await socket.send(response.error(ex))
|
||||||
|
|
||||||
|
ydls.reset_params()
|
||||||
|
|
||||||
ydls.cleanup()
|
ydls.cleanup()
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -15,7 +15,7 @@ TEST_MP3 = 'music/test.mp3'
|
||||||
INFO_BEFORE = {
|
INFO_BEFORE = {
|
||||||
"filepath": TEST_MP3,
|
"filepath": TEST_MP3,
|
||||||
"title": "A Line in the Sand",
|
"title": "A Line in the Sand",
|
||||||
"channel": "Linkin Park - Topic",
|
"uploader": "Linkin Park - Topic",
|
||||||
"playlist": "Album \u2013 The Hunting Party",
|
"playlist": "Album \u2013 The Hunting Party",
|
||||||
"playlist_index": 12,
|
"playlist_index": 12,
|
||||||
"release_year": 2015,
|
"release_year": 2015,
|
||||||
|
@ -44,7 +44,7 @@ class TestPostProcessorsOnFakeData(TestCase):
|
||||||
|
|
||||||
def test_infoytpp(self) -> None:
|
def test_infoytpp(self) -> None:
|
||||||
|
|
||||||
_, info = id3pp.InfoYouTubePP().run(INFO_BEFORE)
|
_, info = id3pp.InfoGenPP().run(INFO_BEFORE)
|
||||||
self.assertDictEqual(info, INFO_AFTER)
|
self.assertDictEqual(info, INFO_AFTER)
|
||||||
|
|
||||||
def test_id3(self) -> None:
|
def test_id3(self) -> None:
|
||||||
|
|
|
@ -10,46 +10,7 @@ import response
|
||||||
import id3pp
|
import id3pp
|
||||||
|
|
||||||
|
|
||||||
class _CreateYDL:
|
ydl_fn_keys = {'youtube', 'yandex', 'soundcloud'}
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def youtube() -> YoutubeDL:
|
|
||||||
ydl = YoutubeDL({'format': 'ba'})
|
|
||||||
ydl.add_post_processor(id3pp.InfoYouTubePP(), when='before_dl')
|
|
||||||
ydl.add_post_processor(FFmpegExtractAudioPP(preferredcodec='mp3'), when='post_process')
|
|
||||||
return ydl
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def yt_proxied() -> YoutubeDL:
|
|
||||||
ydl = _CreateYDL.youtube()
|
|
||||||
proxy = config.get().yt_proxy
|
|
||||||
if proxy is not None:
|
|
||||||
ydl.params['proxy'] = proxy
|
|
||||||
return ydl
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def soundcloud() -> YoutubeDL:
|
|
||||||
ydl = YoutubeDL({'format': 'ba[ext=mp3]/ba'})
|
|
||||||
ydl.add_post_processor(id3pp.InfoYouTubePP(), when='pre_process')
|
|
||||||
proxy = config.get().sc_proxy
|
|
||||||
if proxy is not None:
|
|
||||||
ydl.params['proxy'] = proxy
|
|
||||||
ydl.add_post_processor(FFmpegExtractAudioPP(preferredcodec='mp3'), when='post_process')
|
|
||||||
return ydl
|
|
||||||
|
|
||||||
@staticmethod
|
|
||||||
def yandex() -> YoutubeDL:
|
|
||||||
return YoutubeDL()
|
|
||||||
|
|
||||||
|
|
||||||
create_ydl_fn = {
|
|
||||||
'youtube': _CreateYDL.youtube,
|
|
||||||
'yt_proxied': _CreateYDL.yt_proxied,
|
|
||||||
'soundcloud': _CreateYDL.soundcloud,
|
|
||||||
'yandex': _CreateYDL.yandex,
|
|
||||||
}
|
|
||||||
|
|
||||||
ydl_fn_keys = create_ydl_fn.keys()
|
|
||||||
|
|
||||||
# need process=True for track title in extract_info output
|
# need process=True for track title in extract_info output
|
||||||
NP_YDLS = {'yandex', 'soundcloud'}
|
NP_YDLS = {'yandex', 'soundcloud'}
|
||||||
|
@ -83,98 +44,90 @@ class Downloader:
|
||||||
|
|
||||||
def __init__(self, logger: YdlLogger | None = None) -> None:
|
def __init__(self, logger: YdlLogger | None = None) -> None:
|
||||||
|
|
||||||
self.ydls: dict[str, YoutubeDL | None] = {
|
self.cfg = config.get()
|
||||||
key: None
|
|
||||||
for key in ydl_fn_keys
|
|
||||||
}
|
|
||||||
|
|
||||||
self.cur_ydl: YoutubeDL | None = None
|
self.ydl = YoutubeDL({'format': 'ba[ext=mp3]/ba/b'})
|
||||||
self.cur_site = ''
|
self.ydl.add_post_processor(id3pp.InfoGenPP(), when='pre_process')
|
||||||
|
# Note: it skips converting if downloaded file is already MP3
|
||||||
|
self.ydl.add_post_processor(FFmpegExtractAudioPP(preferredcodec='mp3'), when='post_process')
|
||||||
|
# ID3 tags are set after all pre/post-processings
|
||||||
|
self.ydl.add_post_processor(id3pp.ID3TagsPP(), when='post_process')
|
||||||
|
|
||||||
self.id3pp_obj = id3pp.ID3TagsPP()
|
if logger is not None:
|
||||||
|
self.ydl.params['logger'] = logger
|
||||||
|
|
||||||
self.logger = logger
|
self.ydl.params['outtmpl']['default'] = self.cfg.tmpl
|
||||||
|
|
||||||
def choose_ydl(self, site: str) -> None:
|
self.need_process = False
|
||||||
|
self.updated_params = []
|
||||||
|
|
||||||
ydl = self.ydls[site]
|
def update_params(
|
||||||
cfg = config.get()
|
self,
|
||||||
|
site: str,
|
||||||
|
proxy: bool = False,
|
||||||
|
items: Iterable[int] | None = None) -> None:
|
||||||
|
|
||||||
if ydl is None:
|
self.need_process = site in NP_YDLS
|
||||||
ydl = create_ydl_fn[site]()
|
|
||||||
|
|
||||||
if self.logger is not None:
|
cookies = self.cfg.cookies_dir / (site + '.txt')
|
||||||
ydl.params['logger'] = self.logger
|
if cookies.exists():
|
||||||
|
self.ydl.params['cookiefile'] = str(cookies)
|
||||||
|
self.updated_params.append('cookiefile')
|
||||||
|
|
||||||
ydl.params['outtmpl']['default'] = cfg.tmpl
|
if proxy and self.cfg.proxy is not None:
|
||||||
ydl.add_post_processor(self.id3pp_obj, when='post_process')
|
self.ydl.params['proxy'] = self.cfg.proxy
|
||||||
|
self.updated_params.append('proxy')
|
||||||
|
|
||||||
cookies = cfg.cookies_dir / (site + '.txt')
|
if items:
|
||||||
if cookies.exists():
|
self.ydl.params['playlist_items'] = (
|
||||||
ydl.params['cookiefile'] = str(cookies)
|
','.join(str(i) for i in items)
|
||||||
|
)
|
||||||
|
self.updated_params.append('playlist_items')
|
||||||
|
|
||||||
self.cur_ydl = ydl
|
def reset_params(self) -> None:
|
||||||
self.cur_site = site
|
|
||||||
|
|
||||||
def get_cur_ydl(self) -> YoutubeDL:
|
for param in self.updated_params:
|
||||||
|
del self.ydl.params[param]
|
||||||
ydl = self.cur_ydl
|
self.updated_params.clear()
|
||||||
if ydl is None:
|
self.need_process = False
|
||||||
raise RuntimeError('ydl object not initialized')
|
|
||||||
return ydl
|
|
||||||
|
|
||||||
async def get_playlist_items(self, url: str) -> list[str]:
|
async def get_playlist_items(self, url: str) -> list[str]:
|
||||||
|
|
||||||
return await asyncio.get_event_loop().run_in_executor(
|
return await asyncio.get_event_loop().run_in_executor(
|
||||||
None,
|
None,
|
||||||
Downloader._target_get_playlist_items,
|
Downloader._target_get_playlist_items,
|
||||||
self.get_cur_ydl(),
|
self.ydl,
|
||||||
url,
|
url,
|
||||||
self.cur_site in NP_YDLS,
|
self.need_process,
|
||||||
)
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _target_get_playlist_items(ydl: YoutubeDL, url: str, process: bool) -> list[str]:
|
def _target_get_playlist_items(ydl: YoutubeDL, url: str, process: bool) -> list[str]:
|
||||||
|
|
||||||
info = ydl.extract_info(url, download=False, process=process)
|
info = ydl.extract_info(url, download=False, process=process)
|
||||||
|
|
||||||
if info is None:
|
if info is None:
|
||||||
raise RuntimeError('ydl.extract_info returned None')
|
raise RuntimeError('ydl.extract_info returned None')
|
||||||
|
|
||||||
return [
|
return [
|
||||||
entry['track'] if 'track' in entry else entry['title']
|
entry['track'] if 'track' in entry else entry['title']
|
||||||
for entry in info['entries']
|
for entry in info['entries']
|
||||||
]
|
]
|
||||||
|
|
||||||
async def download(
|
async def download(self, url: str) -> int:
|
||||||
self,
|
|
||||||
url: str,
|
|
||||||
playlist_items: Iterable[int] | None = None) -> int:
|
|
||||||
|
|
||||||
return await asyncio.get_event_loop().run_in_executor(
|
return await asyncio.get_event_loop().run_in_executor(
|
||||||
None,
|
None,
|
||||||
Downloader._target_download,
|
Downloader._target_download,
|
||||||
self.get_cur_ydl(),
|
self.ydl,
|
||||||
url,
|
url,
|
||||||
playlist_items,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _target_download(
|
def _target_download(ydl: YoutubeDL, url: str) -> int:
|
||||||
ydl: YoutubeDL,
|
|
||||||
url: str,
|
|
||||||
playlist_items: Iterable[int] | None = None) -> int:
|
|
||||||
|
|
||||||
if playlist_items:
|
return ydl.download(url)
|
||||||
ydl.params['playlist_items'] = ','.join(str(i) for i in playlist_items)
|
|
||||||
|
|
||||||
ret = ydl.download(url)
|
|
||||||
|
|
||||||
if playlist_items:
|
|
||||||
del ydl.params['playlist_items']
|
|
||||||
|
|
||||||
return ret
|
|
||||||
|
|
||||||
def cleanup(self) -> None:
|
def cleanup(self) -> None:
|
||||||
|
|
||||||
for ydl in self.ydls.values():
|
self.ydl.close()
|
||||||
if ydl is not None:
|
|
||||||
ydl.close()
|
|
||||||
|
|
Loading…
Add table
Reference in a new issue