Compare commits

..

No commits in common. "6b943ad40abb425d8943b60a11f343c47e502d37" and "4b47f618406f12c24f4d5350fe12c9ad0e238432" have entirely different histories.

173
main.py
View file

@ -1,32 +1,24 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import asyncio
import hashlib
import json
import logging
import re
import string
from typing import List, Any, Dict, Optional
import requests
from aiogram import Dispatcher, types, Bot
import aiohttp.client_exceptions import aiohttp.client_exceptions
from pydantic.v1 import BaseSettings from pydantic.v1 import BaseSettings
from typing import List, Any, Dict, Optional
import logging
# Constants from aiogram import Dispatcher, types, Bot
DDG_URL = 'https://duckduckgo.com/js/spice/currency'
COINAPI_URL = 'https://rest.coinapi.io/v1/exchangerate' import requests
import json
import hashlib
import asyncio
import re
import string
# ---
# Config from .env
class Settings(BaseSettings): class Settings(BaseSettings):
debug: bool debug: bool
timeout: int = 2
ndigits: int = 3
coinapi_keys: List[str] coinapi_keys: List[str]
telegram_token: str telegram_token: str
@ -35,11 +27,8 @@ class Settings(BaseSettings):
env_file_encoding = 'utf-8' env_file_encoding = 'utf-8'
settings = Settings() # type: ignore settings = Settings()
# ---
# Logging
log = logging.getLogger('shirino') log = logging.getLogger('shirino')
handler = logging.StreamHandler() handler = logging.StreamHandler()
fmt = logging.Formatter('%(asctime)s %(levelname)s %(message)s') fmt = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
@ -50,34 +39,32 @@ log.addHandler(handler)
if settings.debug: if settings.debug:
handler.setLevel(logging.DEBUG) handler.setLevel(logging.DEBUG)
log.setLevel(logging.DEBUG) log.setLevel(logging.DEBUG)
# ---
coinapi_len = len(settings.coinapi_keys) coinapi_len = len(settings.coinapi_keys)
coinapi_active = [0] # API key index coinapi_active = [0]
dp = Dispatcher() dp = Dispatcher()
DDG_URL = 'https://duckduckgo.com/js/spice/currency'
COINAPI_URL = 'https://rest.coinapi.io/v1/exchangerate'
class CurrencyConverter:
class Currency:
def __init__(self) -> None: def __init__(self) -> None:
self.amount: float = 1.0 self.amount: float = 1.0
self.conv_amount: float = 0.0 self.conv_amount: float = 1.0
self.from_currency = '' self.from_currency = ''
self.conv_currency = '' self.conv_currency = ''
def convert(self) -> None: def convert(self) -> None:
"""Currency conversion""" if not self.ddg():
if not self.ddgapi():
self.coinapi() self.coinapi()
str_amount = f'{self.conv_amount}' str_amount = f'{self.conv_amount}'
point = str_amount.find(".") point = str_amount.find('.')
after_point = str_amount[point + 1:] after_point = str_amount[point + 1:]
fnz = min( # index of first non-zero digit fnz = min(
( (
after_point.index(i) after_point.index(i)
for i in string.digits[1:] for i in string.digits[1:]
@ -92,21 +79,16 @@ class CurrencyConverter:
# how many digits should be after the point: # how many digits should be after the point:
# ndigits (3 by default) after first non-zero # ndigits (3 by default) after first non-zero
ndigits = fnz + settings.ndigits ndigits = fnz + 3
self.conv_amount = round(self.conv_amount, ndigits) self.conv_amount = round(self.conv_amount, ndigits)
def ddgapi(self) -> bool: def ddg(self) -> float:
"""Получение данных фиатной валюты через DuckDuckGo """Получение данных фиатной валюты через DuckDuckGo
e.g: https://duckduckgo.com/js/spice/currency/1/USD/RUB e.g: https://duckduckgo.com/js/spice/currency/1/USD/RUB
Returns:
`False` если валюты нет в API
`True` если конвертация прошла успешно
""" """
# Запрос к API
res = requests.get(f'{DDG_URL}/{self.amount}/{self.from_currency}/{self.conv_currency}') res = requests.get(f'{DDG_URL}/{self.amount}/{self.from_currency}/{self.conv_currency}')
data: Dict[str, Any] = json.loads(re.findall(r'(.+)\);', res.text)[0]) data: Dict[str, Any] = json.loads(re.findall(r'(.+)\);', res.text)[0])
@ -119,6 +101,7 @@ class CurrencyConverter:
conv_amount = conv.get("mid") conv_amount = conv.get("mid")
if conv_amount is None: if conv_amount is None:
print("FUCK")
raise RuntimeError('Ошибка при конвертации валюты через DuckDuckGo') raise RuntimeError('Ошибка при конвертации валюты через DuckDuckGo')
log.debug(conv) log.debug(conv)
@ -132,7 +115,7 @@ class CurrencyConverter:
"""Получение данных с CoinAPI для получения курса криптовалюты """Получение данных с CoinAPI для получения курса криптовалюты
Args: Args:
depth (int, optional): Счетчик, защищающий от рекурсии depth (int, optional): Счетчик, защищающий от бесконечной рекурсии
""" """
if depth <= 0: if depth <= 0:
@ -146,7 +129,7 @@ class CurrencyConverter:
headers={ headers={
'X-CoinAPI-Key': settings.coinapi_keys[coinapi_active[0]], 'X-CoinAPI-Key': settings.coinapi_keys[coinapi_active[0]],
}, },
timeout=settings.timeout, timeout=3,
) )
if resp.status_code == 429: if resp.status_code == 429:
@ -172,15 +155,57 @@ def rotate_token(lst: List[str], active: List[int]) -> None:
active[0] = (active[0] + 1) % len(lst) active[0] = (active[0] + 1) % len(lst)
async def inline_reply(result_id: str, title: str, description: str or None, inline_query: types.InlineQuery) -> None: @dp.inline_query()
async def currency(inline_query: types.InlineQuery) -> None:
query = inline_query.query
article: List[Optional[types.InlineQueryResultArticle]] = [None] article: List[Optional[types.InlineQueryResultArticle]] = [None]
args = query.split()
result_id = hashlib.md5(query.encode()).hexdigest()
conv = Currency()
try:
if len(args) == 0:
article[0] = types.InlineQueryResultArticle(
id=result_id,
title="Требуется 2, либо 3 аргумента",
description=f"@shirino_bot USD RUB \n@shirino_bot 12 USD RUB",
input_message_content=types.InputTextMessageContent(
message_text="Требуется 2, либо 3 аргумента"
)
)
elif args[0].isdigit() or re.match(r'^-?\d+(?:\.\d+)$', args[0]) is not None:
conv.from_currency = args[1].upper()
conv.conv_currency = args[2].upper()
conv.convert()
elif type(args[0]) is str:
conv.from_currency = args[0].upper()
conv.conv_currency = args[1].upper()
conv.convert()
result_title = f'{conv.amount} {conv.from_currency} = {conv.conv_amount} {conv.conv_currency}'
result_desc = None
except aiohttp.client_exceptions.ClientError:
result_title = 'Произошла ошибка'
result_desc = 'Рейт-лимит от api telegram, попробуйте позже'
await asyncio.sleep(1)
except Exception as ex:
log.debug(ex)
result_title = 'Произошла ошибка'
result_desc = f'{type(ex).__name__}: {ex}'
article[0] = types.InlineQueryResultArticle( article[0] = types.InlineQueryResultArticle(
id=result_id, id=result_id,
title=title, title=result_title,
description=description, description=result_desc,
input_message_content=types.InputTextMessageContent( input_message_content=types.InputTextMessageContent(
message_text=title message_text=f"{result_title} \n{result_desc}",
) ),
) )
await inline_query.answer( await inline_query.answer(
@ -190,54 +215,6 @@ async def inline_reply(result_id: str, title: str, description: str or None, inl
) )
@dp.inline_query()
async def currency(inline_query: types.InlineQuery) -> None:
query = inline_query.query
args = query.split()
result_id = hashlib.md5(query.encode()).hexdigest()
conv = CurrencyConverter()
try:
log.debug(len(args))
if len(args) <= 1:
await inline_reply(result_id,
"Требуется 2, либо 3 аргумента",
f"@shirino_bot USD RUB \n@shirino_bot 12 USD RUB", inline_query)
if len(args) == 3:
conv.amount = float(args[0])
conv.from_currency = args[1].upper()
conv.conv_currency = args[2].upper()
conv.convert()
elif len(args) == 2:
conv.from_currency = args[0].upper()
conv.conv_currency = args[1].upper()
conv.convert()
result = (
f'{conv.amount} {conv.from_currency} = '
f'{conv.conv_amount} {conv.conv_currency}'
)
except aiohttp.client_exceptions.ClientError as ex:
await inline_reply(result_id,
"Rate-limit от API Telegram, повторите запрос позже",
None,
inline_query)
log.debug(ex)
await asyncio.sleep(1)
except Exception as ex:
log.debug(ex)
await inline_reply(result_id, "Неверный формат данных",
"@shirino_bot USD RUB \n@shirino_bot 12 USD RUB",
inline_query)
await inline_reply(result_id, result, None, inline_query)
async def main() -> None: async def main() -> None:
bot = Bot(settings.telegram_token) bot = Bot(settings.telegram_token)
await dp.start_polling(bot) await dp.start_polling(bot)