Native concurrent downloading of fragments (#166)

* Option `--concurrent-fragments` (`-N`) to set the number of threads

Related: #165

Known issues:
* When receiving Ctrl+C, the process will exit only after finishing the currently downloading fragments
* The download progress shows the speed of only one thread

Authored by shirt-dev
This commit is contained in:
shirt 2021-03-12 23:46:58 -05:00 committed by GitHub
parent 0a473f2f0f
commit 4cf1e5d2f9
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 250 additions and 104 deletions

View file

@ -7,6 +7,11 @@ try:
can_decrypt_frag = True
except ImportError:
can_decrypt_frag = False
try:
import concurrent.futures
can_threaded_download = True
except ImportError:
can_threaded_download = False
from ..downloader import _get_real_downloader
from .fragment import FragmentFD
@ -19,6 +24,7 @@ from ..compat import (
)
from ..utils import (
parse_m3u8_attributes,
sanitize_open,
update_url_query,
)
@ -151,7 +157,6 @@ class HlsFD(FragmentFD):
ad_frag_next = False
for line in s.splitlines():
line = line.strip()
download_frag = False
if line:
if not line.startswith('#'):
if format_index and discontinuity_count != format_index:
@ -168,13 +173,13 @@ class HlsFD(FragmentFD):
if extra_query:
frag_url = update_url_query(frag_url, extra_query)
if real_downloader:
fragments.append({
'url': frag_url,
'decrypt_info': decrypt_info,
})
continue
download_frag = True
fragments.append({
'frag_index': frag_index,
'url': frag_url,
'decrypt_info': decrypt_info,
'byte_range': byte_range,
'media_sequence': media_sequence,
})
elif line.startswith('#EXT-X-MAP'):
if format_index and discontinuity_count != format_index:
@ -191,12 +196,14 @@ class HlsFD(FragmentFD):
else compat_urlparse.urljoin(man_url, map_info.get('URI')))
if extra_query:
frag_url = update_url_query(frag_url, extra_query)
if real_downloader:
fragments.append({
'url': frag_url,
'decrypt_info': decrypt_info,
})
continue
fragments.append({
'frag_index': frag_index,
'url': frag_url,
'decrypt_info': decrypt_info,
'byte_range': byte_range,
'media_sequence': media_sequence
})
if map_info.get('BYTERANGE'):
splitted_byte_range = map_info.get('BYTERANGE').split('@')
@ -205,7 +212,6 @@ class HlsFD(FragmentFD):
'start': sub_range_start,
'end': sub_range_start + int(splitted_byte_range[0]),
}
download_frag = True
elif line.startswith('#EXT-X-KEY'):
decrypt_url = decrypt_info.get('URI')
@ -236,53 +242,12 @@ class HlsFD(FragmentFD):
ad_frag_next = False
elif line.startswith('#EXT-X-DISCONTINUITY'):
discontinuity_count += 1
i += 1
media_sequence += 1
if download_frag:
count = 0
headers = info_dict.get('http_headers', {})
if byte_range:
headers['Range'] = 'bytes=%d-%d' % (byte_range['start'], byte_range['end'] - 1)
while count <= fragment_retries:
try:
success, frag_content = self._download_fragment(
ctx, frag_url, info_dict, headers)
if not success:
return False
break
except compat_urllib_error.HTTPError as err:
# Unavailable (possibly temporary) fragments may be served.
# First we try to retry then either skip or abort.
# See https://github.com/ytdl-org/youtube-dl/issues/10165,
# https://github.com/ytdl-org/youtube-dl/issues/10448).
count += 1
if count <= fragment_retries:
self.report_retry_fragment(err, frag_index, count, fragment_retries)
if count > fragment_retries:
if skip_unavailable_fragments:
i += 1
media_sequence += 1
self.report_skip_fragment(frag_index)
continue
self.report_error(
'giving up after %s fragment retries' % fragment_retries)
return False
if decrypt_info['METHOD'] == 'AES-128':
iv = decrypt_info.get('IV') or compat_struct_pack('>8xq', media_sequence)
decrypt_info['KEY'] = decrypt_info.get('KEY') or self.ydl.urlopen(
self._prepare_url(info_dict, info_dict.get('_decryption_key_url') or decrypt_info['URI'])).read()
# Don't decrypt the content in tests since the data is explicitly truncated and it's not to a valid block
# size (see https://github.com/ytdl-org/youtube-dl/pull/27660). Tests only care that the correct data downloaded,
# not what it decrypts to.
if not test:
frag_content = AES.new(
decrypt_info['KEY'], AES.MODE_CBC, iv).decrypt(frag_content)
self._append_fragment(ctx, frag_content)
# We only download the first fragment during the test
if test:
break
i += 1
media_sequence += 1
# We only download the first fragment during the test
if test:
fragments = [fragments[0] if fragments else None]
if real_downloader:
info_copy = info_dict.copy()
@ -295,5 +260,106 @@ class HlsFD(FragmentFD):
if not success:
return False
else:
def download_fragment(fragment):
frag_index = fragment['frag_index']
frag_url = fragment['url']
decrypt_info = fragment['decrypt_info']
byte_range = fragment['byte_range']
media_sequence = fragment['media_sequence']
ctx['fragment_index'] = frag_index
count = 0
headers = info_dict.get('http_headers', {})
if byte_range:
headers['Range'] = 'bytes=%d-%d' % (byte_range['start'], byte_range['end'] - 1)
while count <= fragment_retries:
try:
success, frag_content = self._download_fragment(
ctx, frag_url, info_dict, headers)
if not success:
return False, frag_index
break
except compat_urllib_error.HTTPError as err:
# Unavailable (possibly temporary) fragments may be served.
# First we try to retry then either skip or abort.
# See https://github.com/ytdl-org/youtube-dl/issues/10165,
# https://github.com/ytdl-org/youtube-dl/issues/10448).
count += 1
if count <= fragment_retries:
self.report_retry_fragment(err, frag_index, count, fragment_retries)
if count > fragment_retries:
return False, frag_index
if decrypt_info['METHOD'] == 'AES-128':
iv = decrypt_info.get('IV') or compat_struct_pack('>8xq', media_sequence)
decrypt_info['KEY'] = decrypt_info.get('KEY') or self.ydl.urlopen(
self._prepare_url(info_dict, info_dict.get('_decryption_key_url') or decrypt_info['URI'])).read()
# Don't decrypt the content in tests since the data is explicitly truncated and it's not to a valid block
# size (see https://github.com/ytdl-org/youtube-dl/pull/27660). Tests only care that the correct data downloaded,
# not what it decrypts to.
if not test:
frag_content = AES.new(
decrypt_info['KEY'], AES.MODE_CBC, iv).decrypt(frag_content)
return frag_content, frag_index
def append_fragment(frag_content, frag_index):
if frag_content:
fragment_filename = '%s-Frag%d' % (ctx['tmpfilename'], frag_index)
try:
file, frag_sanitized = sanitize_open(fragment_filename, 'rb')
ctx['fragment_filename_sanitized'] = frag_sanitized
file.close()
self._append_fragment(ctx, frag_content)
return True
except FileNotFoundError:
if skip_unavailable_fragments:
self.report_skip_fragment(frag_index)
return True
else:
self.report_error(
'fragment %s not found, unable to continue' % frag_index)
return False
else:
if skip_unavailable_fragments:
self.report_skip_fragment(frag_index)
return True
else:
self.report_error(
'fragment %s not found, unable to continue' % frag_index)
return False
max_workers = self.params.get('concurrent_fragment_downloads', 1)
if can_threaded_download and max_workers > 1:
self.report_warning('The download speed shown is only of one thread. This is a known issue')
with concurrent.futures.ThreadPoolExecutor(max_workers) as pool:
futures = [pool.submit(download_fragment, fragment) for fragment in fragments]
# timeout must be 0 to return instantly
done, not_done = concurrent.futures.wait(futures, timeout=0)
try:
while not_done:
# Check every 1 second for KeyboardInterrupt
freshly_done, not_done = concurrent.futures.wait(not_done, timeout=1)
done |= freshly_done
except KeyboardInterrupt:
for future in not_done:
future.cancel()
# timeout must be none to cancel
concurrent.futures.wait(not_done, timeout=None)
raise KeyboardInterrupt
results = [future.result() for future in futures]
for frag_content, frag_index in results:
result = append_fragment(frag_content, frag_index)
if not result:
return False
else:
for fragment in fragments:
frag_content, frag_index = download_fragment(fragment)
result = append_fragment(frag_content, frag_index)
if not result:
return False
self._finish_frag_download(ctx)
return True