danbooru-bot/commands.py
mctaylors 1d80adc8d8
change: reformat post votes and feedback
Signed-off-by: mctaylors <cantsendmails@mctaylors.ru>
2025-06-08 10:54:36 +03:00

467 lines
17 KiB
Python

from telegram import (
Update,
InlineKeyboardButton,
InlineKeyboardMarkup,
LinkPreviewOptions,
)
from telegram.constants import ParseMode
from telegram.ext import ContextTypes
import html_parser
from datetime import datetime, timedelta
from config import *
from extensions import get_json, format_rating, format_status, humanize_filesize
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
await update.message.reply_text(
"\n".join(
[
f"hello, i'm {html_parser.bold(context.bot.first_name)}, an inline image grabber.\n",
"to get help, use /help",
]
),
parse_mode=ParseMode.HTML,
)
async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
await update.message.reply_text(
"\n".join(
[
html_parser.bold(f"how to use {context.bot.first_name}\n"),
"1. open your message box and type:",
html_parser.code(f"@{context.bot.username} &lt;post ID&gt;"),
"2. click on the box that has popped up",
"3. ???",
"4. done!",
]
),
parse_mode=ParseMode.HTML,
)
async def about_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
reply_markup = None
if general.sourceurl is not None:
keyboard = [[InlineKeyboardButton(f"source code", url=general.sourceurl)]]
reply_markup = InlineKeyboardMarkup(keyboard)
await update.message.reply_text(
"\n".join(
[
html_parser.bold(f"about {context.bot.first_name}"),
f"{context.bot.first_name} is an inline image grabber written in python that grabs images from "
"Danbooru (or other similar services).\n",
html_parser.bold("currently configured instance:"),
f"{html_parser.italic(app.name)} ({app.hostname})",
]
),
parse_mode=ParseMode.HTML,
reply_markup=reply_markup,
)
async def info_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
try:
post_id = context.args[0]
message = await context.bot.send_message(
update.effective_chat.id,
"\n".join([html_parser.bold("Information"), "Fetching..."]),
parse_mode=ParseMode.HTML,
)
post_data = get_json(f"posts/{post_id}")
if post_data is None:
await context.bot.edit_message_text(
" ".join([html_parser.bold("Error:"), "That record was not found."]),
update.effective_chat.id,
message.message_id,
parse_mode=ParseMode.HTML,
)
return
uploader_data = get_json(f"users/{post_data['uploader_id']}")
# well, we could check the uploader, but why would we do that?
keyboard = [
[
InlineKeyboardButton(
f"Open in {app.name}",
url=f"{app.protocol}://{app.hostname}/posts/{post_id}",
)
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
m = [
" ".join(["ID:", html_parser.code(post_data["id"])]),
" ".join(
[
"Uploader:",
html_parser.hyperlink(
uploader_data["name"],
f"{app.protocol}://{app.hostname}/users/{post_data['uploader_id']}",
),
html_parser.hyperlink(
"»",
f"{app.protocol}://{app.hostname}/posts?tags=user:{uploader_data['name']}",
),
]
),
]
created_at = datetime.fromisoformat(post_data["created_at"])
m.append(
" ".join(
[
"Date:",
html_parser.hyperlink(
created_at.strftime("%Y-%m-%d %X (%z)"),
f"{app.protocol}://{app.hostname}/posts?tags=date:{created_at.strftime("%Y-%m-%d")}",
),
]
)
)
if post_data["approver_id"] is not None:
approver_data = get_json(f"users/{post_data['approver_id']}")
m.append(
" ".join(
[
"Approver:",
html_parser.hyperlink(
approver_data["name"],
f"{app.protocol}://{app.hostname}/users/{post_data['approver_id']}",
),
html_parser.hyperlink(
"»",
f"{app.protocol}://{app.hostname}/posts?tags=approver:{approver_data['name']}",
),
]
)
)
m.append(
" ".join(
[
"Size:",
html_parser.hyperlink(
f"{humanize_filesize(post_data['media_asset']['file_size'])} .{post_data['media_asset']['file_ext']}",
"" if post_data["is_banned"] else post_data["file_url"],
),
f"({post_data['media_asset']['image_width']}x{post_data['media_asset']['image_height']})",
html_parser.hyperlink(
"»",
f"{app.protocol}://{app.hostname}/media_assets/{post_data['media_asset']['id']}",
),
]
)
)
m.append(
" ".join(
["Source:", post_data["source"] if post_data["source"] != "" else "🚫"]
)
)
m.append(" ".join(["Rating:", format_rating(post_data["rating"])]))
m.append(
" ".join(
[
"Score:",
html_parser.hyperlink(
post_data["score"],
f"{app.protocol}://{app.hostname}/post_votes?search[post_id]={post_data['id']}&variant=compact",
),
f"(+{post_data['up_score']} / -{post_data['down_score']})",
]
)
)
m.append(
" ".join(
[
"Favorites:",
html_parser.hyperlink(
post_data["fav_count"],
f"{app.protocol}://{app.hostname}/posts/{post_data['id']}/favorites",
),
]
)
)
m.append(" ".join(["Status:", format_status(post_data)]))
link_preview_options = LinkPreviewOptions(True)
if not post_data["is_banned"]:
link_preview_options = LinkPreviewOptions(url=post_data["large_file_url"])
await context.bot.edit_message_text(
"\n".join([html_parser.bold("Information")] + m),
update.effective_chat.id,
message.message_id,
parse_mode=ParseMode.HTML,
reply_markup=reply_markup,
link_preview_options=link_preview_options,
)
except (IndexError, ValueError):
await update.message.reply_text(
" ".join(
[html_parser.bold("Usage:"), html_parser.code("/info &lt;post ID&gt;")]
),
parse_mode=ParseMode.HTML,
)
async def user_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
try:
name_matches = context.args[0]
message = await context.bot.send_message(
update.effective_chat.id,
"\n".join([html_parser.bold(name_matches), "Fetching..."]),
parse_mode=ParseMode.HTML,
)
user_search_data = get_json(f"users", [f"search[name_matches]={name_matches}"])
if len(user_search_data) == 0:
await context.bot.edit_message_text(
" ".join([html_parser.bold("Error:"), "That record was not found."]),
update.effective_chat.id,
message.message_id,
parse_mode=ParseMode.HTML,
)
return
user_id = user_search_data[0]["id"]
user_data = get_json(f"users/{user_id}")
keyboard = [
[
InlineKeyboardButton(
f"Open in {app.name}",
url=f"{app.protocol}://{app.hostname}/users/{user_id}",
)
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
m = [" ".join(["ID:", str(user_data["id"])])]
created_at = datetime.fromisoformat(user_data["created_at"])
m.append(" ".join(["Join Date:", created_at.strftime("%Y-%m-%d")]))
m.append(" ".join(["Level:", user_data["level_string"]]))
m.append(
" ".join(
[
"Uploads:",
html_parser.hyperlink(
user_data["post_upload_count"],
f"{app.protocol}://{app.hostname}/posts?tags=user:{user_data['name']}",
),
]
)
)
if user_data["is_banned"]:
ban_data = get_json(f"bans", [f"search[user_id]={user_data['id']}"])[0]
m.append(
" ".join(
[
"Ban reason:",
ban_data["reason"],
f"(banned for {timedelta(seconds=ban_data['duration'])})",
]
)
)
fav_post_count = get_json(f"counts/posts", [f"tags=fav:{user_data['name']}"])[
"counts"
]["posts"]
m.append(
" ".join(
[
"Favorites:",
html_parser.hyperlink(
fav_post_count,
f"{app.protocol}://{app.hostname}/posts?tags=ordfav:{user_data['name']}",
),
]
)
)
upvote_post_count = get_json(
f"counts/posts", [f"tags=upvote:{user_data['name']}"]
)["counts"]["posts"]
downvote_post_count = get_json(
f"counts/posts", [f"tags=downvote:{user_data['name']}"]
)["counts"]["posts"]
vote_post_count = upvote_post_count + downvote_post_count
m.append(
" ".join(
[
"Post Votes:",
html_parser.hyperlink(
vote_post_count,
f"{app.protocol}://{app.hostname}/post_votes?search[user_name]={user_data['name']}",
),
f"({' '.join([
html_parser.hyperlink(
f"up:{upvote_post_count}",
f"{app.protocol}://{app.hostname}/post_votes?search[user_name]={user_data['name']}&search[score]=1",
),
html_parser.hyperlink(
f"down:{downvote_post_count}",
f"{app.protocol}://{app.hostname}/post_votes?search[user_name]={user_data['name']}&search[score]=-1",
),
])})",
]
)
)
m.append(
" ".join(
[
"Favorite Groups:",
html_parser.hyperlink(
user_data["favorite_group_count"],
f"{app.protocol}://{app.hostname}/favorite_groups?search[creator_name]={user_data['name']}",
),
]
)
)
m.append(
" ".join(
[
"Post Changes:",
html_parser.hyperlink(
user_data["post_update_count"],
f"{app.protocol}://{app.hostname}/post_versions?search[updater_name]={user_data['name']}",
),
]
)
)
noteupdater_post_count = get_json(
f"counts/posts", [f"tags=noteupdater:{user_data['name']}"]
)["counts"]["posts"]
m.append(
" ".join(
[
"Note Changes:",
html_parser.hyperlink(
user_data["note_update_count"],
f"{app.protocol}://{app.hostname}/note_versions?search[updater_name]={user_data['name']}",
),
"in",
html_parser.hyperlink(
noteupdater_post_count,
f"{app.protocol}://{app.hostname}/posts?tags=noteupdater:{user_data['name']}+order:note",
),
"posts",
]
)
)
m.append(
" ".join(
[
"Wiki Page Changes:",
html_parser.hyperlink(
user_data["wiki_page_version_count"],
f"{app.protocol}://{app.hostname}/wiki_page_versions?search[updater_name]={user_data['name']}",
),
]
)
)
m.append(
" ".join(
[
"Artist Changes:",
html_parser.hyperlink(
user_data["artist_version_count"],
f"{app.protocol}://{app.hostname}/artist_versions?search[updater_name]={user_data['name']}",
),
]
)
)
m.append(
" ".join(
[
"Commentary Changes:",
html_parser.hyperlink(
user_data["artist_commentary_version_count"],
f"{app.protocol}://{app.hostname}/artist_commentary_versions?search[updater_name]={user_data['name']}",
),
]
)
)
m.append(
" ".join(
[
"Forum Posts:",
html_parser.hyperlink(
user_data["forum_post_count"],
f"{app.protocol}://{app.hostname}/forum_posts?search[creator_name]={user_data['name']}",
),
]
)
)
commenter_post_count = get_json(
f"counts/posts", [f"tags=commenter:{user_data['name']}"]
)["counts"]["posts"]
m.append(
" ".join(
[
"Comments:",
html_parser.hyperlink(
user_data["comment_count"],
f"{app.protocol}://{app.hostname}/comments?group_by=comment&search[creator_name]={user_data['name']}",
),
"in",
html_parser.hyperlink(
commenter_post_count,
f"{app.protocol}://{app.hostname}/posts?tags=commenter:{user_data['name']}+order:comment_bumped",
),
"posts",
]
)
)
m.append(
" ".join(
[
"Appeals:",
html_parser.hyperlink(
user_data["appeal_count"],
f"{app.protocol}://{app.hostname}/post_appeals?search[creator_name]={user_data['name']}",
),
]
)
)
total_feedback_count = (
user_data["positive_feedback_count"]
+ user_data["neutral_feedback_count"]
+ user_data["negative_feedback_count"]
)
m.append(
" ".join(
[
"Feedback:",
html_parser.hyperlink(
total_feedback_count,
f"{app.protocol}://{app.hostname}/user_feedbacks?search[user_name]={user_data['name']}",
),
f"({' '.join([
html_parser.hyperlink(
f"positive:{user_data['positive_feedback_count']}",
f"{app.protocol}://{app.hostname}/user_feedbacks?search[user_name]={user_data['name']}&search[category]=positive",
),
html_parser.hyperlink(
f"neutral:{user_data['neutral_feedback_count']}",
f"{app.protocol}://{app.hostname}/user_feedbacks?search[user_name]={user_data['name']}&search[category]=neutral",
),
html_parser.hyperlink(
f"negative:{user_data['negative_feedback_count']}",
f"{app.protocol}://{app.hostname}/user_feedbacks?search[user_name]={user_data['name']}&search[category]=negative",
),
])})"
]
)
)
await context.bot.edit_message_text(
"\n".join([html_parser.bold(user_data["name"])] + m),
update.effective_chat.id,
message.message_id,
parse_mode=ParseMode.HTML,
reply_markup=reply_markup,
disable_web_page_preview=True,
)
except (IndexError, ValueError):
await update.message.reply_text(
" ".join(
[html_parser.bold("Usage:"), html_parser.code("/user &lt;username&gt;")]
),
parse_mode=ParseMode.HTML,
)