diff --git a/github/__init__.py b/github/__init__.py index 9f7931d..b7f96d9 100644 --- a/github/__init__.py +++ b/github/__init__.py @@ -1,13 +1,34 @@ -# == __init__.py ==# +""" +MIT License -__title__ = 'Github-Api-Wrapper' -__authors__ = 'VarMonke', 'sudosnok' -__version__ = '1.2.1' -__license__ = 'MIT' -__copyright__ = 'Copyright (c) 2022-present VarMonke & sudosnok' +Copyright (c) 2022-present VarMonke, sudosnok & contributors -from .client import * -from .exceptions import * -from .http import * +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. + +""" + +__title__ = "GitHub-API-Wrapper" +__authors__ = ("VarMonke", "sudosnok", "contributors") +__version__ = "2.0a" +__license__ = "MIT" +__copyright__ = "Copyright (c) 2022-present VarMonke, sudosnok & contributors" + +from .errors import * +from .internals import * from .objects import * -from .urls import * diff --git a/github/cache.py b/github/cache.py deleted file mode 100644 index 29d236e..0000000 --- a/github/cache.py +++ /dev/null @@ -1,69 +0,0 @@ -# == cache.py ==# - -from __future__ import annotations - -from collections import deque -from typing import Any, Deque, Dict, Tuple, TypeVar - -__all__: Tuple[str, ...] = ('ObjectCache',) - - -K = TypeVar('K') -V = TypeVar('V') - - -class _BaseCache(Dict[K, V]): - """This is a rough implementation of an LRU Cache using a deque and a dict.""" - - __slots__: Tuple[str, ...] = ('_max_size', '_lru_keys') - - def __init__(self, max_size: int, *args: Any) -> None: - self._max_size: int = max(min(max_size, 30), 0) # bounding max_size to 15 for now - self._lru_keys: Deque[K] = deque(maxlen=self._max_size) - super().__init__(*args) - - def __getitem__(self, __k: K) -> V: - index = self._lru_keys.index(__k) - target = self._lru_keys[index] - del self._lru_keys[index] - - self._lru_keys.appendleft(target) - return super().__getitem__(__k) - - def __setitem__(self, __k: K, __v: V) -> None: - if len(self) == self._max_size: - self.__delitem__(self._lru_keys.pop()) - - self._lru_keys.appendleft(__k) - return super().__setitem__(__k, __v) - - def update(self, **kwargs: Any) -> None: - for key, value in dict(**kwargs).items(): - key: K - value: V - - self.__setitem__(key, value) - - -class ObjectCache(_BaseCache[K, V]): - """This adjusts the typehints to reflect Github objects.""" - - def __getitem__(self, __k: K) -> V: - index = self._lru_keys.index(__k) - target = self._lru_keys[index] - self._lru_keys.appendleft(target) - return super().__getitem__(__k) - - def __setitem__(self, __k: K, __v: V) -> None: - if self.__len__() == self._max_size: - self.__delitem__(self._lru_keys.pop()) - - self._lru_keys.appendleft(__k) - return super().__setitem__(__k, __v) - - def update(self, **kwargs: Any) -> None: - for key, value in dict(**kwargs).items(): - key: K - value: V - - self.__setitem__(key, value) diff --git a/github/client.py b/github/client.py deleted file mode 100644 index 293122a..0000000 --- a/github/client.py +++ /dev/null @@ -1,381 +0,0 @@ -# == main.py ==# -from __future__ import annotations - -import functools -from typing import ( - Any, - Awaitable, - Callable, - Coroutine, - Dict, - Generator, - List, - Literal, - Optional, - Tuple, - TypeVar, - Union, - overload, -) - -import aiohttp -from typing_extensions import Concatenate, ParamSpec, Self - -from . import exceptions -from .cache import ObjectCache -from .http import http -from .objects import File, Gist, Issue, Organization, Repository, User - -__all__: Tuple[str, ...] = ('GHClient', 'Client') - -T = TypeVar('T') -P = ParamSpec('P') - - -class GHClient: - """The main client, used to start most use-cases. - - Parameters - ---------- - username: Optional[:class:`str`] - An optional username to be provided along with a token to make authenticated API calls. - If you provide a username, the token must be provided as well. - user_cache_size: Optional[:class:`int`] - Determines the maximum number of User objects that will be cached in memory. - Defaults to 30, must be between 30 and 0 inclusive. - repo_cache_size: Optional[:class:`int`] - Determines the maximum number of Repository objects that will be cached in memory. - Defaults to 15, must be between 30 and 0 inclusive. - custom_headers: Optional[:class:`dict`] - A way to pass custom headers into the client session that drives the client, eg. a user-agent. - - Attributes - ---------- - username: Optional[:class:`str`] - The authenticated Client's username, if applicable. - __token: Optional[:class:`str`] - The authenticated Client's token, if applicable. - """ - - has_started: bool = False - - def __init__( - self, - *, - username: Optional[str] = None, - token: Optional[str] = None, - user_cache_size: int = 30, - repo_cache_size: int = 15, - custom_headers: Dict[str, Union[str, int]] = {}, - ): - self._headers = custom_headers - - if username and token: - self.username = username - self.__token = token - self.__auth = aiohttp.BasicAuth(username, token) - else: - self.__auth = None - self.username = None - self.__token = None - - self.http = http(headers=custom_headers, auth=self.__auth) - - self._user_cache = ObjectCache[Any, User](user_cache_size) - self._repo_cache = ObjectCache[Any, Repository](repo_cache_size) - - # Cache manegent - self._cache(type='user')(self.get_self) # type: ignore - self._cache(type='user')(self.get_user) # type: ignore - self._cache(type='repo')(self.get_repo) # type: ignore - - def __call__(self, *args: Any, **kwargs: Any) -> Coroutine[Any, Any, Self]: - return self.start(*args, **kwargs) - - def __await__(self) -> Generator[Any, Any, Self]: - return self.start().__await__() - - async def __aenter__(self) -> Self: - await self.start() - return self - - async def __aexit__(self, *args: Any, **kwargs: Any) -> None: - try: - session = self.http.session - await session.close() - except Exception as exc: - raise Exception('HTTP Session doesn\'t exist') from exc - - def __repr__(self) -> str: - return f'' - - @overload - def check_limits(self, as_dict: Literal[True] = True) -> Dict[str, Union[str, int]]: - ... - - @overload - def check_limits(self, as_dict: Literal[False] = False) -> List[str]: - ... - - def check_limits(self, as_dict: bool = False) -> Union[Dict[str, Union[str, int]], List[str]]: - """Returns the remaining number of API calls per timeframe. - - Parameters - ---------- - as_dict: Optional[:class:`bool`] - Set to True to return the remaining calls in a dictionary. - Set to False to return the remaining calls in a list. - Defaults to False - """ - if not self.has_started: - raise exceptions.NotStarted - if not as_dict: - output: List[str] = [] - for key, value in self.http.session._rates._asdict().items(): # type: ignore - output.append(f"{key} : {value}") - - return output - - return self.http.session._rates # type: ignore - - async def update_auth(self, *, username: str, token: str) -> None: - """Allows you to input auth information after instantiating the client. - - Parameters - ---------- - username: :class:`str` - The username to update the authentication to. - Must also be provided with the valid token. - token: :class:`str` - The token to update the authentication to. - Must also be providede with the valid username. - """ - # check if username and token is valid - await self.http.update_auth(username=username, token=token) - try: - await self.http.get_self() - except exceptions.InvalidToken as exc: - raise exceptions.InvalidToken from exc - - async def start(self) -> Self: - """Main entry point to the wrapper, this creates the ClientSession. - - Parameters - ---------- - """ - if self.has_started: - raise exceptions.AlreadyStarted - if self.__auth: - self.http = await http(auth=self.__auth, headers=self._headers) - try: - await self.http.get_self() - except exceptions.InvalidToken as exc: - raise exceptions.InvalidToken from exc - else: - self.http = await http(auth=None, headers=self._headers) - self.has_started = True - return self - - def _cache( - self: Self, *, type: str - ) -> Callable[ - [Callable[Concatenate[Self, P], Awaitable[T]]], - Callable[Concatenate[Self, P], Awaitable[Optional[Union[T, User, Repository]]]], - ]: - def wrapper( - func: Callable[Concatenate[Self, P], Awaitable[T]] - ) -> Callable[Concatenate[Self, P], Awaitable[Optional[Union[T, User, Repository]]]]: - @functools.wraps(func) - async def wrapped(self: Self, *args: P.args, **kwargs: P.kwargs) -> Optional[Union[T, User, Repository]]: - if type == 'user': - obj = self._user_cache.get(kwargs.get('user')) - if obj: - return obj - - user: User = await func(self, *args, **kwargs) # type: ignore - self._user_cache[kwargs.get("user")] = user - return user - if type == 'repo': - obj = self._repo_cache.get(kwargs.get('repo')) - if obj: - return obj - - repo: Repository = await func(self, *args, **kwargs) # type: ignore - self._repo_cache[kwargs.get('repo')] = repo - return repo - - return wrapped - - return wrapper - - # @_cache(type='User') - async def get_self(self) -> User: - """:class:`User`: Returns the authenticated User object.""" - if self.__auth: - return User(await self.http.get_self(), self.http) - else: - raise exceptions.NoAuthProvided - - async def get_user(self, *, user: str) -> User: - """:class:`User`: Fetch a Github user from their username. - - Parameters - ---------- - user: :class:`str` - The name of the user to fetch. - """ - return User(await self.http.get_user(user), self.http) - - async def get_repo(self, *, owner: str, repo: str) -> Repository: - """:class:`Repository`: Fetch a Github repository from it's name. - - Parameters - ---------- - owner: :class:`str` - The name of the owner of a given reposiory. - repo: :class:`str` - The name of the repository to fetch. - """ - return Repository(await self.http.get_repo(owner, repo), self.http) # type: ignore - - async def get_issue(self, *, owner: str, repo: str, issue: int) -> Issue: - """:class:`Issue`: Fetch a Github Issue from it's name. - - Parameters - ---------- - owner: :class:`str` - The name of the owner of the repository for which the issue relates to. - repo: :class:`str` - The name of the repository to which the issue is related to. - issue: :class:`int` - The ID of the issue to fetch. - """ - return Issue(await self.http.get_repo_issue(owner, repo, issue), self.http) # type: ignore #fwiw, this shouldn't error but pyright <3 - - async def create_repo( - self, - name: str, - description: str = 'Repository created using Github-Api-Wrapper.', - public: bool = False, - gitignore: Optional[str] = None, - license: Optional[str] = None, - ) -> Repository: - """Creates a Repository with supplied data. - Requires API authentication. - - Parameters - ---------- - name: :class:`str` - The name of the repository to be created. - description: :class:`str` - A description of the repository to be created. - public: :class:`bool` - Determines whether only the repository will be visible to the public. - Defaults to False (private repository). - gitignore: Optional[:class:`str`] - .gitignore template to use. - See https://github.com/github/gitignore for GitHub's own templates. - Defaults to None. - license: Optional[:class:`str`] - TODO: Document this. - - Returns - ------- - :class:`Repository` - """ - return Repository( - await self.http.create_repo(name, description, public, gitignore, license), - self.http, - ) - - async def delete_repo(self, repo: str) -> Optional[str]: - """Delete a Github repository, requires authorisation. - - Parameters - ---------- - repo: :class:`str` - The name of the repository to delete. - - Returns - ------- - Optional[:class:`str`] - """ - return await self.http.delete_repo(self.username, repo) - - async def get_gist(self, gist: str) -> Gist: - """Fetch a Github gist from it's id. - - Parameters - ---------- - gist: :class:`str` - The id of the gist to fetch. - - Returns - ------- - :class:`Gist` - """ - return Gist(await self.http.get_gist(gist), self.http) - - async def create_gist( - self, *, files: List[File], description: str = 'Gist from Github-Api-Wrapper', public: bool = True - ) -> Gist: - """Creates a Gist with the given files, requires authorisation. - - Parameters - ---------- - files: List[:class:`File`] - A list of File objects to upload to the gist. - description: :class:`str` - A description of the gist. - public: :class:`bool` - Determines whether the gist will be visible to the public. - Defaults to False (private). - - Returns - ------- - :class:`Gist` - """ - return Gist( - await self.http.create_gist(files=files, description=description, public=public), - self.http, - ) - - async def delete_gist(self, gist: int) -> Optional[str]: - """Delete a Github gist, requires authorisation. - - Parameters - ---------- - gist: :class:`int` - The ID of the gist to delete. - - Returns - ------- - Optional[:class:`str`] - """ - return await self.http.delete_gist(gist) - - async def get_org(self, org: str) -> Organization: - """Fetch a Github organization from it's name. - - Parameters - ---------- - org: :class:`str` - The name of the organization to fetch. - - Returns - ------- - :class:`Organization` - """ - return Organization(await self.http.get_org(org), self.http) - - async def latency(self) -> float: - """:class:`float`: Returns the latency of the client.""" - return await self.http.latency() - - async def close(self) -> None: - """Close the session.""" - await self.http.session.close() - - -class Client(GHClient): - pass diff --git a/github/errors.py b/github/errors.py new file mode 100644 index 0000000..4662601 --- /dev/null +++ b/github/errors.py @@ -0,0 +1,48 @@ +from __future__ import annotations + +__all__ = ("GitHubError", "BaseHTTPError", "HTTPError", "RatelimitReached") + +from datetime import datetime, timezone +from typing import TYPE_CHECKING + +from .utils import human_readable_time_until + +if TYPE_CHECKING: + from aiohttp import ClientResponse + + +class GitHubError(Exception): + """The base class for all errors raised in this library.""" + + +class BaseHTTPError(GitHubError): + """The base class for all HTTP related errors in this library.""" + + +class HTTPError(BaseHTTPError): + """Raised when an HTTP request doesn't respond with a successfull code.""" + + def __init__(self, response: ClientResponse, /) -> None: + self.method = response.method + self.code = response.status + self.url = response.url + self._response = response + + def __str__(self) -> str: + return ( + f"An HTTP error with the code {self.code} has occured while trying to do a" + f" {self.method} request to the URL {self.url}" + ) + + +class RatelimitReached(GitHubError): + """Raised when a ratelimit is reached.""" + + def __init__(self, reset_time: datetime, /) -> None: + self.reset_time = reset_time + + def __str__(self) -> str: + return ( + "The ratelimit has been reached. You can try again in" + f" {human_readable_time_until(datetime.now(timezone.utc) - self.reset_time)}" + ) diff --git a/github/exceptions.py b/github/exceptions.py deleted file mode 100644 index 60f791f..0000000 --- a/github/exceptions.py +++ /dev/null @@ -1,175 +0,0 @@ -# == exceptions.py ==# - -import datetime -from typing import Optional, Tuple - -from aiohttp import ClientResponse - -__all__: Tuple[str, ...] = ( - 'APIError', - 'AlreadyStarted', - 'ClientException', - 'ClientResponse', - 'GistNotFound', - 'HTTPException', - 'InvalidAuthCombination', - 'InvalidToken', - 'IssueNotFound', - 'LoginFailure', - 'MissingPermissions', - 'NoAuthProvided', - 'NotStarted', - 'OrganizationNotFound', - 'Ratelimited', - 'RepositoryAlreadyExists', - 'RepositoryNotFound', - 'ResourceAlreadyExists', - 'ResourceNotFound', - 'UserNotFound', - 'WillExceedRatelimit', -) - - -class APIError(Exception): - """Base level exceptions raised by errors related to any API request or call.""" - - pass - - -class HTTPException(Exception): - """Base level exceptions raised by errors related to HTTP requests.""" - - pass - - -class ClientException(Exception): - """Base level exceptions raised by errors related to the client.""" - - pass - - -class ResourceNotFound(Exception): - """Base level exceptions raised when a resource is not found.""" - - pass - - -class ResourceAlreadyExists(Exception): - """Base level exceptions raised when a resource already exists.""" - - pass - - -class Ratelimited(APIError): - """Raised when the ratelimit from Github is reached or exceeded.""" - - def __init__(self, reset_time: datetime.datetime): - formatted = reset_time.strftime(r"%H:%M:%S %A, %d %b") - msg = f"We're being ratelimited, wait until {formatted}.\nAuthentication raises the ratelimit." - super().__init__(msg) - - -class WillExceedRatelimit(APIError): - """Raised when the library predicts the call will exceed the ratelimit, will abort the call by default.""" - - def __init__(self, response: ClientResponse, count: int): - msg = 'Performing this action will exceed the ratelimit, aborting.\n{} remaining available calls, calls to make: {}.' - msg = msg.format(response.headers['X-RateLimit-Remaining'], count) - super().__init__(msg) - - -class NoAuthProvided(ClientException): - """Raised when no authentication is provided.""" - - def __init__(self): - msg = 'This action required autentication. Pass username and token kwargs to your client instance.' - super().__init__(msg) - - -class InvalidToken(ClientException): - """Raised when the token provided is invalid.""" - - def __init__(self): - msg = 'The token provided is invalid.' - super().__init__(msg) - - -class InvalidAuthCombination(ClientException): - """Raised when the username and token are both provided.""" - - def __init__(self, msg: str): - # msg = 'The username and token cannot be used together.' - super().__init__(msg) - - -class LoginFailure(ClientException): - """Raised when the login attempt fails.""" - - def __init__(self): - msg = 'The login attempt failed. Provide valid credentials.' - super().__init__(msg) - - -class NotStarted(ClientException): - """Raised when the client is not started.""" - - def __init__(self): - msg = 'The client is not started. Run Github.GHClient() to start.' - super().__init__(msg) - - -class AlreadyStarted(ClientException): - """Raised when the client is already started.""" - - def __init__(self): - msg = 'The client is already started.' - super().__init__(msg) - - -class MissingPermissions(APIError): - def __init__(self): - msg = 'You do not have permissions to perform this action.' - super().__init__(msg) - - -class UserNotFound(ResourceNotFound): - def __init__(self): - msg = 'The requested user was not found.' - super().__init__(msg) - - -class RepositoryNotFound(ResourceNotFound): - def __init__(self): - msg = 'The requested repository is either private or does not exist.' - super().__init__(msg) - - -class IssueNotFound(ResourceNotFound): - def __init__(self): - msg = 'The requested issue was not found.' - super().__init__(msg) - - -class OrganizationNotFound(ResourceNotFound): - def __init__(self): - msg = 'The requested organization was not found.' - super().__init__(msg) - - -class GistNotFound(ResourceNotFound): - def __init__(self): - msg = 'The requested gist was not found.' - super().__init__(msg) - - -class RepositoryAlreadyExists(ResourceAlreadyExists): - def __init__(self): - msg = 'The requested repository already exists.' - super().__init__(msg) - - -class FileAlreadyExists(ResourceAlreadyExists): - def __init__(self, msg: Optional[str] = None): - if msg is None: - msg = 'The requested file already exists.' - super().__init__(msg) diff --git a/github/http.py b/github/http.py deleted file mode 100644 index 8d54103..0000000 --- a/github/http.py +++ /dev/null @@ -1,322 +0,0 @@ -# == http.py ==# - -from __future__ import annotations - -import json -import platform -import re -from datetime import datetime -from types import SimpleNamespace -from typing import Any, Dict, List, NamedTuple, Optional, Tuple, Type, Union - -import aiohttp -from typing_extensions import TypeAlias - -from . import __version__ -from .exceptions import * -from .objects import File, Gist, Repository, User, bytes_to_b64 -from .urls import * - -__all__: Tuple[str, ...] = ( - 'Paginator', - 'http', -) - - -LINK_PARSING_RE = re.compile(r"<(\S+(\S))>; rel=\"(\S+)\"") - - -class Rates(NamedTuple): - remaining: str - used: str - total: str - reset_when: Union[datetime, str] - last_request: Union[datetime, str] - - -# aiohttp request tracking / checking bits -async def on_req_start( - session: aiohttp.ClientSession, ctx: SimpleNamespace, params: aiohttp.TraceRequestStartParams -) -> None: - """Before-request hook to make sure we don't overrun the ratelimit.""" - # print(repr(session), repr(ctx), repr(params)) - if session._rates.remaining in ('0', '1'): # type: ignore - raise Exception('Ratelimit exceeded') - - -async def on_req_end(session: aiohttp.ClientSession, ctx: SimpleNamespace, params: aiohttp.TraceRequestEndParams) -> None: - """After-request hook to adjust remaining requests on this time frame.""" - headers = params.response.headers - - remaining = headers['X-RateLimit-Remaining'] - used = headers['X-RateLimit-Used'] - total = headers['X-RateLimit-Limit'] - reset_when = datetime.fromtimestamp(int(headers['X-RateLimit-Reset'])) - last_req = datetime.utcnow() - - session._rates = Rates(remaining, used, total, reset_when, last_req) - - -trace_config = aiohttp.TraceConfig() -trace_config.on_request_start.append(on_req_start) -trace_config.on_request_end.append(on_req_end) - -APIType: TypeAlias = Union[User, Gist, Repository] - - -async def make_session(*, headers: Dict[str, str], authorization: Union[aiohttp.BasicAuth, None]) -> aiohttp.ClientSession: - """This makes the ClientSession, attaching the trace config and ensuring a UA header is present.""" - if not headers.get('User-Agent'): - headers['User-Agent'] = ( - f'Github-API-Wrapper (https://github.com/VarMonke/Github-Api-Wrapper) @ {__version__} Python' - f' {platform.python_version()} aiohttp {aiohttp.__version__}' - ) - - session = aiohttp.ClientSession(auth=authorization, headers=headers, trace_configs=[trace_config]) - session._rates = Rates('', '', '', '', '') - return session - - -# pagination -class Paginator: - """This class handles pagination for objects like Repos and Orgs.""" - - def __init__(self, session: aiohttp.ClientSession, response: aiohttp.ClientResponse, target_type: str): - self.session = session - self.response = response - self.should_paginate = bool(self.response.headers.get('Link', False)) - types: Dict[str, Type[APIType]] = { # note: the type checker doesnt see subclasses like that - 'user': User, - 'gist': Gist, - 'repo': Repository, - } - self.target_type: Type[APIType] = types[target_type] - self.pages = {} - self.is_exhausted = False - self.current_page = 1 - self.next_page = self.current_page + 1 - self.parse_header(response) - - async def fetch_page(self, link: str) -> Dict[str, Union[str, int]]: - """Fetches a specific page and returns the JSON.""" - return await (await self.session.get(link)).json() - - async def early_return(self) -> List[APIType]: - # I don't rightly remember what this does differently, may have a good ol redesign later - return [self.target_type(data, self) for data in await self.response.json()] # type: ignore - - async def exhaust(self) -> List[APIType]: - """Iterates through all of the pages for the relevant object and creates them.""" - if self.should_paginate: - return await self.early_return() - - out: List[APIType] = [] - for page in range(1, self.max_page + 1): - result = await self.session.get(self.bare_link + str(page)) - out.extend([self.target_type(item, self) for item in await result.json()]) # type: ignore - - self.is_exhausted = True - return out - - def parse_header(self, response: aiohttp.ClientResponse) -> None: - """Predicts wether a call will exceed the ratelimit ahead of the call.""" - header = response.headers['Link'] - groups = LINK_PARSING_RE.findall(header) - self.max_page = int(groups[1][1]) - if int(response.headers['X-RateLimit-Remaining']) < self.max_page: - raise WillExceedRatelimit(response, self.max_page) - self.bare_link = groups[0][0][:-1] - - -# GithubUserData = GithubRepoData = GithubIssueData = GithubOrgData = GithubGistData = Dict[str, Union [str, int]] -# Commentnig this out for now, consider using TypeDict's instead in the future <3 - - -class http: - def __init__(self, headers: Dict[str, Union[str, int]], auth: Union[aiohttp.BasicAuth, None]) -> None: - if not headers.get('User-Agent'): - headers['User-Agent'] = ( - 'Github-API-Wrapper (https://github.com/VarMonke/Github-Api-Wrapper) @' - f' {__version__} Python/{platform.python_version()} aiohttp/{aiohttp.__version__}' - ) - - self._rates = Rates('', '', '', '', '') - self.headers = headers - self.auth = auth - - def __await__(self): - return self.start().__await__() - - async def start(self): - self.session = aiohttp.ClientSession( - headers=self.headers, # type: ignore - auth=self.auth, - trace_configs=[trace_config], - ) - if not hasattr(self.session, "_rates"): - self.session._rates = Rates('', '', '', '', '') - return self - - def update_headers(self, *, flush: bool = False, new_headers: Dict[str, Union[str, int]]): - if flush: - from multidict import CIMultiDict - - self.session._default_headers = CIMultiDict(**new_headers) # type: ignore - else: - self.session._default_headers = {**self.session.headers, **new_headers} # type: ignore - - async def update_auth(self, *, username: str, token: str): - auth = aiohttp.BasicAuth(username, token) - headers = self.session.headers - config = self.session.trace_configs - await self.session.close() - self.session = aiohttp.ClientSession(headers=headers, auth=auth, trace_configs=config) - - def data(self): - # return session headers and auth - headers = {**self.session.headers} - return {'headers': headers, 'auth': self.auth} - - async def latency(self): - """Returns the latency of the current session.""" - start = datetime.utcnow() - await self.session.get(BASE_URL) - return (datetime.utcnow() - start).total_seconds() - - async def get_self(self) -> Dict[str, Union[str, int]]: - """Returns the authenticated User's data""" - result = await self.session.get(SELF_URL) - if 200 <= result.status <= 299: - return await result.json() - raise InvalidToken - - async def get_user(self, username: str) -> Dict[str, Union[str, int]]: - """Returns a user's public data in JSON format.""" - result = await self.session.get(USERS_URL.format(username)) - if 200 <= result.status <= 299: - return await result.json() - raise UserNotFound - - async def get_user_repos(self, _user: User) -> List[Dict[str, Union[str, int]]]: - result = await self.session.get(USER_REPOS_URL.format(_user.login)) - if 200 <= result.status <= 299: - return await result.json() - - print('This shouldn\'t be reachable') - return [] - - async def get_user_gists(self, _user: User) -> List[Dict[str, Union[str, int]]]: - result = await self.session.get(USER_GISTS_URL.format(_user.login)) - if 200 <= result.status <= 299: - return await result.json() - - print('This shouldn\'t be reachable') - return [] - - async def get_user_orgs(self, _user: User) -> List[Dict[str, Union[str, int]]]: - result = await self.session.get(USER_ORGS_URL.format(_user.login)) - if 200 <= result.status <= 299: - return await result.json() - - print('This shouldn\'t be reachable') - return [] - - async def get_repo(self, owner: str, repo_name: str) -> Optional[Dict[str, Union[str, int]]]: - """Returns a Repo's raw JSON from the given owner and repo name.""" - result = await self.session.get(REPO_URL.format(owner, repo_name)) - if 200 <= result.status <= 299: - return await result.json() - raise RepositoryNotFound - - async def get_repo_issue(self, owner: str, repo_name: str, issue_number: int) -> Optional[Dict[str, Any]]: - """Returns a single issue's JSON from the given owner and repo name.""" - result = await self.session.get(REPO_ISSUE_URL.format(owner, repo_name, issue_number)) - if 200 <= result.status <= 299: - return await result.json() - raise IssueNotFound - - async def delete_repo(self, owner: Optional[str], repo_name: str) -> Optional[str]: - """Deletes a Repo from the given owner and repo name.""" - result = await self.session.delete(REPO_URL.format(owner, repo_name)) - if 204 <= result.status <= 299: - return 'Successfully deleted repository.' - if result.status == 403: # type: ignore - raise MissingPermissions - raise RepositoryNotFound - - async def delete_gist(self, gist_id: Union[str, int]) -> Optional[str]: - """Deletes a Gist from the given gist id.""" - result = await self.session.delete(GIST_URL.format(gist_id)) - if result.status == 204: - return 'Successfully deleted gist.' - if result.status == 403: - raise MissingPermissions - raise GistNotFound - - async def get_org(self, org_name: str) -> Dict[str, Union[str, int]]: - """Returns an org's public data in JSON format.""" # type: ignore - result = await self.session.get(ORG_URL.format(org_name)) - if 200 <= result.status <= 299: - return await result.json() - raise OrganizationNotFound - - async def get_gist(self, gist_id: str) -> Dict[str, Union[str, int]]: - """Returns a gist's raw JSON from the given gist id.""" - result = await self.session.get(GIST_URL.format(gist_id)) - if 200 <= result.status <= 299: - return await result.json() - raise GistNotFound - - async def create_gist( - self, *, files: List['File'] = [], description: str = 'Default description', public: bool = False - ) -> Dict[str, Union[str, int]]: - data = {} - data['description'] = description - data['public'] = public - data['files'] = {} - for file in files: - data['files'][file.filename] = {'filename': file.filename, 'content': file.read()} # helps editing the file - data = json.dumps(data) - _headers = dict(self.session.headers) - result = await self.session.post(CREATE_GIST_URL, data=data, headers=_headers) - if 201 == result.status: - return await result.json() - raise InvalidToken - - async def create_repo( - self, name: str, description: str, public: bool, gitignore: Optional[str], license: Optional[str] - ) -> Dict[str, Union[str, int]]: - """Creates a repo for you with given data""" - data = { - 'name': name, - 'description': description, - 'public': public, - 'gitignore_template': gitignore, - 'license': license, - } - result = await self.session.post(CREATE_REPO_URL, data=json.dumps(data)) - if 200 <= result.status <= 299: - return await result.json() - if result.status == 401: - raise NoAuthProvided - raise RepositoryAlreadyExists - - async def add_file(self, owner: str, repo_name: str, filename: str, content: str, message: str, branch: str): - """Adds a file to the given repo.""" - - data = { - 'content': bytes_to_b64(content=content), - 'message': message, - 'branch': branch, - } - - result = await self.session.put(ADD_FILE_URL.format(owner, repo_name, filename), data=json.dumps(data)) - if 200 <= result.status <= 299: - return await result.json() - if result.status == 401: - raise NoAuthProvided - if result.status == 409: - raise FileAlreadyExists - if result.status == 422: - raise FileAlreadyExists('This file exists, and can only be edited.') - return await result.json(), result.status diff --git a/github/internals/__init__.py b/github/internals/__init__.py new file mode 100644 index 0000000..50b0188 --- /dev/null +++ b/github/internals/__init__.py @@ -0,0 +1 @@ +from .http import * diff --git a/github/internals/http.py b/github/internals/http.py new file mode 100644 index 0000000..0be6316 --- /dev/null +++ b/github/internals/http.py @@ -0,0 +1,852 @@ +from __future__ import annotations + +__all__ = ("HTTPClient",) + +import asyncio +import logging +import platform +import time +from datetime import datetime, timezone +from typing import TYPE_CHECKING, Any, Awaitable, Dict, List, Literal, NamedTuple, Optional, Union + +from aiohttp import ClientSession, TraceConfig + +from .. import __version__ +from ..utils import error_from_request, human_readable_time_until + +if TYPE_CHECKING: + from types import SimpleNamespace + + from aiohttp import BasicAuth, TraceRequestEndParams, TraceRequestStartParams + from typing_extensions import Self + + from ..objects import File + from ..types import SecurtiyAndAnalysis + +log = logging.getLogger("github") + + +class Ratelimits(NamedTuple): + remaining: Optional[int] + used: Optional[int] + total: Optional[int] + reset_time: Optional[datetime] + last_request: Optional[datetime] + + +# ========= TODO ========= # +# Make a good paginator +# Make objects for all API Types +# Make the requests return TypedDicts with those objects +# Make specific errrors +# Make route /users/{username}/hovercard +# Make it so an error gets raised when the cooldown is reached + +# === ROUTES CHECKLIST === # +# Actions +# Activity +# Apps +# Billing +# Branches +# Checks +# Codes of conduct +# Code Scanning +# Codespaces +# Collaborators +# Commits +# Dependabot +# Dependency Graph +# Deploy keys +# Deployments +# Emojis +# Enterprise administration +# Gists DONE +# Git database +# Gitignore +# Interactions +# Issues +# Licenses +# Markdown +# Meta +# Metrics +# Migrations +# OAuth authorizations +# Organizations +# Packages +# Pages +# Projects +# Pulls +# Rate limit +# Reactions +# Releases +# Repositories DONE +# SCIM +# Search +# Teams +# Users DONE +# Webhooks + + +class HTTPClient: + __session: ClientSession + _rates: Ratelimits + _last_ping: float + _latency: float + + def __new__( + cls, + *, + headers: Optional[Dict[str, Union[str, int]]] = None, + auth: Optional[BasicAuth] = None, + ) -> Awaitable[Self]: + # Basically async def __init__ + return cls.__async_init() + + @classmethod + async def __async_init( + cls, + *, + headers: Optional[Dict[str, str]] = None, + auth: Optional[BasicAuth] = None, + ) -> Self: + self = super(cls, cls).__new__(cls) + + if not headers: + headers = {} + + headers.setdefault( + "User-Agent", + "GitHub-API-Wrapper (https://github.com/Varmonke/GitHub-API-Wrapper) @" + f" {__version__} CPython/{platform.python_version()} aiohttp/{__version__}", + ) + + self._rates = Ratelimits(None, None, None, None, None) + self.__headers = headers + self.__auth = auth + + self._last_ping = 0 + self._latency = 0 + + trace_config = TraceConfig() + + @trace_config.on_request_start.append + async def on_request_start( + _: ClientSession, __: SimpleNamespace, params: TraceRequestStartParams + ) -> None: + if self.ratelimited: + log.info( + "Ratelimit exceeded, trying again in" + f" {human_readable_time_until(self._rates.reset_time - datetime.now(timezone.utc))} (URL:" + f" {params.url}, method: {params.method})" + ) + + # TODO: I get about 3-4 hours of cooldown this might not be a good idea, might make this raise an error instead. + await asyncio.sleep( + max((self._rates.reset_time - datetime.now(timezone.utc)).total_seconds(), 0) + ) + + @trace_config.on_request_end.append + async def on_request_end( + _: ClientSession, __: SimpleNamespace, params: TraceRequestEndParams + ) -> None: + """After-request hook to adjust remaining requests on this time frame.""" + headers = params.response.headers + + self._rates = Ratelimits( + int(headers["X-RateLimit-Remaining"]), + int(headers["X-RateLimit-Used"]), + int(headers["X-RateLimit-Limit"]), + datetime.fromtimestamp(int(headers["X-RateLimit-Reset"])).replace( + tzinfo=timezone.utc + ), + datetime.now(timezone.utc), + ) + + self.__session = ClientSession( + headers=headers, + auth=auth, + trace_configs=[trace_config], + ) + + return self + + async def __aenter__(self) -> Self: + return self + + async def __aexit__(self, *_) -> None: + await self.__session.close() + + @property + def ratelimited(self) -> bool: + remaining = self._rates.remaining + return remaining is not None and remaining < 2 + + @property + def latency(self) -> Awaitable[float]: + async def inner() -> float: + last_ping = self._last_ping + + # If there was no ping or the last ping was more than 5 seconds ago. + if not last_ping or time.monotonic() > last_ping + 5 or self.ratelimited: + self._last_ping = time.monotonic() + + start = time.monotonic() + await self.request("GET", "/") + self._latency = time.monotonic() - start + + return self._latency + + return inner() + + async def request( + self, method: Literal["GET", "POST", "PUT", "DELETE", "PATCH"], path: str, /, **kwargs: Any + ): + async with self.__session.request( + method, f"https://api.github.com{path}", **kwargs + ) as request: + if 200 <= request.status <= 299: + return await request.json() + + raise error_from_request(request) + + # === ROUTES === # + + # === USERS === # + + async def get_logged_in_user(self): + return await self.request("GET", "/user") + + async def update_logged_in_user( + self, + *, + name: Optional[str] = None, + email: Optional[str] = None, + blog: Optional[str] = None, + twitter_username: Optional[str] = None, + company: Optional[str] = None, + location: Optional[str] = None, + hireable: Optional[str] = None, + bio: Optional[str] = None, + ): + data = {} + + if name: + data["name"] = name + if email: + data["email"] = email + if blog: + data["blog"] = blog + if twitter_username: + data["twitter_username"] = twitter_username + if company: + data["company"] = company + if location: + data["location"] = location + if hireable: + data["hireable"] = hireable + if bio: + data["bio"] = bio + + return await self.request("PATCH", "/user", json=data) + + async def list_users(self, *, since: Optional[int] = None, per_page: Optional[int] = None): + params = {} + + if since: + params["since"] = since + if per_page: + params["per_page"] = per_page + + return await self.request("GET", "/users", params=params) + + async def get_user(self, *, username: str): + return await self.request("GET", f"/users/{username}") + + # TODO: /users/{username}/hovercard + # IDK what to name it + + # === REPOS === # + + async def list_org_repos( + self, + *, + org: str, + type: Optional[ + Literal["all", "public", "private", "forks", "sources", "member", "internal"] + ] = None, + sort: Optional[Literal["created", "updated", "pushed", "full_name"]] = None, + direction: Optional[Literal["asc", "desc"]] = None, + per_page: Optional[int] = None, + page: Optional[int] = None, + ): + params = {} + + if type: + params["type"] = type + if sort: + params["sort"] = sort + if direction: + params["direction"] = direction + if per_page: + params["per_page"] = per_page + if page: + params["page"] = page + + return await self.request("GET", f"/orgs/{org}/repos", params=params) + + async def create_org_repo( + self, + *, + org: str, + name: str, + description: Optional[str] = None, + homepage: Optional[str] = None, + private: Optional[bool] = None, + visibility: Optional[Literal["public", "private", "internal"]] = None, + has_issues: Optional[bool] = None, + has_projects: Optional[bool] = None, + has_wiki: Optional[bool] = None, + is_template: Optional[bool] = None, + team_id: Optional[int] = None, + auto_init: Optional[bool] = None, + gitignore_template: Optional[str] = None, + license_template: Optional[str] = None, + allow_squash_merge: Optional[bool] = None, + allow_merge_commit: Optional[bool] = None, + allow_rebase_merge: Optional[bool] = None, + allow_auto_merge: Optional[bool] = None, + delete_branch_on_merge: Optional[bool] = None, + use_squash_pr_title_as_default: Optional[bool] = None, + ): + data: Dict[str, Union[str, bool, int]] = { + "name": name, + } + + if description: + data["description"] = description + if homepage: + data["homepage"] = homepage + if private: + data["private"] = private + if visibility: + data["visibility"] = visibility + if has_issues: + data["has_issues"] = has_issues + if has_projects: + data["has_projects"] = has_projects + if has_wiki: + data["has_wiki"] = has_wiki + if is_template: + data["is_template"] = is_template + if team_id: + data["team_id"] = team_id + if auto_init: + data["auto_init"] = auto_init + if gitignore_template: + data["gitignore_template"] = gitignore_template + if license_template: + data["license_template"] = license_template + if allow_squash_merge: + data["allow_squash_merge"] = allow_squash_merge + if allow_merge_commit: + data["allow_merge_commit"] = allow_merge_commit + if allow_rebase_merge: + data["allow_rebase_merge "] = allow_rebase_merge + if allow_auto_merge: + data["allow_auto_merge"] = allow_auto_merge + if delete_branch_on_merge: + data["delete_branch_on_merge"] = delete_branch_on_merge + if use_squash_pr_title_as_default: + data["use_squash_pr_title_as_default"] = use_squash_pr_title_as_default + + return await self.request("POST", f"/orgs/{org}/repos", json=data) + + async def get_repo(self, *, owner: str, repo: str): + return await self.request("GET", f"/repos/{owner}/{repo}") + + async def update_repo( + self, + *, + owner: str, + repo: str, + name: Optional[str] = None, + description: Optional[str] = None, + homepage: Optional[str] = None, + private: Optional[bool] = None, + visibility: Optional[Literal["public", "private", "internal"]] = None, + security_and_analysis: Optional[SecurtiyAndAnalysis] = None, + has_issues: Optional[bool] = None, + has_projects: Optional[bool] = None, + has_wiki: Optional[bool] = None, + is_template: Optional[bool] = None, + default_branch: Optional[str] = None, + allow_squash_merge: Optional[bool] = None, + allow_merge_commit: Optional[bool] = None, + allow_rebase_merge: Optional[bool] = None, + allow_auto_merge: Optional[bool] = None, + delete_branch_on_merge: Optional[bool] = None, + use_squash_pr_title_as_default: Optional[bool] = None, + archived: Optional[bool] = None, + allow_forking: Optional[bool] = None, + ): + data = {} + + if name: + data["name"] = name + if description: + data["description"] = description + if homepage: + data["homepage"] = homepage + if private: + data["private"] = private + if visibility: + data["visibility"] = visibility + if security_and_analysis: + data["security_and_analysis"] = security_and_analysis + if has_issues: + data["has_issues"] = has_issues + if has_projects: + data["has_projects"] = has_projects + if has_wiki: + data["has_wiki"] = has_wiki + if is_template: + data["is_template"] = is_template + if default_branch: + data["default_branch"] = default_branch + if allow_squash_merge: + data["allow_squash_merge"] = allow_squash_merge + if allow_merge_commit: + data["allow_merge_commit"] = allow_merge_commit + if allow_rebase_merge: + data["allow_rebase_merge "] = allow_rebase_merge + if allow_auto_merge: + data["allow_auto_merge "] = allow_auto_merge + if delete_branch_on_merge: + data["delete_branch_on_merge "] = delete_branch_on_merge + if use_squash_pr_title_as_default: + data["use_squash_pr_title_as_default"] = use_squash_pr_title_as_default + if archived: + data["archived"] = archived + if allow_forking: + data["allow_forking"] = allow_forking + + return await self.request("PATCH", f"/repos/{owner}/{repo}", json=data) + + async def delete_repo(self, *, owner: str, repo: str): + return await self.request("DELETE", f"/repos/{owner}/{repo}") + + async def enable_automated_security_fixes_for_repo(self, *, owner: str, repo: str): + return await self.request("PUT", f"/repos/{owner}/{repo}/automated-security-fixes") + + async def disable_automated_security_fixes_for_repo(self, *, owner: str, repo: str): + return await self.request("DELETE", f"/repos/{owner}/{repo}/automated-security-fixes") + + async def list_codeowners_erros_for_repo( + self, *, owner: str, repo: str, ref: Optional[str] = None + ): + params = {} + + if ref: + params["ref"] = ref + + return await self.request("GET", f"/repos/{owner}/{repo}/codeowners/errors", params=params) + + async def list_contributors_for_repo( + self, + *, + owner: str, + repo: str, + anon: Optional[bool] = None, + per_page: Optional[int] = None, + page: Optional[int] = None, + ): + params = {} + + if anon: + params["anon"] = anon + if per_page: + params["per_page"] = per_page + if page: + params["page"] = page + + return await self.request("GET", f"/repos/{owner}/{repo}/contributors", params=params) + + async def create_dispatch_event_for_repo( + self, *, owner: str, repo: str, event_name: str, client_payload: Optional[str] = None + ): + data = { + "event_name": event_name, + } + + if client_payload: + data["client_payload"] = client_payload + + return await self.request("POST", f"/repos/{owner}/{repo}/dispatches", json=data) + + async def list_repo_languages_for_repo(self, *, owner: str, repo: str): + return await self.request("GET", f"/repos/{owner}/{repo}/languages") + + async def list_tags_for_repo( + self, *, owner: str, repo: str, per_page: Optional[int] = None, page: Optional[int] = None + ): + params = {} + + if per_page: + params["per_page"] = per_page + if page: + params["page"] = page + + return await self.request("GET", f"/repos/{owner}/{repo}/tags", params=params) + + async def list_teams_for_repo( + self, *, owner: str, repo: str, per_page: Optional[int] = None, page: Optional[int] = None + ): + params = {} + + if per_page: + params["per_page"] = per_page + if page: + params["page"] = page + + return await self.request("GET", f"/repos/{owner}/{repo}/teams", params=params) + + async def get_all_topic_for_repo( + self, *, owner: str, repo: str, per_page: Optional[int] = None, page: Optional[int] = None + ): + params = {} + + if per_page: + params["per_page"] = per_page + if page: + params["page"] = page + + return await self.request("GET", f"/repos/{owner}/{repo}/topics", params=params) + + async def replace_all_topics_for_repo(self, *, owner: str, repo: str, names: List[str]): + return await self.request("PUT", f"/repos/{owner}/{repo}/topics", json={"names": names}) + + async def transfer_repo( + self, *, owner: str, repo: str, new_owner: str, team_ids: Optional[List[int]] = None + ): + data: Dict[str, Union[str, List[int]]] = { + "new_owner": new_owner, + } + + if team_ids: + data["team_ids"] = team_ids + + return await self.request("POST", f"/repos/{owner}/{repo}/transfer", json=data) + + async def check_vulnerability_alerts_enabled_for_repo(self, *, owner: str, repo: str): + return await self.request("GET", f"/repos/{owner}/{repo}/vulnerability-alerts") + + async def enable_vulnerability_alerts_for_repo(self, *, owner: str, repo: str): + return await self.request("PUT", f"/repos/{owner}/{repo}/vulnerability-alerts") + + async def disable_vulnerability_alerts_for_repo(self, *, owner: str, repo: str): + return await self.request("DELETE", f"/repos/{owner}/{repo}/vulnerability-alerts") + + async def create_repo_using_template_repo( + self, + *, + template_owner: str, + template_repo: str, + owner: Optional[str] = None, + name: str, + include_all_branches: Optional[bool] = None, + private: Optional[bool] = None, + ): + data: Dict[str, Union[str, bool]] = { + "name": name, + } + + if owner: + data["owner"] = owner + if include_all_branches: + data["include_all_branches"] = include_all_branches + if private: + data["private"] = private + + return await self.request( + "POST", f"/repos/{template_owner}/{template_repo}/generate", json=data + ) + + async def list_public_repos(self, *, since: Optional[int] = None): + params = {} + + if since: + params["since"] = since + + return await self.request("GET", "/repositories", params=params) + + async def list_logged_in_user_repos( + self, + *, + visibility: Optional[Literal["all", "private", "public"]] = None, + affiliation: Optional[Literal["owner", "collaborator", "organization_member"]] = None, + type: Optional[Literal["all", "owner", "public", "private", "member"]] = None, + sort: Optional[Literal["created", "updated", "pushed", "full_name"]] = None, + direction: Optional[Literal["asc", "desc"]] = None, + per_page: Optional[int] = None, + page: Optional[int] = None, + since: Optional[str] = None, + before: Optional[str] = None, + ): + data = {} + + if visibility: + data["visibility"] = visibility + if affiliation: + data["affiliation"] = affiliation + if type: + data["type"] = type + if sort: + data["sort"] = sort + if direction: + data["direction"] = direction + if per_page: + data["per_page"] = per_page + if page: + data["page"] = page + if since: + data["since"] = since + if before: + data["before"] = before + + return self.request("POST", "/user/repos", json=data) + + async def create_repo( + self, + *, + name: str, + description: Optional[str] = None, + homepage: Optional[str] = None, + private: Optional[bool] = None, + has_issues: Optional[bool] = None, + has_projects: Optional[bool] = None, + has_wiki: Optional[bool] = None, + team_id: Optional[int] = None, + auto_init: Optional[bool] = None, + gitignore_template: Optional[str] = None, + license_template: Optional[str] = None, + allow_squash_merge: Optional[bool] = None, + allow_merge_commit: Optional[bool] = None, + allow_rebase_merge: Optional[bool] = None, + allow_auto_merge: Optional[bool] = None, + delete_branch_on_merge: Optional[bool] = None, + has_downloads: Optional[bool] = None, + is_template: Optional[bool] = None, + ): + data: Dict[str, Union[str, bool, int]] = { + "name": name, + } + + if description: + data["description"] = description + if homepage: + data["homepage"] = homepage + if private: + data["private"] = private + if has_issues: + data["has_issues"] = has_issues + if has_projects: + data["has_projects"] = has_projects + if has_wiki: + data["has_wiki"] = has_wiki + if team_id: + data["team_id"] = team_id + if auto_init: + data["auto_init"] = auto_init + if gitignore_template: + data["gitignore_template"] = gitignore_template + if license_template: + data["license_template"] = license_template + if allow_squash_merge: + data["allow_squash_merge"] = allow_squash_merge + if allow_merge_commit: + data["allow_merge_commit"] = allow_merge_commit + if allow_rebase_merge: + data["allow_rebase_merge"] = allow_rebase_merge + if allow_auto_merge: + data["allow_auto_merge"] = allow_auto_merge + if delete_branch_on_merge: + data["delete_branch_on_merge"] = delete_branch_on_merge + if has_downloads: + data["has_downloads"] = has_downloads + if is_template: + data["is_template"] = is_template + + return await self.request("POST", "/user/repos", json=data) + + async def list_user_repos( + self, + *, + username: str, + type: Optional[ + Literal["all", "public", "private", "forks", "sources", "member", "internal"] + ] = None, + sort: Optional[Literal["created", "updated", "pushed", "full_name"]] = None, + direction: Optional[Literal["asc", "desc"]] = None, + per_page: Optional[int] = None, + page: Optional[int] = None, + ): + params = {} + + if type: + params["type"] = type + if sort: + params["sort"] = sort + if direction: + params["direction"] = direction + if per_page: + params["per_page"] = per_page + if page: + params["page"] = page + + return await self.request("GET", f"/users/{username}/repos", params=params) + + # === GISTS === # + + async def list_logged_in_user_gists( + self, + *, + since: Optional[str] = None, + per_page: Optional[int] = None, + page: Optional[int] = None, + ): + params = {} + + if since: + params["since"] = since + if per_page: + params["per_page"] = per_page + if page: + params["page"] = page + + return await self.request("GET", "/gists", params=params) + + async def create_gist( + self, *, description: Optional[str] = None, files: List[File], public: Optional[bool] = None + ): + data: Dict[str, Union[str, bool, Dict[str, str]]] = { + "files": {f.name: f.read() for f in files}, + } + + if description: + data["description"] = description + if public: + data["public"] = public + + return await self.request("POST", "/gists", json=data) + + async def list_public_gists( + self, + *, + since: Optional[str] = None, + per_page: Optional[int] = None, + page: Optional[int] = None, + ): + params = {} + + if since: + params["since"] = since + if per_page: + params["per_page"] = per_page + if page: + params["page"] = page + + return await self.request("GET", "/gists/public", params=params) + + async def list_starred_gists( + self, + *, + since: Optional[str] = None, + per_page: Optional[int] = None, + page: Optional[int] = None, + ): + params = {} + + if since: + params["since"] = since + if per_page: + params["per_page"] = per_page + if page: + params["page"] = page + + return await self.request("GET", "/gists/starred", params=params) + + async def get_gist(self, *, gist_id: str): + return await self.request("GET", f"/gists/{gist_id}") + + async def update_gist( + self, *, gist_id: str, description: Optional[str] = None, files: Optional[List[File]] = None + ): + data = {} + + if description: + data["description"] = description + if files: + data["files"] = {f.name: f.read() for f in files} + + return await self.request("PATCH", f"/gists/{gist_id}") + + async def delete_gist(self, *, gist_id: str): + return await self.request("DELETE", f"/gists/{gist_id}") + + async def list_commits_for_gist( + self, *, gist_id: str, per_page: Optional[int] = None, page: Optional[int] = None + ): + params = {} + + if per_page: + params["per_page"] = per_page + if page: + params["page"] = page + + return await self.request("GET", f"/gists/{gist_id}/commits", params=params) + + async def list_forks_for_gist( + self, *, gist_id: str, per_page: Optional[int] = None, page: Optional[int] = None + ): + params = {} + + if per_page: + params["per_page"] = per_page + if page: + params["page"] = page + + return await self.request("GET", f"/gists/{gist_id}/forks", params=params) + + async def fork_gist(self, *, gist_id: str): + return await self.request("POST", f"/gists/{gist_id}/forks") + + async def check_starred_for_gist(self, *, gist_id: str): + return await self.request("GET", f"/gists/{gist_id}/star") + + async def star_gist(self, *, gist_id: str): + return await self.request("PUT", f"/gists/{gist_id}/star") + + async def unstar_gist(self, *, gist_id: str): + return await self.request("DELETE", f"/gists/{gist_id}/star") + + async def get_revision_for_gist(self, *, gist_id: str, sha: str): + return await self.request("GET", f"/gists/{gist_id}/{sha}") + + async def list_user_gists( + self, + *, + username: str, + since: Optional[str] = None, + per_page: Optional[int] = None, + page: Optional[int] = None, + ): + params = {} + + if since: + params["since"] = since + if per_page: + params["per_page"] = per_page + if page: + params["page"] = page + + return await self.request("GET", f"/users/{username}/gists", params=params) diff --git a/github/objects.py b/github/objects.py deleted file mode 100644 index 8f0978e..0000000 --- a/github/objects.py +++ /dev/null @@ -1,534 +0,0 @@ -# == objects.py ==# -from __future__ import annotations - -from base64 import b64encode -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union - -if TYPE_CHECKING: - from .http import http - -import io -import os -from datetime import datetime - -__all__: Tuple[str, ...] = ( - 'APIObject', - 'dt_formatter', - 'repr_dt', - 'PartialUser', - 'User', - 'Repository', - 'Issue', - 'File', - 'Gist', - 'Organization', -) - - -def dt_formatter(time_str: Optional[str]) -> Optional[datetime]: - if time_str is not None: - return datetime.strptime(time_str, r"%Y-%m-%dT%H:%M:%SZ") - - return None - - -def repr_dt(_datetime: datetime) -> str: - return _datetime.strftime(r'%d-%m-%Y, %H:%M:%S') - - -def bytes_to_b64(content) -> str: - return b64encode(content.encode('utf-8')).decode('ascii') - - -class APIObject: - """Top level class for objects created from the API""" - - __slots__: Tuple[str, ...] = ('_response', '_http') - - def __init__(self, response: Dict[str, Any], _http: http) -> None: - self._http = _http - self._response = response - - def __repr__(self) -> str: - return f'<{self.__class__.__name__}>' - - -# === User stuff ===# - - -class _BaseUser(APIObject): - __slots__ = ( - 'login', - 'id', - ) - - def __init__(self, response: Dict[str, Any], _http: http) -> None: - super().__init__(response, _http) - self._http = _http - self.login = response.get('login') - self.id = response.get('id') - - def __repr__(self) -> str: - return f'<{self.__class__.__name__} id = {self.id}, login = {self.login!r}>' - - async def repos(self) -> List[Repository]: - """List[:class:`Repository`]: Returns a list of public repositories under the user.""" - results = await self._http.get_user_repos(self) # type: ignore - return [Repository(data, self._http) for data in results] - - async def gists(self) -> List[Gist]: - """List[:class:`Gist`]: Returns a list of public gists under the user.""" - results = await self._http.get_user_gists(self) # type: ignore - return [Gist(data, self._http) for data in results] - - async def orgs(self) -> List[Organization]: - """List[:class:`Organization`]: Returns a list of public orgs under the user.""" - results = await self._http.get_user_orgs(self) # type: ignore - return [Organization(data, self._http) for data in results] - - @property - def name(self): - """Optional[str]: The name of the user, if available.""" - return self._response.get('login') - - -class User(_BaseUser): - """Representation of a user object on Github. - - Attributes - ---------- - login: :class:`str` - The API name of the user. - id: :class:`int` - The ID of the user. - avatar_url: :class:`str` - The url of the user's Github avatar. - html_url: :class:`str` - The url of the user's Github page. - created_at: :class:`datetime.datetime` - The time of creation of the user. - """ - - __slots__ = ( - 'login', - 'id', - 'avatar_url', - 'html_url', - 'public_repos', - 'public_gists', - 'followers', - 'following', - 'created_at', - ) - - def __init__(self, response: Dict[str, Any], _http: http) -> None: - super().__init__(response, _http) - tmp = self.__slots__ + _BaseUser.__slots__ - keys = {key: value for key, value in self._response.items() if key in tmp} - for key, value in keys.items(): - if '_at' in key and value is not None: - setattr(self, key, dt_formatter(value)) - continue - else: - setattr(self, key, value) - continue - - def __repr__(self) -> str: - return f'<{self.__class__.__name__} login: {self.login!r}, id: {self.id}, created_at: {self.created_at}>' - - -class PartialUser(_BaseUser): - __slots__ = ( - 'site_admin', - 'html_url', - 'avatar_url', - ) + _BaseUser.__slots__ - - def __init__(self, response: Dict[str, Any], _http: http) -> None: - super().__init__(response, _http) - self.site_admin: Optional[str] = response.get('site_admin') - self.html_url: Optional[str] = response.get('html_url') - self.avatar_url: Optional[str] = response.get('avatar_url') - - def __repr__(self) -> str: - return f'<{self.__class__.__name__} login: {self.login!r}, id: {self.id}, site_admin: {self.site_admin}' - - async def _get_user(self) -> User: - """Upgrades the PartialUser to a User object.""" - response = await self._http.get_user(self.login) - return User(response, self._http) - - -# === Repository stuff ===# - - -class Repository(APIObject): - """Representation of a repository on Github. - - Attributes - ---------- - id: :class:`int` - The ID of the repository in the API. - name: :class:`str` - The name of the repository in the API. - owner: :class:`User` - The owner of the repository. - created_at: :class:`datetime.datetime` - The time the repository was created at. - updated_at: :class:`datetime.datetime` - The time the repository was last updated. - url: :class:`str` - The API url for the repository. - html_url: :class:`str` - The human-url of the repository. - archived: :class:`bool` - Whether the repository is archived or live. - open_issues_count: :class:`int` - The number of the open issues on the repository. - default_branch: :class:`str` - The name of the default branch of the repository. - """ - - if TYPE_CHECKING: - id: int - name: str - owner: str - - __slots__ = ( - 'id', - 'name', - 'owner', - 'sizecreated_at', - 'url', - 'html_url', - 'archived', - 'disabled', - 'updated_at', - 'open_issues_count', - 'clone_url', - 'stargazers_count', - 'watchers_count', - 'license', - ) - - def __init__(self, response: Dict[str, Any], _http: http) -> None: - super().__init__(response, _http) - tmp = self.__slots__ + APIObject.__slots__ - keys = {key: value for key, value in self._response.items() if key in tmp} - for key, value in keys.items(): - if key == 'owner': - setattr(self, key, PartialUser(value, self._http)) - continue - - if key == 'name': - setattr(self, key, value) - continue - - if '_at' in key and value is not None: - setattr(self, key, dt_formatter(value)) - continue - - if 'license' in key: - if value is not None: - setattr(self, key, value.get('name')) - continue - setattr(self, key, None) - - else: - setattr(self, key, value) - continue - - def __repr__(self) -> str: - return f'<{self.__class__.__name__} id: {self.id}, name: {self.name!r}, owner: {self.owner!r}>' - - @property - def is_fork(self) -> bool: - """:class:`bool`: Whether the repository is a fork.""" - return self._response.get('fork') - - @property - def language(self) -> str: - """:class:`str`: Primary language of the repository.""" - return self._response.get('language') - - @property - def open_issues(self) -> int: - """:class:`int`: The number of open issues on the repository.""" - return self._response.get('open_issues') - - @property - def forks(self) -> int: - """:class:`int`: The number of forks of the repository.""" - return self._response.get('forks') - - @property - def default_branch(self) -> str: - """:class:`str`: The default branch of the repository.""" - return self._response.get('default_branch') - - async def delete(self) -> None: - """Deletes the repository.""" - return await self._http.delete_repo( - self.owner.name, # type: ignore this shit is not my fault - self.name, - ) # type: ignore - - async def add_file(self, filename: str, message: str, content: str, branch: Optional[str] = None) -> None: - """Adds a file to the repository. - - Parameters - ---------- - filename: :class:`str` The name of the file. - message: :class:`str` The commit message. - content: :class:`str` The content of the file. - branch: :class:`str` The branch to add the file to, defaults to the default branch. - """ - - if branch is None: - branch = self.default_branch - - return await self._http.add_file(owner=self.owner.name, repo_name=self.name, filename=filename, content=content, message=message, branch=branch) # type: ignore - - -class Issue(APIObject): - """Representation of an issue on Github. - - Attributes - ---------- - id: :class:`int` - The ID of the issue in the API. - title: :class:`str` - The title of the issue in the API. - user: :class:`User` - The user who opened the issue. - labels: List[:class:`str`] - TODO: document this. - state: :class:`str` - The current state of the issue. - created_at: :class:`datetime.datetime` - The time the issue was created. - closed_by: Optional[Union[:class:`PartialUser`, :class:`User`]] - The user the issue was closed by, if applicable. - """ - - __slots__ = ( - 'id', - 'title', - 'user', - 'labels', - 'state', - 'created_at', - 'closed_by', - ) - - def __init__(self, response: Dict[str, Any], _http: http) -> None: - super().__init__(response, _http) - tmp = self.__slots__ + APIObject.__slots__ - keys = {key: value for key, value in self._response.items() if key in tmp} - for key, value in keys.items(): - if key == 'user': - setattr(self, key, PartialUser(value, self._http)) - continue - - if key == 'labels': - setattr(self, key, [label['name'] for label in value]) - continue - - if key == 'closed_by': - setattr(self, key, User(value, self._http)) - continue - - else: - setattr(self, key, value) - continue - - def __repr__(self) -> str: - return ( - f'<{self.__class__.__name__} id: {self.id}, title: {self.title}, user: {self.user}, created_at:' - f' {self.created_at}, state: {self.state}>' - ) - - @property - def updated_at(self) -> Optional[datetime]: - """Optional[:class:`datetime.datetime`]: The time the issue was last updated, if applicable.""" - return dt_formatter(self._response.get('updated_at')) - - @property - def html_url(self) -> str: - """:class:`str`: The human-friendly url of the issue.""" - return self._response.get('html_url') - - -# === Gist stuff ===# - - -class File: - """A wrapper around files and in-memory file-like objects. - - Parameters - ---------- - fp: Union[:class:`str`, :class:`io.StringIO`, :class:`io.BytesIO`] - The filepath or StringIO representing a file to upload. - If providing a StringIO instance, a filename shuold also be provided to the file. - filename: :class:`str` - An override to the file's name, encouraged to provide this if using a StringIO instance. - """ - - def __init__(self, fp: Union[str, io.StringIO, io.BytesIO], filename: str = 'DefaultFilename.txt') -> None: - self.fp = fp - self.filename = filename - - def read(self) -> str: - if isinstance(self.fp, str): - if os.path.exists(self.fp): - with open(self.fp) as fp: - data = fp.read() - return data - return self.fp - elif isinstance(self.fp, io.BytesIO): - return self.fp.read().decode('utf-8') - elif isinstance(self.fp, io.StringIO): # type: ignore - return self.fp.getvalue() - - raise TypeError(f'Expected str, io.StringIO, or io.BytesIO, got {type(self.fp)}') - - -class Gist(APIObject): - """Representation of a gist on Github. - - Attributes - ---------- - id: :class:`int` - The ID of the gist in the API. - html_url: :class:`str` - The human-friendly url of the gist. - files: List[:class:`File`] - A list of the files in the gist, can be an empty list. - public: :class:`bool` - Whether the gist is public. - owner: Union[:class:`PartialUser`, :class:`User`] - The owner of the gist. - created_at: :class:`datetime.datetime` - The time the gist was created at. - """ - - __slots__ = ( - 'id', - 'html_url', - 'node_id', - 'files', - 'public', - 'owner', - 'created_at', - 'truncated', - ) - - def __init__(self, response: Dict[str, Any], _http: http) -> None: - super().__init__(response, _http) - tmp = self.__slots__ + APIObject.__slots__ - keys = {key: value for key, value in self._response.items() if key in tmp} - for key, value in keys.items(): - if key == 'owner': - setattr(self, key, PartialUser(value, self._http)) - continue - if key == 'created_at': - setattr(self, key, dt_formatter(value)) - continue - else: - setattr(self, key, value) - - def __repr__(self) -> str: - return f'<{self.__class__.__name__} id: {self.id}, owner: {self.owner}, created_at: {self.created_at}>' - - @property - def updated_at(self) -> Optional[datetime]: - """Optional[:class:`datetime.datetime`]: The time the gist was last updated, if applicable.""" - return dt_formatter(self._response.get('updated_at')) - - @property - def comments(self) -> str: - """TODO: document this.""" - return self._response.get('comments') - - @property - def discussion(self) -> str: - """TODO: document this.""" - return self._response.get('discussion') - - @property - def raw(self) -> Dict[str, Any]: - """TODO: document this.""" - return self._response - - @property - def url(self) -> str: - return self._response.get('html_url') - - async def delete(self): - """Delete the gist.""" - await self._http.delete_gist(self.id) - - -# === Organization stuff ===# - - -class Organization(APIObject): - """Representation of an organization in the API. - - Attributes - ---------- - login: :class:`str` - TODO: document this - id: :class:`int` - The ID of the organization in the API. - is_verified: :class:`bool` - Whether or not the organization is verified. - created_at: :class:`datetime.datetime` - The time the organization was created at. - avatar_url: :class:`str` - The url of the organization's avatar. - """ - - __slots__ = ( - 'login', - 'id', - 'is_verified', - 'public_repos', - 'public_gists', - 'followers', - 'following', - 'created_at', - 'avatar_url', - ) - - def __init__(self, response: Dict[str, Any], _http: http) -> None: - super().__init__(response, _http) - tmp = self.__slots__ + APIObject.__slots__ - keys = {key: value for key, value in self._response.items() if key in tmp} - for key, value in keys.items(): - if key == 'login': - setattr(self, key, value) - continue - if '_at' in key and value is not None: - setattr(self, key, dt_formatter(value)) - continue - - else: - setattr(self, key, value) - continue - - def __repr__(self): - return ( - f'<{self.__class__.__name__} login: {self.login!r}, id: {self.id}, is_verified: {self.is_verified},' - f' public_repos: {self.public_repos}, public_gists: {self.public_gists}, created_at: {self.created_at}>' - ) - - @property - def description(self): - """:class:`str`: The description of the organization.""" - return self._response.get('description') - - @property - def html_url(self): - """:class:`str`: The human-friendly url of the organization.""" - return self._response.get('html_url') diff --git a/github/objects/__init__.py b/github/objects/__init__.py new file mode 100644 index 0000000..0ad8041 --- /dev/null +++ b/github/objects/__init__.py @@ -0,0 +1,2 @@ +from .file import * +from .object import * diff --git a/github/objects/file.py b/github/objects/file.py new file mode 100644 index 0000000..c2580d9 --- /dev/null +++ b/github/objects/file.py @@ -0,0 +1,26 @@ +__all__ = ("File",) + +import os +from io import BytesIO, StringIO +from pathlib import Path +from typing import Union + + +class File: + def __init__(self, file: Union[str, StringIO, BytesIO], /, *, filename: str) -> None: + self._file = file + self.name = filename + + def read(self) -> str: + f = self._file + + if isinstance(f, BytesIO): + return f.read().decode("utf-8") + + if isinstance(f, StringIO): + return f.getvalue() + + if os.path.exists(f): + return Path(f).read_text() + + return f diff --git a/github/objects/object.py b/github/objects/object.py new file mode 100644 index 0000000..ba66795 --- /dev/null +++ b/github/objects/object.py @@ -0,0 +1,18 @@ +from __future__ import annotations + +__all__ = ("Object",) + +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from ..internals import HTTPClient + + +class Object: + __slots__ = ("__http",) + + def __init__(self, *, http: HTTPClient) -> None: + self.__http = http + + def __repr__(self) -> str: + return f"<{self.__class__.__name__}>" diff --git a/github/types/__init__.py b/github/types/__init__.py new file mode 100644 index 0000000..135a667 --- /dev/null +++ b/github/types/__init__.py @@ -0,0 +1 @@ +from .security_and_analysis import * diff --git a/github/types/security_and_analysis.py b/github/types/security_and_analysis.py new file mode 100644 index 0000000..cff0709 --- /dev/null +++ b/github/types/security_and_analysis.py @@ -0,0 +1,23 @@ +from typing import Literal, TypedDict + +from typing_extensions import NotRequired + +__all__ = ("SecurtiyAndAnalysis",) + + +class AdvancedSecurity(TypedDict): + status: Literal["enabled", "disabled"] + + +class SecretScanning(TypedDict): + status: Literal["enabled", "disabled"] + + +class SecretScanningPushProtection(TypedDict): + status: Literal["enabled", "disabled"] + + +class SecurtiyAndAnalysis(TypedDict): + advanced_security: NotRequired[AdvancedSecurity] + secret_scanning: NotRequired[SecretScanning] + secret_scanning_push_protection: NotRequired[SecretScanningPushProtection] diff --git a/github/urls.py b/github/urls.py deleted file mode 100644 index dc6c639..0000000 --- a/github/urls.py +++ /dev/null @@ -1,43 +0,0 @@ -# == urls.py ==# - -BASE_URL = 'https://api.github.com' - - -# == user urls ==# -USERS_URL = f"{BASE_URL}/users/{{0}}" - -USER_HTML_URL = 'https://github.com/users/{0}' - -SELF_URL = f"{BASE_URL}/user" - -USER_REPOS_URL = f"{USERS_URL}/repos" - -USER_ORGS_URL = f"{USERS_URL}/orgs" - -USER_GISTS_URL = f"{USERS_URL}/gists" - -USER_FOLLOWERS_URL = f"{USERS_URL}/followers" - -USER_FOLLOWING_URL = f"{USERS_URL}/following" - - -# == repo urls ==# -CREATE_REPO_URL = f"{BASE_URL}/user/repos" # _auth repo create - -REPOS_URL = f"{BASE_URL}/repos/{{0}}" # repos of a user - -REPO_URL = f"{BASE_URL}/repos/{{0}}/{{1}}" # a specific repo - -ADD_FILE_URL = f"{BASE_URL}/repos/{{}}/{{}}/contents/{{}}" - -ADD_FILE_BRANCH = f"{BASE_URL}" - -REPO_ISSUE_URL = f"{REPO_URL}/issues/{{2}}" # a specific issue - -# == gist urls ==# -GIST_URL = f"{BASE_URL}/gists/{{0}}" # specific gist - -CREATE_GIST_URL = f"{BASE_URL}/gists" # create a gist - -# == org urls ==# -ORG_URL = f"{BASE_URL}/orgs/{{0}}" diff --git a/github/utils.py b/github/utils.py new file mode 100644 index 0000000..e9f5ff4 --- /dev/null +++ b/github/utils.py @@ -0,0 +1,46 @@ +from __future__ import annotations + +__all__ = ( + "human_readable_time_until", + "str_to_datetime", + "repr_dt", + "bytes_to_b64", + "error_from_request", +) + +from base64 import b64encode +from typing import TYPE_CHECKING, Optional + +from .errors import HTTPError + +if TYPE_CHECKING: + from datetime import datetime, timedelta + + from aiohttp import ClientResponse + + from .errors import BaseHTTPError + + +def human_readable_time_until(td: timedelta, /) -> str: + seconds = int(td.total_seconds()) + hours, seconds = divmod(seconds, 3600) + minutes, seconds = divmod(seconds, 60) + + return f"{hours} hours, {minutes} minues, {seconds} seconds" + + +def str_to_datetime(time: Optional[str], /) -> Optional[datetime]: + return None if time is None else datetime.strptime(time, r"%Y-%m-%dT%H:%M:%SZ") + + +def repr_dt(time: datetime, /) -> str: + return time.strftime(r"%d-%m-%Y, %H:%M:%S") + + +def bytes_to_b64(content: str, /) -> str: + return b64encode(content.encode("utf-8")).decode("ascii") + + +def error_from_request(request: ClientResponse, /) -> BaseHTTPError: + # TODO: Make specific errrors + return HTTPError(request)