1
Fork 0
mirror of https://github.com/RGBCube/GitHubWrapper synced 2025-05-16 14:15:00 +00:00

Rewrote the library

- Only HTTPClient, File and Object exists currently
This commit is contained in:
RGBCube 2022-06-25 14:11:17 +03:00
parent 7bd3d48eab
commit b23d5b78eb
16 changed files with 1048 additions and 1534 deletions

View file

@ -1,13 +1,34 @@
# == __init__.py ==# """
MIT License
__title__ = 'Github-Api-Wrapper' Copyright (c) 2022-present VarMonke, sudosnok & contributors
__authors__ = 'VarMonke', 'sudosnok'
__version__ = '1.2.1'
__license__ = 'MIT'
__copyright__ = 'Copyright (c) 2022-present VarMonke & sudosnok'
from .client import * Permission is hereby granted, free of charge, to any person obtaining a copy
from .exceptions import * of this software and associated documentation files (the "Software"), to deal
from .http import * in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
__title__ = "GitHub-API-Wrapper"
__authors__ = ("VarMonke", "sudosnok", "contributors")
__version__ = "2.0a"
__license__ = "MIT"
__copyright__ = "Copyright (c) 2022-present VarMonke, sudosnok & contributors"
from .errors import *
from .internals import *
from .objects import * from .objects import *
from .urls import *

View file

@ -1,69 +0,0 @@
# == cache.py ==#
from __future__ import annotations
from collections import deque
from typing import Any, Deque, Dict, Tuple, TypeVar
__all__: Tuple[str, ...] = ('ObjectCache',)
K = TypeVar('K')
V = TypeVar('V')
class _BaseCache(Dict[K, V]):
"""This is a rough implementation of an LRU Cache using a deque and a dict."""
__slots__: Tuple[str, ...] = ('_max_size', '_lru_keys')
def __init__(self, max_size: int, *args: Any) -> None:
self._max_size: int = max(min(max_size, 30), 0) # bounding max_size to 15 for now
self._lru_keys: Deque[K] = deque(maxlen=self._max_size)
super().__init__(*args)
def __getitem__(self, __k: K) -> V:
index = self._lru_keys.index(__k)
target = self._lru_keys[index]
del self._lru_keys[index]
self._lru_keys.appendleft(target)
return super().__getitem__(__k)
def __setitem__(self, __k: K, __v: V) -> None:
if len(self) == self._max_size:
self.__delitem__(self._lru_keys.pop())
self._lru_keys.appendleft(__k)
return super().__setitem__(__k, __v)
def update(self, **kwargs: Any) -> None:
for key, value in dict(**kwargs).items():
key: K
value: V
self.__setitem__(key, value)
class ObjectCache(_BaseCache[K, V]):
"""This adjusts the typehints to reflect Github objects."""
def __getitem__(self, __k: K) -> V:
index = self._lru_keys.index(__k)
target = self._lru_keys[index]
self._lru_keys.appendleft(target)
return super().__getitem__(__k)
def __setitem__(self, __k: K, __v: V) -> None:
if self.__len__() == self._max_size:
self.__delitem__(self._lru_keys.pop())
self._lru_keys.appendleft(__k)
return super().__setitem__(__k, __v)
def update(self, **kwargs: Any) -> None:
for key, value in dict(**kwargs).items():
key: K
value: V
self.__setitem__(key, value)

View file

@ -1,381 +0,0 @@
# == main.py ==#
from __future__ import annotations
import functools
from typing import (
Any,
Awaitable,
Callable,
Coroutine,
Dict,
Generator,
List,
Literal,
Optional,
Tuple,
TypeVar,
Union,
overload,
)
import aiohttp
from typing_extensions import Concatenate, ParamSpec, Self
from . import exceptions
from .cache import ObjectCache
from .http import http
from .objects import File, Gist, Issue, Organization, Repository, User
__all__: Tuple[str, ...] = ('GHClient', 'Client')
T = TypeVar('T')
P = ParamSpec('P')
class GHClient:
"""The main client, used to start most use-cases.
Parameters
----------
username: Optional[:class:`str`]
An optional username to be provided along with a token to make authenticated API calls.
If you provide a username, the token must be provided as well.
user_cache_size: Optional[:class:`int`]
Determines the maximum number of User objects that will be cached in memory.
Defaults to 30, must be between 30 and 0 inclusive.
repo_cache_size: Optional[:class:`int`]
Determines the maximum number of Repository objects that will be cached in memory.
Defaults to 15, must be between 30 and 0 inclusive.
custom_headers: Optional[:class:`dict`]
A way to pass custom headers into the client session that drives the client, eg. a user-agent.
Attributes
----------
username: Optional[:class:`str`]
The authenticated Client's username, if applicable.
__token: Optional[:class:`str`]
The authenticated Client's token, if applicable.
"""
has_started: bool = False
def __init__(
self,
*,
username: Optional[str] = None,
token: Optional[str] = None,
user_cache_size: int = 30,
repo_cache_size: int = 15,
custom_headers: Dict[str, Union[str, int]] = {},
):
self._headers = custom_headers
if username and token:
self.username = username
self.__token = token
self.__auth = aiohttp.BasicAuth(username, token)
else:
self.__auth = None
self.username = None
self.__token = None
self.http = http(headers=custom_headers, auth=self.__auth)
self._user_cache = ObjectCache[Any, User](user_cache_size)
self._repo_cache = ObjectCache[Any, Repository](repo_cache_size)
# Cache manegent
self._cache(type='user')(self.get_self) # type: ignore
self._cache(type='user')(self.get_user) # type: ignore
self._cache(type='repo')(self.get_repo) # type: ignore
def __call__(self, *args: Any, **kwargs: Any) -> Coroutine[Any, Any, Self]:
return self.start(*args, **kwargs)
def __await__(self) -> Generator[Any, Any, Self]:
return self.start().__await__()
async def __aenter__(self) -> Self:
await self.start()
return self
async def __aexit__(self, *args: Any, **kwargs: Any) -> None:
try:
session = self.http.session
await session.close()
except Exception as exc:
raise Exception('HTTP Session doesn\'t exist') from exc
def __repr__(self) -> str:
return f'<Client has_auth={bool(self.__auth)}>'
@overload
def check_limits(self, as_dict: Literal[True] = True) -> Dict[str, Union[str, int]]:
...
@overload
def check_limits(self, as_dict: Literal[False] = False) -> List[str]:
...
def check_limits(self, as_dict: bool = False) -> Union[Dict[str, Union[str, int]], List[str]]:
"""Returns the remaining number of API calls per timeframe.
Parameters
----------
as_dict: Optional[:class:`bool`]
Set to True to return the remaining calls in a dictionary.
Set to False to return the remaining calls in a list.
Defaults to False
"""
if not self.has_started:
raise exceptions.NotStarted
if not as_dict:
output: List[str] = []
for key, value in self.http.session._rates._asdict().items(): # type: ignore
output.append(f"{key} : {value}")
return output
return self.http.session._rates # type: ignore
async def update_auth(self, *, username: str, token: str) -> None:
"""Allows you to input auth information after instantiating the client.
Parameters
----------
username: :class:`str`
The username to update the authentication to.
Must also be provided with the valid token.
token: :class:`str`
The token to update the authentication to.
Must also be providede with the valid username.
"""
# check if username and token is valid
await self.http.update_auth(username=username, token=token)
try:
await self.http.get_self()
except exceptions.InvalidToken as exc:
raise exceptions.InvalidToken from exc
async def start(self) -> Self:
"""Main entry point to the wrapper, this creates the ClientSession.
Parameters
----------
"""
if self.has_started:
raise exceptions.AlreadyStarted
if self.__auth:
self.http = await http(auth=self.__auth, headers=self._headers)
try:
await self.http.get_self()
except exceptions.InvalidToken as exc:
raise exceptions.InvalidToken from exc
else:
self.http = await http(auth=None, headers=self._headers)
self.has_started = True
return self
def _cache(
self: Self, *, type: str
) -> Callable[
[Callable[Concatenate[Self, P], Awaitable[T]]],
Callable[Concatenate[Self, P], Awaitable[Optional[Union[T, User, Repository]]]],
]:
def wrapper(
func: Callable[Concatenate[Self, P], Awaitable[T]]
) -> Callable[Concatenate[Self, P], Awaitable[Optional[Union[T, User, Repository]]]]:
@functools.wraps(func)
async def wrapped(self: Self, *args: P.args, **kwargs: P.kwargs) -> Optional[Union[T, User, Repository]]:
if type == 'user':
obj = self._user_cache.get(kwargs.get('user'))
if obj:
return obj
user: User = await func(self, *args, **kwargs) # type: ignore
self._user_cache[kwargs.get("user")] = user
return user
if type == 'repo':
obj = self._repo_cache.get(kwargs.get('repo'))
if obj:
return obj
repo: Repository = await func(self, *args, **kwargs) # type: ignore
self._repo_cache[kwargs.get('repo')] = repo
return repo
return wrapped
return wrapper
# @_cache(type='User')
async def get_self(self) -> User:
""":class:`User`: Returns the authenticated User object."""
if self.__auth:
return User(await self.http.get_self(), self.http)
else:
raise exceptions.NoAuthProvided
async def get_user(self, *, user: str) -> User:
""":class:`User`: Fetch a Github user from their username.
Parameters
----------
user: :class:`str`
The name of the user to fetch.
"""
return User(await self.http.get_user(user), self.http)
async def get_repo(self, *, owner: str, repo: str) -> Repository:
""":class:`Repository`: Fetch a Github repository from it's name.
Parameters
----------
owner: :class:`str`
The name of the owner of a given reposiory.
repo: :class:`str`
The name of the repository to fetch.
"""
return Repository(await self.http.get_repo(owner, repo), self.http) # type: ignore
async def get_issue(self, *, owner: str, repo: str, issue: int) -> Issue:
""":class:`Issue`: Fetch a Github Issue from it's name.
Parameters
----------
owner: :class:`str`
The name of the owner of the repository for which the issue relates to.
repo: :class:`str`
The name of the repository to which the issue is related to.
issue: :class:`int`
The ID of the issue to fetch.
"""
return Issue(await self.http.get_repo_issue(owner, repo, issue), self.http) # type: ignore #fwiw, this shouldn't error but pyright <3
async def create_repo(
self,
name: str,
description: str = 'Repository created using Github-Api-Wrapper.',
public: bool = False,
gitignore: Optional[str] = None,
license: Optional[str] = None,
) -> Repository:
"""Creates a Repository with supplied data.
Requires API authentication.
Parameters
----------
name: :class:`str`
The name of the repository to be created.
description: :class:`str`
A description of the repository to be created.
public: :class:`bool`
Determines whether only the repository will be visible to the public.
Defaults to False (private repository).
gitignore: Optional[:class:`str`]
.gitignore template to use.
See https://github.com/github/gitignore for GitHub's own templates.
Defaults to None.
license: Optional[:class:`str`]
TODO: Document this.
Returns
-------
:class:`Repository`
"""
return Repository(
await self.http.create_repo(name, description, public, gitignore, license),
self.http,
)
async def delete_repo(self, repo: str) -> Optional[str]:
"""Delete a Github repository, requires authorisation.
Parameters
----------
repo: :class:`str`
The name of the repository to delete.
Returns
-------
Optional[:class:`str`]
"""
return await self.http.delete_repo(self.username, repo)
async def get_gist(self, gist: str) -> Gist:
"""Fetch a Github gist from it's id.
Parameters
----------
gist: :class:`str`
The id of the gist to fetch.
Returns
-------
:class:`Gist`
"""
return Gist(await self.http.get_gist(gist), self.http)
async def create_gist(
self, *, files: List[File], description: str = 'Gist from Github-Api-Wrapper', public: bool = True
) -> Gist:
"""Creates a Gist with the given files, requires authorisation.
Parameters
----------
files: List[:class:`File`]
A list of File objects to upload to the gist.
description: :class:`str`
A description of the gist.
public: :class:`bool`
Determines whether the gist will be visible to the public.
Defaults to False (private).
Returns
-------
:class:`Gist`
"""
return Gist(
await self.http.create_gist(files=files, description=description, public=public),
self.http,
)
async def delete_gist(self, gist: int) -> Optional[str]:
"""Delete a Github gist, requires authorisation.
Parameters
----------
gist: :class:`int`
The ID of the gist to delete.
Returns
-------
Optional[:class:`str`]
"""
return await self.http.delete_gist(gist)
async def get_org(self, org: str) -> Organization:
"""Fetch a Github organization from it's name.
Parameters
----------
org: :class:`str`
The name of the organization to fetch.
Returns
-------
:class:`Organization`
"""
return Organization(await self.http.get_org(org), self.http)
async def latency(self) -> float:
""":class:`float`: Returns the latency of the client."""
return await self.http.latency()
async def close(self) -> None:
"""Close the session."""
await self.http.session.close()
class Client(GHClient):
pass

