Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mxrch
GitHub Repository: mxrch/GHunt
Path: blob/master/ghunt/apis/peoplepa.py
252 views
1
from ghunt.objects.base import GHuntCreds
2
from ghunt.errors import *
3
import ghunt.globals as gb
4
from ghunt.objects.apis import GAPI, EndpointConfig
5
from ghunt.parsers.people import Person
6
7
import httpx
8
9
from typing import *
10
import inspect
11
import json
12
13
14
class PeoplePaHttp(GAPI):
15
def __init__(self, creds: GHuntCreds, headers: Dict[str, str] = {}):
16
super().__init__()
17
18
if not headers:
19
headers = gb.config.headers
20
21
base_headers = {
22
"Host": "people-pa.clients6.google.com",
23
}
24
25
headers = {**headers, **base_headers}
26
27
self.hostname = "googleapis.com"
28
self.scheme = "https"
29
30
self._load_api(creds, headers)
31
32
async def people_lookup(self, as_client: httpx.AsyncClient, email: str, params_template="just_gaia_id") -> Tuple[bool, Person]:
33
endpoint = EndpointConfig(
34
name = inspect.currentframe().f_code.co_name,
35
verb = "GET",
36
data_type = None, # json, data or None
37
authentication_mode = "sapisidhash", # sapisidhash, cookies_only, oauth or None
38
require_key = "photos", # key name, or None
39
# key_origin="photos"
40
)
41
42
# Android OAuth fields
43
self.api_name = "people"
44
self.package_name = "com.google.android.gms"
45
self.scopes = [
46
"https://www.googleapis.com/auth/profile.agerange.read",
47
"https://www.googleapis.com/auth/profile.language.read",
48
"https://www.googleapis.com/auth/contacts",
49
"https://www.googleapis.com/auth/peopleapi.legacy.readwrite"
50
51
]
52
53
self._load_endpoint(endpoint)
54
55
base_url = "/v2/people/lookup"
56
57
params_templates = {
58
"just_gaia_id": {
59
"id": email,
60
"type": "EMAIL",
61
"matchType": "EXACT",
62
"requestMask.includeField.paths": "person.metadata"
63
},
64
"just_name": {
65
"id": email,
66
"type": "EMAIL",
67
"matchType": "EXACT",
68
"requestMask.includeField.paths": "person.name",
69
"core_id_params.enable_private_names": True
70
},
71
"max_details": {
72
"id": email,
73
"type": "EMAIL",
74
"match_type": "EXACT",
75
"extension_set.extension_names": [
76
"DYNAMITE_ADDITIONAL_DATA",
77
"DYNAMITE_ORGANIZATION_INFO"
78
],
79
"request_mask.include_field.paths": [
80
"person.metadata.best_display_name",
81
"person.photo",
82
"person.cover_photo",
83
"person.interaction_settings",
84
"person.legacy_fields",
85
"person.metadata",
86
"person.in_app_reachability",
87
"person.name",
88
"person.read_only_profile_info",
89
"person.sort_keys",
90
"person.email"
91
],
92
"request_mask.include_container": [
93
"AFFINITY",
94
"PROFILE",
95
"DOMAIN_PROFILE",
96
"ACCOUNT",
97
"EXTERNAL_ACCOUNT",
98
"CIRCLE",
99
"DOMAIN_CONTACT",
100
"DEVICE_CONTACT",
101
"GOOGLE_GROUP",
102
"CONTACT"
103
],
104
"core_id_params.enable_private_names": True
105
}
106
}
107
108
if not params_templates.get(params_template):
109
raise GHuntParamsTemplateError(f"The asked template {params_template} for the endpoint {endpoint.name} wasn't recognized by GHunt.")
110
params = params_templates[params_template]
111
112
req = await self._query(endpoint.name, as_client, base_url, params=params)
113
114
# Parsing
115
data = json.loads(req.text)
116
person = Person()
117
if not data:
118
return False, person
119
120
person_data = list(data["people"].values())[0]
121
await person._scrape(as_client, person_data)
122
123
return True, person
124
125
async def people(self, as_client: httpx.AsyncClient, gaia_id: str, params_template="just_name") -> Tuple[bool, Person]:
126
endpoint = EndpointConfig(
127
name = inspect.currentframe().f_code.co_name,
128
verb = "GET",
129
data_type = None, # json, data or None
130
authentication_mode = "sapisidhash", # sapisidhash, cookies_only, oauth or None
131
require_key = "photos", # key name, or None
132
# key_origin="photos"
133
)
134
self._load_endpoint(endpoint)
135
136
# Android OAuth fields
137
self.api_name = "people"
138
self.package_name = "com.google.android.gms"
139
self.scopes = [
140
"https://www.googleapis.com/auth/profile.agerange.read",
141
"https://www.googleapis.com/auth/profile.language.read",
142
"https://www.googleapis.com/auth/contacts",
143
"https://www.googleapis.com/auth/peopleapi.legacy.readwrite"
144
145
]
146
147
base_url = "/v2/people"
148
149
params_templates = {
150
"just_name": {
151
"person_id": gaia_id,
152
"requestMask.includeField.paths": "person.name",
153
"core_id_params.enable_private_names": True
154
},
155
"max_details": {
156
"person_id": gaia_id,
157
"extension_set.extension_names": [
158
"DYNAMITE_ADDITIONAL_DATA",
159
"DYNAMITE_ORGANIZATION_INFO"
160
],
161
"request_mask.include_field.paths": [
162
"person.metadata.best_display_name",
163
"person.photo",
164
"person.cover_photo",
165
"person.interaction_settings",
166
"person.legacy_fields",
167
"person.metadata",
168
"person.in_app_reachability",
169
"person.name",
170
"person.read_only_profile_info",
171
"person.sort_keys",
172
"person.email"
173
],
174
"request_mask.include_container": [
175
"AFFINITY",
176
"PROFILE",
177
"DOMAIN_PROFILE",
178
"ACCOUNT",
179
"EXTERNAL_ACCOUNT",
180
"CIRCLE",
181
"DOMAIN_CONTACT",
182
"DEVICE_CONTACT",
183
"GOOGLE_GROUP",
184
"CONTACT"
185
],
186
"core_id_params.enable_private_names": True
187
}
188
}
189
190
if not params_templates.get(params_template):
191
raise GHuntParamsTemplateError(f"The asked template {params_template} for the endpoint {endpoint.name} wasn't recognized by GHunt.")
192
params = params_templates[params_template]
193
194
req = await self._query(endpoint.name, as_client, base_url, params=params)
195
196
# Parsing
197
data = json.loads(req.text)
198
person = Person()
199
if data["personResponse"][0]["status"] == "NOT_FOUND":
200
return False, person
201
202
person_data = data["personResponse"][0]["person"]
203
await person._scrape(as_client, person_data)
204
205
return True, person
206