Queue persistence for download and completed

This commit is contained in:
Erazor2 2022-01-15 12:17:12 +00:00
parent 622ca428e3
commit eb1f031b33
3 changed files with 79 additions and 17 deletions

View file

@ -26,5 +26,6 @@ COPY --from=builder /metube/dist/metube ./ui/dist/metube
ENV DOWNLOAD_DIR /downloads
VOLUME /downloads
VOLUME /queue
EXPOSE 8081
CMD ["python3", "app/main.py"]

View file

@ -92,6 +92,7 @@ async def delete(request):
@sio.event
async def connect(sid, environ):
await dqueue.importQueue()
await sio.emit('all', serializer.encode(dqueue.get()), to=sid)
@routes.get(config.URL_PREFIX)

View file

@ -1,6 +1,7 @@
import os
import yt_dlp
from collections import OrderedDict
import shelve
import asyncio
import multiprocessing
import logging
@ -131,13 +132,71 @@ class Download:
self.info.eta = status.get('eta')
await self.notifier.updated(self.info)
class PersistentQueue:
def __init__(self, filePath, load = False):
self.dict = OrderedDict()
self.shelvePath = filePath
self.__createShelve()
if load:
self.__loadShelve()
def __createShelve(self):
shelf = shelve.open(self.shelvePath, 'c')
shelf.close()
def __loadShelve(self):
with shelve.open(self.shelvePath, 'r') as shelf:
for key in shelf.keys():
self.dict[key] = shelf[key]
def exists(self, key):
return key in self.dict
def get(self, key):
return self.dict[key]
def items(self):
return self.dict.items()
def savedItems(self):
with shelve.open(self.shelvePath, 'r') as shelf:
return dict(shelf).items()
def put(self, key, value):
self.dict[key] = value
with shelve.open(self.shelvePath, 'w') as shelf:
shelf[key] = value.info
def delete(self, key):
del self.dict[key]
with shelve.open(self.shelvePath, 'w') as shelf:
shelf.pop(key)
def next(self):
k, v = next(iter(self.dict.items()))
return k, v
def empty(self):
return not bool(self.dict)
class DownloadQueue:
def __init__(self, config, notifier):
self.config = config
self.notifier = notifier
self.queue = OrderedDict()
self.done = OrderedDict()
self.queue = PersistentQueue('/queue/queue')
self.done = PersistentQueue('/queue/done', True)
self.initialized = False
self.imported = False
async def importQueue(self):
if not self.imported:
for item in self.queue.savedItems():
await self.add(
item[1].url,
item[1].quality,
item[1].format)
self.imported = True
def __initialize(self):
if not self.initialized:
@ -165,10 +224,10 @@ class DownloadQueue:
return {'status': 'error', 'msg': ', '.join(res['msg'] for res in results if res['status'] == 'error' and 'msg' in res)}
return {'status': 'ok'}
elif etype == 'video' or etype.startswith('url') and 'id' in entry and 'title' in entry:
if entry['id'] not in self.queue:
if not self.queue.exists(entry['id']):
dl = DownloadInfo(entry['id'], entry['title'], entry.get('webpage_url') or entry['url'], quality, format)
dldirectory = self.config.DOWNLOAD_DIR if (quality != 'audio' and format != 'mp3') else self.config.AUDIO_DOWNLOAD_DIR
self.queue[entry['id']] = Download(dldirectory, self.config.OUTPUT_TEMPLATE, quality, format, self.config.YTDL_OPTIONS, dl)
self.queue.put(entry['id'], Download(dldirectory, self.config.OUTPUT_TEMPLATE, quality, format, self.config.YTDL_OPTIONS, dl))
self.event.set()
await self.notifier.added(dl)
return {'status': 'ok'}
@ -193,36 +252,37 @@ class DownloadQueue:
async def cancel(self, ids):
for id in ids:
if id not in self.queue:
if not self.queue.exists(id):
log.warn(f'requested cancel for non-existent download {id}')
continue
if self.queue[id].started():
self.queue[id].cancel()
if self.queue.get(id).started():
self.queue.get(id).cancel()
else:
del self.queue[id]
self.queue.delete(id)
await self.notifier.canceled(id)
return {'status': 'ok'}
async def clear(self, ids):
for id in ids:
if id not in self.done:
if not self.done.exists(id):
log.warn(f'requested delete for non-existent download {id}')
continue
del self.done[id]
self.done.delete(id)
await self.notifier.cleared(id)
return {'status': 'ok'}
def get(self):
return(list((k, v.info) for k, v in self.queue.items()),
list((k, v.info) for k, v in self.done.items()))
item = (list((k, v) for k, v in self.queue.savedItems()),
list((k, v) for k, v in self.done.savedItems()))
return item
async def __download(self):
while True:
while not self.queue:
while self.queue.empty():
log.info('waiting for item to download')
await self.event.wait()
self.event.clear()
id, entry = next(iter(self.queue.items()))
id, entry = self.queue.next()
log.info(f'downloading {entry.info.title}')
await entry.start(self.notifier)
if entry.info.status != 'finished':
@ -233,10 +293,10 @@ class DownloadQueue:
pass
entry.info.status = 'error'
entry.close()
if id in self.queue:
del self.queue[id]
if self.queue.exists(id):
self.queue.delete(id)
if entry.canceled:
await self.notifier.canceled(id)
else:
self.done[id] = entry
self.done.put(id, entry)
await self.notifier.completed(entry.info)