feat: add /info command #1
4 changed files with 115 additions and 7 deletions
71
commands.py
71
commands.py
|
@ -1,9 +1,11 @@
|
|||
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
|
||||
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, LinkPreviewOptions
|
||||
from telegram.constants import ParseMode
|
||||
from telegram.ext import ContextTypes
|
||||
|
||||
import html_parser
|
||||
from datetime import datetime
|
||||
from config import *
|
||||
from extensions import get_json, format_rating, format_status, humanize_filesize
|
||||
|
||||
|
||||
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
|
@ -45,3 +47,70 @@ async def about_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> N
|
|||
parse_mode=ParseMode.HTML,
|
||||
reply_markup=reply_markup
|
||||
)
|
||||
|
||||
|
||||
async def info_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
|
||||
try:
|
||||
post_id = context.args[0]
|
||||
message = await context.bot.send_message(update.effective_chat.id,
|
||||
f"{html_parser.bold("Information")}\n"
|
||||
f"Fetching...",
|
||||
parse_mode=ParseMode.HTML)
|
||||
post_data = get_json(f"posts/{post_id}")
|
||||
if post_data is None:
|
||||
await update.message.reply_text(
|
||||
f"{html_parser.bold("Error")}: That record was not found.",
|
||||
parse_mode=ParseMode.HTML)
|
||||
return
|
||||
uploader_data = get_json(f"users/{post_data['uploader_id']}")
|
||||
# well, we could check the uploader, but why would we do that?
|
||||
|
||||
keyboard = [
|
||||
[
|
||||
InlineKeyboardButton(f"Open in {app.name}",
|
||||
url=f"https://{app.hostname}/posts/{post_id}")
|
||||
]
|
||||
]
|
||||
reply_markup = InlineKeyboardMarkup(keyboard)
|
||||
|
||||
# noinspection PyListCreation
|
||||
m = []
|
||||
m.append(f"ID: {html_parser.code(post_data['id'])}")
|
||||
m.append(f"Uploader: {html_parser.hyperlink(uploader_data['name'],
|
||||
f"http://{app.host}/users/{post_data['uploader_id']}")} "
|
||||
f"{html_parser.hyperlink("»", f"http://{app.host}/posts?tags=user:{uploader_data['name']}")}")
|
||||
created_at = datetime.fromisoformat(post_data['created_at'])
|
||||
m.append(f"Date: {html_parser.hyperlink(
|
||||
f"{created_at.strftime("%Y-%m-%d %X (%z)")}",
|
||||
f"http://{app.host}/posts?tags=date:{created_at.strftime("%Y-%m-%d")}")}")
|
||||
if post_data['approver_id'] is not None:
|
||||
approver_data = get_json(f"users/{post_data['approver_id']}")
|
||||
m.append(f"Approver: {html_parser.hyperlink(approver_data['name'],
|
||||
f"http://{app.host}/users/{post_data['approver_id']}")} "
|
||||
f"{html_parser.hyperlink("»", f"http://{app.host}/posts?tags=approver:{approver_data['name']}")}")
|
||||
m.append(f"Size: {html_parser.hyperlink(
|
||||
f"{humanize_filesize(post_data['media_asset']['file_size'])} .{post_data['media_asset']['file_ext']}",
|
||||
post_data['file_url'])} "
|
||||
f"({post_data['media_asset']['image_width']}x{post_data['media_asset']['image_height']}) "
|
||||
f"{html_parser.hyperlink("»", f"http://{app.host}/media_assets/{post_data['media_asset']['id']}")}")
|
||||
m.append(f"Source: {post_data['source'] if post_data['source'] != "" else "🚫"}")
|
||||
m.append(f"Rating: {format_rating(post_data['rating'])}")
|
||||
m.append(f"Score: {html_parser.hyperlink(post_data['score'],
|
||||
f"http://{app.host}/post_votes?search[post_id]={post_data['id']}&variant=compact")} "
|
||||
f"(+{post_data['up_score']} / -{post_data['down_score']})")
|
||||
m.append(f"Favorites: {html_parser.hyperlink(post_data['fav_count'],
|
||||
f"http://{app.host}/posts/{post_data['id']}/favorites")}")
|
||||
m.append(f"Status: {format_status(post_data)}")
|
||||
|
||||
link_preview_options = LinkPreviewOptions(True)
|
||||
if not post_data['is_banned']:
|
||||
link_preview_options = LinkPreviewOptions(url=post_data['large_file_url'])
|
||||
|
||||
await context.bot.edit_message_text(
|
||||
f"{html_parser.bold("Information")}\n" + "\n".join(m),
|
||||
update.effective_chat.id, message.message_id,
|
||||
parse_mode=ParseMode.HTML, reply_markup=reply_markup, link_preview_options=link_preview_options)
|
||||
except (IndexError, ValueError):
|
||||
await update.message.reply_text(
|
||||
f"{html_parser.bold("Usage")}: {html_parser.code(f"/info <post ID>")}",
|
||||
parse_mode=ParseMode.HTML)
|
||||
|
|
|
@ -1,4 +1,8 @@
|
|||
import re
|
||||
import requests
|
||||
|
||||
from typing import Any
|
||||
from config import app
|
||||
|
||||
|
||||
def humanize_tags_from_json(value: str, default: str) -> str:
|
||||
|
@ -7,6 +11,14 @@ def humanize_tags_from_json(value: str, default: str) -> str:
|
|||
return default
|
||||
|
||||
|
||||
def humanize_filesize(value: int) -> str:
|
||||
for unit in ['B', 'KiB', 'MiB', 'GiB']:
|
||||
if value < 1024 or unit == 'GiB':
|
||||
break
|
||||
value /= 1024
|
||||
return f"{value:.2f} {unit}"
|
||||
|
||||
|
||||
def format_rating(value: str) -> str | None:
|
||||
match value:
|
||||
case "g":
|
||||
|
@ -22,3 +34,32 @@ def format_rating(value: str) -> str | None:
|
|||
# Negative Squared Latin Capital Letter E
|
||||
return "🅴"
|
||||
return None
|
||||
|
||||
|
||||
def format_status(data) -> str:
|
||||
is_active = True
|
||||
is_pending = data['is_pending']
|
||||
is_flagged = data['is_flagged']
|
||||
is_deleted = data['is_deleted']
|
||||
is_banned = data['is_banned']
|
||||
if is_pending | is_flagged | is_deleted:
|
||||
is_active = False
|
||||
|
||||
status = []
|
||||
if is_active:
|
||||
status.append("Active")
|
||||
if is_pending:
|
||||
status.append("Pending")
|
||||
if is_flagged:
|
||||
status.append("Flagged")
|
||||
if is_banned:
|
||||
status.append("Banned")
|
||||
return " ".join(status)
|
||||
|
||||
|
||||
def get_json(pathname: str) -> Any | None:
|
||||
r = requests.get(f"http://{app.host}/{pathname}.json")
|
||||
|
||||
if r.status_code != 200:
|
||||
return None
|
||||
return r.json()
|
||||
|
|
|
@ -6,7 +6,7 @@ from telegram.constants import ParseMode
|
|||
from telegram.ext import ContextTypes
|
||||
|
||||
from config import *
|
||||
from extensions import humanize_tags_from_json, format_rating
|
||||
from extensions import humanize_tags_from_json, format_rating, get_json
|
||||
|
||||
|
||||
# noinspection PyUnusedLocal
|
||||
|
@ -19,14 +19,11 @@ async def inline_query(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No
|
|||
if not query.isdigit():
|
||||
return
|
||||
|
||||
response = requests.get(f"http://{app.host}/posts/{query}.json")
|
||||
|
||||
if response.status_code != 200:
|
||||
data = get_json(f"posts/{query}")
|
||||
if data is None:
|
||||
await invalid_query(update, query)
|
||||
return
|
||||
|
||||
data = response.json()
|
||||
|
||||
await answer_query(update, query, data)
|
||||
|
||||
|
||||
|
|
1
main.py
1
main.py
|
@ -21,6 +21,7 @@ def main() -> None:
|
|||
application.add_handler(CommandHandler("start", commands.start_command))
|
||||
application.add_handler(CommandHandler("help", commands.help_command))
|
||||
application.add_handler(CommandHandler("about", commands.about_command))
|
||||
application.add_handler(CommandHandler("info", commands.info_command))
|
||||
|
||||
from inline_query import inline_query
|
||||
application.add_handler(InlineQueryHandler(inline_query))
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue