Shirino/main.py

224 lines
6.2 KiB
Python

#!/usr/bin/env python3
import aiohttp.client_exceptions
from pydantic.v1 import BaseSettings
from typing import List, Any, Dict, Optional
import logging
from aiogram import Dispatcher, types, Bot
import requests
import json
import hashlib
import asyncio
import re
import string
class Settings(BaseSettings):
debug: bool
coinapi_keys: List[str]
telegram_token: str
class Config:
env_file = '.env'
env_file_encoding = 'utf-8'
settings = Settings()
log = logging.getLogger('shirino')
handler = logging.StreamHandler()
fmt = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
handler.setFormatter(fmt)
log.addHandler(handler)
if settings.debug:
handler.setLevel(logging.DEBUG)
log.setLevel(logging.DEBUG)
coinapi_len = len(settings.coinapi_keys)
coinapi_active = [0]
dp = Dispatcher()
DDG_URL = 'https://duckduckgo.com/js/spice/currency'
COINAPI_URL = 'https://rest.coinapi.io/v1/exchangerate'
class Currency:
def __init__(self) -> None:
self.amount: float = 1.0
self.conv_amount: float = 1.0
self.from_currency = ''
self.conv_currency = ''
def convert(self) -> None:
if not self.ddg():
self.coinapi()
str_amount = f'{self.conv_amount}'
point = str_amount.find('.')
after_point = str_amount[point + 1:]
fnz = min(
(
after_point.index(i)
for i in string.digits[1:]
if i in after_point
),
default=-1,
)
if fnz == -1:
# it is an integer like 81.0
return
# how many digits should be after the point:
# ndigits (3 by default) after first non-zero
ndigits = fnz + 3
self.conv_amount = round(self.conv_amount, ndigits)
def ddg(self) -> float:
"""Получение данных фиатной валюты через DuckDuckGo
e.g: https://duckduckgo.com/js/spice/currency/1/USD/RUB
"""
res = requests.get(f'{DDG_URL}/{self.amount}/{self.from_currency}/{self.conv_currency}')
data: Dict[str, Any] = json.loads(re.findall(r'(.+)\);', res.text)[0])
log.debug(data)
if len(data.get('to')) == 0:
return False
conv: Dict[str, str] = data.get('to')[0]
conv_amount = conv.get("mid")
if conv_amount is None:
print("FUCK")
raise RuntimeError('Ошибка при конвертации валюты через DuckDuckGo')
log.debug(conv)
log.debug(conv_amount)
self.conv_amount = float(conv_amount)
return True
def coinapi(self, depth: int = coinapi_len) -> None:
"""Получение данных с CoinAPI для получения курса криптовалюты
Args:
depth (int, optional): Счетчик, защищающий от бесконечной рекурсии
"""
if depth <= 0:
raise RecursionError('Рейтлимит на всех токенах')
resp = requests.get(
(
f'{COINAPI_URL}/{self.from_currency}'
f'/{self.conv_currency}'
),
headers={
'X-CoinAPI-Key': settings.coinapi_keys[coinapi_active[0]],
},
timeout=3,
)
if resp.status_code == 429:
log.warning('CoinAPI ratelimited, rotating token')
rotate_token(settings.coinapi_keys, coinapi_active)
self.coinapi(depth - 1)
data: Dict[str, Any] = resp.json()
rate = data.get('rate')
if rate is None:
raise RuntimeError('Не удалось получить курс валюты от CoinAPI')
self.conv_amount = float(rate * self.amount)
def rotate_token(lst: List[str], active: List[int]) -> None:
"""Смена API-ключей CoinAPI при ratelimits
Args:
lst (List[str]): Список ключей
active (List[str]): Изменяемый объект с текущим ключевым индексом
"""
active[0] = (active[0] + 1) % len(lst)
@dp.inline_query()
async def currency(inline_query: types.InlineQuery) -> None:
query = inline_query.query
article: List[Optional[types.InlineQueryResultArticle]] = [None]
args = query.split()
result_id = hashlib.md5(query.encode()).hexdigest()
conv = Currency()
try:
if len(args) == 0:
article[0] = types.InlineQueryResultArticle(
id=result_id,
title="Требуется 2, либо 3 аргумента",
description=f"@shirino_bot USD RUB \n@shirino_bot 12 USD RUB",
input_message_content=types.InputTextMessageContent(
message_text="Требуется 2, либо 3 аргумента"
)
)
elif args[0].isdigit() or re.match(r'^-?\d+(?:\.\d+)$', args[0]) is not None:
conv.from_currency = args[1].upper()
conv.conv_currency = args[2].upper()
conv.convert()
elif type(args[0]) is str:
conv.from_currency = args[0].upper()
conv.conv_currency = args[1].upper()
conv.convert()
result_title = f'{conv.amount} {conv.from_currency} = {conv.conv_amount} {conv.conv_currency}'
result_desc = None
except aiohttp.client_exceptions.ClientError:
result_title = 'Произошла ошибка'
result_desc = 'Рейт-лимит от api telegram, попробуйте позже'
await asyncio.sleep(1)
except Exception as ex:
log.debug(ex)
result_title = 'Произошла ошибка'
result_desc = f'{type(ex).__name__}: {ex}'
article[0] = types.InlineQueryResultArticle(
id=result_id,
title=result_title,
description=result_desc,
input_message_content=types.InputTextMessageContent(
message_text=f"{result_title} \n{result_desc}",
),
)
await inline_query.answer(
article,
cache_time=1,
is_personal=True,
)
async def main() -> None:
bot = Bot(settings.telegram_token)
await dp.start_polling(bot)
if __name__ == '__main__':
asyncio.run(main())