1
Fork 0
mirror of https://github.com/RGBCube/GitHubWrapper synced 2025-07-24 23:17:43 +00:00

Merge pull request #8 from NextChai/refactoring

Refactoring for optimizations, Thank you Chai.
This commit is contained in:
Var 2022-04-30 20:07:55 +05:30 committed by GitHub
commit 82112dd23c
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 397 additions and 290 deletions

3
.gitignore vendored
View file

@ -1,3 +1,6 @@
# Used for developer debugging
shell.py
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/
*.py[cod] *.py[cod]

View file

@ -1,4 +1,4 @@
#== __init__.py ==# # == __init__.py ==#
__title__ = 'Github-Api-Wrapper' __title__ = 'Github-Api-Wrapper'
__authors__ = 'VarMonke', 'sudosnok' __authors__ = 'VarMonke', 'sudosnok'
@ -10,4 +10,4 @@ from .main import *
from .objects import * from .objects import *
from .http import * from .http import *
from .urls import * from .urls import *
from .exceptions import * from .exceptions import *

View file

@ -1,92 +1,69 @@
#== cache.py ==# # == cache.py ==#
from __future__ import annotations from __future__ import annotations
__all__ = ( from collections import deque, UserDict
'UserCache', from typing import Any, Deque, Tuple, TypeVar
'RepoCache',
'OrgCache',
)
from collections import deque __all__: Tuple[str, ...] = ('ObjectCache',)
from .objects import APIObject, User, Repository, Organization
class _BaseCache(dict): K = TypeVar('K')
V = TypeVar('V')
class _BaseCache(UserDict[K, V]):
"""This is a rough implementation of an LRU Cache using a deque and a dict.""" """This is a rough implementation of an LRU Cache using a deque and a dict."""
_max_size: int
_lru_keys: deque __slots__: Tuple[str, ...] = ('_max_size', '_lru_keys')
def __init__(self, max_size: int, *args):
self._max_size = max(min(max_size, 15), 0) # bounding max_size to 15 for now def __init__(self, max_size: int, *args: Any) -> None:
self._lru_keys = deque(maxlen=self._max_size) self._max_size: int = max(min(max_size, 15), 0) # bounding max_size to 15 for now
super().__init__(args) self._lru_keys: Deque[K] = deque[K](maxlen=self._max_size)
super().__init__(*args)
def __getitem__(self, __k: str) -> APIObject:
target = self._lru_keys.pop(self._lru_keys.index(__k)) def __getitem__(self, __k: K) -> V:
index = self._lru_keys.index(__k)
target = self._lru_keys[index]
del self._lru_keys[index]
self._lru_keys.appendleft(target) self._lru_keys.appendleft(target)
return super().__getitem__(__k) return super().__getitem__(__k)
def __setitem__(self, __k: str, __v: APIObject) -> None: def __setitem__(self, __k: K, __v: V) -> None:
if len(self) == self._max_size: if len(self) == self._max_size:
to_pop = self._lru_keys.pop(-1) self.__delitem__(self._lru_keys.pop())
del self[to_pop]
self._lru_keys.appendleft(__k) self._lru_keys.appendleft(__k)
return super().__setitem__(__k, __v) return super().__setitem__(__k, __v)
def update(self, *args, **kwargs) -> None: def update(self, **kwargs: Any) -> None:
for key, value in dict(*args, **kwargs).iteritems(): for key, value in dict(**kwargs).items():
self[key] = value key: K
value: V
class UserCache(_BaseCache): self.__setitem__(key, value)
"""This adjusts the typehints to reflect User objects"""
def __getitem__(self, __k: str) -> 'User':
target = self._lru_keys.pop(self._lru_keys.index(__k)) class ObjectCache(_BaseCache[K, V]):
"""This adjusts the typehints to reflect Github objects."""
def __getitem__(self, __k: K) -> V:
index = self._lru_keys.index(__k)
target = self._lru_keys[index]
self._lru_keys.appendleft(target) self._lru_keys.appendleft(target)
return super().__getitem__(__k) return super().__getitem__(__k)
def __setitem__(self, __k: str, __v: 'User') -> None: def __setitem__(self, __k: K, __v: V) -> None:
if len(self) == self._max_size: if self.__len__() == self._max_size:
to_pop = self._lru_keys.pop(-1) self.__delitem__(self._lru_keys.pop())
del self[to_pop]
self._lru_keys.appendleft(__k) self._lru_keys.appendleft(__k)
return super().__setitem__(__k, __v) return super().__setitem__(__k, __v)
def update(self, *args, **kwargs) -> None: def update(self, **kwargs: Any) -> None:
for key, value in dict(*args, **kwargs).iteritems(): for key, value in dict(**kwargs).items():
self[key] = value key: K
value: V
class RepoCache(_BaseCache): self.__setitem__(key, value)
"""This adjusts the typehints to reflect Repo objects"""
def __getitem__(self, __k: str) -> 'Repository':
target = self._lru_keys.pop(self._lru_keys.index(__k))
self._lru_keys.appendleft(target)
return super().__getitem__(__k)
def __setitem__(self, __k: str, __v: 'Repository') -> None:
if len(self) == self._max_size:
to_pop = self._lru_keys.pop(-1)
del self[to_pop]
self._lru_keys.appendleft(__k)
return super().__setitem__(__k, __v)
def update(self, *args, **kwargs) -> None:
for key, value in dict(*args, **kwargs).iteritems():
self[key] = value
class OrgCache(_BaseCache):
def __getitem__(self, __k: str) -> 'Organization':
target = self._lru_keys.pop(self._lru_keys.index(__k))
self._lru_keys.appendleft(target)
return super().__getitem__(__k)
def __setitem__(self, __k: str, __v: 'Organization') -> None:
if len(self) == self._max_size:
to_pop = self._lru_keys.pop(-1)
del self[to_pop]
self._lru_keys.appendleft(__k)
return super().__setitem__(__k, __v)
def update(self, *args, **kwargs) -> None:
for key, value in dict(*args, **kwargs).iteritems():
self[key] = value

View file