48
github/errors.py Normal file
View file

@ -0,0 +1,48 @@
from __future__ import annotations
__all__ = ("GitHubError", "BaseHTTPError", "HTTPError", "RatelimitReached")
from datetime import datetime, timezone
from typing import TYPE_CHECKING
from .utils import human_readable_time_until
if TYPE_CHECKING:
from aiohttp import ClientResponse
class GitHubError(Exception):
"""The base class for all errors raised in this library."""
class BaseHTTPError(GitHubError):
"""The base class for all HTTP related errors in this library."""
class HTTPError(BaseHTTPError):
"""Raised when an HTTP request doesn't respond with a successfull code."""
def __init__(self, response: ClientResponse, /) -> None:
self.method = response.method
self.code = response.status
self.url = response.url
self._response = response
def __str__(self) -> str:
return (
f"An HTTP error with the code {self.code} has occured while trying to do a"
f" {self.method} request to the URL {self.url}"
)
class RatelimitReached(GitHubError):
"""Raised when a ratelimit is reached."""
def __init__(self, reset_time: datetime, /) -> None:
self.reset_time = reset_time
def __str__(self) -> str:
return (
"The ratelimit has been reached. You can try again in"
f" {human_readable_time_until(datetime.now(timezone.utc) - self.reset_time)}"
)

View file

@ -1,175 +0,0 @@
# == exceptions.py ==#
import datetime
from typing import Optional, Tuple
from aiohttp import ClientResponse
__all__: Tuple[str, ...] = (
'APIError',
'AlreadyStarted',
'ClientException',
'ClientResponse',
'GistNotFound',
'HTTPException',
'InvalidAuthCombination',
'InvalidToken',
'IssueNotFound',
'LoginFailure',
'MissingPermissions',
'NoAuthProvided',
'NotStarted',
'OrganizationNotFound',
'Ratelimited',
'RepositoryAlreadyExists',
'RepositoryNotFound',
'ResourceAlreadyExists',
'ResourceNotFound',
'UserNotFound',
'WillExceedRatelimit',
)
class APIError(Exception):
"""Base level exceptions raised by errors related to any API request or call."""
pass
class HTTPException(Exception):
"""Base level exceptions raised by errors related to HTTP requests."""
pass
class ClientException(Exception):
"""Base level exceptions raised by errors related to the client."""
pass
class ResourceNotFound(Exception):
"""Base level exceptions raised when a resource is not found."""
pass
class ResourceAlreadyExists(Exception):
"""Base level exceptions raised when a resource already exists."""
pass
class Ratelimited(APIError):
"""Raised when the ratelimit from Github is reached or exceeded."""
def __init__(self, reset_time: datetime.datetime):
formatted = reset_time.strftime(r"%H:%M:%S %A, %d %b")
msg = f"We're being ratelimited, wait until {formatted}.\nAuthentication raises the ratelimit."
super().__init__(msg)
class WillExceedRatelimit(APIError):
"""Raised when the library predicts the call will exceed the ratelimit, will abort the call by default."""
def __init__(self, response: ClientResponse, count: int):
msg = 'Performing this action will exceed the ratelimit, aborting.\n{} remaining available calls, calls to make: {}.'
msg = msg.format(response.headers['X-RateLimit-Remaining'], count)
super().__init__(msg)
class NoAuthProvided(ClientException):
"""Raised when no authentication is provided."""
def __init__(self):
msg = 'This action required autentication. Pass username and token kwargs to your client instance.'
super().__init__(msg)
class InvalidToken(ClientException):
"""Raised when the token provided is invalid."""
def __init__(self):
msg = 'The token provided is invalid.'
super().__init__(msg)
class InvalidAuthCombination(ClientException):
"""Raised when the username and token are both provided."""
def __init__(self, msg: str):
# msg = 'The username and token cannot be used together.'
super().__init__(msg)
class LoginFailure(ClientException):
"""Raised when the login attempt fails."""
def __init__(self):
msg = 'The login attempt failed. Provide valid credentials.'
super().__init__(msg)
class NotStarted(ClientException):
"""Raised when the client is not started."""
def __init__(self):
msg = 'The client is not started. Run Github.GHClient() to start.'
super().__init__(msg)
class AlreadyStarted(ClientException):
"""Raised when the client is already started."""
def __init__(self):
msg = 'The client is already started.'
super().__init__(msg)
class MissingPermissions(APIError):
def __init__(self):
msg = 'You do not have permissions to perform this action.'
super().__init__(msg)
class UserNotFound(ResourceNotFound):
def __init__(self):
msg = 'The requested user was not found.'
super().__init__(msg)
class RepositoryNotFound(ResourceNotFound):
def __init__(self):
msg = 'The requested repository is either private or does not exist.'
super().__init__(msg)
class IssueNotFound(ResourceNotFound):
def __init__(self):
msg = 'The requested issue was not found.'
super().__init__(msg)
class OrganizationNotFound(ResourceNotFound):
def __init__(self):
msg = 'The requested organization was not found.'
super().__init__(msg)
class GistNotFound(ResourceNotFound):
def __init__(self):
msg = 'The requested gist was not found.'
super().__init__(msg)
class RepositoryAlreadyExists(ResourceAlreadyExists):
def __init__(self):
msg = 'The requested repository already exists.'
super().__init__(msg)
class FileAlreadyExists(ResourceAlreadyExists):
def __init__(self, msg: Optional[str] = None):
if msg is None:
msg = 'The requested file already exists.'
super().__init__(msg)

