1
Fork 0
mirror of https://github.com/RGBCube/VReplBot synced 2025-07-27 00:17:46 +00:00
This commit is contained in:
RGBCube 2023-02-03 22:48:05 +03:00
parent ede57775f4
commit 710a20c328
2 changed files with 142 additions and 78 deletions

View file

@ -23,6 +23,10 @@ if TYPE_CHECKING:
from .. import ReplBot from .. import ReplBot
class StopCommandExecution(Exception):
pass
class ErrorHandler(Cog): class ErrorHandler(Cog):
def __init__(self, bot: ReplBot) -> None: def __init__(self, bot: ReplBot) -> None:
self.bot = bot self.bot = bot
@ -36,7 +40,7 @@ class ErrorHandler(Cog):
if cog._get_overridden_method(cog.cog_command_error) is not None: if cog._get_overridden_method(cog.cog_command_error) is not None:
return return
ignored = (CommandNotFound,) ignored = (CommandNotFound, StopCommandExecution)
error = getattr(error, "original", error) error = getattr(error, "original", error)
if isinstance(error, ignored): if isinstance(error, ignored):

View file

@ -8,6 +8,8 @@ from discord import File
from discord.ext.commands import Cog, command from discord.ext.commands import Cog, command
from jishaku.codeblocks import codeblock_converter from jishaku.codeblocks import codeblock_converter
from .error_handler import StopCommandExecution
if TYPE_CHECKING: if TYPE_CHECKING:
from discord import MessageReference, TextChannel from discord import MessageReference, TextChannel
from discord.ext.commands import Context from discord.ext.commands import Context
@ -15,18 +17,6 @@ if TYPE_CHECKING:
from .. import ReplBot from .. import ReplBot
def sanitize_str_for_codeblock(string: str) -> str:
return string.replace("`", "\u200B`\u200B") # Zero-width space.
async def get_message_content(channel: TextChannel, ref: MessageReference) -> str:
if ref.resolved:
return ref.resolved.content
else:
message = await channel.fetch_message(ref.message_id)
return message.content
class Playground( class Playground(
Cog, Cog,
name = "Playground", name = "Playground",
@ -35,46 +25,117 @@ class Playground(
def __init__(self, bot: ReplBot) -> None: def __init__(self, bot: ReplBot) -> None:
self.bot = bot self.bot = bot
async def run_test_common( async def get_code(self, ctx: Context, query: str) -> str:
self,
ctx: Context,
code_or_query: str | None,
*,
type: Literal["run"] | Literal["run_test"]
) -> None:
if not code_or_query:
if not (reply := ctx.message.reference):
await ctx.reply("No code provided.")
return
code_or_query = await get_message_content(ctx.channel, reply)
if (c_stripped := code_or_query.lstrip("https://")).startswith("play.vlang.io/?query="):
query = c_stripped.lstrip("play.vlang.io/?query=").split(" ", 1)[0]
code = await self.get_query_content(query)
if not code:
await ctx.reply("Invalid query.")
return
else:
code = codeblock_converter(code_or_query).content
async with await self.bot.session.post( async with await self.bot.session.post(
f"https://play.vlang.io/{type}", f"https://play.vlang.io/query",
data = { "hash": query }
) as response:
text = await response.text()
if text == "Not found.":
await ctx.reply("Invalid query.")
raise StopCommandExecution()
return text
async def share_code(self, code: str) -> str:
async with await self.bot.session.post(
f"https://play.vlang.io/share",
data = { "code": code },
) as response:
return await response.text()
async def run_code(self, code: str) -> tuple[bool, str]:
async with await self.bot.session.post(
f"https://play.vlang.io/run",
data = { "code": code }, data = { "code": code },
) as response: ) as response:
body = json.loads(await response.text()) body = json.loads(await response.text())
text = sanitize_str_for_codeblock(body["output"])
if len(text) + 7 > 2000: return body["ok"], body["output"]
await ctx.reply(
"The output was too long to be sent as a message. Here is a file instead:", async def test_code(self, code: str) -> tuple[bool, str]:
file = File(BytesIO(text.encode()), filename = "output.txt") async with await self.bot.session.post(
) f"https://play.vlang.io/run_test",
data = { "code": code },
) as response:
body = json.loads(await response.text())
return body["ok"], body["output"]
@staticmethod
async def get_message_content(channel: TextChannel, ref: MessageReference) -> str:
if ref.resolved:
return ref.resolved.content
else:
message = await channel.fetch_message(ref.message_id)
return message.content
@staticmethod
def grep_code(content: str) -> str:
content = "`" + content.split("`", 1)[1].rsplit("`", 1)[0] + "`"
return codeblock_converter(content).content
@staticmethod
def grep_link_query(content: str) -> str | None:
if "play.vlang.io/?query=" not in content:
return None
query = content.split("play.vlang.io/?query=", 1)[1].split(" ", 1)[0]
if not query: # Empty string.
return None
return query
@staticmethod
def extract_link_query(content: str) -> str | None:
if (no_http_content := content.lstrip("https://")).startswith("play.vlang.io/?query="):
return no_http_content.lstrip("play.vlang.io/?query=").split(" ", 1)[0]
@staticmethod
def sanitize(string: str) -> str:
return string.replace("`", "\u200B`\u200B") # Zero-width space.
async def run_test_common(
self,
ctx: Context,
query_or_code: str | None,
*,
type: Literal["run", "test"],
):
if not query_or_code:
if not (ref := ctx.message.reference):
await ctx.reply("No code provided.")
return return
content = await self.get_message_content(ctx.channel, ref)
if query := self.grep_link_query(content):
code = await self.get_code(ctx, query)
else:
code = self.grep_code(content)
elif query := self.extract_link_query(query_or_code):
code = await self.get_code(ctx, query)
else:
code = codeblock_converter(query_or_code).content
ok, output = await (self.run_code(code) if type == "run" else self.test_code(code))
sanitized_output = self.sanitize(output)
sentence = "Success!" if ok else "Failure!"
if len(sanitized_output) > 1900:
await ctx.reply( await ctx.reply(
"```\n" + text + "```" f"**{sentence}**",
file = File(BytesIO(output.encode()), "output.txt")
)
else:
await ctx.reply(
f"**{sentence}**\n"
f"```v\n{self.sanitize(sanitized_output)}```"
) )
@command( @command(
@ -100,19 +161,7 @@ class Playground(
*, *,
query_or_code: str | None = None query_or_code: str | None = None
) -> None: ) -> None:
await self.run_test_common(ctx, query_or_code, type = "run_test") await self.run_test_common(ctx, query_or_code, type = "test")
async def get_query_content(self, query: str) -> str | None:
async with await self.bot.session.post(
f"https://play.vlang.io/query",
data = { "hash": query }
) as response:
text = sanitize_str_for_codeblock(await response.text())
if text == "Not found.":
return None
return text
@command( @command(
aliases = ("download",), aliases = ("download",),
@ -121,39 +170,50 @@ class Playground(
) )
async def show(self, ctx: Context, query: str | None = None) -> None: async def show(self, ctx: Context, query: str | None = None) -> None:
if not query: if not query:
if not (reply := ctx.message.reference): if not (ref := ctx.message.reference):
await ctx.reply("No query provided.") await ctx.reply("No query provided.")
return return
content = await get_message_content(ctx.channel, reply) content = await self.get_message_content(ctx.channel, ref)
if "play.vlang.io/?query=" in content: query = self.grep_link_query(content)
query = content.split("play.vlang.io/?query=", 1)[1].split(" ", 1)[0] else:
else: query = self.extract_link_query(query)
query = content.split(" ", 1)[0]
query = query.lstrip("https://").lstrip("play.vlang.io/?query=")
if not query: if not query:
await ctx.reply("No query provided.") await ctx.reply("No query provided.")
return return
code = await self.get_query_content(query) code = await self.get_code(ctx, query)
sanitized_code = self.sanitize(code)
if not code: if len(sanitized_code) > 1900:
await ctx.reply("Invalid link.")
return
if len(code) + 8 > 2000:
await ctx.reply( await ctx.reply(
"The code was too long to be sent as a message. Here is a file instead:", "The code is too long to be shown. Here's a file instead:",
file = File(BytesIO(code.encode()), filename = "code.v") file = File(BytesIO(code.encode()), "code.v")
) )
return else:
await ctx.reply(f"```v\n{sanitized_code}```")
await ctx.reply( @command(
"```v\n" + code + "```" aliases = ("upload",),
) brief = "Uploads code to V playground.",
help = "Uploads code to V playground."
)
async def share(self, ctx: Context, *, code: str | None = None) -> None:
if not code:
if not (ref := ctx.message.reference):
await ctx.reply("No code provided.")
return
content = await self.get_message_content(ctx.channel, ref)
code = self.grep_code(content)
else:
code = codeblock_converter(code).content
link = await self.share_code(code)
await ctx.reply(f"<{link}>")
async def setup(bot: ReplBot) -> None: async def setup(bot: ReplBot) -> None: