Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mxrch
GitHub Repository: mxrch/GHunt
Path: blob/master/ghunt/apis/playgames.py
252 views
1
from ghunt.objects.base import GHuntCreds
2
from ghunt.errors import *
3
import ghunt.globals as gb
4
from ghunt.objects.apis import GAPI, EndpointConfig
5
from ghunt.parsers.playgames import PlayedGames, PlayerAchievements, PlayerProfile
6
7
import httpx
8
9
from typing import *
10
import inspect
11
import json
12
13
14
class PlayGames(GAPI):
15
def __init__(self, creds: GHuntCreds, headers: Dict[str, str] = {}):
16
super().__init__()
17
18
if not headers:
19
headers = gb.config.headers
20
21
base_headers = {}
22
23
headers = {**headers, **base_headers}
24
25
# Android OAuth fields
26
self.api_name = "playgames"
27
self.package_name = "com.google.android.play.games"
28
self.scopes = [
29
"https://www.googleapis.com/auth/games.firstparty",
30
"https://www.googleapis.com/auth/googleplay"
31
]
32
33
self.hostname = "www.googleapis.com"
34
self.scheme = "https"
35
36
self._load_api(creds, headers)
37
38
async def get_profile(self, as_client: httpx.AsyncClient, player_id: str) -> Tuple[bool, PlayerProfile]:
39
endpoint = EndpointConfig(
40
name = inspect.currentframe().f_code.co_name,
41
verb = "GET",
42
data_type = None, # json, data or None
43
authentication_mode = "oauth", # sapisidhash, cookies_only, oauth or None
44
require_key = None, # key name, or None
45
)
46
self._load_endpoint(endpoint)
47
48
base_url = f"/games/v1whitelisted/players/{player_id}"
49
50
req = await self._query(endpoint.name, as_client, base_url)
51
52
# Parsing
53
data = json.loads(req.text)
54
player_profile = PlayerProfile()
55
if not "displayPlayer" in data:
56
return False, player_profile
57
58
player_profile._scrape(data["displayPlayer"])
59
player_profile.id = player_id
60
61
return True, player_profile
62
63
async def get_played_games(self, as_client: httpx.AsyncClient, player_id: str, page_token: str="") -> Tuple[bool, str, PlayedGames]:
64
endpoint = EndpointConfig(
65
name = inspect.currentframe().f_code.co_name,
66
verb = "GET",
67
data_type = None, # json, data or None
68
authentication_mode = "oauth", # sapisidhash, cookies_only, oauth or None
69
require_key = None, # key name, or None
70
)
71
self._load_endpoint(endpoint)
72
73
base_url = f"/games/v1whitelisted/players/{player_id}/applications/played"
74
75
params = {}
76
if page_token:
77
params = {"pageToken": page_token}
78
79
req = await self._query(endpoint.name, as_client, base_url, params=params)
80
81
# Parsing
82
data = json.loads(req.text)
83
played_games = PlayedGames()
84
if not "items" in data:
85
return False, "", played_games
86
87
next_page_token = data.get("nextPageToken", "")
88
89
played_games._scrape(data["items"])
90
91
return True, next_page_token, played_games
92
93
async def get_achievements(self, as_client: httpx.AsyncClient, player_id: str, page_token: str="") -> Tuple[bool, str, PlayerAchievements]:
94
endpoint = EndpointConfig(
95
name = inspect.currentframe().f_code.co_name,
96
verb = "POST",
97
data_type = "json", # json, data or None
98
authentication_mode = "oauth", # sapisidhash, cookies_only, oauth or None
99
require_key = None, # key name, or None
100
)
101
self._load_endpoint(endpoint)
102
103
base_url = f"/games/v1whitelisted/players/{player_id}/achievements"
104
105
params = {
106
"state": "UNLOCKED",
107
"returnDefinitions": True,
108
"sortOrder": "RECENT_FIRST"
109
}
110
111
if page_token:
112
params["pageToken"] = page_token
113
114
req = await self._query(endpoint.name, as_client, base_url, params=params)
115
116
# Parsing
117
data = json.loads(req.text)
118
achievements = PlayerAchievements()
119
if not "items" in data:
120
return False, "", achievements
121
122
next_page_token = ""
123
if "nextPageToken" in data:
124
next_page_token = data["nextPageToken"]
125
126
achievements._scrape(data)
127
128
return True, next_page_token, achievements
129