Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mxrch
GitHub Repository: mxrch/GHunt
Path: blob/master/ghunt/helpers/utils.py
252 views
1
from pathlib import Path
2
from PIL import Image
3
import hashlib
4
from typing import *
5
from time import time
6
from datetime import datetime, timezone
7
from dateutil.parser import isoparse
8
from copy import deepcopy
9
import jsonpickle
10
import json
11
from packaging.version import parse as parse_version
12
13
import httpx
14
import imagehash
15
from io import BytesIO
16
17
from ghunt import globals as gb
18
from ghunt import version as current_version
19
from ghunt.lib.httpx import AsyncClient
20
21
22
def get_httpx_client() -> httpx.AsyncClient:
23
"""
24
Returns a customized to better support the needs of GHunt CLI users.
25
"""
26
return AsyncClient(http2=True, timeout=15)
27
# return AsyncClient(http2=True, timeout=15, proxies="http://127.0.0.1:8282", verify=False)
28
29
def oprint(obj: any) -> str:
30
serialized = jsonpickle.encode(obj)
31
pretty_output = json.dumps(json.loads(serialized), indent=2)
32
print(pretty_output)
33
34
def chunkify(lst, n):
35
"""
36
Cut a given list to chunks of n items.
37
"""
38
k, m = divmod(len(lst), n)
39
for i in range(n):
40
yield lst[i*k+min(i, m):(i+1)*k+min(i+1, m)]
41
42
def within_docker() -> bool:
43
return Path('/.dockerenv').is_file()
44
45
def gen_sapisidhash(sapisid: str, origin: str, timestamp: str = str(int(time()))) -> str:
46
return f"{timestamp}_{hashlib.sha1(' '.join([timestamp, sapisid, origin]).encode()).hexdigest()}"
47
48
def inject_osid(cookies: Dict[str, str], osids: Dict[str, str], service: str) -> Dict[str, str]:
49
cookies_with_osid = deepcopy(cookies)
50
cookies_with_osid["OSID"] = osids[service]
51
return cookies_with_osid
52
53
def is_headers_syntax_good(headers: Dict[str, str]) -> bool:
54
try:
55
httpx.Headers(headers)
56
return True
57
except:
58
return False
59
60
async def get_url_image_flathash(as_client: httpx.AsyncClient, image_url: str) -> str:
61
req = await as_client.get(image_url)
62
img = Image.open(BytesIO(req.content))
63
flathash = imagehash.average_hash(img)
64
return str(flathash)
65
66
async def is_default_profile_pic(as_client: httpx.AsyncClient, image_url: str) -> Tuple[bool, str]:
67
"""
68
Returns a boolean which indicates if the image_url
69
is a default profile picture, and the flathash of
70
the image.
71
"""
72
flathash = await get_url_image_flathash(as_client, image_url)
73
if imagehash.hex_to_flathash(flathash, 8) - imagehash.hex_to_flathash("000018183c3c0000", 8) < 10 :
74
return True, str(flathash)
75
return False, str(flathash)
76
77
def get_class_name(obj) -> str:
78
return str(obj).strip("<>").split(" ")[0]
79
80
def get_datetime_utc(date_str):
81
"""Converts ISO to datetime object in UTC"""
82
date = isoparse(date_str)
83
margin = date.utcoffset()
84
return date.replace(tzinfo=timezone.utc) - margin
85
86
def ppnb(nb: float|int) -> float:
87
"""
88
Pretty print float number
89
Ex: 3.9 -> 3.9
90
4.0 -> 4
91
4.1 -> 4.1
92
"""
93
try:
94
return int(nb) if nb % int(nb) == 0.0 else nb
95
except ZeroDivisionError:
96
if nb == 0.0:
97
return 0
98
else:
99
return nb
100
101
def parse_oauth_flow_response(body: str):
102
"""
103
Correctly format the response sent by android.googleapis.com
104
during the Android OAuth2 Login Flow.
105
"""
106
return {sp[0]:'='.join(sp[1:]) for x in body.split("\n") if (sp := x.split("="))}
107
108
def humanize_list(array: List[any]):
109
"""
110
Transforms a list to a human sentence.
111
Ex : ["reader", "writer", "owner"] -> "reader, writer and owner".
112
"""
113
if len(array) <= 1:
114
return ''.join(array)
115
116
final = ""
117
for nb, item in enumerate(array):
118
if nb == 0:
119
final += f"{item}"
120
elif nb+1 < len(array):
121
final += f", {item}"
122
else:
123
final += f" and {item}"
124
return final
125
126
def unicode_patch(txt: str):
127
bad_chars = {
128
"é": "e",
129
"è": "e",
130
"ç": "c",
131
"à": "a"
132
}
133
return txt.replace(''.join([*bad_chars.keys()]), ''.join([*bad_chars.values()]))
134
135
def show_version():
136
new_version, new_metadata = check_new_version()
137
print()
138
gb.rc.print(f"> GHunt {current_version.metadata.get('version', '')} ({current_version.metadata.get('name', '')}) <".center(53), style="bold")
139
print()
140
if new_version:
141
gb.rc.print(f"🥳 New version {new_metadata.get('version', '')} ({new_metadata.get('name', '')}) is available !", style="bold red")
142
gb.rc.print(f"🤗 Run 'pipx upgrade ghunt' to update.", style="bold light_pink3")
143
else:
144
gb.rc.print("🎉 You are up to date !", style="light_pink3")
145
146
147
def check_new_version() -> tuple[bool, dict[str, str]]:
148
"""
149
Checks if there is a new version of GHunt available.
150
"""
151
req = httpx.get("https://raw.githubusercontent.com/mxrch/GHunt/master/ghunt/version.py")
152
if req.status_code != 200:
153
return False, {}
154
155
raw = req.text.strip().removeprefix("metadata = ")
156
data = json.loads(raw)
157
new_version = data.get("version", "")
158
new_name = data.get("name", "")
159
160
if parse_version(new_version) > parse_version(current_version.metadata.get("version", "")):
161
return True, {"version": new_version, "name": new_name}
162
return False, {}
163