mirror of
https://github.com/RGBCube/GitHubWrapper
synced 2025-05-17 22:45:08 +00:00
102 lines
3.4 KiB
Python
102 lines
3.4 KiB
Python
#== main.py ==#
|
|
|
|
__all__ = (
|
|
'GHClient',
|
|
)
|
|
|
|
from getpass import getpass
|
|
import aiohttp
|
|
import asyncio
|
|
|
|
from Github.objects.repo import Issue
|
|
|
|
from . import http
|
|
from . import exceptions
|
|
from .objects import *
|
|
|
|
class GHClient:
|
|
_auth = None
|
|
has_started = False
|
|
def __init__(
|
|
self,
|
|
*,
|
|
username: str | None = None,
|
|
token: str | None = None,
|
|
custom_headers: dict[str, str | int] = {}
|
|
):
|
|
"""The main client, used to start most use-cases."""
|
|
self._headers = custom_headers
|
|
if username and token:
|
|
self._auth = aiohttp.BasicAuth(username, token)
|
|
|
|
def __await__(self) -> 'GHClient':
|
|
return self.start().__await__()
|
|
|
|
def __repr__(self) -> str:
|
|
return f'<Github Client; has_auth={bool(self._auth)}>'
|
|
|
|
def __del__(self):
|
|
asyncio.create_task(self.session.close())
|
|
|
|
def check_limits(self, as_dict: bool = False) -> dict[str, str | int] | list[str]:
|
|
if not self.has_started:
|
|
raise exceptions.NotStarted
|
|
if not as_dict:
|
|
output = []
|
|
for key, value in self.session._rates._asdict().items():
|
|
output.append(f'{key} : {value}')
|
|
return output
|
|
return self.session._rates._asdict()
|
|
|
|
def update_auth(self) -> None:
|
|
"""Allows you to input auth information after instantiating the client."""
|
|
username = input('Enter your username: ')
|
|
token = getpass('Enter your token: ')
|
|
self._auth = aiohttp.BasicAuth(username, token)
|
|
|
|
async def start(self) -> 'GHClient':
|
|
"""Main entry point to the wrapper, this creates the ClientSession."""
|
|
if self.has_started:
|
|
raise exceptions.AlreadyStarted
|
|
self.session = await http.make_session(headers=self._headers, authorization=self._auth)
|
|
self.has_started = True
|
|
return self
|
|
|
|
def _cache(func, cache_type: str):
|
|
async def wrapper(self: 'GHClient', name: str):
|
|
if cache_type == 'user':
|
|
if (user := self._user_cache.get(name)):
|
|
return user
|
|
else:
|
|
return await func(self, name)
|
|
if cache_type == 'repo':
|
|
if (repo := self._repo_cache.get(name)):
|
|
return repo
|
|
else:
|
|
return await func(self, name)
|
|
return wrapper
|
|
|
|
async def get_user(self, **kwargs) -> User:
|
|
"""Fetch a Github user from their username."""
|
|
username = kwargs.get('user')
|
|
return User(await http.get_user(self.session, username), self.session)
|
|
|
|
async def get_repo(self, **kwargs) -> Repository:
|
|
"""Fetch a Github repository from it's name."""
|
|
owner = kwargs.get('owner')
|
|
repo_name = kwargs.get('repo')
|
|
return Repository(await http.get_repo_from_name(self.session, owner, repo_name), self.session)
|
|
|
|
async def get_repo_issue(self, **kwargs) -> Issue:
|
|
"""Fetch a Github repository from it's name."""
|
|
owner = kwargs.get('owner')
|
|
repo_name = kwargs.get('repo')
|
|
issue_number = kwargs.get('issue')
|
|
return Issue(await http.get_repo_issue(self.session, owner, repo_name, issue_number), self.session)
|
|
|
|
async def get_org(self, **kwargs) -> Organization:
|
|
"""Fetch a Github organization from it's name"""
|
|
org_name = kwargs.get('org')
|
|
return Organization(await http.get_org(self.session, org_name), self.session)
|
|
|
|
|