@ -1,6 +1,9 @@
#== exceptions.py ==# # == exceptions.py ==#
import datetime import datetime
from aiohttp import ClientResponse
__all__ = ( __all__ = (
'APIError', 'APIError',
'HTTPException', 'HTTPException',
@ -19,108 +22,141 @@ __all__ = (
'IssueNotFound', 'IssueNotFound',
'OrganizationNotFound', 'OrganizationNotFound',
'RepositoryAlreadyExists', 'RepositoryAlreadyExists',
) )
class APIError(Exception): class APIError(Exception):
"""Base level exceptions raised by errors related to any API request or call.""" """Base level exceptions raised by errors related to any API request or call."""
pass pass
class HTTPException(Exception): class HTTPException(Exception):
"""Base level exceptions raised by errors related to HTTP requests.""" """Base level exceptions raised by errors related to HTTP requests."""
pass pass
class ClientException(Exception): class ClientException(Exception):
"""Base level exceptions raised by errors related to the client.""" """Base level exceptions raised by errors related to the client."""
pass pass
class ResourceNotFound(Exception): class ResourceNotFound(Exception):
"""Base level exceptions raised when a resource is not found.""" """Base level exceptions raised when a resource is not found."""
pass pass
class ResourceAlreadyExists(Exception): class ResourceAlreadyExists(Exception):
"""Base level exceptions raised when a resource already exists.""" """Base level exceptions raised when a resource already exists."""
pass pass
class Ratelimited(APIError): class Ratelimited(APIError):
"""Raised when the ratelimit from Github is reached or exceeded.""" """Raised when the ratelimit from Github is reached or exceeded."""
def __init__(self, reset_time: datetime.datetime): def __init__(self, reset_time: datetime.datetime):
formatted = reset_time.strftime(r"%H:%M:%S %A, %d %b") formatted = reset_time.strftime(r"%H:%M:%S %A, %d %b")
msg = "We're being ratelimited, wait until {}.\nAuthentication raises the ratelimit.".format(formatted) msg = "We're being ratelimited, wait until {}.\nAuthentication raises the ratelimit.".format(formatted)
super().__init__(msg) super().__init__(msg)
class WillExceedRatelimit(APIError): class WillExceedRatelimit(APIError):
"""Raised when the library predicts the call will exceed the ratelimit, will abort the call by default.""" """Raised when the library predicts the call will exceed the ratelimit, will abort the call by default."""
def __init__(self, response, count):
def __init__(self, response: ClientResponse, count: int):
msg = 'Performing this action will exceed the ratelimit, aborting.\n{} remaining available calls, calls to make: {}.' msg = 'Performing this action will exceed the ratelimit, aborting.\n{} remaining available calls, calls to make: {}.'
msg = msg.format(response.header['X-RateLimit-Remaining'], count) msg = msg.format(response.headers['X-RateLimit-Remaining'], count)
super().__init__(msg) super().__init__(msg)
class NoAuthProvided(ClientException): class NoAuthProvided(ClientException):
"""Raised when no authentication is provided.""" """Raised when no authentication is provided."""
def __init__(self): def __init__(self):
msg = 'This action required autentication. Pass username and token kwargs to your client instance.' msg = 'This action required autentication. Pass username and token kwargs to your client instance.'
super().__init__(msg) super().__init__(msg)
class InvalidToken(ClientException): class InvalidToken(ClientException):
"""Raised when the token provided is invalid.""" """Raised when the token provided is invalid."""
def __init__(self): def __init__(self):
msg = 'The token provided is invalid.' msg = 'The token provided is invalid.'
super().__init__(msg) super().__init__(msg)
class InvalidAuthCombination(ClientException): class InvalidAuthCombination(ClientException):
"""Raised when the username and token are both provided.""" """Raised when the username and token are both provided."""
def __init__(self): def __init__(self):
msg = 'The username and token cannot be used together.' msg = 'The username and token cannot be used together.'
super().__init__(msg) super().__init__(msg)
class LoginFailure(ClientException): class LoginFailure(ClientException):
"""Raised when the login attempt fails.""" """Raised when the login attempt fails."""
def __init__(self): def __init__(self):
msg = 'The login attempt failed. Provide valid credentials.' msg = 'The login attempt failed. Provide valid credentials.'
super().__init__(msg) super().__init__(msg)
class NotStarted(ClientException): class NotStarted(ClientException):
"""Raised when the client is not started.""" """Raised when the client is not started."""
def __init__(self): def __init__(self):
msg = 'The client is not started. Run Github.GHClient() to start.' msg = 'The client is not started. Run Github.GHClient() to start.'
super().__init__(msg) super().__init__(msg)
class AlreadyStarted(ClientException): class AlreadyStarted(ClientException):
"""Raised when the client is already started.""" """Raised when the client is already started."""
def __init__(self): def __init__(self):
msg = 'The client is already started.' msg = 'The client is already started.'
super().__init__(msg) super().__init__(msg)
class MissingPermissions(APIError): class MissingPermissions(APIError):
def __init__(self): def __init__(self):
msg = 'You do not have permissions to perform this action.' msg = 'You do not have permissions to perform this action.'
super().__init__(msg) super().__init__(msg)
class UserNotFound(ResourceNotFound): class UserNotFound(ResourceNotFound):
def __init__(self): def __init__(self):
msg = 'The requested user was not found.' msg = 'The requested user was not found.'
super().__init__(msg) super().__init__(msg)
class RepositoryNotFound(ResourceNotFound): class RepositoryNotFound(ResourceNotFound):
def __init__(self): def __init__(self):
msg = 'The requested repository is either private or does not exist.' msg = 'The requested repository is either private or does not exist.'
super().__init__(msg) super().__init__(msg)
class IssueNotFound(ResourceNotFound): class IssueNotFound(ResourceNotFound):
def __init__(self): def __init__(self):
msg = 'The requested issue was not found.' msg = 'The requested issue was not found.'
super().__init__(msg) super().__init__(msg)
class OrganizationNotFound(ResourceNotFound): class OrganizationNotFound(ResourceNotFound):
def __init__(self): def __init__(self):
msg = 'The requested organization was not found.' msg = 'The requested organization was not found.'
super().__init__(msg) super().__init__(msg)
class GistNotFound(ResourceNotFound): class GistNotFound(ResourceNotFound):
def __init__(self): def __init__(self):
msg = 'The requested gist was not found.' msg = 'The requested gist was not found.'
super().__init__(msg) super().__init__(msg)
class RepositoryAlreadyExists(ResourceAlreadyExists): class RepositoryAlreadyExists(ResourceAlreadyExists):
def __init__(self): def __init__(self):
msg = 'The requested repository already exists.' msg = 'The requested repository already exists.'

View file

@ -1,21 +1,20 @@
#== http.py ==# # == http.py ==#
from __future__ import annotations from __future__ import annotations
import io
import json import json
import re import re
from collections import namedtuple
from datetime import datetime from datetime import datetime
from types import SimpleNamespace from types import SimpleNamespace
from typing import TYPE_CHECKING, Dict, Union, List from typing import Dict, NamedTuple, Optional, Type, Union, List
from typing_extensions import TypeAlias
import platform import platform
import aiohttp import aiohttp
from .exceptions import * from .exceptions import *
from .exceptions import GistNotFound, RepositoryAlreadyExists, MissingPermissions from .exceptions import GistNotFound, RepositoryAlreadyExists, MissingPermissions
from .objects import APIObject, User, Gist, Repository, Organization, File from .objects import User, Gist, Repository, File
from .urls import * from .urls import *
from . import __version__ from . import __version__
@ -26,104 +25,119 @@ __all__ = (
LINK_PARSING_RE = re.compile(r"<(\S+(\S))>; rel=\"(\S+)\"") LINK_PARSING_RE = re.compile(r"<(\S+(\S))>; rel=\"(\S+)\"")
Rates = namedtuple('Rates', ('remaining', 'used', 'total', 'reset_when', 'last_request'))
class Rates(NamedTuple):
remaining: str
used: str
total: str
reset_when: Union[datetime, str]
last_request: Union[datetime, str]
# aiohttp request tracking / checking bits # aiohttp request tracking / checking bits
async def on_req_start( async def on_req_start(
session: aiohttp.ClientSession, session: aiohttp.ClientSession, ctx: SimpleNamespace, params: aiohttp.TraceRequestStartParams
ctx: SimpleNamespace,
params: aiohttp.TraceRequestStartParams
) -> None: ) -> None:
"""Before-request hook to make sure we don't overrun the ratelimit.""" """Before-request hook to make sure we don't overrun the ratelimit."""
#print(repr(session), repr(ctx), repr(params)) # print(repr(session), repr(ctx), repr(params))
pass pass
async def on_req_end(
session: aiohttp.ClientSession, async def on_req_end(session: aiohttp.ClientSession, ctx: SimpleNamespace, params: aiohttp.TraceRequestEndParams) -> None:
ctx: SimpleNamespace,
params: aiohttp.TraceRequestEndParams
) -> None:
"""After-request hook to adjust remaining requests on this time frame.""" """After-request hook to adjust remaining requests on this time frame."""
headers = params.response.headers headers = params.response.headers
remaining = headers['X-RateLimit-Remaining'] remaining = headers['X-RateLimit-Remaining']
used = headers['X-RateLimit-Used'] used = headers['X-RateLimit-Used']
total = headers['X-RateLimit-Limit'] total = headers['X-RateLimit-Limit']
reset_when = datetime.fromtimestamp(int(headers['X-RateLimit-Reset'])) reset_when = datetime.fromtimestamp(int(headers['X-RateLimit-Reset']))
last_req = datetime.utcnow() last_req = datetime.utcnow()
session._rates = Rates(remaining, used, total, reset_when, last_req) session._rates = Rates(remaining, used, total, reset_when, last_req)
trace_config = aiohttp.TraceConfig() trace_config = aiohttp.TraceConfig()
trace_config.on_request_start.append(on_req_start) trace_config.on_request_start.append(on_req_start)
trace_config.on_request_end.append(on_req_end) trace_config.on_request_end.append(on_req_end)
APIType: TypeAlias = Union[User, Gist, Repository]
async def make_session(*, headers: Dict[str, str], authorization: Union[aiohttp.BasicAuth, None]) -> aiohttp.ClientSession: async def make_session(*, headers: Dict[str, str], authorization: Union[aiohttp.BasicAuth, None]) -> aiohttp.ClientSession:
"""This makes the ClientSession, attaching the trace config and ensuring a UA header is present.""" """This makes the ClientSession, attaching the trace config and ensuring a UA header is present."""
if not headers.get('User-Agent'): if not headers.get('User-Agent'):
headers['User-Agent'] = f'Github-API-Wrapper (https://github.com/VarMonke/Github-Api-Wrapper) @ {__version__} Python {platform.python_version()} aiohttp {aiohttp.__version__}' headers[
'User-Agent'
] = f'Github-API-Wrapper (https://github.com/VarMonke/Github-Api-Wrapper) @ {__version__} Python {platform.python_version()} aiohttp {aiohttp.__version__}'
session = aiohttp.ClientSession( session = aiohttp.ClientSession(auth=authorization, headers=headers, trace_configs=[trace_config])
auth=authorization, session._rates = Rates('', '', '', '', '')
headers=headers,
trace_configs=[trace_config]
)
session._rates = Rates('', '' , '', '', '')
return session return session
# pagination # pagination
class Paginator: class Paginator:
"""This class handles pagination for objects like Repos and Orgs.""" """This class handles pagination for objects like Repos and Orgs."""
def __init__(self, session: aiohttp.ClientSession, response: aiohttp.ClientResponse, target_type: str): def __init__(self, session: aiohttp.ClientSession, response: aiohttp.ClientResponse, target_type: str):
self.session = session self.session = session
self.response = response self.response = response
self.should_paginate = bool(self.response.headers.get('Link', False)) self.should_paginate = bool(self.response.headers.get('Link', False))
types: Dict[str, APIObject] = { types: Dict[str, Type[APIType]] = { # note: the type checker doesnt see subclasses like that
'user': User, 'user': User,
'gist' : Gist, 'gist': Gist,
'repo' : Repository 'repo': Repository,
} }
self.target_type = types[target_type] self.target_type: Type[APIType] = types[target_type]
self.pages = {} self.pages = {}
self.is_exhausted = False self.is_exhausted = False
self.current_page = 1 self.current_page = 1
self.next_page = self.current_page + 1 self.next_page = self.current_page + 1
self.parse_header(response) self.parse_header(response)
async def fetch_page(self, link) -> Dict[str, Union[str, int]]: async def fetch_page(self, link: str) -> Dict[str, Union[str, int]]:
"""Fetches a specific page and returns the JSON.""" """Fetches a specific page and returns the JSON."""
return await (await self.session.get(link)).json() return await (await self.session.get(link)).json()
async def early_return(self) -> List[APIObject]: async def early_return(self) -> List[APIType]:
# I don't rightly remember what this does differently, may have a good ol redesign later # I don't rightly remember what this does differently, may have a good ol redesign later
return [self.target_type(data, self.session) for data in await self.response.json()] return [self.target_type(data, self) for data in await self.response.json()] # type: ignore
async def exhaust(self) -> List[APIObject]: async def exhaust(self) -> List[APIType]:
"""Iterates through all of the pages for the relevant object and creates them.""" """Iterates through all of the pages for the relevant object and creates them."""
if self.should_paginate: if self.should_paginate:
return await self.early_return() return await self.early_return()
out = []
for page in range(1, self.max_page+1): out: List[APIType] = []
for page in range(1, self.max_page + 1):
result = await self.session.get(self.bare_link + str(page)) result = await self.session.get(self.bare_link + str(page))
out.extend([self.target_type(item, self.session) for item in await result.json()]) out.extend([self.target_type(item, self) for item in await result.json()]) # type: ignore
self.is_exhausted = True self.is_exhausted = True
return out return out
def parse_header(self, response: aiohttp.ClientResponse) -> None: def parse_header(self, response: aiohttp.ClientResponse) -> None:
"""Predicts wether a call will exceed the ratelimit ahead of the call.""" """Predicts wether a call will exceed the ratelimit ahead of the call."""
header = response.headers.get('Link') header = response.headers['Link']
groups = LINK_PARSING_RE.findall(header) groups = LINK_PARSING_RE.findall(header)
self.max_page = int(groups[1][1]) self.max_page = int(groups[1][1])
if int(response.headers['X-RateLimit-Remaining']) < self.max_page: if int(response.headers['X-RateLimit-Remaining']) < self.max_page:
raise WillExceedRatelimit(response, self.max_page) raise WillExceedRatelimit(response, self.max_page)
self.bare_link = groups[0][0][:-1] self.bare_link = groups[0][0][:-1]
GithubUserData = GithubRepoData = GithubIssueData = GithubOrgData = GithubGistData = Dict[str, Union [str, int]]
# GithubUserData = GithubRepoData = GithubIssueData = GithubOrgData = GithubGistData = Dict[str, Union [str, int]]
# Commentnig this out for now, consider using TypeDict's instead in the future <3
class http: class http:
def __init__(self, headers: Dict[str, Union[str, int]], auth: Union[aiohttp.BasicAuth, None]): def __init__(self, headers: Dict[str, Union[str, int]], auth: Union[aiohttp.BasicAuth, None]) -> None:
if not headers.get('User-Agent'): if not headers.get('User-Agent'):
headers['User-Agent'] = f'Github-API-Wrapper (https://github.com/VarMonke/Github-Api-Wrapper) @ {__version__} Python/{platform.python_version()} aiohttp/{aiohttp.__version__}' headers[
'User-Agent'
] = f'Github-API-Wrapper (https://github.com/VarMonke/Github-Api-Wrapper) @ {__version__} Python/{platform.python_version()} aiohttp/{aiohttp.__version__}'
self._rates = Rates('', '', '', '', '') self._rates = Rates('', '', '', '', '')
self.headers = headers self.headers = headers
self.auth = auth self.auth = auth
@ -133,7 +147,7 @@ class http:
async def start(self): async def start(self):
self.session = aiohttp.ClientSession( self.session = aiohttp.ClientSession(
headers=self.headers, headers=self.headers, # type: ignore
auth=self.auth, auth=self.auth,
trace_configs=[trace_config], trace_configs=[trace_config],
) )
@ -144,23 +158,20 @@ class http:
def update_headers(self, *, flush: bool = False, new_headers: Dict[str, Union[str, int]]): def update_headers(self, *, flush: bool = False, new_headers: Dict[str, Union[str, int]]):
if flush: if flush:
from multidict import CIMultiDict from multidict import CIMultiDict
self.session.headers = CIMultiDict(**new_headers)
self.session._default_headers = CIMultiDict(**new_headers) # type: ignore
else: else:
self.session.headers = {**self.session.headers, **new_headers} self.session._default_headers = {**self.session.headers, **new_headers} # type: ignore
async def update_auth(self, *, username: str, token: str): async def update_auth(self, *, username: str, token: str):
auth = aiohttp.BasicAuth(username, token) auth = aiohttp.BasicAuth(username, token)
headers = self.session.headers headers = self.session.headers
config = self.session.trace_configs config = self.session.trace_configs
await self.session.close() await self.session.close()
self.session = aiohttp.ClientSession( self.session = aiohttp.ClientSession(headers=headers, auth=auth, trace_configs=config)
headers=headers,
auth=auth,
trace_configs=config
)
def data(self): def data(self):
#return session headers and auth # return session headers and auth
headers = {**self.session.headers} headers = {**self.session.headers}
return {'headers': headers, 'auth': self.auth} return {'headers': headers, 'auth': self.auth}
@ -170,56 +181,59 @@ class http:
await self.session.get(BASE_URL) await self.session.get(BASE_URL)
return (datetime.utcnow() - start).total_seconds() return (datetime.utcnow() - start).total_seconds()
async def get_self(self) -> GithubUserData: async def get_self(self) -> Dict[str, Union[str, int]]:
"""Returns the authenticated User's data""" """Returns the authenticated User's data"""
result = await self.session.get(SELF_URL) result = await self.session.get(SELF_URL)
if 200 <= result.status <= 299: if 200 <= result.status <= 299:
return await result.json() return await result.json()
raise InvalidToken raise InvalidToken
async def get_user(self, username: str) -> GithubUserData: async def get_user(self, username: str) -> Dict[str, Union[str, int]]:
"""Returns a user's public data in JSON format.""" """Returns a user's public data in JSON format."""
result = await self.session.get(USERS_URL.format(username)) result = await self.session.get(USERS_URL.format(username))
if 200 <= result.status <= 299: if 200 <= result.status <= 299:
return await result.json() return await result.json()
raise UserNotFound raise UserNotFound
async def get_user_repos(self, _user: User) -> List[GithubRepoData]: async def get_user_repos(self, _user: User) -> List[Dict[str, Union[str, int]]]:
result = await self.session.get(USER_REPOS_URL.format(_user.login)) result = await self.session.get(USER_REPOS_URL.format(_user.login))
if 200 <= result.status <= 299: if 200 <= result.status <= 299:
return await result.json() return await result.json()
else:
print('This shouldn\'t be reachable')
async def get_user_gists(self, _user: User) -> List[GithubGistData]: print('This shouldn\'t be reachable')
return []
async def get_user_gists(self, _user: User) -> List[Dict[str, Union[str, int]]]:
result = await self.session.get(USER_GISTS_URL.format(_user.login)) result = await self.session.get(USER_GISTS_URL.format(_user.login))
if 200 <= result.status <= 299: if 200 <= result.status <= 299:
return await result.json() return await result.json()
else:
print('This shouldn\'t be reachable')
async def get_user_orgs(self, _user: User) -> List[GithubOrgData]: print('This shouldn\'t be reachable')
return []
async def get_user_orgs(self, _user: User) -> List[Dict[str, Union[str, int]]]:
result = await self.session.get(USER_ORGS_URL.format(_user.login)) result = await self.session.get(USER_ORGS_URL.format(_user.login))
if 200 <= result.status <= 299: if 200 <= result.status <= 299:
return await result.json() return await result.json()
else:
print('This shouldn\'t be reachable')
async def get_repo(self, owner: str, repo_name: str) -> GithubRepoData: print('This shouldn\'t be reachable')
return []
async def get_repo(self, owner: str, repo_name: str) -> Dict[str, Union[str, int]]:
"""Returns a Repo's raw JSON from the given owner and repo name.""" """Returns a Repo's raw JSON from the given owner and repo name."""
result = await self.session.get(REPO_URL.format(owner, repo_name)) result = await self.session.get(REPO_URL.format(owner, repo_name))
if 200 <= result.status <= 299: if 200 <= result.status <= 299:
return await result.json() return await result.json()
raise RepositoryNotFound raise RepositoryNotFound
async def get_repo_issue(self, owner: str, repo_name: str, issue_number: int) -> GithubIssueData: async def get_repo_issue(self, owner: str, repo_name: str, issue_number: int) -> Dict[str, Union[str, int]]:
"""Returns a single issue's JSON from the given owner and repo name.""" """Returns a single issue's JSON from the given owner and repo name."""
result = await self.session.get(REPO_ISSUE_URL.format(owner, repo_name, issue_number)) result = await self.session.get(REPO_ISSUE_URL.format(owner, repo_name, issue_number))
if 200 <= result.status <= 299: if 200 <= result.status <= 299:
return await result.json() return await result.json()
raise IssueNotFound raise IssueNotFound
async def delete_repo(self, owner: str, repo_name: str) -> None: async def delete_repo(self, owner: str, repo_name: str) -> Optional[str]:
"""Deletes a Repo from the given owner and repo name.""" """Deletes a Repo from the given owner and repo name."""
result = await self.session.delete(REPO_URL.format(owner, repo_name)) result = await self.session.delete(REPO_URL.format(owner, repo_name))
if 204 <= result.status <= 299: if 204 <= result.status <= 299:
@ -228,7 +242,7 @@ class http:
raise MissingPermissions raise MissingPermissions
raise RepositoryNotFound raise RepositoryNotFound
async def delete_gist(self, gist_id: int) -> None: async def delete_gist(self, gist_id: int) -> Optional[str]:
"""Deletes a Gist from the given gist id.""" """Deletes a Gist from the given gist id."""
result = await self.session.delete(GIST_URL.format(gist_id)) result = await self.session.delete(GIST_URL.format(gist_id))
if result.status == 204: if result.status == 204:
@ -237,14 +251,14 @@ class http:
raise MissingPermissions raise MissingPermissions
raise GistNotFound raise GistNotFound
async def get_org(self, org_name: str) -> GithubOrgData: async def get_org(self, org_name: str) -> Dict[str, Union[str, int]]:
"""Returns an org's public data in JSON format.""" """Returns an org's public data in JSON format."""
result = await self.session.get(ORG_URL.format(org_name)) result = await self.session.get(ORG_URL.format(org_name))
if 200 <= result.status <= 299: if 200 <= result.status <= 299:
return await result.json() return await result.json()
raise OrganizationNotFound raise OrganizationNotFound
async def get_gist(self, gist_id: int) -> GithubGistData: async def get_gist(self, gist_id: int) -> Dict[str, Union[str, int]]:
"""Returns a gist's raw JSON from the given gist id.""" """Returns a gist's raw JSON from the given gist id."""
result = await self.session.get(GIST_URL.format(gist_id)) result = await self.session.get(GIST_URL.format(gist_id))
if 200 <= result.status <= 299: if 200 <= result.status <= 299:
@ -252,29 +266,26 @@ class http:
raise GistNotFound raise GistNotFound
async def create_gist( async def create_gist(
self, self, *, files: List['File'] = [], description: str = 'Default description', public: bool = False
*, ) -> Dict[str, Union[str, int]]:
files: List['File'] = [],
description: str = 'Default description',
public: bool = False
) -> GithubGistData:
data = {} data = {}
data['description'] = description data['description'] = description
data['public'] = public data['public'] = public
data['files'] = {} data['files'] = {}
for file in files: for file in files:
data['files'][file.filename] = { data['files'][file.filename] = {'filename': file.filename, 'content': file.read()} # helps editing the file
'filename' : file.filename, # helps editing the file
'content': file.read()
}
data = json.dumps(data) data = json.dumps(data)
_headers = dict(self.session.headers) _headers = dict(self.session.headers)
result = await self.session.post(CREATE_GIST_URL, data=data, headers=_headers|{'Accept': 'application/vnd.github.v3+json'}) result = await self.session.post(
CREATE_GIST_URL, data=data, headers=_headers | {'Accept': 'application/vnd.github.v3+json'}
)
if 201 == result.status: if 201 == result.status:
return await result.json() return await result.json()
raise InvalidToken raise InvalidToken
async def create_repo(self, name: str, description: str, public: bool, gitignore: str, license: str) -> GithubRepoData: async def create_repo(
self, name: str, description: str, public: bool, gitignore: Optional[str], license: Optional[str]
) -> Dict[str, Union[str, int]]:
"""Creates a repo for you with given data""" """Creates a repo for you with given data"""
data = { data = {
'name': name, 'name': name,

View file

@ -1,26 +1,40 @@
#== main.py ==# # == main.py ==#
from __future__ import annotations from __future__ import annotations
from datetime import datetime
__all__ = ( __all__ = ("GHClient",)
'GHClient',
)
import asyncio
import functools import functools
from typing import Union, List, Dict
import aiohttp import aiohttp
from typing import (
Awaitable,
Callable,
Literal,
Any,
Coroutine,
Dict,
Generator,
Optional,
Union,
List,
overload,
TypeVar,
)
from typing_extensions import Self, ParamSpec, Concatenate
from . import exceptions from . import exceptions
from .cache import RepoCache, UserCache from .cache import ObjectCache
from .http import http from .http import http
from .objects import Gist, Issue, Organization, Repository, User, File from .objects import Gist, Issue, Organization, Repository, User, File
T = TypeVar('T')
P = ParamSpec('P')
class GHClient: class GHClient:
_auth = None has_started: bool = False
has_started = False
http: http
def __init__( def __init__(
self, self,
*, *,
@ -28,47 +42,77 @@ class GHClient:
token: Union[str, None] = None, token: Union[str, None] = None,
user_cache_size: int = 30, user_cache_size: int = 30,
repo_cache_size: int = 15, repo_cache_size: int = 15,
custom_headers: dict[str, Union[str, int]] = {} custom_headers: dict[str, Union[str, int]] = {},
): ):
"""The main client, used to start most use-cases.""" """The main client, used to start most use-cases."""
self._headers = custom_headers self._headers = custom_headers
bound = lambda hi, lo, value: max(min(value, hi), lo)
self._user_cache = UserCache(bound(50, 0, user_cache_size))
self._repo_cache = RepoCache(bound(50, 0, repo_cache_size))
if username and token: if username and token:
self.username = username self.username = username
self.token = token self.token = token
self._auth = aiohttp.BasicAuth(username, token) self._auth = aiohttp.BasicAuth(username, token)
else:
self._auth = None
self.username = None
self.token = None
def __await__(self) -> 'GHClient': self.http = http(headers=custom_headers, auth=self._auth)
self._user_cache = ObjectCache[Any, User](user_cache_size)
self._repo_cache = ObjectCache[Any, Repository](repo_cache_size)
# Cache manegent
self._cache(type='user')(self.get_self) # type: ignore
self._cache(type='user')(self.get_user) # type: ignore
self._cache(type='repo')(self.get_repo) # type: ignore
def __call__(self, *args: Any, **kwargs: Any) -> Coroutine[Any, Any, Self]:
return self.start(*args, **kwargs)
def __await__(self) -> Generator[Any, Any, Self]:
return self.start().__await__() return self.start().__await__()
async def __aenter__(self) -> Self:
await self.start()
return self
async def __aexit__(self, *args: Any, **kwargs: Any) -> None:
if session := getattr(self.http, 'session', None):
await session.close()
def __repr__(self) -> str: def __repr__(self) -> str:
return f'<{self.__class__.__name__} has_auth={bool(self._auth)}>' return f'<{self.__class__.__name__} has_auth={bool(self._auth)}>'
def __del__(self): @overload
asyncio.create_task(self.http.session.close()) def check_limits(self, as_dict: Literal[True] = True) -> Dict[str, Union[str, int]]:
...
def check_limits(self, as_dict: bool = False) -> dict[str, str | int] | list[str]: @overload
def check_limits(self, as_dict: Literal[False] = False) -> List[str]:
...
def check_limits(self, as_dict: bool = False) -> Union[Dict[str, Union[str, int]], List[str]]:
if not self.has_started: if not self.has_started:
raise exceptions.NotStarted raise exceptions.NotStarted
if not as_dict: if not as_dict:
output = [] output: List[str] = []
for key, value in self.http.session._rates._asdict().items(): for key, value in self.http.session._rates._asdict().items(): # type: ignore
output.append(f'{key} : {value}') output.append(f"{key} : {value}")
return output return output
return self.http.session._rates
return self.http.session._rates # type: ignore
async def update_auth(self, username: str, token: str) -> None: async def update_auth(self, username: str, token: str) -> None:
"""Allows you to input auth information after instantiating the client.""" """Allows you to input auth information after instantiating the client."""
#check if username and token is valid # check if username and token is valid
await self.http.update_auth(username=username, token=token) await self.http.update_auth(username=username, token=token)
try: try:
await self.http.get_self() await self.http.get_self()
except exceptions.InvalidToken as exc: except exceptions.InvalidToken as exc:
raise exceptions.InvalidToken from exc raise exceptions.InvalidToken from exc
async def start(self) -> 'GHClient': async def start(self) -> Self:
"""Main entry point to the wrapper, this creates the ClientSession.""" """Main entry point to the wrapper, this creates the ClientSession."""
if self.has_started: if self.has_started:
raise exceptions.AlreadyStarted raise exceptions.AlreadyStarted
@ -83,78 +127,93 @@ class GHClient:
self.has_started = True self.has_started = True
return self return self
def _cache(*args, **kwargs): def _cache(
target_type = kwargs.get('type') self: Self, *, type: str
def wrapper(func): ) -> Callable[
[Callable[Concatenate[Self, P], Awaitable[T]]],
Callable[Concatenate[Self, P], Awaitable[Optional[Union[T, User, Repository]]]],
]:
def wrapper(
func: Callable[Concatenate[Self, P], Awaitable[T]]
) -> Callable[Concatenate[Self, P], Awaitable[Optional[Union[T, User, Repository]]]]:
@functools.wraps(func) @functools.wraps(func)
async def wrapped(self, *args, **kwargs): async def wrapped(self: Self, *args: P.args, **kwargs: P.kwargs) -> Optional[Union[T, User, Repository]]:
if target_type == 'User': if type == 'user':
if (obj := self._user_cache.get(kwargs.get('user'))): if obj := self._user_cache.get(kwargs.get('user')):
return obj return obj
else:
res = await func(self, *args, **kwargs) user: User = await func(self, *args, **kwargs) # type: ignore
self._user_cache[kwargs.get('user')] = res self._user_cache[kwargs.get("user")] = user
return res return user
if target_type == 'Repo': if type == 'repo':
if (obj := self._repo_cache.get(kwargs.get('repo'))): if obj := self._repo_cache.get(kwargs.get('repo')):
return obj return obj
else:
res = await func(self, *args, **kwargs) repo: Repository = await func(self, *args, **kwargs) # type: ignore
self._repo_cache[kwargs.get('repo')] = res self._repo_cache[kwargs.get('repo')] = repo
return res return repo
return wrapped return wrapped
return wrapper return wrapper
#@_cache(type='User') # @_cache(type='User')
async def get_self(self) -> User: async def get_self(self) -> User:
"""Returns the authenticated User object.""" """Returns the authenticated User object."""
if self._auth: if self._auth:
return User(await self.http.get_self(), self.http.session) return User(await self.http.get_self(), self.http)
else: else:
raise exceptions.NoAuthProvided raise exceptions.NoAuthProvided
@_cache(type='User')
async def get_user(self, *, user: str) -> User: async def get_user(self, *, user: str) -> User:
"""Fetch a Github user from their username.""" """Fetch a Github user from their username."""
return User(await self.http.get_user(user), self.http.session) return User(await self.http.get_user(user), self.http)
@_cache(type='Repo')
async def get_repo(self, *, owner: str, repo: str) -> Repository: async def get_repo(self, *, owner: str, repo: str) -> Repository:
"""Fetch a Github repository from it's name.""" """Fetch a Github repository from it's name."""
return Repository(await self.http.get_repo(owner, repo), self.http.session) return Repository(await self.http.get_repo(owner, repo), self.http)
async def get_issue(self, *, owner: str, repo: str, issue: int) -> Issue: async def get_issue(self, *, owner: str, repo: str, issue: int) -> Issue:
"""Fetch a Github Issue from it's name.""" """Fetch a Github Issue from it's name."""
return Issue(await self.http.get_repo_issue(owner, repo, issue), self.http.session) return Issue(await self.http.get_repo_issue(owner, repo, issue), self.http)
async def create_repo(self, name: str, description: str = 'Repository created using Github-Api-Wrapper.', public: bool = False,gitignore: str = None, license: str = None) -> Repository: async def create_repo(
return Repository(await self.http.create_repo(name,description,public,gitignore,license), self.http.session) self,
name: str,
description: str = 'Repository created using Github-Api-Wrapper.',
public: bool = False,
gitignore: Optional[str] = None,
license: Optional[str] = None,
) -> Repository:
return Repository(
await self.http.create_repo(name, description, public, gitignore, license),
self.http,
)
async def delete_repo(self, repo: str= None, owner: str = None) -> None: async def delete_repo(self, repo: str, owner: str) -> Optional[str]:
"""Delete a Github repository, requires authorisation.""" """Delete a Github repository, requires authorisation."""
owner = owner or self.username owner = owner or self.username # type: ignore
return await self.http.delete_repo(owner, repo) return await self.http.delete_repo(owner, repo)
async def get_gist(self, gist: int) -> Gist: async def get_gist(self, gist: int) -> Gist:
"""Fetch a Github gist from it's id.""" """Fetch a Github gist from it's id."""
return Gist(await self.http.get_gist(gist), self.http.session) return Gist(await self.http.get_gist(gist), self.http)
async def create_gist(self, *, files: List[File], description: str, public: bool) -> Gist: async def create_gist(self, *, files: List[File], description: str, public: bool) -> Gist:
"""Creates a Gist with the given files, requires authorisation.""" """Creates a Gist with the given files, requires authorisation."""
return Gist(await self.http.create_gist(files=files, description=description, public=public), self.http.session) return Gist(
await self.http.create_gist(files=files, description=description, public=public),
self.http,
)
async def delete_gist(self, gist: int) -> None: async def delete_gist(self, gist: int) -> Optional[str]:
"""Delete a Github gist, requires authorisation.""" """Delete a Github gist, requires authorisation."""
return await self.http.delete_gist(gist) return await self.http.delete_gist(gist)
async def get_org(self, org: str) -> Organization: async def get_org(self, org: str) -> Organization:
"""Fetch a Github organization from it's name.""" """Fetch a Github organization from it's name."""
return Organization(await self.http.get_org(org), self.http.session) return Organization(await self.http.get_org(org), self.http)
async def latency(self) -> float: async def latency(self) -> float:
"""Returns the latency of the client.""" """Returns the latency of the client."""
return await self.http.latency() return await self.http.latency()

View file

@ -1,7 +1,7 @@
#== objects.py ==# # == objects.py ==#
from __future__ import annotations from __future__ import annotations
from typing import TYPE_CHECKING, Any, Union, List, Dict from typing import TYPE_CHECKING, Any, Optional, Tuple, Union, Dict
if TYPE_CHECKING: if TYPE_CHECKING:
from .http import http from .http import http
@ -23,21 +23,22 @@ __all__ = (
'Organization', 'Organization',
) )
def dt_formatter(time_str: str) -> datetime:
def dt_formatter(time_str: str) -> Optional[datetime]:
if time_str is not None: if time_str is not None:
return datetime.strptime(time_str, r"%Y-%m-%dT%H:%M:%SZ") return datetime.strptime(time_str, r"%Y-%m-%dT%H:%M:%SZ")
return None return None
def repr_dt(_datetime: datetime) -> str: def repr_dt(_datetime: datetime) -> str:
return _datetime.strftime(r'%d-%m-%Y, %H:%M:%S') return _datetime.strftime(r'%d-%m-%Y, %H:%M:%S')
class APIObject:
__slots__ = (
'_response',
'_http'
)
def __init__(self, response: Dict[str, Any] , _http: http) -> None: class APIObject:
__slots__: Tuple[str, ...] = ('_response', '_http')
def __init__(self, response: Dict[str, Any], _http: http) -> None:
self._http = _http self._http = _http
self._response = response self._response = response
@ -45,14 +46,16 @@ class APIObject:
return f'<{self.__class__.__name__}>' return f'<{self.__class__.__name__}>'
#=== User stuff ===# # === User stuff ===#
class _BaseUser(APIObject): class _BaseUser(APIObject):
__slots__ = ( __slots__ = (
'login', 'login',
'id', 'id',
) )
def __init__(self, response: dict, _http: http) -> None:
def __init__(self, response: Dict[str, Any], _http: http) -> None:
super().__init__(response, _http) super().__init__(response, _http)
self._http = _http self._http = _http
self.login = response.get('login') self.login = response.get('login')
@ -62,19 +65,19 @@ class _BaseUser(APIObject):
return f'<{self.__class__.__name__} id = {self.id}, login = {self.login!r}>' return f'<{self.__class__.__name__} id = {self.id}, login = {self.login!r}>'
async def repos(self) -> list[Repository]: async def repos(self) -> list[Repository]:
results = await self._http.get_user_repos(self) results = await self._http.get_user_repos(self) # type: ignore
return [Repository(data, self._http) for data in results] return [Repository(data, self._http) for data in results]
async def gists(self) -> list[Gist]: async def gists(self) -> list[Gist]:
results = await self._http.get_user_gists(self) results = await self._http.get_user_gists(self) # type: ignore
return [Gist(data, self._http) for data in results] return [Gist(data, self._http) for data in results]
async def orgs(self) -> list[Organization]: async def orgs(self) -> list[Organization]:
results = await self._http.get_user_orgs(self) results = await self._http.get_user_orgs(self) # type: ignore
return [Organization(data, self._http) for data in results] return [Organization(data, self._http) for data in results]
class User(_BaseUser): class User(_BaseUser):
__slots__ = ( __slots__ = (
'login', 'login',
'id', 'id',
@ -85,11 +88,12 @@ class User(_BaseUser):
'followers', 'followers',
'following', 'following',
'created_at', 'created_at',
) )
def __init__(self, response: dict, _http: http) -> None:
def __init__(self, response: Dict[str, Any], _http: http) -> None:
super().__init__(response, _http) super().__init__(response, _http)
tmp = self.__slots__ + _BaseUser.__slots__ tmp = self.__slots__ + _BaseUser.__slots__
keys = {key: value for key,value in self._response.items() if key in tmp} keys = {key: value for key, value in self._response.items() if key in tmp}
for key, value in keys.items(): for key, value in keys.items():
if '_at' in key and value is not None: if '_at' in key and value is not None:
setattr(self, key, dt_formatter(value)) setattr(self, key, dt_formatter(value))
@ -107,32 +111,37 @@ class PartialUser(_BaseUser):
'site_admin', 'site_admin',
'html_url', 'html_url',
'avatar_url', 'avatar_url',
) + _BaseUser.__slots__ ) + _BaseUser.__slots__
def __init__(self, response: dict, _http: http) -> None: def __init__(self, response: Dict[str, Any], _http: http) -> None:
super().__init__(response, _http) super().__init__(response, _http)
self.site_admin = response.get('site_admin') self.site_admin: Optional[str] = response.get('site_admin')
self.html_url = response.get('html_url') self.html_url: Optional[str] = response.get('html_url')
self.avatar_url = response.get('avatar_url') self.avatar_url: Optional[str] = response.get('avatar_url')
def __repr__(self) -> str: def __repr__(self) -> str:
return f'<{self.__class__.__name__} login: {self.login!r}, id: {self.id}, site_admin: {self.site_admin}, html_url: {self.html_url}>' return f'<{self.__class__.__name__} login: {self.login!r}, id: {self.id}, site_admin: {self.site_admin}, html_url: {self.html_url}>'
async def _get_user(self) -> User: async def _get_user(self) -> User:
"""Upgrades the PartialUser to a User object.""" """Upgrades the PartialUser to a User object."""
response = await self._http.get_user(self.login) response = await self._http.get_user(self.login)
return User(response, self._http) return User(response, self._http)
#=== Repository stuff ===# # === Repository stuff ===#
class Repository(APIObject): class Repository(APIObject):
if TYPE_CHECKING:
id: int
name: str
owner: str
__slots__ = ( __slots__ = (
'id', 'id',
'name', 'name',
'owner', 'owner',
'size' 'size' 'created_at',
'created_at',
'url', 'url',
'html_url', 'html_url',
'archived', 'archived',
@ -144,11 +153,12 @@ class Repository(APIObject):
'stargazers_count', 'stargazers_count',
'watchers_count', 'watchers_count',
'license', 'license',
) )
def __init__(self, response: dict, _http: http) -> None:
def __init__(self, response: Dict[str, Any], _http: http) -> None:
super().__init__(response, _http) super().__init__(response, _http)
tmp = self.__slots__ + APIObject.__slots__ tmp = self.__slots__ + APIObject.__slots__
keys = {key: value for key,value in self._response.items() if key in tmp} keys = {key: value for key, value in self._response.items() if key in tmp}
for key, value in keys.items(): for key, value in keys.items():
if key == 'owner': if key == 'owner':
setattr(self, key, PartialUser(value, self._http)) setattr(self, key, PartialUser(value, self._http))
@ -191,6 +201,7 @@ class Repository(APIObject):
def forks(self) -> int: def forks(self) -> int:
return self._response.get('forks') return self._response.get('forks')
class Issue(APIObject): class Issue(APIObject):
__slots__ = ( __slots__ = (
'id', 'id',
@ -202,10 +213,10 @@ class Issue(APIObject):
'closed_by', 'closed_by',
) )
def __init__(self, response: dict, _http: http) -> None: def __init__(self, response: Dict[str, Any], _http: http) -> None:
super().__init__(response, _http) super().__init__(response, _http)
tmp = self.__slots__ + APIObject.__slots__ tmp = self.__slots__ + APIObject.__slots__
keys = {key: value for key,value in self._response.items() if key in tmp} keys = {key: value for key, value in self._response.items() if key in tmp}
for key, value in keys.items(): for key, value in keys.items():
if key == 'user': if key == 'user':
setattr(self, key, PartialUser(value, self._http)) setattr(self, key, PartialUser(value, self._http))
@ -227,17 +238,19 @@ class Issue(APIObject):
return f'<{self.__class__.__name__} id: {self.id}, title: {self.title}, user: {self.user}, created_at: {self.created_at}, state: {self.state}>' return f'<{self.__class__.__name__} id: {self.id}, title: {self.title}, user: {self.user}, created_at: {self.created_at}, state: {self.state}>'
@property @property
def updated_at(self) -> str: def updated_at(self) -> Optional[datetime]:
return dt_formatter(self._response.get('updated_at')) return dt_formatter(self._response.get('updated_at'))
@property @property
def html_url(self) -> str: def html_url(self) -> str:
return self._response.get('html_url') return self._response.get('html_url')
#=== Gist stuff ===#
# === Gist stuff ===#
class File: class File:
def __init__(self, fp: Union[str, io.StringIO], filename: str = 'DefaultFilename.txt'): def __init__(self, fp: Union[str, io.StringIO], filename: str = 'DefaultFilename.txt') -> None:
self.fp = fp self.fp = fp
self.filename = filename self.filename = filename
@ -246,14 +259,17 @@ class File:
if os.path.exists(self.fp): if os.path.exists(self.fp):
with open(self.fp) as fp: with open(self.fp) as fp:
data = fp.read() data = fp.read()
return data return data
return self.fp return self.fp
elif isinstance(self.fp, io.BytesIO): elif isinstance(self.fp, io.BytesIO):
return self.fp.read().decode('utf-8') return self.fp.read()
elif isinstance(self.fp, io.StringIO): elif isinstance(self.fp, io.StringIO): # type: ignore
return self.fp.getvalue() return self.fp.getvalue()
else:
raise TypeError(f'Expected str, io.StringIO, or io.BytesIO, got {type(self.fp)}') raise TypeError(f'Expected str, io.StringIO, or io.BytesIO, got {type(self.fp)}')
class Gist(APIObject): class Gist(APIObject):
__slots__ = ( __slots__ = (
@ -265,11 +281,12 @@ class Gist(APIObject):
'owner', 'owner',
'created_at', 'created_at',
'truncated', 'truncated',
) )
def __init__(self, response: dict, _http: http) -> None:
def __init__(self, response: Dict[str, Any], _http: http) -> None:
super().__init__(response, _http) super().__init__(response, _http)
tmp = self.__slots__ + APIObject.__slots__ tmp = self.__slots__ + APIObject.__slots__
keys = {key: value for key,value in self._response.items() if key in tmp} keys = {key: value for key, value in self._response.items() if key in tmp}
for key, value in keys.items(): for key, value in keys.items():
if key == 'owner': if key == 'owner':
setattr(self, key, PartialUser(value, self._http)) setattr(self, key, PartialUser(value, self._http))
@ -284,7 +301,7 @@ class Gist(APIObject):
return f'<{self.__class__.__name__} id: {self.id}, owner: {self.owner}, created_at: {self.created_at}>' return f'<{self.__class__.__name__} id: {self.id}, owner: {self.owner}, created_at: {self.created_at}>'
@property @property
def updated_at(self) -> str: def updated_at(self) -> Optional[datetime]:
return dt_formatter(self._response.get('updated_at')) return dt_formatter(self._response.get('updated_at'))
@property @property
@ -296,29 +313,30 @@ class Gist(APIObject):
return self._response.get('discussion') return self._response.get('discussion')
@property @property
def raw(self) -> str: def raw(self) -> Dict[str, Any]:
return self._response return self._response
#=== Organization stuff ===# # === Organization stuff ===#
class Organization(APIObject): class Organization(APIObject):
__slots__ = ( __slots__ = (
'login', 'login',
'id', 'id',
'is_verified', 'is_verified',
'public_repos', 'public_repos',
'public_gists', 'public_gists',
'followers', 'followers',
'following', 'following',
'created_at', 'created_at',
'avatar_url', 'avatar_url',
) )
def __init__(self, response: dict, _http: http) -> None: def __init__(self, response: Dict[str, Any], _http: http) -> None:
super().__init__(response, _http) super().__init__(response, _http)
tmp = self.__slots__ + APIObject.__slots__ tmp = self.__slots__ + APIObject.__slots__
keys = {key: value for key,value in self._response.items() if key in tmp} keys = {key: value for key, value in self._response.items() if key in tmp}
for key, value in keys.items(): for key, value in keys.items():
if key == 'login': if key == 'login':
setattr(self, key, value) setattr(self, key, value)
@ -340,4 +358,4 @@ class Organization(APIObject):
@property @property
def html_url(self): def html_url(self):
return self._response.get('html_url') return self._response.get('html_url')

View file

@ -1,9 +1,9 @@
#== urls.py ==# # == urls.py ==#
BASE_URL = 'https://api.github.com' BASE_URL = 'https://api.github.com'
#== user urls ==# # == user urls ==#
USERS_URL = BASE_URL + '/users/{0}' USERS_URL = BASE_URL + '/users/{0}'
USER_HTML_URL = 'https://github.com/users/{0}' USER_HTML_URL = 'https://github.com/users/{0}'
@ -21,19 +21,19 @@ USER_FOLLOWERS_URL = USERS_URL + '/followers'
USER_FOLLOWING_URL = USERS_URL + '/following' USER_FOLLOWING_URL = USERS_URL + '/following'
#== repo urls ==# # == repo urls ==#
CREATE_REPO_URL = BASE_URL + '/user/repos' #_auth repo create CREATE_REPO_URL = BASE_URL + '/user/repos' # _auth repo create
REPOS_URL = BASE_URL + '/repos/{0}' # repos of a user REPOS_URL = BASE_URL + '/repos/{0}' # repos of a user
REPO_URL = BASE_URL + '/repos/{0}/{1}' # a specific repo REPO_URL = BASE_URL + '/repos/{0}/{1}' # a specific repo
REPO_ISSUE_URL = REPO_URL + '/issues/{2}' # a specific issue REPO_ISSUE_URL = REPO_URL + '/issues/{2}' # a specific issue
#== gist urls ==# # == gist urls ==#
GIST_URL = BASE_URL + '/gists/{0}' # specific gist GIST_URL = BASE_URL + '/gists/{0}' # specific gist
CREATE_GIST_URL = BASE_URL + '/gists' # create a gist CREATE_GIST_URL = BASE_URL + '/gists' # create a gist
#== org urls ==# # == org urls ==#
ORG_URL = BASE_URL + '/orgs/{0}' ORG_URL = BASE_URL + '/orgs/{0}'

3
pyproject.toml Normal file
View file

@ -0,0 +1,3 @@
[tool.black]
line-length = 125
skip-string-normalization = true

View file

@ -26,4 +26,4 @@ setup(
long_description=readme, long_description=readme,
install_requires=requirements, install_requires=requirements,
python_requires='>=3.8.0', python_requires='>=3.8.0',
) )