Разделил код на файлы. Рефакторинг кода. Теперь можно писать на родном языке курс валюты (e.g: 2 рубля в доллары)

This commit is contained in:
Данил 2024-08-25 18:04:34 +03:00
parent 4e0090e90e
commit e2316f69b4
3 changed files with 147 additions and 129 deletions

85
function/convert.py Normal file
View file

@ -0,0 +1,85 @@
import yaml
import requests
import json
import re
from decimal import Decimal, ROUND_DOWN
from function.format_number import format_number
config = yaml.safe_load(open('config.yaml'))
coinapi_len = len(config['coinapi_keys'])
coinapi_active = [0]
class Converter:
def __init__(self):
self.amount: float = 1.0
self.conv_amount: float = 0.0
self.from_currency: str = ''
self.conv_currency: str = ''
def convert(self) -> None:
if not self.ddg():
self.coinapi()
number = Decimal(str(self.conv_amount))
self.conv_amount = format_number(number.quantize(Decimal('1.00'), rounding=ROUND_DOWN))
def ddg(self) -> bool:
res = requests.get('https://duckduckgo.com/js/spice/currency'
f'/{self.amount}'
f'/{self.from_currency}'
f'/{self.conv_currency}')
data = json.loads(re.findall(r'\(\s*(.*)\s*\);$', res.text)[0])
del data['terms']
del data['privacy']
del data['timestamp']
if len(data.get('to')) == 0:
return False
conv = data.get('to')[0]
conv_amount = conv.get('mid')
if conv_amount is None:
raise RuntimeError('Ошибка при конвертации валюты через DuckDuckGo')
self.conv_amount = float(conv_amount)
return True
def coinapi(self, depth: int = config['coinapi_keys']) -> None:
if depth <= 0:
raise RecursionError('Rate limit on all tokens')
resp = requests.get(
(
'https://rest.coinapi.io/v1/exchangerate'
f'/{self.from_currency}'
f'/{self.conv_currency}'
),
headers={
'X-CoinAPI-Key': config['coinapi_keys'][coinapi_active[0]],
},
timeout=config['timeout'],
)
if resp.status_code == 429:
rotate_token(config['coinapi_keys'], coinapi_active)
self.coinapi(depth - 1)
data = resp.json()
rate = data.get('rate')
if rate is None:
raise RuntimeError('Failed to get the exchange rate from CoinAPI')
self.conv_amount = float(rate * self.amount)
def rotate_token(lst, active) -> None:
active[0] = (active[0] + 1) % len(lst)

14
function/format_number.py Normal file
View file

@ -0,0 +1,14 @@
def format_number(number):
number_str = str(number)
if '.' in number_str:
integer_part, fractional_part = number_str.split('.')
else:
integer_part, fractional_part = number_str, ''
formatted_integer_part = '{:,}'.format(int(integer_part)).replace(',', ' ')
if fractional_part:
return formatted_integer_part + '.' + fractional_part
else:
return formatted_integer_part

175
main.py
View file