View file

@ -1,322 +0,0 @@
# == http.py ==#
from __future__ import annotations
import json
import platform
import re
from datetime import datetime
from types import SimpleNamespace
from typing import Any, Dict, List, NamedTuple, Optional, Tuple, Type, Union
import aiohttp
from typing_extensions import TypeAlias
from . import __version__
from .exceptions import *
from .objects import File, Gist, Repository, User, bytes_to_b64
from .urls import *
__all__: Tuple[str, ...] = (
'Paginator',
'http',
)
LINK_PARSING_RE = re.compile(r"<(\S+(\S))>; rel=\"(\S+)\"")
class Rates(NamedTuple):
remaining: str
used: str
total: str
reset_when: Union[datetime, str]
last_request: Union[datetime, str]
# aiohttp request tracking / checking bits
async def on_req_start(
session: aiohttp.ClientSession, ctx: SimpleNamespace, params: aiohttp.TraceRequestStartParams
) -> None:
"""Before-request hook to make sure we don't overrun the ratelimit."""
# print(repr(session), repr(ctx), repr(params))
if session._rates.remaining in ('0', '1'): # type: ignore
raise Exception('Ratelimit exceeded')
async def on_req_end(session: aiohttp.ClientSession, ctx: SimpleNamespace, params: aiohttp.TraceRequestEndParams) -> None:
"""After-request hook to adjust remaining requests on this time frame."""
headers = params.response.headers
remaining = headers['X-RateLimit-Remaining']
used = headers['X-RateLimit-Used']
total = headers['X-RateLimit-Limit']
reset_when = datetime.fromtimestamp(int(headers['X-RateLimit-Reset']))
last_req = datetime.utcnow()
session._rates = Rates(remaining, used, total, reset_when, last_req)
trace_config = aiohttp.TraceConfig()
trace_config.on_request_start.append(on_req_start)
trace_config.on_request_end.append(on_req_end)
APIType: TypeAlias = Union[User, Gist, Repository]
async def make_session(*, headers: Dict[str, str], authorization: Union[aiohttp.BasicAuth, None]) -> aiohttp.ClientSession:
"""This makes the ClientSession, attaching the trace config and ensuring a UA header is present."""
if not headers.get('User-Agent'):
headers['User-Agent'] = (
f'Github-API-Wrapper (https://github.com/VarMonke/Github-Api-Wrapper) @ {__version__} Python'
f' {platform.python_version()} aiohttp {aiohttp.__version__}'
)
session = aiohttp.ClientSession(auth=authorization, headers=headers, trace_configs=[trace_config])
session._rates = Rates('', '', '', '', '')
return session
# pagination
class Paginator:
"""This class handles pagination for objects like Repos and Orgs."""
def __init__(self, session: aiohttp.ClientSession, response: aiohttp.ClientResponse, target_type: str):
self.session = session
self.response = response
self.should_paginate = bool(self.response.headers.get('Link', False))
types: Dict[str, Type[APIType]] = { # note: the type checker doesnt see subclasses like that
'user': User,
'gist': Gist,
'repo': Repository,
}
self.target_type: Type[APIType] = types[target_type]
self.pages = {}
self.is_exhausted = False
self.current_page = 1
self.next_page = self.current_page + 1
self.parse_header(response)
async def fetch_page(self, link: str) -> Dict[str, Union[str, int]]:
"""Fetches a specific page and returns the JSON."""
return await (await self.session.get(link)).json()
async def early_return(self) -> List[APIType]:
# I don't rightly remember what this does differently, may have a good ol redesign later
return [self.target_type(data, self) for data in await self.response.json()] # type: ignore
async def exhaust(self) -> List[APIType]:
"""Iterates through all of the pages for the relevant object and creates them."""
if self.should_paginate:
return await self.early_return()
out: List[APIType] = []
for page in range(1, self.max_page + 1):
result = await self.session.get(self.bare_link + str(page))
out.extend([self.target_type(item, self) for item in await result.json()]) # type: ignore
self.is_exhausted = True
return out
def parse_header(self, response: aiohttp.ClientResponse) -> None:
"""Predicts wether a call will exceed the ratelimit ahead of the call."""
header = response.headers['Link']
groups = LINK_PARSING_RE.findall(header)
self.max_page = int(groups[1][1])
if int(response.headers['X-RateLimit-Remaining']) < self.max_page:
raise WillExceedRatelimit(response, self.max_page)
self.bare_link = groups[0][0][:-1]
# GithubUserData = GithubRepoData = GithubIssueData = GithubOrgData = GithubGistData = Dict[str, Union [str, int]]
# Commentnig this out for now, consider using TypeDict's instead in the future <3
class http:
def __init__(self, headers: Dict[str, Union[str, int]], auth: Union[aiohttp.BasicAuth, None]) -> None:
if not headers.get('User-Agent'):
headers['User-Agent'] = (
'Github-API-Wrapper (https://github.com/VarMonke/Github-Api-Wrapper) @'
f' {__version__} Python/{platform.python_version()} aiohttp/{aiohttp.__version__}'
)
self._rates = Rates('', '', '', '', '')
self.headers = headers
self.auth = auth
def __await__(self):
return self.start().__await__()
async def start(self):
self.session = aiohttp.ClientSession(
headers=self.headers, # type: ignore
auth=self.auth,
trace_configs=[trace_config],
)
if not hasattr(self.session, "_rates"):
self.session._rates = Rates('', '', '', '', '')
return self
def update_headers(self, *, flush: bool = False, new_headers: Dict[str, Union[str, int]]):
if flush:
from multidict import CIMultiDict
self.session._default_headers = CIMultiDict(**new_headers) # type: ignore
else:
self.session._default_headers = {**self.session.headers, **new_headers} # type: ignore
async def update_auth(self, *, username: str, token: str):
auth = aiohttp.BasicAuth(username, token)
headers = self.session.headers
config = self.session.trace_configs
await self.session.close()
self.session = aiohttp.ClientSession(headers=headers, auth=auth, trace_configs=config)
def data(self):
# return session headers and auth
headers = {**self.session.headers}
return {'headers': headers, 'auth': self.auth}
async def latency(self):
"""Returns the latency of the current session."""
start = datetime.utcnow()
await self.session.get(BASE_URL)
return (datetime.utcnow() - start).total_seconds()
async def get_self(self) -> Dict[str, Union[str, int]]:
"""Returns the authenticated User's data"""
result = await self.session.get(SELF_URL)
if 200 <= result.status <= 299:
return await result.json()
raise InvalidToken
async def get_user(self, username: str) -> Dict[str, Union[str, int]]:
"""Returns a user's public data in JSON format."""
result = await self.session.get(USERS_URL.format(username))
if 200 <= result.status <= 299:
return await result.json()
raise UserNotFound
async def get_user_repos(self, _user: User) -> List[Dict[str, Union[str, int]]]:
result = await self.session.get(USER_REPOS_URL.format(_user.login))
if 200 <= result.status <= 299:
return await result.json()
print('This shouldn\'t be reachable')
return []
async def get_user_gists(self, _user: User) -> List[Dict[str, Union[str, int]]]:
result = await self.session.get(USER_GISTS_URL.format(_user.login))
if 200 <= result.status <= 299:
return await result.json()
print('This shouldn\'t be reachable')
return []
async def get_user_orgs(self, _user: User) -> List[Dict[str, Union[str, int]]]:
result = await self.session.get(USER_ORGS_URL.format(_user.login))
if 200 <= result.status <= 299:
return await result.json()
print('This shouldn\'t be reachable')
return []
async def get_repo(self, owner: str, repo_name: str) -> Optional[Dict[str, Union[str, int]]]:
"""Returns a Repo's raw JSON from the given owner and repo name."""
result = await self.session.get(REPO_URL.format(owner, repo_name))
if 200 <= result.status <= 299:
return await result.json()
raise RepositoryNotFound
async def get_repo_issue(self, owner: str, repo_name: str, issue_number: int) -> Optional[Dict[str, Any]]:
"""Returns a single issue's JSON from the given owner and repo name."""
result = await self.session.get(REPO_ISSUE_URL.format(owner, repo_name, issue_number))
if 200 <= result.status <= 299:
return await result.json()
raise IssueNotFound
async def delete_repo(self, owner: Optional[str], repo_name: str) -> Optional[str]:
"""Deletes a Repo from the given owner and repo name."""
result = await self.session.delete(REPO_URL.format(owner, repo_name))
if 204 <= result.status <= 299:
return 'Successfully deleted repository.'
if result.status == 403: # type: ignore
raise MissingPermissions
raise RepositoryNotFound
async def delete_gist(self, gist_id: Union[str, int]) -> Optional[str]:
"""Deletes a Gist from the given gist id."""
result = await self.session.delete(GIST_URL.format(gist_id))
if result.status == 204:
return 'Successfully deleted gist.'
if result.status == 403:
raise MissingPermissions
raise GistNotFound
async def get_org(self, org_name: str) -> Dict[str, Union[str, int]]:
"""Returns an org's public data in JSON format.""" # type: ignore
result = await self.session.get(ORG_URL.format(org_name))
if 200 <= result.status <= 299:
return await result.json()
raise OrganizationNotFound
async def get_gist(self, gist_id: str) -> Dict[str, Union[str, int]]:
"""Returns a gist's raw JSON from the given gist id."""
result = await self.session.get(GIST_URL.format(gist_id))
if 200 <= result.status <= 299:
return await result.json()
raise GistNotFound
async def create_gist(
self, *, files: List['File'] = [], description: str = 'Default description', public: bool = False
) -> Dict[str, Union[str, int]]:
data = {}
data['description'] = description
data['public'] = public
data['files'] = {}
for file in files:
data['files'][file.filename] = {'filename': file.filename, 'content': file.read()} # helps editing the file
data = json.dumps(data)
_headers = dict(self.session.headers)
result = await self.session.post(CREATE_GIST_URL, data=data, headers=_headers)
if 201 == result.status:
return await result.json()
raise InvalidToken
async def create_repo(
self, name: str, description: str, public: bool, gitignore: Optional[str], license: Optional[str]
) -> Dict[str, Union[str, int]]:
"""Creates a repo for you with given data"""
data = {
'name': name,
'description': description,
'public': public,
'gitignore_template': gitignore,
'license': license,
}
result = await self.session.post(CREATE_REPO_URL, data=json.dumps(data))
if 200 <= result.status <= 299:
return await result.json()
if result.status == 401:
raise NoAuthProvided
raise RepositoryAlreadyExists
async def add_file(self, owner: str, repo_name: str, filename: str, content: str, message: str, branch: str):
"""Adds a file to the given repo."""
data = {
'content': bytes_to_b64(content=content),
'message': message,
'branch': branch,
}
result = await self.session.put(ADD_FILE_URL.format(owner, repo_name, filename), data=json.dumps(data))
if 200 <= result.status <= 299:
return await result.json()
if result.status == 401:
raise NoAuthProvided
if result.status == 409:
raise FileAlreadyExists
if result.status == 422:
raise FileAlreadyExists('This file exists, and can only be edited.')
return await result.json(), result.status

