feat: add /info command #1

Merged
mctaylors merged 3 commits from info_command into master 2025-05-20 18:15:13 +03:00
4 changed files with 115 additions and 7 deletions

View file

@ -1,9 +1,11 @@
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, LinkPreviewOptions
from telegram.constants import ParseMode from telegram.constants import ParseMode
from telegram.ext import ContextTypes from telegram.ext import ContextTypes
import html_parser import html_parser
from datetime import datetime
from config import * from config import *
from extensions import get_json, format_rating, format_status, humanize_filesize
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
@ -45,3 +47,70 @@ async def about_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> N
parse_mode=ParseMode.HTML, parse_mode=ParseMode.HTML,
reply_markup=reply_markup reply_markup=reply_markup
) )
async def info_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
try:
post_id = context.args[0]
message = await context.bot.send_message(update.effective_chat.id,
f"{html_parser.bold("Information")}\n"
f"Fetching...",
parse_mode=ParseMode.HTML)
post_data = get_json(f"posts/{post_id}")
if post_data is None:
await update.message.reply_text(
f"{html_parser.bold("Error")}: That record was not found.",
parse_mode=ParseMode.HTML)
return
uploader_data = get_json(f"users/{post_data['uploader_id']}")
# well, we could check the uploader, but why would we do that?
keyboard = [
[
InlineKeyboardButton(f"Open in {app.name}",
url=f"https://{app.hostname}/posts/{post_id}")
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
# noinspection PyListCreation
m = []
m.append(f"ID: {html_parser.code(post_data['id'])}")
m.append(f"Uploader: {html_parser.hyperlink(uploader_data['name'],
f"http://{app.host}/users/{post_data['uploader_id']}")} "
f"{html_parser.hyperlink("»", f"http://{app.host}/posts?tags=user:{uploader_data['name']}")}")
created_at = datetime.fromisoformat(post_data['created_at'])
m.append(f"Date: {html_parser.hyperlink(
f"{created_at.strftime("%Y-%m-%d %X (%z)")}",
f"http://{app.host}/posts?tags=date:{created_at.strftime("%Y-%m-%d")}")}")
if post_data['approver_id'] is not None:
approver_data = get_json(f"users/{post_data['approver_id']}")
m.append(f"Approver: {html_parser.hyperlink(approver_data['name'],
f"http://{app.host}/users/{post_data['approver_id']}")} "
f"{html_parser.hyperlink("»", f"http://{app.host}/posts?tags=approver:{approver_data['name']}")}")
m.append(f"Size: {html_parser.hyperlink(
f"{humanize_filesize(post_data['media_asset']['file_size'])} .{post_data['media_asset']['file_ext']}",
post_data['file_url'])} "
f"({post_data['media_asset']['image_width']}x{post_data['media_asset']['image_height']}) "
f"{html_parser.hyperlink("»", f"http://{app.host}/media_assets/{post_data['media_asset']['id']}")}")
m.append(f"Source: {post_data['source'] if post_data['source'] != "" else "🚫"}")
m.append(f"Rating: {format_rating(post_data['rating'])}")
m.append(f"Score: {html_parser.hyperlink(post_data['score'],
f"http://{app.host}/post_votes?search[post_id]={post_data['id']}&variant=compact")} "
f"(+{post_data['up_score']} / -{post_data['down_score']})")
m.append(f"Favorites: {html_parser.hyperlink(post_data['fav_count'],
f"http://{app.host}/posts/{post_data['id']}/favorites")}")
m.append(f"Status: {format_status(post_data)}")
link_preview_options = LinkPreviewOptions(True)
if not post_data['is_banned']:
link_preview_options = LinkPreviewOptions(url=post_data['large_file_url'])
await context.bot.edit_message_text(
f"{html_parser.bold("Information")}\n" + "\n".join(m),
update.effective_chat.id, message.message_id,
parse_mode=ParseMode.HTML, reply_markup=reply_markup, link_preview_options=link_preview_options)
except (IndexError, ValueError):
await update.message.reply_text(
f"{html_parser.bold("Usage")}: {html_parser.code(f"/info <post ID>")}",
parse_mode=ParseMode.HTML)

View file

@ -1,4 +1,8 @@
import re import re
import requests
from typing import Any
from config import app
def humanize_tags_from_json(value: str, default: str) -> str: def humanize_tags_from_json(value: str, default: str) -> str:
@ -7,6 +11,14 @@ def humanize_tags_from_json(value: str, default: str) -> str:
return default return default
def humanize_filesize(value: int) -> str:
for unit in ['B', 'KiB', 'MiB', 'GiB']:
if value < 1024 or unit == 'GiB':
break
value /= 1024
return f"{value:.2f} {unit}"
def format_rating(value: str) -> str | None: def format_rating(value: str) -> str | None:
match value: match value:
case "g": case "g":
@ -22,3 +34,32 @@ def format_rating(value: str) -> str | None:
# Negative Squared Latin Capital Letter E # Negative Squared Latin Capital Letter E
return "🅴" return "🅴"
return None return None
def format_status(data) -> str:
is_active = True
is_pending = data['is_pending']
is_flagged = data['is_flagged']
is_deleted = data['is_deleted']
is_banned = data['is_banned']
if is_pending | is_flagged | is_deleted:
is_active = False
status = []
if is_active:
status.append("Active")
if is_pending:
status.append("Pending")
if is_flagged:
status.append("Flagged")
if is_banned:
status.append("Banned")
return " ".join(status)
def get_json(pathname: str) -> Any | None:
r = requests.get(f"http://{app.host}/{pathname}.json")
if r.status_code != 200:
return None
return r.json()

View file

@ -6,7 +6,7 @@ from telegram.constants import ParseMode
from telegram.ext import ContextTypes from telegram.ext import ContextTypes
from config import * from config import *
from extensions import humanize_tags_from_json, format_rating from extensions import humanize_tags_from_json, format_rating, get_json
# noinspection PyUnusedLocal # noinspection PyUnusedLocal
@ -19,14 +19,11 @@ async def inline_query(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No
if not query.isdigit(): if not query.isdigit():
return return
response = requests.get(f"http://{app.host}/posts/{query}.json") data = get_json(f"posts/{query}")
if data is None:
if response.status_code != 200:
await invalid_query(update, query) await invalid_query(update, query)
return return
data = response.json()
await answer_query(update, query, data) await answer_query(update, query, data)

View file

@ -21,6 +21,7 @@ def main() -> None:
application.add_handler(CommandHandler("start", commands.start_command)) application.add_handler(CommandHandler("start", commands.start_command))
application.add_handler(CommandHandler("help", commands.help_command)) application.add_handler(CommandHandler("help", commands.help_command))
application.add_handler(CommandHandler("about", commands.about_command)) application.add_handler(CommandHandler("about", commands.about_command))
application.add_handler(CommandHandler("info", commands.info_command))
from inline_query import inline_query from inline_query import inline_query
application.add_handler(InlineQueryHandler(inline_query)) application.add_handler(InlineQueryHandler(inline_query))