from telegram import ( Update, InlineKeyboardButton, InlineKeyboardMarkup, LinkPreviewOptions, ) from telegram.constants import ParseMode from telegram.ext import ContextTypes import html_parser from datetime import datetime, timedelta from config import * from extensions import get_json, format_rating, format_status, humanize_filesize async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: await update.message.reply_text( "\n".join( [ f"hello, i'm {html_parser.bold(context.bot.first_name)}, an inline image grabber.\n", "to get help, use /help", ] ), parse_mode=ParseMode.HTML, ) async def help_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: await update.message.reply_text( "\n".join( [ html_parser.bold(f"how to use {context.bot.first_name}\n"), "1. open your message box and type:", html_parser.code(f"@{context.bot.username} <post ID>"), "2. click on the box that has popped up", "3. ???", "4. done!", ] ), parse_mode=ParseMode.HTML, ) async def about_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: reply_markup = None if general.sourceurl is not None: keyboard = [[InlineKeyboardButton(f"source code", url=general.sourceurl)]] reply_markup = InlineKeyboardMarkup(keyboard) await update.message.reply_text( "\n".join( [ html_parser.bold(f"about {context.bot.first_name}"), f"{context.bot.first_name} is an inline image grabber written in python that grabs images from " "Danbooru (or other similar services).\n", html_parser.bold("currently configured instance:"), f"{html_parser.italic(app.name)} ({app.hostname})", ] ), parse_mode=ParseMode.HTML, reply_markup=reply_markup, ) async def info_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: try: post_id = context.args[0] message = await context.bot.send_message( update.effective_chat.id, "\n".join([html_parser.bold("Information"), "Fetching..."]), parse_mode=ParseMode.HTML, ) post_data = get_json(f"posts/{post_id}") if post_data is None: await context.bot.edit_message_text( " ".join([html_parser.bold("Error:"), "That record was not found."]), update.effective_chat.id, message.message_id, parse_mode=ParseMode.HTML, ) return uploader_data = get_json(f"users/{post_data['uploader_id']}") # well, we could check the uploader, but why would we do that? keyboard = [ [ InlineKeyboardButton( f"Open in {app.name}", url=f"{app.protocol}://{app.hostname}/posts/{post_id}", ) ] ] reply_markup = InlineKeyboardMarkup(keyboard) m = [ " ".join(["ID:", html_parser.code(post_data["id"])]), " ".join( [ "Uploader:", html_parser.hyperlink( uploader_data["name"], f"{app.protocol}://{app.hostname}/users/{post_data['uploader_id']}", ), html_parser.hyperlink( "»", f"{app.protocol}://{app.hostname}/posts?tags=user:{uploader_data['name']}", ), ] ), ] created_at = datetime.fromisoformat(post_data["created_at"]) m.append( " ".join( [ "Date:", html_parser.hyperlink( created_at.strftime("%Y-%m-%d %X (%z)"), f"{app.protocol}://{app.hostname}/posts?tags=date:{created_at.strftime("%Y-%m-%d")}", ), ] ) ) if post_data["approver_id"] is not None: approver_data = get_json(f"users/{post_data['approver_id']}") m.append( " ".join( [ "Approver:", html_parser.hyperlink( approver_data["name"], f"{app.protocol}://{app.hostname}/users/{post_data['approver_id']}", ), html_parser.hyperlink( "»", f"{app.protocol}://{app.hostname}/posts?tags=approver:{approver_data['name']}", ), ] ) ) m.append( " ".join( [ "Size:", html_parser.hyperlink( f"{humanize_filesize(post_data['media_asset']['file_size'])} .{post_data['media_asset']['file_ext']}", "" if post_data["is_banned"] else post_data["file_url"], ), f"({post_data['media_asset']['image_width']}x{post_data['media_asset']['image_height']})", html_parser.hyperlink( "»", f"{app.protocol}://{app.hostname}/media_assets/{post_data['media_asset']['id']}", ), ] ) ) m.append( " ".join( ["Source:", post_data["source"] if post_data["source"] != "" else "🚫"] ) ) m.append(" ".join(["Rating:", format_rating(post_data["rating"])])) m.append( " ".join( [ "Score:", html_parser.hyperlink( post_data["score"], f"{app.protocol}://{app.hostname}/post_votes?search[post_id]={post_data['id']}&variant=compact", ), f"(+{post_data['up_score']} / -{post_data['down_score']})", ] ) ) m.append( " ".join( [ "Favorites:", html_parser.hyperlink( post_data["fav_count"], f"{app.protocol}://{app.hostname}/posts/{post_data['id']}/favorites", ), ] ) ) m.append(" ".join(["Status:", format_status(post_data)])) link_preview_options = LinkPreviewOptions(True) if not post_data["is_banned"]: link_preview_options = LinkPreviewOptions(url=post_data["large_file_url"]) await context.bot.edit_message_text( "\n".join([html_parser.bold("Information")] + m), update.effective_chat.id, message.message_id, parse_mode=ParseMode.HTML, reply_markup=reply_markup, link_preview_options=link_preview_options, ) except (IndexError, ValueError): await update.message.reply_text( " ".join( [html_parser.bold("Usage:"), html_parser.code("/info <post ID>")] ), parse_mode=ParseMode.HTML, ) async def user_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: try: name_matches = context.args[0] message = await context.bot.send_message( update.effective_chat.id, "\n".join([html_parser.bold(name_matches), "Fetching..."]), parse_mode=ParseMode.HTML, ) user_search_data = get_json(f"users", [f"search[name_matches]={name_matches}"]) if len(user_search_data) == 0: await context.bot.edit_message_text( " ".join([html_parser.bold("Error:"), "That record was not found."]), update.effective_chat.id, message.message_id, parse_mode=ParseMode.HTML, ) return user_id = user_search_data[0]["id"] user_data = get_json(f"users/{user_id}") keyboard = [ [ InlineKeyboardButton( f"Open in {app.name}", url=f"{app.protocol}://{app.hostname}/users/{user_id}", ) ] ] reply_markup = InlineKeyboardMarkup(keyboard) m = [" ".join(["ID:", str(user_data["id"])])] created_at = datetime.fromisoformat(user_data["created_at"]) m.append(" ".join(["Join Date:", created_at.strftime("%Y-%m-%d")])) m.append(" ".join(["Level:", user_data["level_string"]])) m.append( " ".join( [ "Uploads:", html_parser.hyperlink( user_data["post_upload_count"], f"{app.protocol}://{app.hostname}/posts?tags=user:{user_data['name']}", ), ] ) ) if user_data["is_banned"]: ban_data = get_json(f"bans", [f"search[user_id]={user_data['id']}"])[0] m.append( " ".join( [ "Ban reason:", ban_data["reason"], f"(banned for {timedelta(seconds=ban_data['duration'])})", ] ) ) fav_post_count = get_json(f"counts/posts", [f"tags=fav:{user_data['name']}"])[ "counts" ]["posts"] m.append( " ".join( [ "Favorites:", html_parser.hyperlink( fav_post_count, f"{app.protocol}://{app.hostname}/posts?tags=ordfav:{user_data['name']}", ), ] ) ) upvote_post_count = get_json( f"counts/posts", [f"tags=upvote:{user_data['name']}"] )["counts"]["posts"] downvote_post_count = get_json( f"counts/posts", [f"tags=downvote:{user_data['name']}"] )["counts"]["posts"] vote_post_count = upvote_post_count + downvote_post_count m.append( " ".join( [ "Post Votes:", html_parser.hyperlink( f"up:{upvote_post_count}", f"{app.protocol}://{app.hostname}/post_votes?search[user_name]={user_data['name']}&search[score]=1", ), html_parser.hyperlink( f"down:{downvote_post_count}", f"{app.protocol}://{app.hostname}/post_votes?search[user_name]={user_data['name']}&search[score]=-1", ), html_parser.hyperlink( f"total:{vote_post_count}", f"{app.protocol}://{app.hostname}/post_votes?search[user_name]={user_data['name']}", ), ] ) ) m.append( " ".join( [ "Favorite Groups:", html_parser.hyperlink( user_data["favorite_group_count"], f"{app.protocol}://{app.hostname}/favorite_groups?search[creator_name]={user_data['name']}", ), ] ) ) m.append( " ".join( [ "Post Changes:", html_parser.hyperlink( user_data["post_update_count"], f"{app.protocol}://{app.hostname}/post_versions?search[updater_name]={user_data['name']}", ), ] ) ) noteupdater_post_count = get_json( f"counts/posts", [f"tags=noteupdater:{user_data['name']}"] )["counts"]["posts"] m.append( " ".join( [ "Note Changes:", html_parser.hyperlink( user_data["note_update_count"], f"{app.protocol}://{app.hostname}/note_versions?search[updater_name]={user_data['name']}", ), "in", html_parser.hyperlink( noteupdater_post_count, f"{app.protocol}://{app.hostname}/posts?tags=noteupdater:{user_data['name']}+order:note", ), "posts", ] ) ) m.append( " ".join( [ "Wiki Page Changes:", html_parser.hyperlink( user_data["wiki_page_version_count"], f"{app.protocol}://{app.hostname}/wiki_page_versions?search[updater_name]={user_data['name']}", ), ] ) ) m.append( " ".join( [ "Artist Changes:", html_parser.hyperlink( user_data["artist_version_count"], f"{app.protocol}://{app.hostname}/artist_versions?search[updater_name]={user_data['name']}", ), ] ) ) m.append( " ".join( [ "Commentary Changes:", html_parser.hyperlink( user_data["artist_commentary_version_count"], f"{app.protocol}://{app.hostname}/artist_commentary_versions?search[updater_name]={user_data['name']}", ), ] ) ) m.append( " ".join( [ "Forum Posts:", html_parser.hyperlink( user_data["forum_post_count"], f"{app.protocol}://{app.hostname}/forum_posts?search[creator_name]={user_data['name']}", ), ] ) ) commenter_post_count = get_json( f"counts/posts", [f"tags=commenter:{user_data['name']}"] )["counts"]["posts"] m.append( " ".join( [ "Comments:", html_parser.hyperlink( user_data["comment_count"], f"{app.protocol}://{app.hostname}/comments?group_by=comment&search[creator_name]={user_data['name']}", ), "in", html_parser.hyperlink( commenter_post_count, f"{app.protocol}://{app.hostname}/posts?tags=commenter:{user_data['name']}+order:comment_bumped", ), "posts", ] ) ) m.append( " ".join( [ "Appeals:", html_parser.hyperlink( user_data["appeal_count"], f"{app.protocol}://{app.hostname}/post_appeals?search[creator_name]={user_data['name']}", ), ] ) ) avg_feedback_count = user_data["positive_feedback_count"] - user_data["negative_feedback_count"] m.append( " ".join( [ "Feedback:", html_parser.hyperlink( f"positive:{user_data['positive_feedback_count']}", f"{app.protocol}://{app.hostname}/user_feedbacks?search[user_name]={user_data['name']}&search[category]=positive", ), html_parser.hyperlink( f"neutral:{user_data['neutral_feedback_count']}", f"{app.protocol}://{app.hostname}/user_feedbacks?search[user_name]={user_data['name']}&search[category]=neutral", ), html_parser.hyperlink( f"negative:{user_data['negative_feedback_count']}", f"{app.protocol}://{app.hostname}/user_feedbacks?search[user_name]={user_data['name']}&search[category]=negative", ), html_parser.hyperlink( f"avg:{avg_feedback_count}", f"{app.protocol}://{app.hostname}/user_feedbacks?search[user_name]={user_data['name']}", ), ] ) ) await context.bot.edit_message_text( "\n".join([html_parser.bold(user_data["name"])] + m), update.effective_chat.id, message.message_id, parse_mode=ParseMode.HTML, reply_markup=reply_markup, disable_web_page_preview=True, ) except (IndexError, ValueError): await update.message.reply_text( " ".join( [html_parser.bold("Usage:"), html_parser.code("/user <username>")] ), parse_mode=ParseMode.HTML, )