From c3550ec17f967a500e5191eb4a15d90c004128cd Mon Sep 17 00:00:00 2001 From: Alex Date: Sat, 7 Dec 2019 21:49:31 +0200 Subject: [PATCH] fixed potential race --- app/ytdl.py | 174 +++++++++++++++++++++++++++------------------------- 1 file changed, 92 insertions(+), 82 deletions(-) diff --git a/app/ytdl.py b/app/ytdl.py index 1993300..2cd7eb2 100644 --- a/app/ytdl.py +++ b/app/ytdl.py @@ -8,78 +8,6 @@ import logging log = logging.getLogger('ytdl') -class DownloadInfo: - def __init__(self, id, title, url): - self.id, self.title, self.url = id, title, url - self.status = self.msg = self.percent = self.speed = self.eta = None - -class Download: - manager = None - - def __init__(self, download_dir, info): - self.download_dir = download_dir - self.info = info - self.tmpfilename = None - self.status_queue = None - self.proc = None - self.loop = None - - def _download(self): - try: - ret = youtube_dl.YoutubeDL(params={ - 'quiet': True, - 'no_color': True, - #'skip_download': True, - 'outtmpl': os.path.join(self.download_dir, '%(title)s.%(ext)s'), - 'cachedir': False, - 'socket_timeout': 30, - 'progress_hooks': [lambda d: self.status_queue.put(d)], - }).download([self.info.url]) - self.status_queue.put({'status': 'finished' if ret == 0 else 'error'}) - except youtube_dl.utils.YoutubeDLError as exc: - self.status_queue.put({'status': 'error', 'msg': str(exc)}) - - async def start(self): - if Download.manager is None: - Download.manager = multiprocessing.Manager() - self.status_queue = Download.manager.Queue() - self.proc = multiprocessing.Process(target=self._download) - self.proc.start() - self.loop = asyncio.get_running_loop() - return await self.loop.run_in_executor(None, self.proc.join) - - def cancel(self): - if self.running(): - self.proc.kill() - - def close(self): - if self.proc is not None: - self.proc.close() - self.status_queue.put(None) - - def running(self): - try: - return self.proc is not None and self.proc.is_alive() - except ValueError: - return False - - async def update_status(self, updated_cb): - await updated_cb() - while self.running(): - status = await self.loop.run_in_executor(None, self.status_queue.get) - if status is None: - return - self.tmpfilename = status.get('tmpfilename') - self.info.status = status['status'] - self.info.msg = status.get('msg') - if 'downloaded_bytes' in status: - total = status.get('total_bytes') or status.get('total_bytes_estimate') - if total: - self.info.percent = status['downloaded_bytes'] / total * 100 - self.info.speed = status.get('speed') - self.info.eta = status.get('eta') - await updated_cb() - class DownloadQueueNotifier: async def added(self, dl): raise NotImplementedError @@ -96,6 +24,87 @@ class DownloadQueueNotifier: async def cleared(self, id): raise NotImplementedError +class DownloadInfo: + def __init__(self, id, title, url): + self.id, self.title, self.url = id, title, url + self.status = self.msg = self.percent = self.speed = self.eta = None + +class Download: + manager = None + + def __init__(self, download_dir, info): + self.download_dir = download_dir + self.info = info + self.canceled = False + self.tmpfilename = None + self.status_queue = None + self.proc = None + self.loop = None + self.notifier = None + + def _download(self): + try: + ret = youtube_dl.YoutubeDL(params={ + 'quiet': True, + 'no_color': True, + #'skip_download': True, + 'outtmpl': os.path.join(self.download_dir, '%(title)s.%(ext)s'), + 'cachedir': False, + 'socket_timeout': 30, + 'progress_hooks': [lambda d: self.status_queue.put(d)], + }).download([self.info.url]) + self.status_queue.put({'status': 'finished' if ret == 0 else 'error'}) + except youtube_dl.utils.YoutubeDLError as exc: + self.status_queue.put({'status': 'error', 'msg': str(exc)}) + + async def start(self, notifier): + if Download.manager is None: + Download.manager = multiprocessing.Manager() + self.status_queue = Download.manager.Queue() + self.proc = multiprocessing.Process(target=self._download) + self.proc.start() + self.loop = asyncio.get_running_loop() + self.notifier = notifier + self.info.status = 'preparing' + await self.notifier.updated(self.info) + asyncio.ensure_future(self.update_status()) + return await self.loop.run_in_executor(None, self.proc.join) + + def cancel(self): + if self.running(): + self.proc.kill() + self.canceled = True + + def close(self): + if self.started(): + self.proc.close() + self.status_queue.put(None) + + def running(self): + try: + return self.proc is not None and self.proc.is_alive() + except ValueError: + return False + + def started(self): + return self.proc is not None + + async def update_status(self): + while True: + status = await self.loop.run_in_executor(None, self.status_queue.get) + if status is None: + return + self.tmpfilename = status.get('tmpfilename') + self.info.status = status['status'] + self.info.msg = status.get('msg') + if 'downloaded_bytes' in status: + total = status.get('total_bytes') or status.get('total_bytes_estimate') + if total: + self.info.percent = status['downloaded_bytes'] / total * 100 + self.info.speed = status.get('speed') + self.info.eta = status.get('eta') + await self.notifier.updated(self.info) + class DownloadQueue: def __init__(self, config, notifier): self.config = config @@ -140,9 +149,11 @@ class DownloadQueue: if id not in self.queue: log.warn(f'requested cancel for non-existent download {id}') continue - self.queue[id].cancel() - del self.queue[id] - await self.notifier.canceled(id) + if self.queue[id].started(): + self.queue[id].cancel() + else: + del self.queue[id] + await self.notifier.canceled(id) return {'status': 'ok'} async def clear(self, ids): @@ -166,11 +177,7 @@ class DownloadQueue: self.event.clear() id, entry = next(iter(self.queue.items())) log.info(f'downloading {entry.info.title}') - entry.info.status = 'preparing' - start_aw = entry.start() - async def updated_cb(): await self.notifier.updated(entry.info) - asyncio.ensure_future(entry.update_status(updated_cb)) - await start_aw + await entry.start(self.notifier) if entry.info.status != 'finished': if entry.tmpfilename and os.path.isfile(entry.tmpfilename): try: @@ -181,5 +188,8 @@ class DownloadQueue: entry.close() if id in self.queue: del self.queue[id] - self.done[id] = entry - await self.notifier.completed(entry.info) + if entry.canceled: + await self.notifier.canceled(id) + else: + self.done[id] = entry + await self.notifier.completed(entry.info)