Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mxrch
GitHub Repository: mxrch/GHunt
Path: blob/master/ghunt/objects/apis.py
252 views
1
from ghunt.errors import GHuntCorruptedHeadersError
2
from ghunt.helpers.knowledge import get_origin_of_key, get_api_key
3
from ghunt.objects.base import GHuntCreds, SmartObj
4
from ghunt.helpers.utils import *
5
from ghunt.errors import *
6
from ghunt.helpers.auth import *
7
8
import httpx
9
import asyncio
10
11
from datetime import datetime, timezone
12
from typing import *
13
from dataclasses import dataclass
14
15
16
# APIs objects
17
18
@dataclass
19
class EndpointConfig(SmartObj):
20
def __init__(self,
21
name: str="",
22
headers: Dict[str, str] = {},
23
cookies: Dict[str, str] = {},
24
ext_metadata: Dict[str, Dict[str, str]] = {},
25
verb: str = "",
26
data_type: str|None = None,
27
authentication_mode: str|None = None,
28
require_key: str | None = None,
29
key_origin: str | None = None,
30
_computed_headers: Dict[str, str] = {},
31
_computed_cookies: Dict[str, str] = {}
32
):
33
self.name = name
34
self.headers = headers
35
self.cookies = cookies
36
self.ext_metadata = ext_metadata
37
self.verb = verb
38
self.data_type = data_type
39
self.authentication_mode = authentication_mode
40
self.require_key = require_key
41
self.key_origin = key_origin
42
self._computed_headers = _computed_headers
43
self._computed_cookies = _computed_cookies
44
45
class GAPI(SmartObj):
46
def __init__(self):
47
self.api_name: str = ""
48
self.package_name: str = ""
49
self.scopes: List[str] = []
50
51
self.hostname: str = ""
52
self.scheme: str = ""
53
self.loaded_endpoints: Dict[str, EndpointConfig] = {}
54
self.creds: GHuntCreds = None
55
self.headers: Dict[str, str] = {}
56
self.cookies: Dict[str, str] = {}
57
self.gen_token_lock: asyncio.Lock = None
58
59
def _load_api(self, creds: GHuntCreds, headers: Dict[str, str]):
60
if not creds.are_creds_loaded():
61
raise GHuntInsufficientCreds(f"This API requires a loaded GHuntCreds object, but it is not.")
62
63
if not is_headers_syntax_good(headers):
64
raise GHuntCorruptedHeadersError(f"The provided headers when loading the endpoint seems corrupted, please check it : {headers}")
65
66
self.creds = creds
67
self.headers = headers
68
69
def _load_endpoint(self, endpoint: EndpointConfig):
70
if endpoint.name in self.loaded_endpoints:
71
return
72
73
headers = {**endpoint.headers, **self.headers}
74
75
if endpoint.authentication_mode == "oauth":
76
self.gen_token_lock = asyncio.Lock()
77
78
cookies = {}
79
if endpoint.authentication_mode in ["sapisidhash", "cookies_only"]:
80
if not (cookies := self.creds.cookies):
81
raise GHuntInsufficientCreds(f"This endpoint requires the cookies in the GHuntCreds object, but they aren't loaded.")
82
83
if (key_name := endpoint.require_key):
84
if not (api_key := get_api_key(key_name)):
85
raise GHuntInsufficientCreds(f"This endpoint requires the {key_name} API key in the GHuntCreds object, but it isn't loaded.")
86
if not endpoint.key_origin:
87
endpoint.key_origin = get_origin_of_key(key_name)
88
headers = {**headers, "X-Goog-Api-Key": api_key, **headers, "Origin": endpoint.key_origin, "Referer": endpoint.key_origin}
89
90
if endpoint.authentication_mode == "sapisidhash":
91
if not (sapisidhash := cookies.get("SAPISID")):
92
raise GHuntInsufficientCreds(f"This endpoint requires the SAPISID cookie in the GHuntCreds object, but it isn't loaded.")
93
94
headers = {**headers, "Authorization": f"SAPISIDHASH {gen_sapisidhash(sapisidhash, endpoint.key_origin)}"}
95
96
# https://github.com/googleapis/googleapis/blob/f8a290120b3a67e652742a221f73778626dc3081/google/api/context.proto#L43
97
for ext_type,ext_value in endpoint.ext_metadata.items():
98
ext_bin_headers = {f"X-Goog-Ext-{k}-{ext_type.title()}":v for k,v in ext_value.items()}
99
headers = {**headers, **ext_bin_headers}
100
101
if not is_headers_syntax_good(headers):
102
raise GHuntCorruptedHeadersError(f"The provided headers when loading the endpoint seems corrupted, please check it : {headers}")
103
104
endpoint._computed_headers = headers
105
endpoint._computed_cookies = cookies
106
self.loaded_endpoints[endpoint.name] = endpoint
107
108
async def _check_and_gen_authorization_token(self, as_client: httpx.AsyncClient, creds: GHuntCreds):
109
async with self.gen_token_lock:
110
present = False
111
if self.api_name in creds.android.authorization_tokens:
112
present = True
113
token = creds.android.authorization_tokens[self.api_name]["token"]
114
expiry_date = datetime.utcfromtimestamp(creds.android.authorization_tokens[self.api_name]["expiry"]).replace(tzinfo=timezone.utc)
115
116
# If there are no registered authorization token for the current API, or if the token has expired
117
if (not self.api_name in creds.android.authorization_tokens) or (present and datetime.now(timezone.utc) > expiry_date):
118
token, _, expiry_timestamp = await android_oauth_app(as_client, creds.android.master_token, self.package_name, self.scopes)
119
creds.android.authorization_tokens[self.api_name] = {
120
"token": token,
121
"expiry": expiry_timestamp
122
}
123
creds.save_creds(silent=True)
124
gb.rc.print(f"\n[+] New token for {self.api_name} has been generated", style="italic")
125
return token
126
127
async def _query(self, endpoint_name: str, as_client: httpx.AsyncClient, base_url: str, params: Dict[str, Any]={}, data: Any=None) -> httpx.Response:
128
endpoint = self.loaded_endpoints[endpoint_name]
129
headers = endpoint._computed_headers
130
if endpoint.authentication_mode == "oauth":
131
token = await self._check_and_gen_authorization_token(as_client, self.creds)
132
headers = {**headers, "Authorization": f"OAuth {token}"}
133
134
if endpoint.verb == "GET":
135
req = await as_client.get(f"{self.scheme}://{self.hostname}{base_url}",
136
params=params, headers=headers, cookies=endpoint._computed_cookies)
137
elif endpoint.verb == "POST":
138
if endpoint.data_type == "data":
139
req = await as_client.post(f"{self.scheme}://{self.hostname}{base_url}",
140
params=params, data=data, headers=headers, cookies=endpoint._computed_cookies)
141
elif endpoint.data_type == "json":
142
req = await as_client.post(f"{self.scheme}://{self.hostname}{base_url}",
143
params=params, json=data, headers=headers, cookies=endpoint._computed_cookies)
144
else:
145
raise GHuntUnknownRequestDataTypeError(f"The provided data type {endpoint.data_type} wasn't recognized by GHunt.")
146
else:
147
raise GHuntUnknownVerbError(f"The provided verb {endpoint.verb} wasn't recognized by GHunt.")
148
149
return req
150
151
# Others
152
153
class Parser(SmartObj):
154
"""
155
The class that is used to initialize every parser class.
156
It will automatically manage the __slots__ attribute.
157
"""
158
pass
159