Shirino/main.py

197 lines
4.6 KiB
Python
Raw Normal View History

2023-05-30 13:47:06 +03:00
#!/usr/bin/env python3
2023-03-08 13:31:45 +03:00
import json
import hashlib
2023-05-30 13:33:41 +03:00
import logging
import string
2023-05-30 13:33:41 +03:00
from typing import Any, Dict, List, Optional
2023-03-08 13:31:45 +03:00
2023-05-30 13:33:41 +03:00
import requests
import math
2023-03-08 13:31:45 +03:00
2023-05-30 13:33:41 +03:00
from pydantic import BaseSettings
2023-03-08 13:31:45 +03:00
2023-05-30 13:47:06 +03:00
from aiogram import Bot # type: ignore
from aiogram.dispatcher import Dispatcher # type: ignore
from aiogram.utils import executor # type: ignore
2023-03-08 13:31:45 +03:00
2023-05-30 13:47:06 +03:00
from aiogram.types import InlineQuery # type: ignore
2023-05-30 13:33:41 +03:00
from aiogram.types import InlineQueryResultArticle
from aiogram.types import InputTextMessageContent
2023-03-08 13:31:45 +03:00
2023-05-30 13:33:41 +03:00
# Constants
DDG_URL = 'https://duckduckgo.com/js/spice/currency'
COINAPI_URL = 'https://rest.coinapi.io/v1/exchangerate'
# ---
2023-05-30 13:47:06 +03:00
2023-05-30 13:33:41 +03:00
# Config from .env
class Settings(BaseSettings):
debug: bool
2023-05-30 13:47:06 +03:00
timeout: int = 2
2023-05-30 13:33:41 +03:00
coinapi_key: str
telegram_token: str
2023-05-30 13:33:41 +03:00
class Config:
env_file = '.env'
env_file_encoding = 'utf-8'
2023-05-30 13:47:06 +03:00
settings = Settings() # type: ignore
2023-05-30 13:33:41 +03:00
# ---
2023-05-30 13:47:06 +03:00
2023-05-30 13:33:41 +03:00
# Logging
log = logging.getLogger('shirino')
handler = logging.StreamHandler()
fmt = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
2023-03-08 13:31:45 +03:00
2023-05-30 13:33:41 +03:00
handler.setFormatter(fmt)
log.addHandler(handler)
2023-03-08 13:31:45 +03:00
2023-05-30 13:33:41 +03:00
if settings.debug:
handler.setLevel(logging.DEBUG)
log.setLevel(logging.DEBUG)
# ---
2023-05-30 13:47:06 +03:00
2023-05-30 13:33:41 +03:00
bot = Bot(token=settings.telegram_token)
dp = Dispatcher(bot)
2023-03-08 13:31:45 +03:00
2023-05-30 13:33:41 +03:00
class CurrencyConverter:
2023-05-30 13:47:06 +03:00
2023-05-30 13:33:41 +03:00
def __init__(self) -> None:
2023-05-30 13:47:06 +03:00
2023-05-30 13:33:41 +03:00
self.amount = 1.0
self.conv_amount = 0.0
self.from_currency = ''
self.conv_currency = ''
2023-05-30 13:47:06 +03:00
2023-05-30 13:33:41 +03:00
def convert(self) -> None:
"""Currency conversion"""
if not self.ddgapi():
self.coinapi()
2023-05-30 13:47:06 +03:00
s = f'{self.conv_amount}'
point = s.find(".")
a = s[point+1:]
fnz = min([a.index(i) for i in string.digits[1:] if i in a], default=-1)
if fnz == -1:
self.conv_amount = int(float(s))
self.conv_amount = s[:point] + '.' + a[:fnz+3]
2023-05-30 13:33:41 +03:00
def ddgapi(self) -> bool:
"""Get data from DuckDuckGo's currency API
2023-05-30 13:47:06 +03:00
2023-05-30 13:33:41 +03:00
Returns:
`False` if the currency does not exist,
`True` on successful conversion
"""
# API request
resp = requests.get(
2023-05-30 13:47:06 +03:00
(
f'{DDG_URL}/{self.amount}/{self.from_currency}'
f'/{self.conv_currency}'
),
timeout=settings.timeout,
2023-05-30 13:33:41 +03:00
)
log.debug(resp.text)
# Parsing JSON data
data: Dict[str, Any] = json.loads(
2023-05-30 13:47:06 +03:00
resp.text
.replace('ddg_spice_currency(', '')
2023-05-30 13:33:41 +03:00
.replace(');', '')
)
log.debug(data)
# If the currency does not exist
descr = data.get('headers', {}).get('description', '')
if descr.find('ERROR') != -1:
return False
# Otherwise
conv: Dict[str, str] = data.get('conversion', {})
self.conv_amount = float(conv.get('converted-amount', 0))
log.debug(conv)
return True
2023-05-30 13:47:06 +03:00
2023-05-30 13:33:41 +03:00
def coinapi(self) -> None:
"""Get data from CoinAPI (for cryptocurrencies)"""
resp = requests.get(
(
f'{COINAPI_URL}/{self.from_currency}'
f'/{self.conv_currency}'
),
headers={
'X-CoinAPI-Key': settings.coinapi_key,
},
2023-05-30 13:47:06 +03:00
timeout=settings.timeout,
2023-05-30 13:33:41 +03:00
)
data: Dict[str, Any] = resp.json()
self.conv_amount = float(data.get('rate', 0)*self.amount)
2023-03-08 13:31:45 +03:00
@dp.inline_handler()
2023-05-30 13:33:41 +03:00
async def currency(inline_query: InlineQuery) -> None:
2023-03-08 13:31:45 +03:00
2023-05-30 13:33:41 +03:00
query = inline_query.query
article: List[Optional[InlineQueryResultArticle]] = [None]
2023-03-08 13:31:45 +03:00
2023-05-30 13:33:41 +03:00
text = query.split()
len_ = len(text)
2023-03-08 13:31:45 +03:00
2023-05-30 13:33:41 +03:00
result_id = hashlib.md5(query.encode()).hexdigest()
conv = CurrencyConverter()
2023-03-08 13:31:45 +03:00
try:
2023-05-30 13:33:41 +03:00
if len_ == 3:
conv.amount = float(text[0])
conv.from_currency = text[1].upper()
conv.conv_currency = text[2].upper()
conv.convert()
elif len_ == 2:
conv.from_currency = text[0].upper()
conv.conv_currency = text[1].upper()
conv.convert()
2023-03-08 13:31:45 +03:00
else:
2023-05-30 13:33:41 +03:00
raise ValueError('Надо 2 или 3 аргумента')
2023-05-30 13:33:41 +03:00
result = (
f'{conv.amount} {conv.from_currency} = '
f'{conv.conv_amount} {conv.conv_currency}'
2023-05-30 13:33:41 +03:00
)
2023-05-30 13:33:41 +03:00
except Exception as ex:
2023-05-30 13:49:37 +03:00
result = f'{type(ex).__name__}: {ex}'
2023-05-30 13:33:41 +03:00
article[0] = InlineQueryResultArticle(
2023-03-08 13:31:45 +03:00
id=result_id,
title=result,
2023-05-30 13:33:41 +03:00
input_message_content=InputTextMessageContent(
message_text=result,
),
)
await inline_query.answer(
article,
cache_time=1,
is_personal=True,
)
2023-03-08 13:31:45 +03:00
executor.start_polling(dp, skip_updates=True)