feat: add /user command (#5)
All checks were successful
Build / Upload to production (push) Successful in 1m43s

Reviewed-on: #5
Co-authored-by: mctaylors <cantsendmails@mctaylors.ru>
Co-committed-by: mctaylors <cantsendmails@mctaylors.ru>
This commit is contained in:
Macintxsh 2025-06-08 10:59:52 +03:00 committed by Forgejo
parent 6ccf2ace4a
commit 86e9c04e1e
Signed by: Forgejo
GPG key ID: F2B3CB736682B201
3 changed files with 271 additions and 4 deletions

View file

@ -8,7 +8,7 @@ from telegram.constants import ParseMode
from telegram.ext import ContextTypes
import html_parser
from datetime import datetime
from datetime import datetime, timedelta
from config import *
from extensions import get_json, format_rating, format_status, humanize_filesize
@ -203,3 +203,265 @@ async def info_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No
),
parse_mode=ParseMode.HTML,
)
async def user_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
try:
name_matches = context.args[0]
message = await context.bot.send_message(
update.effective_chat.id,
"\n".join([html_parser.bold(name_matches), "Fetching..."]),
parse_mode=ParseMode.HTML,
)
user_search_data = get_json(f"users", [f"search[name_matches]={name_matches}"])
if len(user_search_data) == 0:
await context.bot.edit_message_text(
" ".join([html_parser.bold("Error:"), "That record was not found."]),
update.effective_chat.id,
message.message_id,
parse_mode=ParseMode.HTML,
)
return
user_id = user_search_data[0]["id"]
user_data = get_json(f"users/{user_id}")
keyboard = [
[
InlineKeyboardButton(
f"Open in {app.name}",
url=f"{app.protocol}://{app.hostname}/users/{user_id}",
)
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
m = [" ".join(["ID:", html_parser.code(user_data["id"])])]
created_at = datetime.fromisoformat(user_data["created_at"])
m.append(" ".join(["Join Date:", created_at.strftime("%Y-%m-%d")]))
m.append(" ".join(["Level:", user_data["level_string"]]))
m.append(
" ".join(
[
"Uploads:",
html_parser.hyperlink(
user_data["post_upload_count"],
f"{app.protocol}://{app.hostname}/posts?tags=user:{user_data['name']}",
),
]
)
)
if user_data["is_banned"]:
ban_data = get_json(f"bans", [f"search[user_id]={user_data['id']}"])[0]
m.append(
" ".join(
[
"Ban reason:",
ban_data["reason"],
f"(banned for {timedelta(seconds=ban_data['duration'])})",
]
)
)
fav_post_count = get_json(f"counts/posts", [f"tags=fav:{user_data['name']}"])[
"counts"
]["posts"]
m.append(
" ".join(
[
"Favorites:",
html_parser.hyperlink(
fav_post_count,
f"{app.protocol}://{app.hostname}/posts?tags=ordfav:{user_data['name']}",
),
]
)
)
upvote_post_count = get_json(
f"counts/posts", [f"tags=upvote:{user_data['name']}"]
)["counts"]["posts"]
downvote_post_count = get_json(
f"counts/posts", [f"tags=downvote:{user_data['name']}"]
)["counts"]["posts"]
vote_post_count = upvote_post_count + downvote_post_count
m.append(
" ".join(
[
"Post Votes:",
html_parser.hyperlink(
vote_post_count,
f"{app.protocol}://{app.hostname}/post_votes?search[user_name]={user_data['name']}",
),
f"({' '.join([
html_parser.hyperlink(
f"up:{upvote_post_count}",
f"{app.protocol}://{app.hostname}/post_votes?search[user_name]={user_data['name']}&search[score]=1",
),
html_parser.hyperlink(
f"down:{downvote_post_count}",
f"{app.protocol}://{app.hostname}/post_votes?search[user_name]={user_data['name']}&search[score]=-1",
),
])})",
]
)
)
m.append(
" ".join(
[
"Favorite Groups:",
html_parser.hyperlink(
user_data["favorite_group_count"],
f"{app.protocol}://{app.hostname}/favorite_groups?search[creator_name]={user_data['name']}",
),
]
)
)
m.append(
" ".join(
[
"Post Changes:",
html_parser.hyperlink(
user_data["post_update_count"],
f"{app.protocol}://{app.hostname}/post_versions?search[updater_name]={user_data['name']}",
),
]
)
)
noteupdater_post_count = get_json(
f"counts/posts", [f"tags=noteupdater:{user_data['name']}"]
)["counts"]["posts"]
m.append(
" ".join(
[
"Note Changes:",
html_parser.hyperlink(
user_data["note_update_count"],
f"{app.protocol}://{app.hostname}/note_versions?search[updater_name]={user_data['name']}",
),
"in",
html_parser.hyperlink(
noteupdater_post_count,
f"{app.protocol}://{app.hostname}/posts?tags=noteupdater:{user_data['name']}+order:note",
),
"posts",
]
)
)
m.append(
" ".join(
[
"Wiki Page Changes:",
html_parser.hyperlink(
user_data["wiki_page_version_count"],
f"{app.protocol}://{app.hostname}/wiki_page_versions?search[updater_name]={user_data['name']}",
),
]
)
)
m.append(
" ".join(
[
"Artist Changes:",
html_parser.hyperlink(
user_data["artist_version_count"],
f"{app.protocol}://{app.hostname}/artist_versions?search[updater_name]={user_data['name']}",
),
]
)
)
m.append(
" ".join(
[
"Commentary Changes:",
html_parser.hyperlink(
user_data["artist_commentary_version_count"],
f"{app.protocol}://{app.hostname}/artist_commentary_versions?search[updater_name]={user_data['name']}",
),
]
)
)
m.append(
" ".join(
[
"Forum Posts:",
html_parser.hyperlink(
user_data["forum_post_count"],
f"{app.protocol}://{app.hostname}/forum_posts?search[creator_name]={user_data['name']}",
),
]
)
)
commenter_post_count = get_json(
f"counts/posts", [f"tags=commenter:{user_data['name']}"]
)["counts"]["posts"]
m.append(
" ".join(
[
"Comments:",
html_parser.hyperlink(
user_data["comment_count"],
f"{app.protocol}://{app.hostname}/comments?group_by=comment&search[creator_name]={user_data['name']}",
),
"in",
html_parser.hyperlink(
commenter_post_count,
f"{app.protocol}://{app.hostname}/posts?tags=commenter:{user_data['name']}+order:comment_bumped",
),
"posts",
]
)
)
m.append(
" ".join(
[
"Appeals:",
html_parser.hyperlink(
user_data["appeal_count"],
f"{app.protocol}://{app.hostname}/post_appeals?search[creator_name]={user_data['name']}",
),
]
)
)
total_feedback_count = (
user_data["positive_feedback_count"]
+ user_data["neutral_feedback_count"]
+ user_data["negative_feedback_count"]
)
m.append(
" ".join(
[
"Feedback:",
html_parser.hyperlink(
total_feedback_count,
f"{app.protocol}://{app.hostname}/user_feedbacks?search[user_name]={user_data['name']}",
),
f"({' '.join([
html_parser.hyperlink(
f"positive:{user_data['positive_feedback_count']}",
f"{app.protocol}://{app.hostname}/user_feedbacks?search[user_name]={user_data['name']}&search[category]=positive",
),
html_parser.hyperlink(
f"neutral:{user_data['neutral_feedback_count']}",
f"{app.protocol}://{app.hostname}/user_feedbacks?search[user_name]={user_data['name']}&search[category]=neutral",
),
html_parser.hyperlink(
f"negative:{user_data['negative_feedback_count']}",
f"{app.protocol}://{app.hostname}/user_feedbacks?search[user_name]={user_data['name']}&search[category]=negative",
),
])})"
]
)
)
await context.bot.edit_message_text(
"\n".join([html_parser.bold(user_data["name"])] + m),
update.effective_chat.id,
message.message_id,
parse_mode=ParseMode.HTML,
reply_markup=reply_markup,
disable_web_page_preview=True,
)
except (IndexError, ValueError):
await update.message.reply_text(
" ".join(
[html_parser.bold("Usage:"), html_parser.code("/user &lt;username&gt;")]
),
parse_mode=ParseMode.HTML,
)

View file

@ -1,7 +1,7 @@
import re
import requests
from typing import Any
from typing import Any, Optional
from config import app
@ -57,8 +57,12 @@ def format_status(data) -> str:
return " ".join(status)
def get_json(pathname: str) -> Any | None:
r = requests.get(f"http://{app.host}/{pathname}.json")
def get_json(pathname: str, query: Optional[list] = None) -> Any | None:
url = [f"http://{app.host}/{pathname}.json"]
if query is not None:
url.append("?")
url.append("&".join(query))
r = requests.get("".join(url))
if r.status_code != 200:
return None

View file

@ -23,6 +23,7 @@ def main() -> None:
application.add_handler(CommandHandler("help", commands.help_command))
application.add_handler(CommandHandler("about", commands.about_command))
application.add_handler(CommandHandler("info", commands.info_command))
application.add_handler(CommandHandler("user", commands.user_command))
from inline_query import inline_query