View file

@ -0,0 +1 @@
from .http import *

852
github/internals/http.py Normal file
View file

@ -0,0 +1,852 @@
from __future__ import annotations
__all__ = ("HTTPClient",)
import asyncio
import logging
import platform
import time
from datetime import datetime, timezone
from typing import TYPE_CHECKING, Any, Awaitable, Dict, List, Literal, NamedTuple, Optional, Union
from aiohttp import ClientSession, TraceConfig
from .. import __version__
from ..utils import error_from_request, human_readable_time_until
if TYPE_CHECKING:
from types import SimpleNamespace
from aiohttp import BasicAuth, TraceRequestEndParams, TraceRequestStartParams
from typing_extensions import Self
from ..objects import File
from ..types import SecurtiyAndAnalysis
log = logging.getLogger("github")
class Ratelimits(NamedTuple):
remaining: Optional[int]
used: Optional[int]
total: Optional[int]
reset_time: Optional[datetime]
last_request: Optional[datetime]
# ========= TODO ========= #
# Make a good paginator
# Make objects for all API Types
# Make the requests return TypedDicts with those objects
# Make specific errrors
# Make route /users/{username}/hovercard
# Make it so an error gets raised when the cooldown is reached
# === ROUTES CHECKLIST === #
# Actions
# Activity
# Apps
# Billing
# Branches
# Checks
# Codes of conduct
# Code Scanning
# Codespaces
# Collaborators
# Commits
# Dependabot
# Dependency Graph
# Deploy keys
# Deployments
# Emojis
# Enterprise administration
# Gists DONE
# Git database
# Gitignore
# Interactions
# Issues
# Licenses
# Markdown
# Meta
# Metrics
# Migrations
# OAuth authorizations
# Organizations
# Packages
# Pages
# Projects
# Pulls
# Rate limit
# Reactions
# Releases
# Repositories DONE
# SCIM
# Search
# Teams
# Users DONE
# Webhooks
class HTTPClient:
__session: ClientSession
_rates: Ratelimits
_last_ping: float
_latency: float
def __new__(
cls,
*,
headers: Optional[Dict[str, Union[str, int]]] = None,
auth: Optional[BasicAuth] = None,
) -> Awaitable[Self]:
# Basically async def __init__
return cls.__async_init()
@classmethod
async def __async_init(
cls,
*,
headers: Optional[Dict[str, str]] = None,
auth: Optional[BasicAuth] = None,
) -> Self:
self = super(cls, cls).__new__(cls)
if not headers:
headers = {}
headers.setdefault(
"User-Agent",
"GitHub-API-Wrapper (https://github.com/Varmonke/GitHub-API-Wrapper) @"
f" {__version__} CPython/{platform.python_version()} aiohttp/{__version__}",
)
self._rates = Ratelimits(None, None, None, None, None)
self.__headers = headers
self.__auth = auth
self._last_ping = 0
self._latency = 0
trace_config = TraceConfig()
@trace_config.on_request_start.append
async def on_request_start(
_: ClientSession, __: SimpleNamespace, params: TraceRequestStartParams
) -> None:
if self.ratelimited:
log.info(
"Ratelimit exceeded, trying again in"
f" {human_readable_time_until(self._rates.reset_time - datetime.now(timezone.utc))} (URL:"
f" {params.url}, method: {params.method})"
)
# TODO: I get about 3-4 hours of cooldown this might not be a good idea, might make this raise an error instead.
await asyncio.sleep(
max((self._rates.reset_time - datetime.now(timezone.utc)).total_seconds(), 0)
)
@trace_config.on_request_end.append
async def on_request_end(
_: ClientSession, __: SimpleNamespace, params: TraceRequestEndParams
) -> None:
"""After-request hook to adjust remaining requests on this time frame."""
headers = params.response.headers
self._rates = Ratelimits(
int(headers["X-RateLimit-Remaining"]),
int(headers["X-RateLimit-Used"]),
int(headers["X-RateLimit-Limit"]),
datetime.fromtimestamp(int(headers["X-RateLimit-Reset"])).replace(
tzinfo=timezone.utc
),
datetime.now(timezone.utc),
)
self.__session = ClientSession(
headers=headers,
auth=auth,
trace_configs=[trace_config],
)
return self
async def __aenter__(self) -> Self:
return self
async def __aexit__(self, *_) -> None:
await self.__session.close()
@property
def ratelimited(self) -> bool:
remaining = self._rates.remaining
return remaining is not None and remaining < 2
@property
def latency(self) -> Awaitable[float]:
async def inner() -> float:
last_ping = self._last_ping
# If there was no ping or the last ping was more than 5 seconds ago.
if not last_ping or time.monotonic() > last_ping + 5 or self.ratelimited:
self._last_ping = time.monotonic()
start = time.monotonic()
await self.request("GET", "/")
self._latency = time.monotonic() - start
return self._latency
return inner()
async def request(
self, method: Literal["GET", "POST", "PUT", "DELETE", "PATCH"], path: str, /, **kwargs: Any
):
async with self.__session.request(
method, f"https://api.github.com{path}", **kwargs
) as request:
if 200 <= request.status <= 299:
return await request.json()
raise error_from_request(request)
# === ROUTES === #
# === USERS === #
async def get_logged_in_user(self):
return await self.request("GET", "/user")
async def update_logged_in_user(
self,
*,
name: Optional[str] = None,
email: Optional[str] = None,
blog: Optional[str] = None,
twitter_username: Optional[str] = None,
company: Optional[str] = None,
location: Optional[str] = None,
hireable: Optional[str] = None,
bio: Optional[str] = None,
):
data = {}
if name:
data["name"] = name
if email:
data["email"] = email
if blog:
data["blog"] = blog
if twitter_username:
data["twitter_username"] = twitter_username
if company:
data["company"] = company
if location:
data["location"] = location
if hireable:
data["hireable"] = hireable
if bio:
data["bio"] = bio
return await self.request("PATCH", "/user", json=data)
async def list_users(self, *, since: Optional[int] = None, per_page: Optional[int] = None):
params = {}
if since:
params["since"] = since
if per_page:
params["per_page"] = per_page
return await self.request("GET", "/users", params=params)
async def get_user(self, *, username: str):
return await self.request("GET", f"/users/{username}")
# TODO: /users/{username}/hovercard
# IDK what to name it
# === REPOS === #
async def list_org_repos(
self,
*,
org: str,
type: Optional[
Literal["all", "public", "private", "forks", "sources", "member", "internal"]
] = None,
sort: Optional[Literal["created", "updated", "pushed", "full_name"]] = None,
direction: Optional[Literal["asc", "desc"]] = None,
per_page: Optional[int] = None,
page: Optional[int] = None,
):
params = {}
if type:
params["type"] = type
if sort:
params["sort"] = sort
if direction:
params["direction"] = direction
if per_page:
params["per_page"] = per_page
if page:
params["page"] = page
return await self.request("GET", f"/orgs/{org}/repos", params=params)
async def create_org_repo(
self,
*,
org: str,
name: str,
description: Optional[str] = None,
homepage: Optional[str] = None,
private: Optional[bool] = None,
visibility: Optional[Literal["public", "private", "internal"]] = None,
has_issues: Optional[bool] = None,
has_projects: Optional[bool] = None,
has_wiki: Optional[bool] = None,
is_template: Optional[bool] = None,
team_id: Optional[int] = None,
auto_init: Optional[bool] = None,
gitignore_template: Optional[str] = None,
license_template: Optional[str] = None,
allow_squash_merge: Optional[bool] = None,
allow_merge_commit: Optional[bool] = None,
allow_rebase_merge: Optional[bool] = None,
allow_auto_merge: Optional[bool] = None,
delete_branch_on_merge: Optional[bool] = None,
use_squash_pr_title_as_default: Optional[bool] = None,
):
data: Dict[str, Union[str, bool, int]] = {
"name": name,
}
if description:
data["description"] = description
if homepage:
data["homepage"] = homepage
if private:
data["private"] = private
if visibility:
data["visibility"] = visibility
if has_issues:
data["has_issues"] = has_issues
if has_projects:
data["has_projects"] = has_projects
if has_wiki:
data["has_wiki"] = has_wiki
if is_template:
data["is_template"] = is_template
if team_id:
data["team_id"] = team_id
if auto_init:
data["auto_init"] = auto_init
if gitignore_template:
data["gitignore_template"] = gitignore_template
if license_template:
data["license_template"] = license_template
if allow_squash_merge:
data["allow_squash_merge"] = allow_squash_merge
if allow_merge_commit:
data["allow_merge_commit"] = allow_merge_commit
if allow_rebase_merge:
data["allow_rebase_merge "] = allow_rebase_merge
if allow_auto_merge:
data["allow_auto_merge"] = allow_auto_merge
if delete_branch_on_merge:
data["delete_branch_on_merge"] = delete_branch_on_merge
if use_squash_pr_title_as_default:
data["use_squash_pr_title_as_default"] = use_squash_pr_title_as_default
return await self.request("POST", f"/orgs/{org}/repos", json=data)
async def get_repo(self, *, owner: str, repo: str):
return await self.request("GET", f"/repos/{owner}/{repo}")
async def update_repo(
self,
*,
owner: str,
repo: str,
name: Optional[str] = None,
description: Optional[str] = None,
homepage: Optional[str] = None,
private: Optional[bool] = None,
visibility: Optional[Literal["public", "private", "internal"]] = None,
security_and_analysis: Optional[SecurtiyAndAnalysis] = None,
has_issues: Optional[bool] = None,
has_projects: Optional[bool] = None,
has_wiki: Optional[bool] = None,
is_template: Optional[bool] = None,
default_branch: Optional[str] = None,
allow_squash_merge: Optional[bool] = None,
allow_merge_commit: Optional[bool] = None,
allow_rebase_merge: Optional[bool] = None,
allow_auto_merge: Optional[bool] = None,
delete_branch_on_merge: Optional[bool] = None,
use_squash_pr_title_as_default: Optional[bool] = None,
archived: Optional[bool] = None,
allow_forking: Optional[bool] = None,
):
data = {}
if name:
data["name"] = name
if description:
data["description"] = description
if homepage:
data["homepage"] = homepage
if private:
data["private"] = private
if visibility:
data["visibility"] = visibility
if security_and_analysis:
data["security_and_analysis"] = security_and_analysis
if has_issues:
data["has_issues"] = has_issues
if has_projects:
data["has_projects"] = has_projects
if has_wiki:
data["has_wiki"] = has_wiki
if is_template:
data["is_template"] = is_template
if default_branch:
data["default_branch"] = default_branch
if allow_squash_merge:
data["allow_squash_merge"] = allow_squash_merge
if allow_merge_commit:
data["allow_merge_commit"] = allow_merge_commit
if allow_rebase_merge:
data["allow_rebase_merge "] = allow_rebase_merge
if allow_auto_merge:
data["allow_auto_merge "] = allow_auto_merge
if delete_branch_on_merge:
data["delete_branch_on_merge "] = delete_branch_on_merge
if use_squash_pr_title_as_default:
data["use_squash_pr_title_as_default"] = use_squash_pr_title_as_default
if archived:
data["archived"] = archived
if allow_forking:
data["allow_forking"] = allow_forking
return await self.request("PATCH", f"/repos/{owner}/{repo}", json=data)
async def delete_repo(self, *, owner: str, repo: str):
return await self.request("DELETE", f"/repos/{owner}/{repo}")
async def enable_automated_security_fixes_for_repo(self, *, owner: str, repo: str):
return await self.request("PUT", f"/repos/{owner}/{repo}/automated-security-fixes")
async def disable_automated_security_fixes_for_repo(self, *, owner: str, repo: str):
return await self.request("DELETE", f"/repos/{owner}/{repo}/automated-security-fixes")
async def list_codeowners_erros_for_repo(
self, *, owner: str, repo: str, ref: Optional[str] = None
):
params = {}
if ref:
params["ref"] = ref
return await self.request("GET", f"/repos/{owner}/{repo}/codeowners/errors", params=params)
async def list_contributors_for_repo(
self,
*,
owner: str,
repo: str,
anon: Optional[bool] = None,
per_page: Optional[int] = None,
page: Optional[int] = None,
):
params = {}
if anon:
params["anon"] = anon
if per_page:
params["per_page"] = per_page
if page:
params["page"] = page
return await self.request("GET", f"/repos/{owner}/{repo}/contributors", params=params)
async def create_dispatch_event_for_repo(
self, *, owner: str, repo: str, event_name: str, client_payload: Optional[str] = None
):
data = {
"event_name": event_name,
}
if client_payload:
data["client_payload"] = client_payload
return await self.request("POST", f"/repos/{owner}/{repo}/dispatches", json=data)
async def list_repo_languages_for_repo(self, *, owner: str, repo: str):
return await self.request("GET", f"/repos/{owner}/{repo}/languages")
async def list_tags_for_repo(
self, *, owner: str, repo: str, per_page: Optional[int] = None, page: Optional[int] = None
):
params = {}
if per_page:
params["per_page"] = per_page
if page:
params["page"] = page
return await self.request("GET", f"/repos/{owner}/{repo}/tags", params=params)
async def list_teams_for_repo(
self, *, owner: str, repo: str, per_page: Optional[int] = None, page: Optional[int] = None
):
params = {}
if per_page:
params["per_page"] = per_page
if page:
params["page"] = page
return await self.request("GET", f"/repos/{owner}/{repo}/teams", params=params)
async def get_all_topic_for_repo(
self, *, owner: str, repo: str, per_page: Optional[int] = None, page: Optional[int] = None
):
params = {}
if per_page:
params["per_page"] = per_page
if page:
params["page"] = page
return await self.request("GET", f"/repos/{owner}/{repo}/topics", params=params)
async def replace_all_topics_for_repo(self, *, owner: str, repo: str, names: List[str]):
return await self.request("PUT", f"/repos/{owner}/{repo}/topics", json={"names": names})
async def transfer_repo(
self, *, owner: str, repo: str, new_owner: str, team_ids: Optional[List[int]] = None
):
data: Dict[str, Union[str, List[int]]] = {
"new_owner": new_owner,
}
if team_ids:
data["team_ids"] = team_ids
return await self.request("POST", f"/repos/{owner}/{repo}/transfer", json=data)
async def check_vulnerability_alerts_enabled_for_repo(self, *, owner: str, repo: str):
return await self.request("GET", f"/repos/{owner}/{repo}/vulnerability-alerts")
async def enable_vulnerability_alerts_for_repo(self, *, owner: str, repo: str):
return await self.request("PUT", f"/repos/{owner}/{repo}/vulnerability-alerts")
async def disable_vulnerability_alerts_for_repo(self, *, owner: str, repo: str):
return await self.request("DELETE", f"/repos/{owner}/{repo}/vulnerability-alerts")
async def create_repo_using_template_repo(
self,
*,
template_owner: str,
template_repo: str,
owner: Optional[str] = None,
name: str,
include_all_branches: Optional[bool] = None,
private: Optional[bool] = None,
):
data: Dict[str, Union[str, bool]] = {
"name": name,
}
if owner:
data["owner"] = owner
if include_all_branches:
data["include_all_branches"] = include_all_branches
if private:
data["private"] = private
return await self.request(
"POST", f"/repos/{template_owner}/{template_repo}/generate", json=data
)
async def list_public_repos(self, *, since: Optional[int] = None):
params = {}
if since:
params["since"] = since
return await self.request("GET", "/repositories", params=params)
async def list_logged_in_user_repos(
self,
*,
visibility: Optional[Literal["all", "private", "public"]] = None,
affiliation: Optional[Literal["owner", "collaborator", "organization_member"]] = None,
type: Optional[Literal["all", "owner", "public", "private", "member"]] = None,
sort: Optional[Literal["created", "updated", "pushed", "full_name"]] = None,
direction: Optional[Literal["asc", "desc"]] = None,
per_page: Optional[int] = None,
page: Optional[int] = None,
since: Optional[str] = None,
before: Optional[str] = None,
):
data = {}
if visibility:
data["visibility"] = visibility
if affiliation:
data["affiliation"] = affiliation
if type:
data["type"] = type
if sort:
data["sort"] = sort
if direction:
data["direction"] = direction
if per_page:
data["per_page"] = per_page
if page:
data["page"] = page
if since:
data["since"] = since
if before:
data["before"] = before
return self.request("POST", "/user/repos", json=data)
async def create_repo(
self,
*,
name: str,
description: Optional[str] = None,
homepage: Optional[str] = None,
private: Optional[bool] = None,
has_issues: Optional[bool] = None,
has_projects: Optional[bool] = None,
has_wiki: Optional[bool] = None,
team_id: Optional[int] = None,
auto_init: Optional[bool] = None,
gitignore_template: Optional[str] = None,
license_template: Optional[str] = None,
allow_squash_merge: Optional[bool] = None,
allow_merge_commit: Optional[bool] = None,
allow_rebase_merge: Optional[bool] = None,
allow_auto_merge: Optional[bool] = None,
delete_branch_on_merge: Optional[bool] = None,
has_downloads: Optional[bool] = None,
is_template: Optional[bool] = None,
):
data: Dict[str, Union[str, bool, int]] = {
"name": name,
}
if description:
data["description"] = description
if homepage:
data["homepage"] = homepage
if private:
data["private"] = private
if has_issues:
data["has_issues"] = has_issues
if has_projects:
data["has_projects"] = has_projects
if has_wiki:
data["has_wiki"] = has_wiki
if team_id:
data["team_id"] = team_id
if auto_init:
data["auto_init"] = auto_init
if gitignore_template:
data["gitignore_template"] = gitignore_template
if license_template:
data["license_template"] = license_template
if allow_squash_merge:
data["allow_squash_merge"] = allow_squash_merge
if allow_merge_commit:
data["allow_merge_commit"] = allow_merge_commit
if allow_rebase_merge:
data["allow_rebase_merge"] = allow_rebase_merge
if allow_auto_merge:
data["allow_auto_merge"] = allow_auto_merge
if delete_branch_on_merge:
data["delete_branch_on_merge"] = delete_branch_on_merge
if has_downloads:
data["has_downloads"] = has_downloads
if is_template:
data["is_template"] = is_template
return await self.request("POST", "/user/repos", json=data)
async def list_user_repos(
self,
*,
username: str,
type: Optional[
Literal["all", "public", "private", "forks", "sources", "member", "internal"]
] = None,
sort: Optional[Literal["created", "updated", "pushed", "full_name"]] = None,
direction: Optional[Literal["asc", "desc"]] = None,
per_page: Optional[int] = None,
page: Optional[int] = None,
):
params = {}
if type:
params["type"] = type
if sort:
params["sort"] = sort
if direction:
params["direction"] = direction
if per_page:
params["per_page"] = per_page
if page:
params["page"] = page
return await self.request("GET", f"/users/{username}/repos", params=params)
# === GISTS === #
async def list_logged_in_user_gists(
self,
*,
since: Optional[str] = None,
per_page: Optional[int] = None,
page: Optional[int] = None,
):
params = {}
if since:
params["since"] = since
if per_page:
params["per_page"] = per_page
if page:
params["page"] = page
return await self.request("GET", "/gists", params=params)
async def create_gist(
self, *, description: Optional[str] = None, files: List[File], public: Optional[bool] = None
):
data: Dict[str, Union[str, bool, Dict[str, str]]] = {
"files": {f.name: f.read() for f in files},
}
if description:
data["description"] = description
if public:
data["public"] = public
return await self.request("POST", "/gists", json=data)
async def list_public_gists(
self,
*,
since: Optional[str] = None,
per_page: Optional[int] = None,
page: Optional[int] = None,
):
params = {}
if since:
params["since"] = since
if per_page:
params["per_page"] = per_page
if page:
params["page"] = page
return await self.request("GET", "/gists/public", params=params)
async def list_starred_gists(
self,
*,
since: Optional[str] = None,
per_page: Optional[int] = None,
page: Optional[int] = None,
):
params = {}
if since:
params["since"] = since
if per_page:
params["per_page"] = per_page
if page:
params["page"] = page
return await self.request("GET", "/gists/starred", params=params)
async def get_gist(self, *, gist_id: str):
return await self.request("GET", f"/gists/{gist_id}")
async def update_gist(
self, *, gist_id: str, description: Optional[str] = None, files: Optional[List[File]] = None
):
data = {}
if description:
data["description"] = description
if files:
data["files"] = {f.name: f.read() for f in files}
return await self.request("PATCH", f"/gists/{gist_id}")
async def delete_gist(self, *, gist_id: str):
return await self.request("DELETE", f"/gists/{gist_id}")
async def list_commits_for_gist(
self, *, gist_id: str, per_page: Optional[int] = None, page: Optional[int] = None
):
params = {}
if per_page:
params["per_page"] = per_page
if page:
params["page"] = page
return await self.request("GET", f"/gists/{gist_id}/commits", params=params)
async def list_forks_for_gist(
self, *, gist_id: str, per_page: Optional[int] = None, page: Optional[int] = None
):
params = {}
if per_page:
params["per_page"] = per_page
if page:
params["page"] = page
return await self.request("GET", f"/gists/{gist_id}/forks", params=params)
async def fork_gist(self, *, gist_id: str):
return await self.request("POST", f"/gists/{gist_id}/forks")
async def check_starred_for_gist(self, *, gist_id: str):
return await self.request("GET", f"/gists/{gist_id}/star")
async def star_gist(self, *, gist_id: str):
return await self.request("PUT", f"/gists/{gist_id}/star")
async def unstar_gist(self, *, gist_id: str):
return await self.request("DELETE", f"/gists/{gist_id}/star")
async def get_revision_for_gist(self, *, gist_id: str, sha: str):
return await self.request("GET", f"/gists/{gist_id}/{sha}")
async def list_user_gists(
self,
*,
username: str,
since: Optional[str] = None,
per_page: Optional[int] = None,
page: Optional[int] = None,
):
params = {}
if since:
params["since"] = since
if per_page:
params["per_page"] = per_page
if page:
params["page"] = page
return await self.request("GET", f"/users/{username}/gists", params=params)

