mirror of
https://github.com/RGBCube/VReplBot
synced 2025-05-14 13:34:59 +00:00
225 lines
6.8 KiB
Python
225 lines
6.8 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
from io import BytesIO
|
|
from typing import Literal, TYPE_CHECKING
|
|
|
|
from discord import File
|
|
from discord.ext.commands import Cog, command
|
|
from jishaku.codeblocks import codeblock_converter
|
|
|
|
from .error_handler import StopCommandExecution
|
|
|
|
if TYPE_CHECKING:
|
|
from discord import MessageReference, TextChannel
|
|
from discord.ext.commands import Context
|
|
|
|
from .. import ReplBot
|
|
|
|
|
|
class Playground(
|
|
Cog,
|
|
name = "Playground",
|
|
description = "V Playground commands.",
|
|
):
|
|
def __init__(self, bot: ReplBot) -> None:
|
|
self.bot = bot
|
|
|
|
async def get_code(self, ctx: Context, query: str) -> str:
|
|
async with await self.bot.session.post(
|
|
f"https://play.vlang.io/query",
|
|
data = { "hash": query }
|
|
) as response:
|
|
text = await response.text()
|
|
|
|
if text == "Not found.":
|
|
await ctx.reply("Invalid query.")
|
|
raise StopCommandExecution()
|
|
|
|
return text
|
|
|
|
async def share_code(self, code: str) -> str:
|
|
async with await self.bot.session.post(
|
|
f"https://play.vlang.io/share",
|
|
data = { "code": code },
|
|
) as response:
|
|
return await response.text()
|
|
|
|
async def run_code(self, code: str) -> tuple[bool, str]:
|
|
async with await self.bot.session.post(
|
|
f"https://play.vlang.io/run",
|
|
data = { "code": code },
|
|
) as response:
|
|
body = json.loads(await response.json(encoding = "utf-8"))
|
|
|
|
return body["ok"], body["output"]
|
|
|
|
async def test_code(self, code: str) -> tuple[bool, str]:
|
|
async with await self.bot.session.post(
|
|
f"https://play.vlang.io/run_test",
|
|
data = { "code": code },
|
|
) as response:
|
|
body = json.loads(await response.json(encoding = "utf-8"))
|
|
|
|
return body["ok"], body["output"]
|
|
|
|
@staticmethod
|
|
async def get_message_content(channel: TextChannel, ref: MessageReference) -> str:
|
|
if ref.resolved:
|
|
return ref.resolved.content
|
|
else:
|
|
message = await channel.fetch_message(ref.message_id)
|
|
return message.content
|
|
|
|
@staticmethod
|
|
def grep_code(content: str) -> str:
|
|
content = "`" + (splitted := content.split("`", 1))[len(splitted) - 1].rsplit("`", 1)[
|
|
0] + "`"
|
|
|
|
return codeblock_converter(content).content
|
|
|
|
@staticmethod
|
|
def grep_link_query(content: str) -> str | None:
|
|
if "play.vlang.io/?query=" not in content:
|
|
return None
|
|
|
|
query = content.split("play.vlang.io/?query=", 1)[1].split(" ", 1)[0].removesuffix(">")
|
|
|
|
if not query: # Empty string.
|
|
return None
|
|
|
|
return query
|
|
|
|
@staticmethod
|
|
def extract_link_query(content: str) -> str | None:
|
|
if (no_http_content := content.removeprefix("<").removeprefix("https://")).startswith(
|
|
"play.vlang.io/?query="
|
|
):
|
|
return no_http_content.removeprefix("play.vlang.io/?query=").split(" ", 1)[
|
|
0].removesuffix(">")
|
|
|
|
@staticmethod
|
|
def sanitize(string: str) -> str:
|
|
return string.replace("`", "\u200B`\u200B") # Zero-width space.
|
|
|
|
async def run_test_common(
|
|
self,
|
|
ctx: Context,
|
|
query_or_code: str | None,
|
|
*,
|
|
type: Literal["run", "test"],
|
|
):
|
|
if not query_or_code:
|
|
if not (ref := ctx.message.reference):
|
|
await ctx.reply("No code provided.")
|
|
return
|
|
|
|
content = await self.get_message_content(ctx.channel, ref)
|
|
|
|
if query := self.grep_link_query(content):
|
|
code = await self.get_code(ctx, query)
|
|
else:
|
|
code = self.grep_code(content)
|
|
|
|
elif query := self.extract_link_query(query_or_code):
|
|
code = await self.get_code(ctx, query)
|
|
else:
|
|
code = codeblock_converter(query_or_code).content
|
|
|
|
ok, output = await (self.run_code(code) if type == "run" else self.test_code(code))
|
|
sanitized_output = self.sanitize(output)
|
|
|
|
sentence = "Success!" if ok else "Failure!"
|
|
|
|
if len(sanitized_output) > 1900:
|
|
await ctx.reply(
|
|
f"**{sentence}**",
|
|
file = File(BytesIO(output.encode()), "output.txt")
|
|
)
|
|
else:
|
|
await ctx.reply(
|
|
f"**{sentence}**\n"
|
|
f"```v\n{self.sanitize(sanitized_output)}```"
|
|
)
|
|
|
|
@command(
|
|
aliases = ("eval", "repl"),
|
|
brief = "Runs V code.",
|
|
help = "Runs V code."
|
|
)
|
|
async def run(
|
|
self,
|
|
ctx: Context,
|
|
*,
|
|
query_or_code: str | None = None
|
|
) -> None:
|
|
await self.run_test_common(ctx, query_or_code, type = "run")
|
|
|
|
@command(
|
|
brief = "Runs tests of V code.",
|
|
help = "Runs tests of V code."
|
|
)
|
|
async def test(
|
|
self,
|
|
ctx: Context,
|
|
*,
|
|
query_or_code: str | None = None
|
|
) -> None:
|
|
await self.run_test_common(ctx, query_or_code, type = "test")
|
|
|
|
@command(
|
|
aliases = ("download",),
|
|
brief = "Shows the code in a V playground link.",
|
|
help = "Shows the code in a V playground link."
|
|
)
|
|
async def show(self, ctx: Context, query: str | None = None) -> None:
|
|
if not query:
|
|
if not (ref := ctx.message.reference):
|
|
await ctx.reply("No query provided.")
|
|
return
|
|
|
|
content = await self.get_message_content(ctx.channel, ref)
|
|
|
|
query = self.grep_link_query(content)
|
|
|
|
else:
|
|
query = self.extract_link_query(query)
|
|
|
|
if not query:
|
|
await ctx.reply("No query provided.")
|
|
return
|
|
|
|
code = await self.get_code(ctx, query)
|
|
sanitized_code = self.sanitize(code)
|
|
|
|
if len(sanitized_code) > 1900:
|
|
await ctx.reply(
|
|
"The code is too long to be shown. Here's a file instead:",
|
|
file = File(BytesIO(code.encode()), "code.v")
|
|
)
|
|
else:
|
|
await ctx.reply(f"```v\n{sanitized_code}```")
|
|
|
|
@command(
|
|
aliases = ("upload",),
|
|
brief = "Uploads code to V playground.",
|
|
help = "Uploads code to V playground."
|
|
)
|
|
async def share(self, ctx: Context, *, code: str | None = None) -> None:
|
|
if not code:
|
|
if not (ref := ctx.message.reference):
|
|
await ctx.reply("No code provided.")
|
|
return
|
|
|
|
content = await self.get_message_content(ctx.channel, ref)
|
|
code = self.grep_code(content)
|
|
else:
|
|
code = codeblock_converter(code).content
|
|
|
|
link = await self.share_code(code)
|
|
|
|
await ctx.reply(f"<https://play.vlang.io/?query={link}>")
|
|
|
|
|
|
async def setup(bot: ReplBot) -> None:
|
|
await bot.add_cog(Playground(bot))
|