1
Fork 0
mirror of https://github.com/RGBCube/VReplBot synced 2025-05-14 13:34:59 +00:00
VReplBot/v_repl_bot/cogs/playground.py
2023-02-05 13:31:46 +03:00

225 lines
6.8 KiB
Python

from __future__ import annotations
import json
from io import BytesIO
from typing import Literal, TYPE_CHECKING
from discord import File
from discord.ext.commands import Cog, command
from jishaku.codeblocks import codeblock_converter
from .error_handler import StopCommandExecution
if TYPE_CHECKING:
from discord import MessageReference, TextChannel
from discord.ext.commands import Context
from .. import ReplBot
class Playground(
Cog,
name = "Playground",
description = "V Playground commands.",
):
def __init__(self, bot: ReplBot) -> None:
self.bot = bot
async def get_code(self, ctx: Context, query: str) -> str:
async with await self.bot.session.post(
f"https://play.vlang.io/query",
data = { "hash": query }
) as response:
text = await response.text()
if text == "Not found.":
await ctx.reply("Invalid query.")
raise StopCommandExecution()
return text
async def share_code(self, code: str) -> str:
async with await self.bot.session.post(
f"https://play.vlang.io/share",
data = { "code": code },
) as response:
return await response.text()
async def run_code(self, code: str) -> tuple[bool, str]:
async with await self.bot.session.post(
f"https://play.vlang.io/run",
data = { "code": code },
) as response:
body = json.loads(await response.json(encoding = "utf-8"))
return body["ok"], body["output"]
async def test_code(self, code: str) -> tuple[bool, str]:
async with await self.bot.session.post(
f"https://play.vlang.io/run_test",
data = { "code": code },
) as response:
body = json.loads(await response.json(encoding = "utf-8"))
return body["ok"], body["output"]
@staticmethod
async def get_message_content(channel: TextChannel, ref: MessageReference) -> str:
if ref.resolved:
return ref.resolved.content
else:
message = await channel.fetch_message(ref.message_id)
return message.content
@staticmethod
def grep_code(content: str) -> str:
content = "`" + (splitted := content.split("`", 1))[len(splitted) - 1].rsplit("`", 1)[
0] + "`"
return codeblock_converter(content).content
@staticmethod
def grep_link_query(content: str) -> str | None:
if "play.vlang.io/?query=" not in content:
return None
query = content.split("play.vlang.io/?query=", 1)[1].split(" ", 1)[0].removesuffix(">")
if not query: # Empty string.
return None
return query
@staticmethod
def extract_link_query(content: str) -> str | None:
if (no_http_content := content.removeprefix("<").removeprefix("https://")).startswith(
"play.vlang.io/?query="
):
return no_http_content.removeprefix("play.vlang.io/?query=").split(" ", 1)[
0].removesuffix(">")
@staticmethod
def sanitize(string: str) -> str:
return string.replace("`", "\u200B`\u200B") # Zero-width space.
async def run_test_common(
self,
ctx: Context,
query_or_code: str | None,
*,
type: Literal["run", "test"],
):
if not query_or_code:
if not (ref := ctx.message.reference):
await ctx.reply("No code provided.")
return
content = await self.get_message_content(ctx.channel, ref)
if query := self.grep_link_query(content):
code = await self.get_code(ctx, query)
else:
code = self.grep_code(content)
elif query := self.extract_link_query(query_or_code):
code = await self.get_code(ctx, query)
else:
code = codeblock_converter(query_or_code).content
ok, output = await (self.run_code(code) if type == "run" else self.test_code(code))
sanitized_output = self.sanitize(output)
sentence = "Success!" if ok else "Failure!"
if len(sanitized_output) > 1900:
await ctx.reply(
f"**{sentence}**",
file = File(BytesIO(output.encode()), "output.txt")
)
else:
await ctx.reply(
f"**{sentence}**\n"
f"```v\n{self.sanitize(sanitized_output)}```"
)
@command(
aliases = ("eval", "repl"),
brief = "Runs V code.",
help = "Runs V code."
)
async def run(
self,
ctx: Context,
*,
query_or_code: str | None = None
) -> None:
await self.run_test_common(ctx, query_or_code, type = "run")
@command(
brief = "Runs tests of V code.",
help = "Runs tests of V code."
)
async def test(
self,
ctx: Context,
*,
query_or_code: str | None = None
) -> None:
await self.run_test_common(ctx, query_or_code, type = "test")
@command(
aliases = ("download",),
brief = "Shows the code in a V playground link.",
help = "Shows the code in a V playground link."
)
async def show(self, ctx: Context, query: str | None = None) -> None:
if not query:
if not (ref := ctx.message.reference):
await ctx.reply("No query provided.")
return
content = await self.get_message_content(ctx.channel, ref)
query = self.grep_link_query(content)
else:
query = self.extract_link_query(query)
if not query:
await ctx.reply("No query provided.")
return
code = await self.get_code(ctx, query)
sanitized_code = self.sanitize(code)
if len(sanitized_code) > 1900:
await ctx.reply(
"The code is too long to be shown. Here's a file instead:",
file = File(BytesIO(code.encode()), "code.v")
)
else:
await ctx.reply(f"```v\n{sanitized_code}```")
@command(
aliases = ("upload",),
brief = "Uploads code to V playground.",
help = "Uploads code to V playground."
)
async def share(self, ctx: Context, *, code: str | None = None) -> None:
if not code:
if not (ref := ctx.message.reference):
await ctx.reply("No code provided.")
return
content = await self.get_message_content(ctx.channel, ref)
code = self.grep_code(content)
else:
code = codeblock_converter(code).content
link = await self.share_code(code)
await ctx.reply(f"<https://play.vlang.io/?query={link}>")
async def setup(bot: ReplBot) -> None:
await bot.add_cog(Playground(bot))