Compare commits

..

No commits in common. "4b47f618406f12c24f4d5350fe12c9ad0e238432" and "53c861698b1b220579497a61eba4da0eed2fa02c" have entirely different histories.

2 changed files with 98 additions and 75 deletions

168
main.py
View file

@ -1,24 +1,33 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import aiohttp.client_exceptions
from pydantic.v1 import BaseSettings
from typing import List, Any, Dict, Optional
import logging
from aiogram import Dispatcher, types, Bot
import requests
import json import json
import hashlib import hashlib
import logging
import string
from typing import Any, Dict, List, Optional
import aiohttp.client_exceptions
import requests
from pydantic.v1 import BaseSettings
from aiogram import Bot, Dispatcher, types # type: ignore
import asyncio import asyncio
import re
import string # Constants
DDG_URL = 'https://duckduckgo.com/js/spice/currency'
COINAPI_URL = 'https://rest.coinapi.io/v1/exchangerate'
# ---
# Config from .env
class Settings(BaseSettings): class Settings(BaseSettings):
debug: bool debug: bool
timeout: int = 2
ndigits: int = 3
coinapi_keys: List[str] coinapi_keys: List[str]
telegram_token: str telegram_token: str
@ -27,8 +36,11 @@ class Settings(BaseSettings):
env_file_encoding = 'utf-8' env_file_encoding = 'utf-8'
settings = Settings() settings = Settings() # type: ignore
# ---
# Logging
log = logging.getLogger('shirino') log = logging.getLogger('shirino')
handler = logging.StreamHandler() handler = logging.StreamHandler()
fmt = logging.Formatter('%(asctime)s %(levelname)s %(message)s') fmt = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
@ -39,32 +51,34 @@ log.addHandler(handler)
if settings.debug: if settings.debug:
handler.setLevel(logging.DEBUG) handler.setLevel(logging.DEBUG)
log.setLevel(logging.DEBUG) log.setLevel(logging.DEBUG)
# ---
coinapi_len = len(settings.coinapi_keys) coinapi_len = len(settings.coinapi_keys)
coinapi_active = [0] coinapi_active = [0] # API key index
dp = Dispatcher() dp = Dispatcher()
DDG_URL = 'https://duckduckgo.com/js/spice/currency'
COINAPI_URL = 'https://rest.coinapi.io/v1/exchangerate'
class CurrencyConverter:
class Currency:
def __init__(self) -> None: def __init__(self) -> None:
self.amount: float = 1.0 self.amount: float = 1.0
self.conv_amount: float = 1.0 self.conv_amount: float = 0.0
self.from_currency = '' self.from_currency = ''
self.conv_currency = '' self.conv_currency = ''
def convert(self) -> None: def convert(self) -> None:
if not self.ddg(): """Currency conversion"""
if not self.ddgapi():
self.coinapi() self.coinapi()
str_amount = f'{self.conv_amount}' str_amount = f'{self.conv_amount}'
point = str_amount.find('.') point = str_amount.find('.')
after_point = str_amount[point + 1:] after_point = str_amount[point + 1:]
fnz = min( fnz = min( # index of first non-zero digit
( (
after_point.index(i) after_point.index(i)
for i in string.digits[1:] for i in string.digits[1:]
@ -79,43 +93,62 @@ class Currency:
# how many digits should be after the point: # how many digits should be after the point:
# ndigits (3 by default) after first non-zero # ndigits (3 by default) after first non-zero
ndigits = fnz + 3 ndigits = fnz + settings.ndigits
self.conv_amount = round(self.conv_amount, ndigits) self.conv_amount = round(self.conv_amount, ndigits)
def ddg(self) -> float: def ddgapi(self) -> bool:
"""Получение данных фиатной валюты через DuckDuckGo """Get data from DuckDuckGo's currency API
e.g: https://duckduckgo.com/js/spice/currency/1/USD/RUB Returns:
`False` if the currency does not exist,
`True` on successful conversion
""" """
res = requests.get(f'{DDG_URL}/{self.amount}/{self.from_currency}/{self.conv_currency}') # API request
data: Dict[str, Any] = json.loads(re.findall(r'(.+)\);', res.text)[0]) resp = requests.get(
(
f'{DDG_URL}/{self.amount}/{self.from_currency}'
f'/{self.conv_currency}'
),
timeout=settings.timeout,
)
log.debug(resp.text)
# Parsing JSON data
data: Dict[str, Any] = json.loads(
resp.text
.replace('ddg_spice_currency(', '')
.replace(');', '')
)
log.debug(data) log.debug(data)
if len(data.get('to')) == 0: # If the currency does not exist
error = data.get('to')
if len(error) == 0:
return False return False
# Otherwise
conv: Dict[str, str] = data.get('to')[0] conv: Dict[str, str] = data.get('to')[0]
conv_amount = conv.get("mid") conv_amount = conv.get('mid')
if conv_amount is None: if not conv_amount:
print("FUCK") raise RuntimeError('Ошибка при конвертации через DDG')
raise RuntimeError('Ошибка при конвертации валюты через DuckDuckGo')
log.debug(conv)
log.debug(conv_amount)
self.conv_amount = float(conv_amount) self.conv_amount = float(conv_amount)
log.debug(conv)
return True return True
def coinapi(self, depth: int = coinapi_len) -> None: def coinapi(self, depth: int = coinapi_len) -> None:
"""Получение данных с CoinAPI для получения курса криптовалюты """Get data from CoinAPI (for cryptocurrencies)
Args: Args:
depth (int, optional): Счетчик, защищающий от бесконечной рекурсии depth (int, optional): Counter protecting from infinite recursion
""" """
if depth <= 0: if depth <= 0:
@ -129,7 +162,7 @@ class Currency:
headers={ headers={
'X-CoinAPI-Key': settings.coinapi_keys[coinapi_active[0]], 'X-CoinAPI-Key': settings.coinapi_keys[coinapi_active[0]],
}, },
timeout=3, timeout=settings.timeout,
) )
if resp.status_code == 429: if resp.status_code == 429:
@ -145,11 +178,11 @@ class Currency:
def rotate_token(lst: List[str], active: List[int]) -> None: def rotate_token(lst: List[str], active: List[int]) -> None:
"""Смена API-ключей CoinAPI при ratelimits """Rotates API key to prevent ratelimits
Args: Args:
lst (List[str]): Список ключей lst (List[str]): Keys list
active (List[str]): Изменяемый объект с текущим ключевым индексом active (List[str]): Mutable object with current key index
""" """
active[0] = (active[0] + 1) % len(lst) active[0] = (active[0] + 1) % len(lst)
@ -157,59 +190,50 @@ def rotate_token(lst: List[str], active: List[int]) -> None:
@dp.inline_query() @dp.inline_query()
async def currency(inline_query: types.InlineQuery) -> None: async def currency(inline_query: types.InlineQuery) -> None:
query = inline_query.query query = inline_query.query
article: List[Optional[types.InlineQueryResultArticle]] = [None] article: List[Optional[types.InlineQueryResultArticle]] = [None]
args = query.split() text = query.split()
len_ = len(text)
result_id = hashlib.md5(query.encode()).hexdigest() result_id = hashlib.md5(query.encode()).hexdigest()
conv = Currency() conv = CurrencyConverter()
try: try:
if len(args) == 0: if len_ == 3:
article[0] = types.InlineQueryResultArticle( conv.amount = float(text[0])
id=result_id, conv.from_currency = text[1].upper()
title="Требуется 2, либо 3 аргумента", conv.conv_currency = text[2].upper()
description=f"@shirino_bot USD RUB \n@shirino_bot 12 USD RUB",
input_message_content=types.InputTextMessageContent(
message_text="Требуется 2, либо 3 аргумента"
)
)
elif args[0].isdigit() or re.match(r'^-?\d+(?:\.\d+)$', args[0]) is not None:
conv.from_currency = args[1].upper()
conv.conv_currency = args[2].upper()
conv.convert() conv.convert()
elif type(args[0]) is str: elif len_ == 2:
conv.from_currency = args[0].upper() conv.from_currency = text[0].upper()
conv.conv_currency = args[1].upper() conv.conv_currency = text[1].upper()
conv.convert() conv.convert()
else:
raise ValueError('Надо 2 или 3 аргумента')
result_title = f'{conv.amount} {conv.from_currency} = {conv.conv_amount} {conv.conv_currency}' result = (
result_desc = None f'{conv.amount} {conv.from_currency} = '
f'{conv.conv_amount} {conv.conv_currency}'
)
except aiohttp.client_exceptions.ClientError: except aiohttp.client_exceptions.ClientError:
result_title = 'Произошла ошибка' result = "Рейт-лимит от api telegram, попробуйте позже"
result_desc = 'Рейт-лимит от api telegram, попробуйте позже'
await asyncio.sleep(1)
except Exception as ex: except Exception as ex:
log.debug(ex) result = f'{type(ex).__name__}: {ex}'
result_title = 'Произошла ошибка'
result_desc = f'{type(ex).__name__}: {ex}'
article[0] = types.InlineQueryResultArticle( article[0] = types.InlineQueryResultArticle(
id=result_id, id=result_id,
title=result_title, title=result,
description=result_desc,
input_message_content=types.InputTextMessageContent( input_message_content=types.InputTextMessageContent(
message_text=f"{result_title} \n{result_desc}", message_text=result,
), ),
) )
await inline_query.answer( await inline_query.answer(
article, article, # type: ignore
cache_time=1, cache_time=1,
is_personal=True, is_personal=True,
) )

View file

@ -1,4 +1,3 @@
requests==2.31.0 requests==2.31.0
aiogram==3.1.1 pydantic[dotenv]==2.4.2
aiohttp==3.8.5 aiogram==3.1.1
pydantic[dotenv]==2.5.2