@ -1,130 +1,59 @@
#!/usr/bin/env python3
from aiogram import Bot, Dispatcher, types
import asyncio
import hashlib
import json
import string
import aiohttp
import requests
import re
import logging
import yaml
import hashlib
import aiohttp
import json
from aiogram import Dispatcher, types, Bot
from aiogram.filters import CommandStart
# Constants
DDG_URL = 'https://duckduckgo.com/js/spice/currency'
COINAPI_URL = 'https://rest.coinapi.io/v1/exchangerate'
from function.convert import Converter
config = yaml.safe_load(open("config.yaml"))
log = logging.getLogger('shirino')
handler = logging.StreamHandler()
fmt = logging.Formatter('%(asctime)s %(levelname)s %(message)s')
handler.setFormatter(fmt)
log.addHandler(handler)
if config['debug']:
handler.setLevel(logging.DEBUG)
log.setLevel(logging.DEBUG)
coinapi_len = len(config['coinapi_keys'])
coinapi_active = [0]
dp = Dispatcher()
class CurrencyConverter:
def __init__(self):
self.amount = 1.0
self.conv_amount = 0.0
self.from_currency = 'RUB'
self.conv_currency = 'USD'
@dp.message()
async def message_conv(message: types.Message):
query = message.text.split(' ')
conv = Converter()
def convert(self):
if not self.ddgapi():
self.coinapi()
amount = query[0]
source_currency_alias = query[1]
target_currency_alias = query[3]
str_amount = f'{self.conv_amount}'
point = str_amount.find(".")
after_point = str_amount[point + 1:]
with open('currency.json', encoding='utf-8') as file:
currency_json = json.loads(file.read())
fnz = min(
(
after_point.index(i)
for i in string.digits[1:]
if i in after_point
),
default=-1,
)
source_currency_code = None
target_currency_code = None
if fnz == -1:
for currency_code, aliases in currency_json.items():
if source_currency_alias in aliases:
source_currency_code = currency_code
elif target_currency_alias in aliases:
target_currency_code = currency_code
elif source_currency_code and target_currency_code:
break
else:
return
ndigits = fnz + config['ndigits']
if source_currency_code and target_currency_code:
conv.amount = float(amount)
conv.from_currency = source_currency_code.upper()
conv.conv_currency = target_currency_code.upper()
conv.convert()
self.conv_amount = round(self.conv_amount, ndigits)
def ddgapi(self):
res = requests.get(f'{DDG_URL}/{self.amount}/{self.from_currency}/{self.conv_currency}')
data = json.loads(re.findall(r'\(\s*(.*)\s*\);$', res.text)[0])
del data['terms']
del data['privacy']
del data['timestamp']
log.debug(data)
if len(data.get('to')) == 0:
return False
conv = data.get('to')[0]
conv_amount = conv.get("mid")
if conv_amount is None:
raise RuntimeError('Ошибка при конвертации валюты через DuckDuckGo')
log.debug(conv)
log.debug(conv_amount)
self.conv_amount = float(conv_amount)
return True
def coinapi(self, depth: int = config['coinapi_keys']):
if depth <= 0:
raise RecursionError('Рейтлимит на всех токенах')
resp = requests.get(
(
f'{COINAPI_URL}/{self.from_currency}'
f'/{self.conv_currency}'
),
headers={
'X-CoinAPI-Key': config['coinapi_keys'][coinapi_active[0]],
},
timeout=config['timeout'],
result = (
f'{conv.amount} {conv.from_currency} = '
f'{conv.conv_amount} {conv.conv_currency}'
)
if resp.status_code == 429:
log.warning('CoinAPI ratelimited, rotating token')
rotate_token(config['coinapi_keys'], coinapi_active)
self.coinapi(depth - 1)
data = resp.json()
rate = data.get('rate')
if rate is None:
raise RuntimeError('Не удалось получить курс валюты от CoinAPI')
self.conv_amount = float(rate * self.amount)
await message.reply(result)
def rotate_token(lst, active):
active[0] = (active[0] + 1) % len(lst)
async def inline_reply(result_id: str, title: str, description: str or None, inline_query: types.InlineQuery):
async def inline_reply(result_id: str, title: str, description: str or None, inline_query: types.InlineQuery) -> None:
article = [None]
article[0] = types.InlineQueryResultArticle(
id=result_id,
@ -143,18 +72,17 @@ async def inline_reply(result_id: str, title: str, description: str or None, inl
@dp.inline_query()
async def currency(inline_query: types.InlineQuery):
async def currency(inline_query: types.InlineQuery) -> None:
query = inline_query.query
args = query.split()
result_id = hashlib.md5(query.encode()).hexdigest()
conv = CurrencyConverter()
conv = Converter()
try:
log.debug(len(args))
if len(args) <= 1:
await inline_reply(result_id,
"Требуется 2, либо 3 аргумента",
"2 or 3 arguments are required",
f"@shirino_bot USD RUB \n@shirino_bot 12 USD RUB", inline_query)
if len(args) == 3:
@ -172,35 +100,26 @@ async def currency(inline_query: types.InlineQuery):
f'{conv.conv_amount} {conv.conv_currency}'
)
except aiohttp.client_exceptions.ClientError as ex:
except aiohttp.client_exceptions.ClientError:
await inline_reply(result_id,
"Rate-limit от API Telegram, повторите запрос позже",
"Rate-limit from the Telegram API, repeat the request later",
None,
inline_query)
log.debug(ex)
await asyncio.sleep(1)
except Exception as ex:
log.debug(ex)
await inline_reply(result_id, "Неверный формат данных",
except Exception as e:
print(e)
await inline_reply(result_id, "Invalid data format",
"@shirino_bot USD RUB \n@shirino_bot 12 USD RUB",
inline_query)
except UnboundLocalError:
pass
await inline_reply(result_id, result, None, inline_query)
@dp.message(CommandStart())
async def start(message: types.Message):
await message.answer("Привет! Бот может показывать курс обмена криптовалюты и фиатной валюты. "
"Бот используется с помощью инлайн-команд: "
"`@shirino_bot 12 usd rub` или `@shirino_bot usd rub`"
"\n\nИсходный код опубликован на [Github](https://github.com/redume/shirino)",
parse_mode="markdown")
async def main() -> None:
log.debug(config)
bot = Bot(config['telegram_token'])
await dp.start_polling(bot)