Compare commits

...

12 commits

Author SHA1 Message Date
270e1630d4
feat: add /info command, pt. 2
Signed-off-by: mctaylors <cantsendmails@mctaylors.ru>
2025-05-18 12:35:05 +03:00
b079d56250
feat: add /info command, pt. 1
# Conflicts:
#	extensions.py
2025-05-18 12:34:58 +03:00
c70d7fe3be
refactor: completely rewrite humanize_tags_from_json
Signed-off-by: mctaylors <cantsendmails@mctaylors.ru>
2025-05-18 12:30:22 +03:00
f07a51f76a
change: make inline query answer more compact
Signed-off-by: mctaylors <cantsendmails@mctaylors.ru>
2025-05-18 12:30:22 +03:00
df1735ef36
feat: use LinkPreviewOptions in answer_query
Signed-off-by: mctaylors <cantsendmails@mctaylors.ru>
2025-05-18 12:30:22 +03:00
0cc77dd42c
change: make return None statement explicit in format_rating
Signed-off-by: mctaylors <cantsendmails@mctaylors.ru>
2025-05-18 12:30:22 +03:00
d646920210
fix: set encoding for _c.read
Signed-off-by: mctaylors <cantsendmails@mctaylors.ru>
2025-05-18 12:30:22 +03:00
0ee018e2ce
refactor: define parameter types *again*
Signed-off-by: mctaylors <cantsendmails@mctaylors.ru>
2025-05-18 12:30:22 +03:00
7fdf18b5cb
i18n: use strings from Danbooru
Signed-off-by: mctaylors <cantsendmails@mctaylors.ru>
2025-05-18 12:30:22 +03:00
ff86c78b21
fix: add is_banned check
Signed-off-by: mctaylors <cantsendmails@mctaylors.ru>
2025-05-18 12:30:22 +03:00
05cbca7509
chore: remove unused imports
Signed-off-by: mctaylors <cantsendmails@mctaylors.ru>
2025-05-18 12:30:22 +03:00
b503234b85
pip: add requirements.txt
Signed-off-by: mctaylors <cantsendmails@mctaylors.ru>
2025-05-18 12:30:22 +03:00
6 changed files with 131 additions and 27 deletions

View file

@ -1,9 +1,10 @@
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup, LinkPreviewOptions
from telegram.constants import ParseMode
from telegram.ext import ContextTypes
import html_parser
from config import *
from extensions import get_json, format_rating, format_status
async def start_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
@ -45,3 +46,65 @@ async def about_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> N
parse_mode=ParseMode.HTML,
reply_markup=reply_markup
)
async def info_command(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
try:
post_id = context.args[0]
message = await context.bot.send_message(update.effective_chat.id,
f"{html_parser.bold("Information")}\n"
f"Fetching...",
parse_mode=ParseMode.HTML)
post_data = get_json(f"posts/{post_id}")
if post_data is None:
await update.message.reply_text(
f"{html_parser.bold("Error")}: That record was not found.",
parse_mode=ParseMode.HTML)
return
uploader_data = get_json(f"users/{post_data['uploader_id']}")
# well, we could check the uploader, but why would we do that?
keyboard = [
[
InlineKeyboardButton(f"Open in {app.name}",
url=f"https://{app.hostname}/posts/{post_id}")
]
]
reply_markup = InlineKeyboardMarkup(keyboard)
# noinspection PyListCreation
m = []
m.append(f"ID: {html_parser.code(post_data['id'])}")
m.append(f"Uploader: {html_parser.hyperlink(uploader_data['name'],
f"http://{app.host}/users/{post_data['uploader_id']}")} "
f"{html_parser.hyperlink("»", f"http://{app.host}/posts?tags=user:{uploader_data['name']}")}")
m.append(f"Date: {post_data['created_at']}")
if post_data['approver_id'] is not None:
approver_data = get_json(f"users/{post_data['approver_id']}")
m.append(f"Approver: {html_parser.hyperlink(approver_data['name'],
f"http://{app.host}/users/{post_data['approver_id']}")} "
f"{html_parser.hyperlink("»", f"http://{app.host}/posts?tags=approver:{approver_data['name']}")}")
m.append(f"Size: {post_data['media_asset']['file_size']} .{post_data['media_asset']['file_ext']} "
f"({post_data['media_asset']['image_width']}x{post_data['media_asset']['image_height']}) "
f"{html_parser.hyperlink("»", f"http://{app.host}/media_assets/{post_data['media_asset']['id']}")}")
m.append(f"Source: {post_data['source'] if post_data['source'] != "" else "🚫"}")
m.append(f"Rating: {format_rating(post_data['rating'])}")
m.append(f"Score: {html_parser.hyperlink(post_data['score'],
f"http://{app.host}/post_votes?search[post_id]={post_data['id']}&variant=compact")} "
f"(+{post_data['up_score']} / -{post_data['down_score']})")
m.append(f"Favorites: {html_parser.hyperlink(post_data['fav_count'],
f"http://{app.host}/posts/{post_data['id']}/favorites")}")
m.append(f"Status: {format_status(post_data)}")
link_preview_options = LinkPreviewOptions(True)
if not post_data['is_banned']:
link_preview_options = LinkPreviewOptions(url=post_data['large_file_url'])
await context.bot.edit_message_text(
f"{html_parser.bold("Information")}\n" + "\n".join(m),
update.effective_chat.id, message.message_id,
parse_mode=ParseMode.HTML, reply_markup=reply_markup, link_preview_options=link_preview_options)
except (IndexError, ValueError):
await update.message.reply_text(
f"{html_parser.bold("Usage")}: {html_parser.code(f"/info &lt;post ID&gt;")}",
parse_mode=ParseMode.HTML)