View file

@ -1,534 +0,0 @@
# == objects.py ==#
from __future__ import annotations
from base64 import b64encode
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
if TYPE_CHECKING:
from .http import http
import io
import os
from datetime import datetime
__all__: Tuple[str, ...] = (
'APIObject',
'dt_formatter',
'repr_dt',
'PartialUser',
'User',
'Repository',
'Issue',
'File',
'Gist',
'Organization',
)
def dt_formatter(time_str: Optional[str]) -> Optional[datetime]:
if time_str is not None:
return datetime.strptime(time_str, r"%Y-%m-%dT%H:%M:%SZ")
return None
def repr_dt(_datetime: datetime) -> str:
return _datetime.strftime(r'%d-%m-%Y, %H:%M:%S')
def bytes_to_b64(content) -> str:
return b64encode(content.encode('utf-8')).decode('ascii')
class APIObject:
"""Top level class for objects created from the API"""
__slots__: Tuple[str, ...] = ('_response', '_http')
def __init__(self, response: Dict[str, Any], _http: http) -> None:
self._http = _http
self._response = response
def __repr__(self) -> str:
return f'<{self.__class__.__name__}>'
# === User stuff ===#
class _BaseUser(APIObject):
__slots__ = (
'login',
'id',
)
def __init__(self, response: Dict[str, Any], _http: http) -> None:
super().__init__(response, _http)
self._http = _http
self.login = response.get('login')
self.id = response.get('id')
def __repr__(self) -> str:
return f'<{self.__class__.__name__} id = {self.id}, login = {self.login!r}>'
async def repos(self) -> List[Repository]:
"""List[:class:`Repository`]: Returns a list of public repositories under the user."""
results = await self._http.get_user_repos(self) # type: ignore
return [Repository(data, self._http) for data in results]
async def gists(self) -> List[Gist]:
"""List[:class:`Gist`]: Returns a list of public gists under the user."""
results = await self._http.get_user_gists(self) # type: ignore
return [Gist(data, self._http) for data in results]
async def orgs(self) -> List[Organization]:
"""List[:class:`Organization`]: Returns a list of public orgs under the user."""
results = await self._http.get_user_orgs(self) # type: ignore
return [Organization(data, self._http) for data in results]
@property
def name(self):
"""Optional[str]: The name of the user, if available."""
return self._response.get('login')
class User(_BaseUser):
"""Representation of a user object on Github.
Attributes
----------
login: :class:`str`
The API name of the user.
id: :class:`int`
The ID of the user.
avatar_url: :class:`str`
The url of the user's Github avatar.
html_url: :class:`str`
The url of the user's Github page.
created_at: :class:`datetime.datetime`
The time of creation of the user.
"""
__slots__ = (
'login',
'id',
'avatar_url',
'html_url',
'public_repos',
'public_gists',
'followers',
'following',
'created_at',
)
def __init__(self, response: Dict[str, Any], _http: http) -> None:
super().__init__(response, _http)
tmp = self.__slots__ + _BaseUser.__slots__
keys = {key: value for key, value in self._response.items() if key in tmp}
for key, value in keys.items():
if '_at' in key and value is not None:
setattr(self, key, dt_formatter(value))
continue
else:
setattr(self, key, value)
continue
def __repr__(self) -> str:
return f'<{self.__class__.__name__} login: {self.login!r}, id: {self.id}, created_at: {self.created_at}>'
class PartialUser(_BaseUser):
__slots__ = (
'site_admin',
'html_url',
'avatar_url',
) + _BaseUser.__slots__
def __init__(self, response: Dict[str, Any], _http: http) -> None:
super().__init__(response, _http)
self.site_admin: Optional[str] = response.get('site_admin')
self.html_url: Optional[str] = response.get('html_url')
self.avatar_url: Optional[str] = response.get('avatar_url')
def __repr__(self) -> str:
return f'<{self.__class__.__name__} login: {self.login!r}, id: {self.id}, site_admin: {self.site_admin}'
async def _get_user(self) -> User:
"""Upgrades the PartialUser to a User object."""
response = await self._http.get_user(self.login)
return User(response, self._http)
# === Repository stuff ===#
class Repository(APIObject):
"""Representation of a repository on Github.
Attributes
----------
id: :class:`int`
The ID of the repository in the API.
name: :class:`str`
The name of the repository in the API.
owner: :class:`User`
The owner of the repository.
created_at: :class:`datetime.datetime`
The time the repository was created at.
updated_at: :class:`datetime.datetime`
The time the repository was last updated.
url: :class:`str`
The API url for the repository.
html_url: :class:`str`
The human-url of the repository.
archived: :class:`bool`
Whether the repository is archived or live.
open_issues_count: :class:`int`
The number of the open issues on the repository.
default_branch: :class:`str`
The name of the default branch of the repository.
"""
if TYPE_CHECKING:
id: int
name: str
owner: str
__slots__ = (
'id',
'name',
'owner',
'sizecreated_at',
'url',
'html_url',
'archived',
'disabled',
'updated_at',
'open_issues_count',
'clone_url',
'stargazers_count',
'watchers_count',
'license',
)
def __init__(self, response: Dict[str, Any], _http: http) -> None:
super().__init__(response, _http)
tmp = self.__slots__ + APIObject.__slots__
keys = {key: value for key, value in self._response.items() if key in tmp}
for key, value in keys.items():
if key == 'owner':
setattr(self, key, PartialUser(value, self._http))
continue
if key == 'name':
setattr(self, key, value)
continue
if '_at' in key and value is not None:
setattr(self, key, dt_formatter(value))
continue
if 'license' in key:
if value is not None:
setattr(self, key, value.get('name'))
continue
setattr(self, key, None)
else:
setattr(self, key, value)
continue
def __repr__(self) -> str:
return f'<{self.__class__.__name__} id: {self.id}, name: {self.name!r}, owner: {self.owner!r}>'
@property
def is_fork(self) -> bool:
""":class:`bool`: Whether the repository is a fork."""
return self._response.get('fork')
@property
def language(self) -> str:
""":class:`str`: Primary language of the repository."""
return self._response.get('language')
@property
def open_issues(self) -> int:
""":class:`int`: The number of open issues on the repository."""
return self._response.get('open_issues')
@property
def forks(self) -> int:
""":class:`int`: The number of forks of the repository."""
return self._response.get('forks')
@property
def default_branch(self) -> str:
""":class:`str`: The default branch of the repository."""
return self._response.get('default_branch')
async def delete(self) -> None:
"""Deletes the repository."""
return await self._http.delete_repo(
self.owner.name, # type: ignore this shit is not my fault
self.name,
) # type: ignore
async def add_file(self, filename: str, message: str, content: str, branch: Optional[str] = None) -> None:
"""Adds a file to the repository.
Parameters
----------
filename: :class:`str` The name of the file.
message: :class:`str` The commit message.
content: :class:`str` The content of the file.
branch: :class:`str` The branch to add the file to, defaults to the default branch.
"""
if branch is None:
branch = self.default_branch
return await self._http.add_file(owner=self.owner.name, repo_name=self.name, filename=filename, content=content, message=message, branch=branch) # type: ignore
class Issue(APIObject):
"""Representation of an issue on Github.
Attributes
----------
id: :class:`int`
The ID of the issue in the API.
title: :class:`str`
The title of the issue in the API.
user: :class:`User`
The user who opened the issue.
labels: List[:class:`str`]
TODO: document this.
state: :class:`str`
The current state of the issue.
created_at: :class:`datetime.datetime`
The time the issue was created.
closed_by: Optional[Union[:class:`PartialUser`, :class:`User`]]
The user the issue was closed by, if applicable.
"""
__slots__ = (
'id',
'title',
'user',
'labels',
'state',
'created_at',
'closed_by',
)
def __init__(self, response: Dict[str, Any], _http: http) -> None:
super().__init__(response, _http)
tmp = self.__slots__ + APIObject.__slots__
keys = {key: value for key, value in self._response.items() if key in tmp}
for key, value in keys.items():
if key == 'user':
setattr(self, key, PartialUser(value, self._http))
continue
if key == 'labels':
setattr(self, key, [label['name'] for label in value])
continue
if key == 'closed_by':
setattr(self, key, User(value, self._http))
continue
else:
setattr(self, key, value)
continue
def __repr__(self) -> str:
return (
f'<{self.__class__.__name__} id: {self.id}, title: {self.title}, user: {self.user}, created_at:'
f' {self.created_at}, state: {self.state}>'
)
@property
def updated_at(self) -> Optional[datetime]:
"""Optional[:class:`datetime.datetime`]: The time the issue was last updated, if applicable."""
return dt_formatter(self._response.get('updated_at'))
@property
def html_url(self) -> str:
""":class:`str`: The human-friendly url of the issue."""
return self._response.get('html_url')
# === Gist stuff ===#
class File:
"""A wrapper around files and in-memory file-like objects.
Parameters
----------
fp: Union[:class:`str`, :class:`io.StringIO`, :class:`io.BytesIO`]
The filepath or StringIO representing a file to upload.
If providing a StringIO instance, a filename shuold also be provided to the file.
filename: :class:`str`
An override to the file's name, encouraged to provide this if using a StringIO instance.
"""
def __init__(self, fp: Union[str, io.StringIO, io.BytesIO], filename: str = 'DefaultFilename.txt') -> None:
self.fp = fp
self.filename = filename
def read(self) -> str:
if isinstance(self.fp, str):
if os.path.exists(self.fp):
with open(self.fp) as fp:
data = fp.read()
return data
return self.fp
elif isinstance(self.fp, io.BytesIO):
return self.fp.read().decode('utf-8')
elif isinstance(self.fp, io.StringIO): # type: ignore
return self.fp.getvalue()
raise TypeError(f'Expected str, io.StringIO, or io.BytesIO, got {type(self.fp)}')
class Gist(APIObject):
"""Representation of a gist on Github.
Attributes
----------
id: :class:`int`
The ID of the gist in the API.
html_url: :class:`str`
The human-friendly url of the gist.
files: List[:class:`File`]
A list of the files in the gist, can be an empty list.
public: :class:`bool`
Whether the gist is public.
owner: Union[:class:`PartialUser`, :class:`User`]
The owner of the gist.
created_at: :class:`datetime.datetime`
The time the gist was created at.
"""
__slots__ = (
'id',
'html_url',
'node_id',
'files',
'public',
'owner',
'created_at',
'truncated',
)
def __init__(self, response: Dict[str, Any], _http: http) -> None:
super().__init__(response, _http)
tmp = self.__slots__ + APIObject.__slots__
keys = {key: value for key, value in self._response.items() if key in tmp}
for key, value in keys.items():
if key == 'owner':
setattr(self, key, PartialUser(value, self._http))
continue
if key == 'created_at':
setattr(self, key, dt_formatter(value))
continue
else:
setattr(self, key, value)
def __repr__(self) -> str:
return f'<{self.__class__.__name__} id: {self.id}, owner: {self.owner}, created_at: {self.created_at}>'
@property
def updated_at(self) -> Optional[datetime]:
"""Optional[:class:`datetime.datetime`]: The time the gist was last updated, if applicable."""
return dt_formatter(self._response.get('updated_at'))
@property
def comments(self) -> str:
"""TODO: document this."""
return self._response.get('comments')
@property
def discussion(self) -> str:
"""TODO: document this."""
return self._response.get('discussion')
@property
def raw(self) -> Dict[str, Any]:
"""TODO: document this."""
return self._response
@property
def url(self) -> str:
return self._response.get('html_url')
async def delete(self):
"""Delete the gist."""
await self._http.delete_gist(self.id)
# === Organization stuff ===#
class Organization(APIObject):
"""Representation of an organization in the API.
Attributes
----------
login: :class:`str`
TODO: document this
id: :class:`int`
The ID of the organization in the API.
is_verified: :class:`bool`
Whether or not the organization is verified.
created_at: :class:`datetime.datetime`
The time the organization was created at.
avatar_url: :class:`str`
The url of the organization's avatar.
"""
__slots__ = (
'login',
'id',
'is_verified',
'public_repos',
'public_gists',
'followers',
'following',
'created_at',
'avatar_url',
)
def __init__(self, response: Dict[str, Any], _http: http) -> None:
super().__init__(response, _http)
tmp = self.__slots__ + APIObject.__slots__
keys = {key: value for key, value in self._response.items() if key in tmp}
for key, value in keys.items():
if key == 'login':
setattr(self, key, value)
continue
if '_at' in key and value is not None:
setattr(self, key, dt_formatter(value))
continue
else:
setattr(self, key, value)
continue
def __repr__(self):
return (
f'<{self.__class__.__name__} login: {self.login!r}, id: {self.id}, is_verified: {self.is_verified},'
f' public_repos: {self.public_repos}, public_gists: {self.public_gists}, created_at: {self.created_at}>'
)
@property
def description(self):
""":class:`str`: The description of the organization."""
return self._response.get('description')
@property
def html_url(self):
""":class:`str`: The human-friendly url of the organization."""
return self._response.get('html_url')

