Compare commits

...

2 commits

2 changed files with 77 additions and 100 deletions

172
main.py
View file

@ -1,33 +1,24 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import json
import hashlib
import logging
import string
from typing import Any, Dict, List, Optional
import aiohttp.client_exceptions import aiohttp.client_exceptions
import requests
from pydantic.v1 import BaseSettings from pydantic.v1 import BaseSettings
from typing import List, Any, Dict, Optional
import logging
from aiogram import Bot, Dispatcher, types # type: ignore from aiogram import Dispatcher, types, Bot
import requests
import json
import hashlib
import asyncio import asyncio
import re
# Constants import string
DDG_URL = 'https://duckduckgo.com/js/spice/currency'
COINAPI_URL = 'https://rest.coinapi.io/v1/exchangerate'
# ---
# Config from .env
class Settings(BaseSettings): class Settings(BaseSettings):
debug: bool debug: bool
timeout: int = 2
ndigits: int = 3
coinapi_keys: List[str] coinapi_keys: List[str]
telegram_token: str telegram_token: str
@ -36,11 +27,8 @@ class Settings(BaseSettings):
env_file_encoding = 'utf-8' env_file_encoding = 'utf-8'
settings = Settings() # type: ignore settings = Settings()
# ---
# Logging
log = logging.getLogger('shirino') log = logging.getLogger('shirino')
handler = logging.StreamHandler() handler = logging.StreamHandler()
fmt = logging.Formatter('%(asctime)s %(levelname)s %(message)s') fmt = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
@ -51,34 +39,32 @@ log.addHandler(handler)
if settings.debug: if settings.debug:
handler.setLevel(logging.DEBUG) handler.setLevel(logging.DEBUG)
log.setLevel(logging.DEBUG) log.setLevel(logging.DEBUG)
# ---
coinapi_len = len(settings.coinapi_keys) coinapi_len = len(settings.coinapi_keys)
coinapi_active = [0] # API key index coinapi_active = [0]
dp = Dispatcher() dp = Dispatcher()
DDG_URL = 'https://duckduckgo.com/js/spice/currency'
COINAPI_URL = 'https://rest.coinapi.io/v1/exchangerate'
class CurrencyConverter:
class Currency:
def __init__(self) -> None: def __init__(self) -> None:
self.amount: float = 1.0 self.amount: float = 1.0
self.conv_amount: float = 0.0 self.conv_amount: float = 1.0
self.from_currency = '' self.from_currency = ''
self.conv_currency = '' self.conv_currency = ''
def convert(self) -> None: def convert(self) -> None:
"""Currency conversion""" if not self.ddg():
if not self.ddgapi():
self.coinapi() self.coinapi()
str_amount = f'{self.conv_amount}' str_amount = f'{self.conv_amount}'
point = str_amount.find('.') point = str_amount.find('.')
after_point = str_amount[point + 1:] after_point = str_amount[point + 1:]
fnz = min( # index of first non-zero digit fnz = min(
( (
after_point.index(i) after_point.index(i)
for i in string.digits[1:] for i in string.digits[1:]
@ -93,62 +79,43 @@ class CurrencyConverter:
# how many digits should be after the point: # how many digits should be after the point:
# ndigits (3 by default) after first non-zero # ndigits (3 by default) after first non-zero
ndigits = fnz + settings.ndigits ndigits = fnz + 3
self.conv_amount = round(self.conv_amount, ndigits) self.conv_amount = round(self.conv_amount, ndigits)
def ddgapi(self) -> bool: def ddg(self) -> float:
"""Get data from DuckDuckGo's currency API """Получение данных фиатной валюты через DuckDuckGo
Returns: e.g: https://duckduckgo.com/js/spice/currency/1/USD/RUB
`False` if the currency does not exist,
`True` on successful conversion
""" """
# API request res = requests.get(f'{DDG_URL}/{self.amount}/{self.from_currency}/{self.conv_currency}')
resp = requests.get( data: Dict[str, Any] = json.loads(re.findall(r'(.+)\);', res.text)[0])
(
f'{DDG_URL}/{self.amount}/{self.from_currency}'
f'/{self.conv_currency}'
),
timeout=settings.timeout,
)
log.debug(resp.text)
# Parsing JSON data
data: Dict[str, Any] = json.loads(
resp.text
.replace('ddg_spice_currency(', '')
.replace(');', '')
)
log.debug(data) log.debug(data)
# If the currency does not exist if len(data.get('to')) == 0:
error = data.get('to')
if len(error) == 0:
return False return False
# Otherwise
conv: Dict[str, str] = data.get('to')[0] conv: Dict[str, str] = data.get('to')[0]
conv_amount = conv.get('mid') conv_amount = conv.get("mid")
if not conv_amount: if conv_amount is None:
raise RuntimeError('Ошибка при конвертации через DDG') print("FUCK")
raise RuntimeError('Ошибка при конвертации валюты через DuckDuckGo')
self.conv_amount = float(conv_amount)
log.debug(conv) log.debug(conv)
log.debug(conv_amount)
self.conv_amount = float(conv_amount)
return True return True
def coinapi(self, depth: int = coinapi_len) -> None: def coinapi(self, depth: int = coinapi_len) -> None:
"""Get data from CoinAPI (for cryptocurrencies) """Получение данных с CoinAPI для получения курса криптовалюты
Args: Args:
depth (int, optional): Counter protecting from infinite recursion depth (int, optional): Счетчик, защищающий от бесконечной рекурсии
""" """
if depth <= 0: if depth <= 0:
@ -162,7 +129,7 @@ class CurrencyConverter:
headers={ headers={
'X-CoinAPI-Key': settings.coinapi_keys[coinapi_active[0]], 'X-CoinAPI-Key': settings.coinapi_keys[coinapi_active[0]],
}, },
timeout=settings.timeout, timeout=3,
) )
if resp.status_code == 429: if resp.status_code == 429:
@ -178,11 +145,11 @@ class CurrencyConverter:
def rotate_token(lst: List[str], active: List[int]) -> None: def rotate_token(lst: List[str], active: List[int]) -> None:
"""Rotates API key to prevent ratelimits """Смена API-ключей CoinAPI при ratelimits
Args: Args:
lst (List[str]): Keys list lst (List[str]): Список ключей
active (List[str]): Mutable object with current key index active (List[str]): Изменяемый объект с текущим ключевым индексом
""" """
active[0] = (active[0] + 1) % len(lst) active[0] = (active[0] + 1) % len(lst)
@ -190,50 +157,59 @@ def rotate_token(lst: List[str], active: List[int]) -> None:
@dp.inline_query() @dp.inline_query()
async def currency(inline_query: types.InlineQuery) -> None: async def currency(inline_query: types.InlineQuery) -> None:
query = inline_query.query query = inline_query.query
article: List[Optional[types.InlineQueryResultArticle]] = [None] article: List[Optional[types.InlineQueryResultArticle]] = [None]
text = query.split() args = query.split()
len_ = len(text)
result_id = hashlib.md5(query.encode()).hexdigest() result_id = hashlib.md5(query.encode()).hexdigest()
conv = CurrencyConverter() conv = Currency()
try: try:
if len_ == 3: if len(args) == 0:
conv.amount = float(text[0]) article[0] = types.InlineQueryResultArticle(
conv.from_currency = text[1].upper() id=result_id,
conv.conv_currency = text[2].upper() title="Требуется 2, либо 3 аргумента",
conv.convert() description=f"@shirino_bot USD RUB \n@shirino_bot 12 USD RUB",
elif len_ == 2: input_message_content=types.InputTextMessageContent(
conv.from_currency = text[0].upper() message_text="Требуется 2, либо 3 аргумента"
conv.conv_currency = text[1].upper() )
conv.convert() )
else:
raise ValueError('Надо 2 или 3 аргумента') elif args[0].isdigit() or re.match(r'^-?\d+(?:\.\d+)$', args[0]) is not None:
conv.from_currency = args[1].upper()
conv.conv_currency = args[2].upper()
conv.convert()
elif type(args[0]) is str:
conv.from_currency = args[0].upper()
conv.conv_currency = args[1].upper()
conv.convert()
result_title = f'{conv.amount} {conv.from_currency} = {conv.conv_amount} {conv.conv_currency}'
result_desc = None
result = (
f'{conv.amount} {conv.from_currency} = '
f'{conv.conv_amount} {conv.conv_currency}'
)
except aiohttp.client_exceptions.ClientError: except aiohttp.client_exceptions.ClientError:
result = "Рейт-лимит от api telegram, попробуйте позже" result_title = 'Произошла ошибка'
result_desc = 'Рейт-лимит от api telegram, попробуйте позже'
await asyncio.sleep(1)
except Exception as ex: except Exception as ex:
result = f'{type(ex).__name__}: {ex}' log.debug(ex)
result_title = 'Произошла ошибка'
result_desc = f'{type(ex).__name__}: {ex}'
article[0] = types.InlineQueryResultArticle( article[0] = types.InlineQueryResultArticle(
id=result_id, id=result_id,
title=result, title=result_title,
description=result_desc,
input_message_content=types.InputTextMessageContent( input_message_content=types.InputTextMessageContent(
message_text=result, message_text=f"{result_title} \n{result_desc}",
), ),
) )
await inline_query.answer( await inline_query.answer(
article, # type: ignore article,
cache_time=1, cache_time=1,
is_personal=True, is_personal=True,
) )

View file

@ -1,3 +1,4 @@
requests==2.31.0 requests==2.31.0
pydantic[dotenv]==2.4.2 aiogram==3.1.1
aiogram==3.1.1 aiohttp==3.8.5
pydantic[dotenv]==2.5.2