Shirino/main.py

253 lines
6.9 KiB
Python

#!/usr/bin/env python3
import asyncio
import hashlib
import json
import logging
import re
import string
from typing import List, Any, Dict, Optional
import requests
from aiogram import Dispatcher, types, Bot
import aiohttp.client_exceptions
from pydantic.v1 import BaseSettings
# Constants
DDG_URL = 'https://duckduckgo.com/js/spice/currency'
COINAPI_URL = 'https://rest.coinapi.io/v1/exchangerate'
# ---
# Config from .env
class Settings(BaseSettings):
debug: bool
timeout: int = 2
ndigits: int = 3
coinapi_keys: List[str]
telegram_token: str
class Config:
env_file = '.env'
env_file_encoding = 'utf-8'
settings = Settings() # type: ignore
# ---
# Logging
log = logging.getLogger('shirino')
handler = logging.StreamHandler()
fmt = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
handler.setFormatter(fmt)
log.addHandler(handler)
if settings.debug:
handler.setLevel(logging.DEBUG)
log.setLevel(logging.DEBUG)
# ---
coinapi_len = len(settings.coinapi_keys)
coinapi_active = [0] # API key index
dp = Dispatcher()
class CurrencyConverter:
def __init__(self) -> None:
self.amount: float = 1.0
self.conv_amount: float = 0.0
self.from_currency = ''
self.conv_currency = ''
def convert(self) -> None:
"""Currency conversion"""
if not self.ddgapi():
self.coinapi()
str_amount = f'{self.conv_amount}'
point = str_amount.find(".")
after_point = str_amount[point + 1:]
fnz = min( # index of first non-zero digit
(
after_point.index(i)
for i in string.digits[1:]
if i in after_point
),
default=-1,
)
if fnz == -1:
# it is an integer like 81.0
return
# how many digits should be after the point:
# ndigits (3 by default) after first non-zero
ndigits = fnz + settings.ndigits
self.conv_amount = round(self.conv_amount, ndigits)
def ddgapi(self) -> bool:
"""Получение данных фиатной валюты через DuckDuckGo
e.g: https://duckduckgo.com/js/spice/currency/1/USD/RUB
Returns:
`False` если валюты нет в API
`True` если конвертация прошла успешно
"""
# Запрос к API
res = requests.get(f'{DDG_URL}/{self.amount}/{self.from_currency}/{self.conv_currency}')
data: Dict[str, Any] = json.loads(re.findall(r'(.+)\);', res.text)[0])
log.debug(data)
if len(data.get('to')) == 0:
return False
conv: Dict[str, str] = data.get('to')[0]
conv_amount = conv.get("mid")
if conv_amount is None:
raise RuntimeError('Ошибка при конвертации валюты через DuckDuckGo')
log.debug(conv)
log.debug(conv_amount)
self.conv_amount = float(conv_amount)
return True
def coinapi(self, depth: int = coinapi_len) -> None:
"""Получение данных с CoinAPI для получения курса криптовалюты
Args:
depth (int, optional): Счетчик, защищающий от рекурсии
"""
if depth <= 0:
raise RecursionError('Рейтлимит на всех токенах')
resp = requests.get(
(
f'{COINAPI_URL}/{self.from_currency}'
f'/{self.conv_currency}'
),
headers={
'X-CoinAPI-Key': settings.coinapi_keys[coinapi_active[0]],
},
timeout=settings.timeout,
)
if resp.status_code == 429:
log.warning('CoinAPI ratelimited, rotating token')
rotate_token(settings.coinapi_keys, coinapi_active)
self.coinapi(depth - 1)
data: Dict[str, Any] = resp.json()
rate = data.get('rate')
if rate is None:
raise RuntimeError('Не удалось получить курс валюты от CoinAPI')
self.conv_amount = float(rate * self.amount)
def rotate_token(lst: List[str], active: List[int]) -> None:
"""Смена API-ключей CoinAPI при ratelimits
Args:
lst (List[str]): Список ключей
active (List[str]): Изменяемый объект с текущим ключевым индексом
"""
active[0] = (active[0] + 1) % len(lst)
@dp.inline_query()
async def currency(inline_query: types.InlineQuery) -> None:
query = inline_query.query
article: List[Optional[types.InlineQueryResultArticle]] = [None]
args = query.split()
result_id = hashlib.md5(query.encode()).hexdigest()
conv = CurrencyConverter()
try:
log.debug(len(args))
if len(args) <= 1:
article[0] = types.InlineQueryResultArticle(
id=result_id,
title="Требуется 2, либо 3 аргумента",
description=f"@shirino_bot USD RUB \n@shirino_bot 12 USD RUB",
input_message_content=types.InputTextMessageContent(
message_text="Требуется 2, либо 3 аргумента"
)
)
await inline_query.answer(
article,
cache_time=1,
is_personal=True,
)
if len(args) == 3:
conv.amount = float(args[0])
conv.from_currency = args[1].upper()
conv.conv_currency = args[2].upper()
conv.convert()
elif len(args) == 2:
conv.from_currency = args[0].upper()
conv.conv_currency = args[1].upper()
conv.convert()
result = (
f'{conv.amount} {conv.from_currency} = '
f'{conv.conv_amount} {conv.conv_currency}'
)
except aiohttp.client_exceptions.ClientError as ex:
article[0] = types.InlineQueryResultArticle(
id=result_id,
title="Rate-limit от API Telegram, повторите запрос позже.",
input_message_content=types.InputTextMessageContent(
message_text="Rate-limit от API Telegram, повторите запрос позже.",
)
)
log.debug(ex)
await asyncio.sleep(1)
except Exception as ex:
log.debug(ex)
result = f'{type(ex).__name__}: {ex}'
article[0] = types.InlineQueryResultArticle(
id=result_id,
title=result,
input_message_content=types.InputTextMessageContent(
message_text=result,
),
)
await inline_query.answer(
article, # type: ignore
cache_time=1,
is_personal=True,
)
async def main() -> None:
bot = Bot(settings.telegram_token)
await dp.start_polling(bot)
if __name__ == '__main__':
asyncio.run(main())