View file

@ -0,0 +1,2 @@
from .file import *
from .object import *

26
github/objects/file.py Normal file
View file

@ -0,0 +1,26 @@
__all__ = ("File",)
import os
from io import BytesIO, StringIO
from pathlib import Path
from typing import Union
class File:
def __init__(self, file: Union[str, StringIO, BytesIO], /, *, filename: str) -> None:
self._file = file
self.name = filename
def read(self) -> str:
f = self._file
if isinstance(f, BytesIO):
return f.read().decode("utf-8")
if isinstance(f, StringIO):
return f.getvalue()
if os.path.exists(f):
return Path(f).read_text()
return f

18
github/objects/object.py Normal file
View file

@ -0,0 +1,18 @@
from __future__ import annotations
__all__ = ("Object",)
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from ..internals import HTTPClient
class Object:
__slots__ = ("__http",)
def __init__(self, *, http: HTTPClient) -> None:
self.__http = http
def __repr__(self) -> str:
return f"<{self.__class__.__name__}>"

1
github/types/__init__.py Normal file
View file

@ -0,0 +1 @@
from .security_and_analysis import *

View file

@ -0,0 +1,23 @@
from typing import Literal, TypedDict
from typing_extensions import NotRequired
__all__ = ("SecurtiyAndAnalysis",)
class AdvancedSecurity(TypedDict):
status: Literal["enabled", "disabled"]
class SecretScanning(TypedDict):
status: Literal["enabled", "disabled"]
class SecretScanningPushProtection(TypedDict):
status: Literal["enabled", "disabled"]
class SecurtiyAndAnalysis(TypedDict):
advanced_security: NotRequired[AdvancedSecurity]
secret_scanning: NotRequired[SecretScanning]
secret_scanning_push_protection: NotRequired[SecretScanningPushProtection]