View file

@ -4,7 +4,7 @@ from typing import Optional
import requests
_c = configparser.ConfigParser()
_c.read("config.ini")
_c.read("config.ini", "utf-8")
class _General:

View file

@ -1,20 +1,17 @@
import re
import requests
from typing import Any
from config import app
def humanize_tags_from_json(value: str, default: str):
def humanize_tags_from_json(value: str, default: str) -> str:
if value != "":
output = str()
tags = value.split()
for t in tags:
output += f"{re.sub('_\\(.*', '', t)}, "
output = output[:-2]
return output
return ", ".join([re.sub('_\\(.*', '', t) for t in value.split()])
return default
def format_rating(value: str):
def format_rating(value: str) -> str | None:
match value:
case "g":
# Negative Squared Latin Capital Letter G
@ -28,3 +25,32 @@ def format_rating(value: str):
case "e":
# Negative Squared Latin Capital Letter E
return "🅴"
return None
def format_status(data) -> str:
is_active = True
is_pending = data['is_pending']
is_flagged = data['is_flagged']
is_deleted = data['is_deleted']
is_banned = data['is_banned']
if is_pending | is_flagged | is_deleted:
is_active = False
status = []
if is_active:
status.append("Active")
if is_pending:
status.append("Pending")
if is_flagged:
status.append("Flagged")
if is_banned:
status.append("Banned")
return " ".join(status)
def get_json(pathname: str) -> Any | None:
r = requests.get(f"http://{app.host}/{pathname}.json")
if r.status_code != 200:
return None
return r.json()

View file

@ -1,13 +1,12 @@
from uuid import uuid4
import requests
from telegram import Update, InlineKeyboardButton, InlineQueryResultArticle, InputTextMessageContent, \
InlineKeyboardMarkup
InlineKeyboardMarkup, LinkPreviewOptions
from telegram.constants import ParseMode
from telegram.ext import ContextTypes
from config import *
from extensions import humanize_tags_from_json, format_rating
from extensions import humanize_tags_from_json, format_rating, get_json
# noinspection PyUnusedLocal
@ -20,14 +19,11 @@ async def inline_query(update: Update, context: ContextTypes.DEFAULT_TYPE) -> No
if not query.isdigit():
return
response = requests.get(f"http://{app.host}/posts/{query}.json")
if response.status_code != 200:
data = get_json(f"posts/{query}")
if data is None:
await invalid_query(update, query)
return
data = response.json()
await answer_query(update, query, data)
@ -36,6 +32,24 @@ async def answer_query(update: Update, query: str, data) -> None:
copyrights = humanize_tags_from_json(data['tag_string_copyright'], "unknown copyright")
artists = humanize_tags_from_json(data['tag_string_artist'], "unknown artist")
rating = format_rating(data['rating'])
if data['is_banned']:
results = [
InlineQueryResultArticle(
id=str(uuid4()),
title=f"ID: {query}",
description=f"{characters} ({copyrights}) drawn by {artists}",
input_message_content=InputTextMessageContent(
f"<code>#{query}</code> <s><b>{characters} ({copyrights})</b> drawn by <b>{artists}</b></s> {rating}\n"
f"This post has been removed because of a takedown request or rule violation.",
parse_mode=ParseMode.HTML
)
)
]
await update.inline_query.answer(results)
return
keyboard = [
[
InlineKeyboardButton(f"Open in {app.name}",
@ -51,10 +65,9 @@ async def answer_query(update: Update, query: str, data) -> None:
description=f"{characters} ({copyrights}) drawn by {artists}",
thumbnail_url=data['preview_file_url'],
input_message_content=InputTextMessageContent(
f"ID: <code>{query}</code> {rating}\n"
f"<a href='{data['large_file_url']}'><b>{characters} ({copyrights})</b> "
f"drawn by <b>{artists}</b></a>",
parse_mode=ParseMode.HTML
f"<code>#{query}</code> <b>{characters} ({copyrights})</b> drawn by <b>{artists}</b> {rating}",
parse_mode=ParseMode.HTML,
link_preview_options=LinkPreviewOptions(url=data['large_file_url'])
),
reply_markup=InlineKeyboardMarkup(keyboard)
)
@ -68,9 +81,10 @@ async def invalid_query(update: Update, query: str) -> None:
InlineQueryResultArticle(
id=str(uuid4()),
title=f"ID: {query}",
description="not found.",
description="Error",
input_message_content=InputTextMessageContent(
f"ID: <code>{query}</code>\n<b>requested post does not exist.</b>",
f"<code>#{query}</code>\n"
f"That record was not found.",
parse_mode=ParseMode.HTML
)
)

View file

@ -21,6 +21,7 @@ def main() -> None:
application.add_handler(CommandHandler("start", commands.start_command))
application.add_handler(CommandHandler("help", commands.help_command))
application.add_handler(CommandHandler("about", commands.about_command))
application.add_handler(CommandHandler("info", commands.info_command))
from inline_query import inline_query
application.add_handler(InlineQueryHandler(inline_query))
@ -28,7 +29,7 @@ def main() -> None:
application.run_polling(allowed_updates=commands.Update.ALL_TYPES)
def get_token():
def get_token() -> None:
if os.getenv("BOT_TOKEN") is not None:
return os.getenv("BOT_TOKEN")

BIN
requirements.txt Normal file

Binary file not shown.