From 6141d674a99025ef4429540f6588343f86ee776c Mon Sep 17 00:00:00 2001 From: mctaylors Date: Tue, 20 May 2025 18:15:12 +0300 Subject: [PATCH] feat: add /info command (#1) Reviewed-on: https://git.mctaylors.ru/mctaylors/danbooru-bot/pulls/1 Co-authored-by: mctaylors Co-committed-by: mctaylors --- commands.py | 71 ++++++++++++++++++++++++++++++++++++++++++++++++- extensions.py | 41 ++++++++++++++++++++++++++++ inline_query.py | 9 +++---- main.py | 1 + 4 files changed, 115 insertions(+), 7 deletions(-) diff --git a/commands.py b/commands.py index 5b001c8..c6d2584 100644 --- a/commands.py +++ b/commands.py @@ -1,9 +1,11 @@ -from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup +from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, LinkPreviewOptions from telegram.constants import ParseMode from telegram.ext import ContextTypes import html_parser +from datetime import datetime from config import * +from extensions import get_json, format_rating, format_status, humanize_filesize async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: @@ -45,3 +47,70 @@ async def about_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> N parse_mode=ParseMode.HTML, reply_markup=reply_markup ) + + +async def info_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: + try: + post_id = context.args[0] + message = await context.bot.send_message(update.effective_chat.id, + f"{html_parser.bold("Information")}\n" + f"Fetching...", + parse_mode=ParseMode.HTML) + post_data = get_json(f"posts/{post_id}") + if post_data is None: + await update.message.reply_text( + f"{html_parser.bold("Error")}: That record was not found.", + parse_mode=ParseMode.HTML) + return + uploader_data = get_json(f"users/{post_data['uploader_id']}") + # well, we could check the uploader, but why would we do that? + + keyboard = [ + [ + InlineKeyboardButton(f"Open in {app.name}", + url=f"https://{app.hostname}/posts/{post_id}") + ] + ] + reply_markup = InlineKeyboardMarkup(keyboard) + + # noinspection PyListCreation + m = [] + m.append(f"ID: {html_parser.code(post_data['id'])}") + m.append(f"Uploader: {html_parser.hyperlink(uploader_data['name'], + f"http://{app.host}/users/{post_data['uploader_id']}")} " + f"{html_parser.hyperlink("»", f"http://{app.host}/posts?tags=user:{uploader_data['name']}")}") + created_at = datetime.fromisoformat(post_data['created_at']) + m.append(f"Date: {html_parser.hyperlink( + f"{created_at.strftime("%Y-%m-%d %X (%z)")}", + f"http://{app.host}/posts?tags=date:{created_at.strftime("%Y-%m-%d")}")}") + if post_data['approver_id'] is not None: + approver_data = get_json(f"users/{post_data['approver_id']}") + m.append(f"Approver: {html_parser.hyperlink(approver_data['name'], + f"http://{app.host}/users/{post_data['approver_id']}")} " + f"{html_parser.hyperlink("»", f"http://{app.host}/posts?tags=approver:{approver_data['name']}")}") + m.append(f"Size: {html_parser.hyperlink( + f"{humanize_filesize(post_data['media_asset']['file_size'])} .{post_data['media_asset']['file_ext']}", + post_data['file_url'])} " + f"({post_data['media_asset']['image_width']}x{post_data['media_asset']['image_height']}) " + f"{html_parser.hyperlink("»", f"http://{app.host}/media_assets/{post_data['media_asset']['id']}")}") + m.append(f"Source: {post_data['source'] if post_data['source'] != "" else "🚫"}") + m.append(f"Rating: {format_rating(post_data['rating'])}") + m.append(f"Score: {html_parser.hyperlink(post_data['score'], + f"http://{app.host}/post_votes?search[post_id]={post_data['id']}&variant=compact")} " + f"(+{post_data['up_score']} / -{post_data['down_score']})") + m.append(f"Favorites: {html_parser.hyperlink(post_data['fav_count'], + f"http://{app.host}/posts/{post_data['id']}/favorites")}") + m.append(f"Status: {format_status(post_data)}") + + link_preview_options = LinkPreviewOptions(True) + if not post_data['is_banned']: + link_preview_options = LinkPreviewOptions(url=post_data['large_file_url']) + + await context.bot.edit_message_text( + f"{html_parser.bold("Information")}\n" + "\n".join(m), + update.effective_chat.id, message.message_id, + parse_mode=ParseMode.HTML, reply_markup=reply_markup, link_preview_options=link_preview_options) + except (IndexError, ValueError): + await update.message.reply_text( + f"{html_parser.bold("Usage")}: {html_parser.code(f"/info <post ID>")}", + parse_mode=ParseMode.HTML) diff --git a/extensions.py b/extensions.py index 49b2d1a..8ee8db9 100644 --- a/extensions.py +++ b/extensions.py @@ -1,4 +1,8 @@ import re +import requests + +from typing import Any +from config import app def humanize_tags_from_json(value: str, default: str) -> str: @@ -7,6 +11,14 @@ def humanize_tags_from_json(value: str, default: str) -> str: return default +def humanize_filesize(value: int) -> str: + for unit in ['B', 'KiB', 'MiB', 'GiB']: + if value < 1024 or unit == 'GiB': + break + value /= 1024 + return f"{value:.2f} {unit}" + + def format_rating(value: str) -> str | None: match value: case "g": @@ -22,3 +34,32 @@ def format_rating(value: str) -> str | None: # Negative Squared Latin Capital Letter E return "🅴" return None + + +def format_status(data) -> str: + is_active = True + is_pending = data['is_pending'] + is_flagged = data['is_flagged'] + is_deleted = data['is_deleted'] + is_banned = data['is_banned'] + if is_pending | is_flagged | is_deleted: + is_active = False + + status = [] + if is_active: + status.append("Active") + if is_pending: + status.append("Pending") + if is_flagged: + status.append("Flagged") + if is_banned: + status.append("Banned") + return " ".join(status) + + +def get_json(pathname: str) -> Any | None: + r = requests.get(f"http://{app.host}/{pathname}.json") + + if r.status_code != 200: + return None + return r.json() diff --git a/inline_query.py b/inline_query.py index a3f19c4..657e0b9 100644 --- a/inline_query.py +++ b/inline_query.py @@ -6,7 +6,7 @@ from telegram.constants import ParseMode from telegram.ext import ContextTypes from config import * -from extensions import humanize_tags_from_json, format_rating +from extensions import humanize_tags_from_json, format_rating, get_json # noinspection PyUnusedLocal @@ -19,14 +19,11 @@ async def inline_query(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No if not query.isdigit(): return - response = requests.get(f"http://{app.host}/posts/{query}.json") - - if response.status_code != 200: + data = get_json(f"posts/{query}") + if data is None: await invalid_query(update, query) return - data = response.json() - await answer_query(update, query, data) diff --git a/main.py b/main.py index 5bd891a..3bc63d2 100644 --- a/main.py +++ b/main.py @@ -21,6 +21,7 @@ def main() -> None: application.add_handler(CommandHandler("start", commands.start_command)) application.add_handler(CommandHandler("help", commands.help_command)) application.add_handler(CommandHandler("about", commands.about_command)) + application.add_handler(CommandHandler("info", commands.info_command)) from inline_query import inline_query application.add_handler(InlineQueryHandler(inline_query))