View file

@ -1,43 +0,0 @@
# == urls.py ==#
BASE_URL = 'https://api.github.com'
# == user urls ==#
USERS_URL = f"{BASE_URL}/users/{{0}}"
USER_HTML_URL = 'https://github.com/users/{0}'
SELF_URL = f"{BASE_URL}/user"
USER_REPOS_URL = f"{USERS_URL}/repos"
USER_ORGS_URL = f"{USERS_URL}/orgs"
USER_GISTS_URL = f"{USERS_URL}/gists"
USER_FOLLOWERS_URL = f"{USERS_URL}/followers"
USER_FOLLOWING_URL = f"{USERS_URL}/following"
# == repo urls ==#
CREATE_REPO_URL = f"{BASE_URL}/user/repos" # _auth repo create
REPOS_URL = f"{BASE_URL}/repos/{{0}}" # repos of a user
REPO_URL = f"{BASE_URL}/repos/{{0}}/{{1}}" # a specific repo
ADD_FILE_URL = f"{BASE_URL}/repos/{{}}/{{}}/contents/{{}}"
ADD_FILE_BRANCH = f"{BASE_URL}"
REPO_ISSUE_URL = f"{REPO_URL}/issues/{{2}}" # a specific issue
# == gist urls ==#
GIST_URL = f"{BASE_URL}/gists/{{0}}" # specific gist
CREATE_GIST_URL = f"{BASE_URL}/gists" # create a gist
# == org urls ==#
ORG_URL = f"{BASE_URL}/orgs/{{0}}"

