danbooru-bot/inline_query.py
mctaylors 1b8c096db6
All checks were successful
Build / Upload to production (push) Successful in 1m44s
fix: whoops
Signed-off-by: mctaylors <cantsendmails@mctaylors.ru>
2025-06-09 13:43:19 +03:00

141 lines
4.5 KiB
Python

from uuid import uuid4
from telegram import (
Update,
InlineKeyboardButton,
InlineQueryResultArticle,
InputTextMessageContent,
InlineKeyboardMarkup,
LinkPreviewOptions,
helpers,
)
from telegram.constants import ParseMode
from telegram.ext import ContextTypes
import config
from extensions import humanize_tags_from_json, format_rating, get_json, HtmlFormat
async def inline_query(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
query = update.inline_query.query
if not query:
return
if not query.isdigit():
return
query = int(query)
if query <= 0:
return
data = get_json(f"posts/{query}")
if data is None:
await invalid_query(update, query)
return
await answer_query(update, context, query, data)
async def answer_query(
update: Update, context: ContextTypes.DEFAULT_TYPE, query: int, data
) -> None:
characters = humanize_tags_from_json(data["tag_string_character"], "no characters")
copyrights = humanize_tags_from_json(
data["tag_string_copyright"], "unknown copyright"
)
artists = humanize_tags_from_json(data["tag_string_artist"], "unknown artist")
rating = format_rating(data["rating"])
keyboard = [
[
InlineKeyboardButton(
"Show information",
url=helpers.create_deep_linked_url(context.bot.username, str(query)),
),
]
]
if data["is_banned"]:
results = [
InlineQueryResultArticle(
id=str(uuid4()),
title=f"#{query} {rating}",
description=f"{characters} ({copyrights}) drawn by {artists}",
input_message_content=InputTextMessageContent(
" ".join(
[
HtmlFormat.code(f"#{query}"),
HtmlFormat.strikethrough(
" ".join(
[
HtmlFormat.bold(
f"{characters} ({copyrights})"
),
"drawn by",
HtmlFormat.bold(artists),
]
)
),
rating,
"\nThis post has been removed because of a takedown request or rule violation.",
]
),
parse_mode=ParseMode.HTML,
),
reply_markup=InlineKeyboardMarkup(keyboard),
)
]
await update.inline_query.answer(results)
return
keyboard[0].append(InlineKeyboardButton("View original", url=data["file_url"]))
results = [
InlineQueryResultArticle(
id=str(uuid4()),
title=f"#{query} {rating}",
description=f"{characters} ({copyrights}) drawn by {artists}",
thumbnail_url=data["preview_file_url"],
input_message_content=InputTextMessageContent(
" ".join(
[
HtmlFormat.code(f"#{query}"),
HtmlFormat.hyperlink(
" ".join(
[
HtmlFormat.bold(f"{characters} ({copyrights})"),
"drawn by",
HtmlFormat.bold(artists),
]
),
f"{config.app.protocol}://{config.app.hostname}/posts/{query}",
),
rating,
]
),
parse_mode=ParseMode.HTML,
link_preview_options=LinkPreviewOptions(url=data["large_file_url"]),
),
reply_markup=InlineKeyboardMarkup(keyboard),
)
]
await update.inline_query.answer(results)
async def invalid_query(update: Update, query: int) -> None:
results = [
InlineQueryResultArticle(
id=str(uuid4()),
title=f"#{query}",
description="Error",
input_message_content=InputTextMessageContent(
" ".join([HtmlFormat.code(f"#{query}"), "That record was not found."]),
parse_mode=ParseMode.HTML,
),
)
]
await update.inline_query.answer(results)