Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mxrch
GitHub Repository: mxrch/GHunt
Path: blob/master/ghunt/helpers/auth.py
252 views
1
import asyncio
2
import json
3
import base64
4
import os
5
from typing import *
6
7
import httpx
8
from bs4 import BeautifulSoup as bs
9
10
from ghunt import globals as gb
11
from ghunt.objects.base import GHuntCreds
12
from ghunt.errors import *
13
from ghunt.helpers.utils import *
14
from ghunt.helpers import listener
15
from ghunt.helpers.knowledge import get_domain_of_service, get_package_sig
16
from ghunt.knowledge.services import services_baseurls
17
from ghunt.helpers.auth import *
18
19
20
async def android_master_auth(as_client: httpx.AsyncClient, oauth_token: str) -> Tuple[str, List[str], str, str]:
21
"""
22
Takes an oauth_token to perform an android authentication
23
to get the master token and other informations.
24
25
Returns the master token, connected services, account email and account full name.
26
"""
27
data = {
28
"Token": oauth_token,
29
"service": "ac2dm",
30
"get_accountid": 1,
31
"ACCESS_TOKEN": 1,
32
"add_account": 1,
33
"callerSig": "38918a453d07199354f8b19af05ec6562ced5788"
34
}
35
36
req = await as_client.post("https://android.googleapis.com/auth", data=data)
37
resp = parse_oauth_flow_response(req.text)
38
for keyword in ["Token", "Email", "services", "firstName", "lastName"]:
39
if keyword not in resp:
40
raise GHuntAndroidMasterAuthError(f'Expected "{keyword}" in the response of the Android Master Authentication.\nThe oauth_token may be expired.')
41
return resp["Token"], resp["services"].split(","), resp["Email"], f'{resp["firstName"]} {resp["lastName"]}'
42
43
async def android_oauth_app(as_client: httpx.AsyncClient, master_token: str,
44
package_name: str, scopes: List[str]) -> Tuple[str, List[str], int]:
45
"""
46
Uses the master token to ask for an authorization token,
47
with specific scopes and app package name.
48
49
Returns the authorization token, granted scopes and expiry UTC timestamp.
50
"""
51
client_sig = get_package_sig(package_name)
52
53
data = {
54
"app": package_name,
55
"service": f"oauth2:{' '.join(scopes)}",
56
"client_sig": client_sig,
57
"Token": master_token
58
}
59
60
req = await as_client.post("https://android.googleapis.com/auth", data=data)
61
resp = parse_oauth_flow_response(req.text)
62
for keyword in ["Expiry", "grantedScopes", "Auth"]:
63
if keyword not in resp:
64
raise GHuntAndroidAppOAuth2Error(f'Expected "{keyword}" in the response of the Android App OAuth2 Authentication.\nThe master token may be revoked.')
65
return resp["Auth"], resp["grantedScopes"].split(" "), int(resp["Expiry"])
66
67
async def gen_osid(as_client: httpx.AsyncClient, cookies: Dict[str, str], generated_osids: dict[str, str], service: str) -> None:
68
domain = get_domain_of_service(service)
69
70
params = {
71
"service": service,
72
"osid": 1,
73
"continue": f"https://{domain}/",
74
"followup": f"https://{domain}/",
75
"authuser": 0
76
}
77
78
req = await as_client.get(f"https://accounts.google.com/ServiceLogin", params=params, cookies=cookies, headers=gb.config.headers)
79
80
body = bs(req.text, 'html.parser')
81
82
params = {x.attrs["name"]:x.attrs["value"] for x in body.find_all("input", {"type":"hidden"})}
83
84
headers = {**gb.config.headers, **{"Content-Type": "application/x-www-form-urlencoded"}}
85
req = await as_client.post(f"https://{domain}/accounts/SetOSID", cookies=cookies, data=params, headers=headers)
86
87
if not "OSID" in req.cookies:
88
raise GHuntOSIDAuthError("[-] No OSID header detected, exiting...")
89
90
generated_osids[service] = req.cookies["OSID"]
91
92
async def gen_osids(as_client: httpx.AsyncClient, cookies: Dict[str, str], osids: List[str]) -> Dict[str, str]:
93
"""
94
Generate OSIDs of given services names,
95
contained in the "osids" dict argument.
96
"""
97
generated_osids = {}
98
tasks = [gen_osid(as_client, cookies, generated_osids, service) for service in osids]
99
await asyncio.gather(*tasks)
100
101
return generated_osids
102
103
async def check_cookies(as_client: httpx.AsyncClient, cookies: Dict[str, str]) -> bool:
104
"""Checks the validity of given cookies."""
105
continue_url = "https://www.google.com/robots.txt"
106
params = {"continue": continue_url}
107
req = await as_client.get("https://accounts.google.com/CheckCookie", params=params, cookies=cookies)
108
return req.status_code == 302 and not req.headers.get("Location", "").startswith(("https://support.google.com", "https://accounts.google.com/CookieMismatch"))
109
110
async def check_osid(as_client: httpx.AsyncClient, cookies: Dict[str, str], service: str) -> bool:
111
"""Checks the validity of given OSID."""
112
domain = get_domain_of_service(service)
113
wanted = ["authuser", "continue", "osidt", "ifkv"]
114
req = await as_client.get(f"https://accounts.google.com/ServiceLogin?service={service}&osid=1&continue=https://{domain}/&followup=https://{domain}/&authuser=0",
115
cookies=cookies, headers=gb.config.headers)
116
117
body = bs(req.text, 'html.parser')
118
params = [x.attrs["name"] for x in body.find_all("input", {"type":"hidden"})]
119
if not all([param in wanted for param in params]):
120
return False
121
122
return True
123
124
async def check_osids(as_client: httpx.AsyncClient, cookies: Dict[str, str], osids: Dict[str, str]) -> bool:
125
"""Checks the validity of given OSIDs."""
126
tasks = [check_osid(as_client, cookies, service) for service in osids]
127
results = await asyncio.gather(*tasks)
128
return all(results)
129
130
async def check_master_token(as_client: httpx.AsyncClient, master_token: str) -> str:
131
"""Checks the validity of the android master token."""
132
try:
133
await android_oauth_app(as_client, master_token, "com.google.android.play.games", ["https://www.googleapis.com/auth/games.firstparty"])
134
except GHuntAndroidAppOAuth2Error:
135
return False
136
return True
137
138
async def gen_cookies_and_osids(as_client: httpx.AsyncClient, ghunt_creds: GHuntCreds, osids: list[str]=[*services_baseurls.keys()]):
139
from ghunt.apis.accounts import Accounts
140
accounts_api = Accounts(ghunt_creds)
141
is_logged_in, uber_auth = await accounts_api.OAuthLogin(as_client)
142
if not is_logged_in:
143
raise GHuntLoginError("[-] Not logged in.")
144
145
params = {
146
"uberauth": uber_auth,
147
"continue": "https://www.google.com",
148
"source": "ChromiumAccountReconcilor",
149
"externalCcResult": "doubleclick:null,youtube:null"
150
}
151
152
req = await as_client.get("https://accounts.google.com/MergeSession", params=params)
153
cookies = dict(req.cookies)
154
ghunt_creds.cookies = cookies
155
osids = await gen_osids(as_client, cookies, osids)
156
ghunt_creds.osids = osids
157
158
async def check_and_gen(as_client: httpx.AsyncClient, ghunt_creds: GHuntCreds):
159
"""Checks the validity of the cookies and generate new ones if needed."""
160
if not await check_cookies(as_client, ghunt_creds.cookies):
161
await gen_cookies_and_osids(as_client, ghunt_creds)
162
if not await check_cookies(as_client, ghunt_creds.cookies):
163
raise GHuntLoginError("[-] Can't generate cookies after multiple retries. Exiting...")
164
165
ghunt_creds.save_creds(silent=True)
166
gb.rc.print("[+] Authenticated !\n", style="sea_green3")
167
168
def auth_dialog() -> Tuple[Dict[str, str], str] :
169
"""
170
Launch the dialog that asks the user
171
how he want to generate its credentials.
172
"""
173
choices = ("You can facilitate configuring GHunt by using the GHunt Companion extension on Firefox, Chrome, Edge and Opera here :\n"
174
"=> https://github.com/mxrch/ghunt_companion\n\n"
175
"[1] (Companion) Put GHunt on listening mode (currently not compatible with docker)\n"
176
"[2] (Companion) Paste base64-encoded authentication\n"
177
"[3] Enter the oauth_token (starts with \"oauth2_4/\")\n"
178
"[4] Enter the master token (starts with \"aas_et/\")\n"
179
"Choice => ")
180
181
oauth_token = ""
182
master_token = ""
183
choice = input(choices)
184
if choice in ["1", "2"]:
185
if choice == "1":
186
received_data = listener.run()
187
elif choice == "2":
188
received_data = input("Paste the encoded credentials here => ")
189
data = json.loads(base64.b64decode(received_data))
190
oauth_token = data["oauth_token"]
191
192
elif choice == "3":
193
oauth_token = input(f"OAuth token => ").strip('" ')
194
195
elif choice == "4":
196
master_token = input(f"Master token => ").strip('" ')
197
198
else:
199
print("Please choose a valid choice. Exiting...")
200
exit(os.EX_IOERR)
201
202
return oauth_token, master_token
203
204
async def load_and_auth(as_client: httpx.AsyncClient, help=True) -> GHuntCreds:
205
"""Returns an authenticated GHuntCreds object."""
206
creds = GHuntCreds()
207
try:
208
creds.load_creds()
209
except GHuntInvalidSession as e:
210
if help:
211
raise GHuntInvalidSession(f"Please generate a new session by doing => ghunt login") from e
212
else:
213
raise e
214
215
await check_and_gen(as_client, creds)
216
217
return creds
218
219