46
github/utils.py Normal file
View file

@ -0,0 +1,46 @@
from __future__ import annotations
__all__ = (
"human_readable_time_until",
"str_to_datetime",
"repr_dt",
"bytes_to_b64",
"error_from_request",
)
from base64 import b64encode
from typing import TYPE_CHECKING, Optional
from .errors import HTTPError
if TYPE_CHECKING:
from datetime import datetime, timedelta
from aiohttp import ClientResponse
from .errors import BaseHTTPError
def human_readable_time_until(td: timedelta, /) -> str:
seconds = int(td.total_seconds())
hours, seconds = divmod(seconds, 3600)
minutes, seconds = divmod(seconds, 60)
return f"{hours} hours, {minutes} minues, {seconds} seconds"
def str_to_datetime(time: Optional[str], /) -> Optional[datetime]:
return None if time is None else datetime.strptime(time, r"%Y-%m-%dT%H:%M:%SZ")
def repr_dt(time: datetime, /) -> str:
return time.strftime(r"%d-%m-%Y, %H:%M:%S")
def bytes_to_b64(content: str, /) -> str:
return b64encode(content.encode("utf-8")).decode("ascii")
def error_from_request(request: ClientResponse, /) -> BaseHTTPError:
# TODO: Make specific errrors
return HTTPError(request)