Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mxrch
GitHub Repository: mxrch/GHunt
Path: blob/master/ghunt/helpers/gmaps.py
252 views
1
from dateutil.relativedelta import relativedelta
2
from datetime import datetime
3
import json
4
from geopy import distance
5
from geopy.geocoders import Nominatim
6
from typing import *
7
8
import httpx
9
from alive_progress import alive_bar
10
11
from ghunt import globals as gb
12
from ghunt.objects.base import *
13
from ghunt.helpers.utils import *
14
from ghunt.objects.utils import *
15
from ghunt.helpers.knowledge import get_gmaps_type_translation
16
17
18
def get_datetime(datepublished: str):
19
"""
20
Get an approximative date from the maps review date
21
Examples : 'last 2 days', 'an hour ago', '3 years ago'
22
"""
23
if datepublished.split()[0] in ["a", "an"]:
24
nb = 1
25
else:
26
if datepublished.startswith("last"):
27
nb = int(datepublished.split()[1])
28
else:
29
nb = int(datepublished.split()[0])
30
31
if "minute" in datepublished:
32
delta = relativedelta(minutes=nb)
33
elif "hour" in datepublished:
34
delta = relativedelta(hours=nb)
35
elif "day" in datepublished:
36
delta = relativedelta(days=nb)
37
elif "week" in datepublished:
38
delta = relativedelta(weeks=nb)
39
elif "month" in datepublished:
40
delta = relativedelta(months=nb)
41
elif "year" in datepublished:
42
delta = relativedelta(years=nb)
43
else:
44
delta = relativedelta()
45
46
return (datetime.today() - delta).replace(microsecond=0, second=0)
47
48
async def get_reviews(as_client: httpx.AsyncClient, gaia_id: str) -> Tuple[str, Dict[str, int], List[MapsReview], List[MapsPhoto]]:
49
"""Extracts the target's statistics, reviews and photos."""
50
next_page_token = ""
51
agg_reviews = []
52
agg_photos = []
53
stats = {}
54
55
req = await as_client.get(f"https://www.google.com/locationhistory/preview/mas?authuser=0&hl=en&gl=us&pb={gb.config.templates['gmaps_pb']['stats'].format(gaia_id)}")
56
if req.status_code == 302 and req.headers["Location"].startswith("https://www.google.com/sorry/index"):
57
return "failed", stats, [], []
58
59
data = json.loads(req.text[5:])
60
if not data[16][8]:
61
return "empty", stats, [], []
62
stats = {sec[6]:sec[7] for sec in data[16][8][0]}
63
total_reviews = stats["Reviews"] + stats["Ratings"] + stats["Photos"]
64
if not total_reviews:
65
return "empty", stats, [], []
66
67
with alive_bar(total_reviews, receipt=False) as bar:
68
for category in ["reviews", "photos"]:
69
first = True
70
while True:
71
if first:
72
req = await as_client.get(f"https://www.google.com/locationhistory/preview/mas?authuser=0&hl=en&gl=us&pb={gb.config.templates['gmaps_pb'][category]['first'].format(gaia_id)}")
73
first = False
74
else:
75
req = await as_client.get(f"https://www.google.com/locationhistory/preview/mas?authuser=0&hl=en&gl=us&pb={gb.config.templates['gmaps_pb'][category]['page'].format(gaia_id, next_page_token)}")
76
data = json.loads(req.text[5:])
77
78
new_reviews = []
79
new_photos = []
80
next_page_token = ""
81
82
# Reviews
83
if category == "reviews":
84
if not data[24]:
85
return "private", stats, [], []
86
reviews_data = data[24][0]
87
if not reviews_data:
88
break
89
for review_data in reviews_data:
90
review = MapsReview()
91
review.id = review_data[6][0]
92
review.date = datetime.utcfromtimestamp(review_data[6][1][3] / 1000000)
93
if len(review_data[6][2]) > 15 and review_data[6][2][15]:
94
review.comment = review_data[6][2][15][0][0]
95
review.rating = review_data[6][2][0][0]
96
97
review.location.id = review_data[1][14][0]
98
review.location.name = review_data[1][2]
99
review.location.address = review_data[1][3]
100
review.location.tags = review_data[1][4] if review_data[1][4] else []
101
review.location.types = [x for x in review_data[1][8] if x]
102
if review_data[1][0]:
103
review.location.position.latitude = review_data[1][0][2]
104
review.location.position.longitude = review_data[1][0][3]
105
# if len(review_data[1]) > 31 and review_data[1][31]:
106
# print(f"Cost level : {review_data[1][31]}")
107
# review.location.cost_level = len(review_data[1][31])
108
new_reviews.append(review)
109
bar()
110
111
agg_reviews += new_reviews
112
113
if not new_reviews or len(data[24]) < 4 or not data[24][3]:
114
break
115
next_page_token = data[24][3].strip("=")
116
117
# Photos
118
elif category == "photos" :
119
if not data[22]:
120
return "private", stats, [], []
121
photos_data = data[22][1]
122
if not photos_data:
123
break
124
for photo_data in photos_data:
125
photos = MapsPhoto()
126
photos.id = photo_data[0][10]
127
photos.url = photo_data[0][6][0].split("=")[0]
128
date = photo_data[0][21][6][8]
129
photos.date = datetime(date[0], date[1], date[2], date[3]) # UTC
130
# photos.approximative_date = get_datetime(date[8][0]) # UTC
131
132
if len(photo_data) > 1:
133
photos.location.id = photo_data[1][14][0]
134
photos.location.name = photo_data[1][2]
135
photos.location.address = photo_data[1][3]
136
photos.location.tags = photo_data[1][4] if photo_data[1][4] else []
137
photos.location.types = [x for x in photo_data[1][8] if x] if photo_data[1][8] else []
138
if photo_data[1][0]:
139
photos.location.position.latitude = photo_data[1][0][2]
140
photos.location.position.longitude = photo_data[1][0][3]
141
if len(photo_data[1]) > 31 and photo_data[1][31]:
142
photos.location.cost_level = len(photo_data[1][31])
143
new_photos.append(photos)
144
bar()
145
146
agg_photos += new_photos
147
148
if not new_photos or len(data[22]) < 4 or not data[22][3]:
149
break
150
next_page_token = data[22][3].strip("=")
151
152
return "", stats, agg_reviews, agg_photos
153
154
def avg_location(locs: Tuple[float, float]):
155
"""
156
Calculates the average location
157
from a list of (latitude, longitude) tuples.
158
"""
159
latitude = []
160
longitude = []
161
for loc in locs:
162
latitude.append(loc[0])
163
longitude.append(loc[1])
164
165
latitude = sum(latitude) / len(latitude)
166
longitude = sum(longitude) / len(longitude)
167
return latitude, longitude
168
169
def translate_confidence(percents: int):
170
"""Translates the percents number to a more human-friendly text"""
171
if percents >= 100:
172
return "Extremely high"
173
elif percents >= 80:
174
return "Very high"
175
elif percents >= 60:
176
return "Little high"
177
elif percents >= 40:
178
return "Okay"
179
elif percents >= 20:
180
return "Low"
181
elif percents >= 10:
182
return "Very low"
183
else:
184
return "Extremely low"
185
186
def sanitize_location(location: Dict[str, str]):
187
"""Returns the nearest place from a Nomatim location response."""
188
not_country = False
189
not_town = False
190
town = "?"
191
country = "?"
192
if "city" in location:
193
town = location["city"]
194
elif "village" in location:
195
town = location["village"]
196
elif "town" in location:
197
town = location["town"]
198
elif "municipality" in location:
199
town = location["municipality"]
200
else:
201
not_town = True
202
if not "country" in location:
203
not_country = True
204
location["country"] = country
205
if not_country and not_town:
206
return False
207
location["town"] = town
208
return location
209
210
def calculate_probable_location(geolocator: Nominatim, reviews_and_photos: List[MapsReview|MapsPhoto], gmaps_radius: int):
211
"""Calculates the probable location from a list of reviews and the max radius."""
212
tmprinter = TMPrinter()
213
radius = gmaps_radius
214
215
locations = {}
216
tmprinter.out(f"Calculation of the distance of each review...")
217
for nb, review in enumerate(reviews_and_photos):
218
if not review.location.position.latitude or not review.location.position.longitude:
219
continue
220
if review.location.id not in locations:
221
locations[review.location.id] = {"dates": [], "locations": [], "range": None, "score": 0}
222
location = (review.location.position.latitude, review.location.position.longitude)
223
for review2 in reviews_and_photos:
224
location2 = (review2.location.position.latitude, review2.location.position.longitude)
225
dis = distance.distance(location, location2).km
226
227
if dis <= radius:
228
locations[review.location.id]["dates"].append(review2.date)
229
locations[review.location.id]["locations"].append(location2)
230
231
maxdate = max(locations[review.location.id]["dates"])
232
mindate = min(locations[review.location.id]["dates"])
233
locations[review.location.id]["range"] = maxdate - mindate
234
tmprinter.out(f"Calculation of the distance of each review ({nb}/{len(reviews_and_photos)})...")
235
236
tmprinter.clear()
237
238
locations = {k: v for k, v in
239
sorted(locations.items(), key=lambda k: len(k[1]["locations"]), reverse=True)} # We sort it
240
241
tmprinter.out("Identification of redundant areas...")
242
to_del = []
243
for id in locations:
244
if id in to_del:
245
continue
246
for id2 in locations:
247
if id2 in to_del or id == id2:
248
continue
249
if all([loc in locations[id]["locations"] for loc in locations[id2]["locations"]]):
250
to_del.append(id2)
251
for hash in to_del:
252
del locations[hash]
253
254
tmprinter.out("Calculating confidence...")
255
256
maxrange = max([locations[hash]["range"] for hash in locations])
257
maxlen = max([len(locations[hash]["locations"]) for hash in locations])
258
minreq = 3
259
mingroups = 3
260
261
score_steps = 4
262
for hash, loc in locations.items():
263
if len(loc["locations"]) == maxlen:
264
locations[hash]["score"] += score_steps * 4
265
if loc["range"] == maxrange:
266
locations[hash]["score"] += score_steps * 3
267
if len(locations) >= mingroups:
268
others = sum([len(locations[h]["locations"]) for h in locations if h != hash])
269
if len(loc["locations"]) > others:
270
locations[hash]["score"] += score_steps * 2
271
if len(loc["locations"]) >= minreq:
272
locations[hash]["score"] += score_steps
273
274
panels = sorted(set([loc["score"] for loc in locations.values()]), reverse=True)
275
276
maxscore = sum([p * score_steps for p in range(1, score_steps + 1)])
277
for panel in panels:
278
locs = [loc for loc in locations.values() if loc["score"] == panel]
279
if len(locs[0]["locations"]) == 1:
280
panel /= 2
281
if len(reviews_and_photos) < 4:
282
panel /= 2
283
confidence = translate_confidence(panel / maxscore * 100)
284
for nb, loc in enumerate(locs):
285
avg = avg_location(loc["locations"])
286
while True:
287
try:
288
location = geolocator.reverse(f"{avg[0]}, {avg[1]}", timeout=10).raw["address"]
289
break
290
except:
291
pass
292
location = sanitize_location(location)
293
locs[nb]["avg"] = location
294
del locs[nb]["locations"]
295
del locs[nb]["score"]
296
del locs[nb]["range"]
297
del locs[nb]["dates"]
298
299
tmprinter.clear()
300
301
return confidence, locs
302
303
def output(err: str, stats: Dict[str, int], reviews: List[MapsReview], photos: List[MapsPhoto], gaia_id: str):
304
"""Pretty print the Maps results, and do some guesses."""
305
306
print(f"\nProfile page : https://www.google.com/maps/contrib/{gaia_id}/reviews")
307
308
if err == "failed":
309
print("\n[-] Your IP has been blocked by Google. Try again later.")
310
311
reviews_and_photos: List[MapsReview|MapsPhoto] = reviews + photos
312
if err != "private" and (err == "empty" or not reviews_and_photos):
313
print("\n[-] No review.")
314
return
315
316
print("\n[Statistics]")
317
for section, number in stats.items():
318
if number:
319
print(f"{section} : {number}")
320
321
if err == "private":
322
print("\n[-] Reviews are private.")
323
return
324
325
print("\n[Reviews]")
326
avg_ratings = round(sum([x.rating for x in reviews]) / len(reviews), 1)
327
print(f"[+] Average rating : {ppnb(avg_ratings)}/5\n")
328
329
# I removed the costs calculation because of a Google update : https://github.com/mxrch/GHunt/issues/529
330
331
# costs_table = {
332
# 1: "Inexpensive",
333
# 2: "Moderately expensive",
334
# 3: "Expensive",
335
# 4: "Very expensive"
336
# }
337
338
# total_costs = 0
339
# costs_stats = {x:0 for x in range(1,5)}
340
# for review in reviews_and_photos:
341
# if review.location.cost_level:
342
# costs_stats[review.location.cost_level] += 1
343
# total_costs += 1
344
# costs_stats = dict(sorted(costs_stats.items(), key=lambda item: item[1], reverse=True)) # We sort the dict by cost popularity
345
346
# if total_costs:
347
# print("[Costs]")
348
# for cost, desc in costs_table.items():
349
# line = f"> {ppnb(round(costs_stats[cost]/total_costs*100, 1))}% {desc} ({costs_stats[cost]})"
350
# style = ""
351
# if not costs_stats[cost]:
352
# style = "bright_black"
353
# elif costs_stats[cost] == list(costs_stats.values())[0]:
354
# style = "spring_green1"
355
# gb.rc.print(line, style=style)
356
357
# avg_costs = round(sum([x*y for x,y in costs_stats.items()]) / total_costs)
358
# print(f"\n[+] Average costs : {costs_table[avg_costs]}")
359
# else:
360
# print("[-] No costs data.")
361
362
types = {}
363
for review in reviews_and_photos:
364
for type in review.location.types:
365
if type not in types:
366
types[type] = 0
367
types[type] += 1
368
types = dict(sorted(types.items(), key=lambda item: item[1], reverse=True))
369
370
types_and_tags = {}
371
for review in reviews_and_photos:
372
for type in review.location.types:
373
if type not in types_and_tags:
374
types_and_tags[type] = {}
375
for tag in review.location.tags:
376
if tag not in types_and_tags[type]:
377
types_and_tags[type][tag] = 0
378
types_and_tags[type][tag] += 1
379
types_and_tags[type] = dict(sorted(types_and_tags[type].items(), key=lambda item: item[1], reverse=True))
380
types_and_tags = dict(sorted(types_and_tags.items()))
381
382
if types_and_tags:
383
print("\nTarget's locations preferences :")
384
385
unknown_trads = []
386
for type, type_count in types.items():
387
tags_counts = types_and_tags[type]
388
translation = get_gmaps_type_translation(type)
389
if not translation:
390
unknown_trads.append(type)
391
gb.rc.print(f"\n🏨 [underline]{translation if translation else type.title()} [{type_count}]", style="bold")
392
nb = 0
393
for tag, tag_count in list(tags_counts.items()):
394
if nb >= 7:
395
break
396
elif tag.lower() == type:
397
continue
398
print(f"- {tag} ({tag_count})")
399
nb += 1
400
401
if unknown_trads:
402
print(f"\n⚠️ The following gmaps types haven't been found in GHunt\'s knowledge.")
403
for type in unknown_trads:
404
print(f"- {type}")
405
print("Please open an issue on the GHunt Github or submit a PR to add it !")
406
407
geolocator = Nominatim(user_agent="nominatim")
408
409
confidence, locations = calculate_probable_location(geolocator, reviews_and_photos, gb.config.gmaps_radius)
410
print(f"\n[+] Probable location (confidence => {confidence}) :")
411
412
loc_names = []
413
for loc in locations:
414
loc_names.append(
415
f"- {loc['avg']['town']}, {loc['avg']['country']}"
416
)
417
418
loc_names = set(loc_names) # delete duplicates
419
for loc in loc_names:
420
print(loc)
421