Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
mxrch
GitHub Repository: mxrch/GHunt
Path: blob/master/ghunt/modules/spiderdal.py
252 views
1
import asyncio
2
from dataclasses import dataclass
3
from pathlib import Path
4
5
from ghunt import globals as gb
6
from ghunt.objects.base import GHuntCreds
7
from ghunt.objects.utils import TMPrinter
8
from ghunt.helpers.utils import get_httpx_client
9
from ghunt.apis.digitalassetslinks import DigitalAssetsLinksHttp
10
from ghunt.helpers.playstore import app_exists
11
12
import httpx
13
14
15
@dataclass
16
class Asset:
17
site: str
18
package_name: str
19
certificate: str
20
21
async def identify_public_pkgs(as_client: httpx.AsyncClient, pkg_name: str, pkgs: dict[str, str], limiter: asyncio.Semaphore):
22
async with limiter:
23
if await app_exists(as_client, pkg_name):
24
pkgs[pkg_name] = "public"
25
else:
26
pkgs[pkg_name] = "private"
27
28
async def analyze_single(as_client: httpx.AsyncClient, dal: DigitalAssetsLinksHttp, current_target: Asset, sites: dict[str, dict], pkgs: dict[str, dict], visited: set, limiter: asyncio.Semaphore):
29
short_pkg_name = f"{current_target.package_name}${current_target.certificate}"
30
31
async with limiter:
32
if current_target.site:
33
_, res = await dal.list_statements(as_client, website=current_target.site)
34
elif current_target.package_name:
35
_, res = await dal.list_statements(as_client, android_package_name=current_target.package_name, android_cert_fingerprint=current_target.certificate)
36
37
for item in res.statements:
38
if item.target.web.site:
39
clean_site = item.target.web.site.strip('.')
40
if clean_site not in sites:
41
sites[clean_site] = {
42
"asset": Asset(site=clean_site, package_name=None, certificate=None),
43
"first_origin": current_target,
44
"origins": set(),
45
}
46
sites[clean_site]["origins"].add(current_target.site if current_target.site else short_pkg_name)
47
48
if item.target.android_app.package_name:
49
temp_name = f"{item.target.android_app.package_name}${item.target.android_app.certificate.sha_fingerprint}"
50
51
if temp_name not in pkgs:
52
pkgs[temp_name] = {
53
"asset": Asset(site=None, package_name=item.target.android_app.package_name, certificate=item.target.android_app.certificate.sha_fingerprint),
54
"first_origin": current_target,
55
"origins": set(),
56
}
57
pkgs[temp_name]["origins"].add(current_target.site if current_target.site else short_pkg_name)
58
59
if current_target.site:
60
visited.add(current_target.site)
61
if res.statements and current_target.site not in sites:
62
sites[current_target.site] = {
63
"asset": current_target,
64
"first_origin": None,
65
"origins": set(),
66
}
67
if current_target.package_name:
68
visited.add(short_pkg_name)
69
if res.statements and short_pkg_name not in pkgs:
70
pkgs[short_pkg_name] = {
71
"asset": current_target,
72
"first_origin": None,
73
"origins": set(),
74
}
75
76
async def main(url: str, package: str, fingerprint: str, strict: bool, json_file: Path):
77
ghunt_creds = GHuntCreds()
78
ghunt_creds.load_creds()
79
80
as_client = get_httpx_client()
81
digitalassetslink = DigitalAssetsLinksHttp(ghunt_creds)
82
83
tmprinter = TMPrinter()
84
85
sites: dict = {}
86
pkgs: dict = {}
87
visited = set()
88
89
limiter = asyncio.Semaphore(10)
90
91
current_targets: list[Asset] = []
92
93
if url:
94
http = False
95
if url.startswith("http"):
96
http = True
97
98
if url.startswith(("http://", "https://")):
99
domain = url.split("//")[1]
100
else:
101
domain = url
102
103
temp_targets = []
104
temp_targets.append(f"https://{domain}")
105
if http:
106
temp_targets.append(f"http://{domain}")
107
if not strict:
108
temp_targets.append(f"https://www.{domain}")
109
if http:
110
temp_targets.append(f"http://www.{domain}")
111
112
for target in temp_targets:
113
current_targets.append(Asset(site=target, package_name=None, certificate=None))
114
115
if package and fingerprint:
116
current_targets.append(Asset(site=None, package_name=package, certificate=fingerprint))
117
118
round = 0
119
total_scanned = 0
120
print()
121
while current_targets:
122
round += 1
123
total_scanned += len(current_targets)
124
125
tmprinter.out(f"šŸ•·ļø [R{round}]: Investigating {len(current_targets)} targets...", style="bold magenta")
126
127
await asyncio.gather(
128
*[
129
analyze_single(as_client, digitalassetslink, target, sites, pkgs, visited, limiter)
130
for target in current_targets
131
]
132
)
133
134
# Next candidates
135
next_sites = [site["asset"] for name,site in sites.items() if not name in visited]
136
next_pkgs = [pkg["asset"] for name,pkg in pkgs.items() if not name in visited]
137
current_targets = next_sites + next_pkgs
138
139
tmprinter.clear()
140
gb.rc.print(f"šŸ•·ļø [R{round}]: Investigation done ! {total_scanned} assets scanned.", style="bold magenta")
141
142
# Sort
143
pkgs_names = {x:None for x in set([x["asset"].package_name for x in pkgs.values()])}
144
await asyncio.gather(
145
*[
146
identify_public_pkgs(as_client, pkg_name, pkgs_names, limiter)
147
for pkg_name in pkgs_names
148
]
149
)
150
151
# Print results
152
if sites:
153
gb.rc.print(f"\n🌐 {len(sites)} site{'s' if len(sites) > 1 else ''} found !", style="white")
154
for site_url, site in sites.items():
155
if site["first_origin"]:
156
if site["first_origin"].site:
157
gb.rc.print(f"- [deep_sky_blue1][link={site_url}]{site_url}[/link][/deep_sky_blue1] [steel_blue italic](leaked by : {site['first_origin'].site})[/steel_blue italic]")
158
else:
159
gb.rc.print(f"- [deep_sky_blue1][link={site_url}]{site_url}[/link][/deep_sky_blue1] [steel_blue italic](leaked by : {site['first_origin'].package_name})[/steel_blue italic]")
160
else:
161
gb.rc.print(f"- [deep_sky_blue1][link={site_url}]{site_url}[/link][/deep_sky_blue1]")
162
else:
163
gb.rc.print("\nNo sites found.", style="italic bright_black")
164
165
if pkgs:
166
gb.rc.print(f"\nšŸ“¦ {len(pkgs_names)} Android package{'s' if len(pkgs) > 1 else ''} found !", style="white")
167
for pkg_name, state in pkgs_names.items():
168
if state == "public":
169
gb.rc.print(f"- šŸŖ {pkg_name}", style="light_steel_blue")
170
else:
171
gb.rc.print(f"- 🄷 {pkg_name}", style="light_steel_blue")
172
gb.rc.print("\tFingerprints (SHA256) :", style="steel_blue")
173
for pkg in pkgs.values():
174
fingerprints_cache = set()
175
if pkg["asset"].package_name == pkg_name:
176
if pkg["asset"].certificate not in fingerprints_cache:
177
if pkg["first_origin"].site:
178
gb.rc.print(f"\t\t- {pkg['asset'].certificate} (leaked by : {pkg['first_origin'].site})", style="steel_blue italic", emoji=False)
179
else:
180
gb.rc.print(f"\t\t- {pkg['asset'].certificate} (leaked by : {pkg['first_origin'].package_name})", style="steel_blue italic", emoji=False)
181
fingerprints_cache.add(pkg["asset"].certificate)
182
else:
183
gb.rc.print("\nNo packages found.", style="bright_black italic")
184
185
if json_file:
186
import json
187
from ghunt.objects.encoders import GHuntEncoder;
188
with open(json_file, "w", encoding="utf-8") as f:
189
f.write(json.dumps({
190
"sites": sites,
191
"packages": pkgs
192
}, cls=GHuntEncoder, indent=4))
193
gb.rc.print(f"\n[+] JSON output wrote to {json_file} !\n", style="italic")
194
else:
195
print()
196