Compare commits

..

No commits in common. "4b47f618406f12c24f4d5350fe12c9ad0e238432" and "53c861698b1b220579497a61eba4da0eed2fa02c" have entirely different histories.

2 changed files with 98 additions and 75 deletions

168
main.py
View file

@ -1,24 +1,33 @@
#!/usr/bin/env python3
import aiohttp.client_exceptions
from pydantic.v1 import BaseSettings
from typing import List, Any, Dict, Optional
import logging
from aiogram import Dispatcher, types, Bot
import requests
import json
import hashlib
import logging
import string
from typing import Any, Dict, List, Optional
import aiohttp.client_exceptions
import requests
from pydantic.v1 import BaseSettings
from aiogram import Bot, Dispatcher, types # type: ignore
import asyncio
import re
import string
# Constants
DDG_URL = 'https://duckduckgo.com/js/spice/currency'
COINAPI_URL = 'https://rest.coinapi.io/v1/exchangerate'
# ---
# Config from .env
class Settings(BaseSettings):
debug: bool
timeout: int = 2
ndigits: int = 3
coinapi_keys: List[str]
telegram_token: str
@ -27,8 +36,11 @@ class Settings(BaseSettings):
env_file_encoding = 'utf-8'
settings = Settings()
settings = Settings() # type: ignore
# ---
# Logging
log = logging.getLogger('shirino')
handler = logging.StreamHandler()
fmt = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
@ -39,32 +51,34 @@ log.addHandler(handler)
if settings.debug:
handler.setLevel(logging.DEBUG)
log.setLevel(logging.DEBUG)
# ---
coinapi_len = len(settings.coinapi_keys)
coinapi_active = [0]
coinapi_active = [0] # API key index
dp = Dispatcher()
DDG_URL = 'https://duckduckgo.com/js/spice/currency'
COINAPI_URL = 'https://rest.coinapi.io/v1/exchangerate'
class CurrencyConverter:
class Currency:
def __init__(self) -> None:
self.amount: float = 1.0
self.conv_amount: float = 1.0
self.conv_amount: float = 0.0
self.from_currency = ''
self.conv_currency = ''
def convert(self) -> None:
if not self.ddg():
"""Currency conversion"""
if not self.ddgapi():
self.coinapi()
str_amount = f'{self.conv_amount}'
point = str_amount.find('.')
after_point = str_amount[point + 1:]
fnz = min(
fnz = min( # index of first non-zero digit
(
after_point.index(i)
for i in string.digits[1:]
@ -79,43 +93,62 @@ class Currency:
# how many digits should be after the point:
# ndigits (3 by default) after first non-zero
ndigits = fnz + 3
ndigits = fnz + settings.ndigits
self.conv_amount = round(self.conv_amount, ndigits)
def ddg(self) -> float:
"""Получение данных фиатной валюты через DuckDuckGo
def ddgapi(self) -> bool:
"""Get data from DuckDuckGo's currency API
e.g: https://duckduckgo.com/js/spice/currency/1/USD/RUB
Returns:
`False` if the currency does not exist,
`True` on successful conversion
"""
res = requests.get(f'{DDG_URL}/{self.amount}/{self.from_currency}/{self.conv_currency}')
data: Dict[str, Any] = json.loads(re.findall(r'(.+)\);', res.text)[0])
# API request
resp = requests.get(
(
f'{DDG_URL}/{self.amount}/{self.from_currency}'
f'/{self.conv_currency}'
),
timeout=settings.timeout,
)
log.debug(resp.text)
# Parsing JSON data
data: Dict[str, Any] = json.loads(
resp.text
.replace('ddg_spice_currency(', '')
.replace(');', '')
)
log.debug(data)
if len(data.get('to')) == 0:
# If the currency does not exist
error = data.get('to')
if len(error) == 0:
return False
# Otherwise
conv: Dict[str, str] = data.get('to')[0]
conv_amount = conv.get("mid")
conv_amount = conv.get('mid')
if conv_amount is None:
print("FUCK")
raise RuntimeError('Ошибка при конвертации валюты через DuckDuckGo')
log.debug(conv)
log.debug(conv_amount)
if not conv_amount:
raise RuntimeError('Ошибка при конвертации через DDG')
self.conv_amount = float(conv_amount)
log.debug(conv)
return True
def coinapi(self, depth: int = coinapi_len) -> None:
"""Получение данных с CoinAPI для получения курса криптовалюты
"""Get data from CoinAPI (for cryptocurrencies)
Args:
depth (int, optional): Счетчик, защищающий от бесконечной рекурсии
depth (int, optional): Counter protecting from infinite recursion
"""
if depth <= 0:
@ -129,7 +162,7 @@ class Currency:
headers={
'X-CoinAPI-Key': settings.coinapi_keys[coinapi_active[0]],
},
timeout=3,
timeout=settings.timeout,
)
if resp.status_code == 429:
@ -145,11 +178,11 @@ class Currency:
def rotate_token(lst: List[str], active: List[int]) -> None:
"""Смена API-ключей CoinAPI при ratelimits
"""Rotates API key to prevent ratelimits
Args:
lst (List[str]): Список ключей
active (List[str]): Изменяемый объект с текущим ключевым индексом
lst (List[str]): Keys list
active (List[str]): Mutable object with current key index
"""
active[0] = (active[0] + 1) % len(lst)
@ -157,59 +190,50 @@ def rotate_token(lst: List[str], active: List[int]) -> None:
@dp.inline_query()
async def currency(inline_query: types.InlineQuery) -> None:
query = inline_query.query
article: List[Optional[types.InlineQueryResultArticle]] = [None]
args = query.split()
text = query.split()
len_ = len(text)
result_id = hashlib.md5(query.encode()).hexdigest()
conv = Currency()
conv = CurrencyConverter()
try:
if len(args) == 0:
article[0] = types.InlineQueryResultArticle(
id=result_id,
title="Требуется 2, либо 3 аргумента",
description=f"@shirino_bot USD RUB \n@shirino_bot 12 USD RUB",
input_message_content=types.InputTextMessageContent(
message_text="Требуется 2, либо 3 аргумента"
)
)
elif args[0].isdigit() or re.match(r'^-?\d+(?:\.\d+)$', args[0]) is not None:
conv.from_currency = args[1].upper()
conv.conv_currency = args[2].upper()
if len_ == 3:
conv.amount = float(text[0])
conv.from_currency = text[1].upper()
conv.conv_currency = text[2].upper()
conv.convert()
elif type(args[0]) is str:
conv.from_currency = args[0].upper()
conv.conv_currency = args[1].upper()
elif len_ == 2:
conv.from_currency = text[0].upper()
conv.conv_currency = text[1].upper()
conv.convert()
else:
raise ValueError('Надо 2 или 3 аргумента')
result_title = f'{conv.amount} {conv.from_currency} = {conv.conv_amount} {conv.conv_currency}'
result_desc = None
result = (
f'{conv.amount} {conv.from_currency} = '
f'{conv.conv_amount} {conv.conv_currency}'
)
except aiohttp.client_exceptions.ClientError:
result_title = 'Произошла ошибка'
result_desc = 'Рейт-лимит от api telegram, попробуйте позже'
await asyncio.sleep(1)
result = "Рейт-лимит от api telegram, попробуйте позже"
except Exception as ex:
log.debug(ex)
result_title = 'Произошла ошибка'
result_desc = f'{type(ex).__name__}: {ex}'
result = f'{type(ex).__name__}: {ex}'
article[0] = types.InlineQueryResultArticle(
id=result_id,
title=result_title,
description=result_desc,
title=result,
input_message_content=types.InputTextMessageContent(
message_text=f"{result_title} \n{result_desc}",
message_text=result,
),
)
await inline_query.answer(
article,
article, # type: ignore
cache_time=1,
is_personal=True,
)

View file

@ -1,4 +1,3 @@
requests==2.31.0
aiogram==3.1.1
aiohttp==3.8.5
pydantic[dotenv]==2.5.2
pydantic[dotenv]==2.4.2
aiogram==3.1.1