#!/usr/bin/env python3 import asyncio import hashlib import json import logging import re import string from typing import List, Any, Dict, Optional import requests from aiogram import Dispatcher, types, Bot import aiohttp.client_exceptions from pydantic.v1 import BaseSettings # Constants DDG_URL = 'https://duckduckgo.com/js/spice/currency' COINAPI_URL = 'https://rest.coinapi.io/v1/exchangerate' # --- # Config from .env class Settings(BaseSettings): debug: bool timeout: int = 2 ndigits: int = 3 coinapi_keys: List[str] telegram_token: str class Config: env_file = '.env' env_file_encoding = 'utf-8' settings = Settings() # type: ignore # --- # Logging log = logging.getLogger('shirino') handler = logging.StreamHandler() fmt = logging.Formatter('%(asctime)s %(levelname)s %(message)s') handler.setFormatter(fmt) log.addHandler(handler) if settings.debug: handler.setLevel(logging.DEBUG) log.setLevel(logging.DEBUG) # --- coinapi_len = len(settings.coinapi_keys) coinapi_active = [0] # API key index dp = Dispatcher() class CurrencyConverter: def __init__(self) -> None: self.amount: float = 1.0 self.conv_amount: float = 0.0 self.from_currency = '' self.conv_currency = '' def convert(self) -> None: """Currency conversion""" if not self.ddgapi(): self.coinapi() str_amount = f'{self.conv_amount}' point = str_amount.find(".") after_point = str_amount[point + 1:] fnz = min( # index of first non-zero digit ( after_point.index(i) for i in string.digits[1:] if i in after_point ), default=-1, ) if fnz == -1: # it is an integer like 81.0 return # how many digits should be after the point: # ndigits (3 by default) after first non-zero ndigits = fnz + settings.ndigits self.conv_amount = round(self.conv_amount, ndigits) def ddgapi(self) -> bool: """Получение данных фиатной валюты через DuckDuckGo e.g: https://duckduckgo.com/js/spice/currency/1/USD/RUB Returns: `False` если валюты нет в API `True` если конвертация прошла успешно """ # Запрос к API res = requests.get(f'{DDG_URL}/{self.amount}/{self.from_currency}/{self.conv_currency}') data: Dict[str, Any] = json.loads(re.findall(r'(.+)\);', res.text)[0]) log.debug(data) if len(data.get('to')) == 0: return False conv: Dict[str, str] = data.get('to')[0] conv_amount = conv.get("mid") if conv_amount is None: raise RuntimeError('Ошибка при конвертации валюты через DuckDuckGo') log.debug(conv) log.debug(conv_amount) self.conv_amount = float(conv_amount) return True def coinapi(self, depth: int = coinapi_len) -> None: """Получение данных с CoinAPI для получения курса криптовалюты Args: depth (int, optional): Счетчик, защищающий от рекурсии """ if depth <= 0: raise RecursionError('Рейтлимит на всех токенах') resp = requests.get( ( f'{COINAPI_URL}/{self.from_currency}' f'/{self.conv_currency}' ), headers={ 'X-CoinAPI-Key': settings.coinapi_keys[coinapi_active[0]], }, timeout=settings.timeout, ) if resp.status_code == 429: log.warning('CoinAPI ratelimited, rotating token') rotate_token(settings.coinapi_keys, coinapi_active) self.coinapi(depth - 1) data: Dict[str, Any] = resp.json() rate = data.get('rate') if rate is None: raise RuntimeError('Не удалось получить курс валюты от CoinAPI') self.conv_amount = float(rate * self.amount) def rotate_token(lst: List[str], active: List[int]) -> None: """Смена API-ключей CoinAPI при ratelimits Args: lst (List[str]): Список ключей active (List[str]): Изменяемый объект с текущим ключевым индексом """ active[0] = (active[0] + 1) % len(lst) @dp.inline_query() async def currency(inline_query: types.InlineQuery) -> None: query = inline_query.query article: List[Optional[types.InlineQueryResultArticle]] = [None] args = query.split() result_id = hashlib.md5(query.encode()).hexdigest() conv = CurrencyConverter() try: if len(args) == 3: conv.amount = float(args[0]) conv.from_currency = args[1].upper() conv.conv_currency = args[2].upper() conv.convert() elif len(args) == 2: conv.from_currency = args[0].upper() conv.conv_currency = args[1].upper() conv.convert() else: raise ValueError('Надо 2 или 3 аргумента') result = ( f'{conv.amount} {conv.from_currency} = ' f'{conv.conv_amount} {conv.conv_currency}' ) except aiohttp.client_exceptions.ClientError as ex: article[0] = types.InlineQueryResultArticle( id=result_id, title="Rate-limit от API Telegram, повторите запрос позже.", input_message_content=types.InputTextMessageContent( message_text="Rate-limit от API Telegram, повторите запрос позже.", ) ) log.debug(ex) await asyncio.sleep(1) except Exception as ex: log.debug(ex) result = f'{type(ex).__name__}: {ex}' article[0] = types.InlineQueryResultArticle( id=result_id, title=result, input_message_content=types.InputTextMessageContent( message_text=result, ), ) await inline_query.answer( article, # type: ignore cache_time=1, is_personal=True, ) async def main() -> None: bot = Bot(settings.telegram_token) await dp.start_polling(bot) if __name__ == '__main__': asyncio.run(main())