-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy pathcheck.py
88 lines (68 loc) · 3.03 KB
/
check.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
import asyncio
import logging
import json
from pathlib import Path
from sys import argv
import httpx
logging.basicConfig()
semaphore = asyncio.Semaphore(50)
async def check_vulnerable(client: httpx.AsyncClient, url: str):
async with semaphore:
retries = 3
while retries > 0:
try:
scheme = "http"
url = f"{scheme}://{url}"
resp = await client.get(url)
if resp.status_code == 400:
try:
scheme = "https"
url = f"{scheme}://{url}"
resp = await client.get(url)
except (httpx.ConnectTimeout, httpx.PoolTimeout, httpx.ReadTimeout, httpx.RemoteProtocolError, httpx.ConnectError):
return False
if resp.status_code == 401:
return False
if resp.status_code == 400:
logging.error(f"{url}: 400. Maybe is it an HTTPS host? {resp.text}")
return False
print(f"[+] Found an open orthanc at address {url}")
resp = await client.get(f"{url}/instances")
if resp.status_code != 200:
logging.error(f"{url}: {resp.status_code} while contacting the API: {resp.text}")
return False
try:
res = resp.json()
except json.decoder.JSONDecodeError:
return False
if not len(res):
logging.error(f"{url}: no instances. Not uploading new DICOMs here (yet).")
return False
first = res[0]
resp = await client.post(f"{url}/instances/{first}/export")
if resp.status_code == 403:
print(f"[-] {url}: PATCHED")
return False
if resp.status_code != 500:
logging.warning(f"{url}: {resp}: {resp.text}?")
print(f"[+] {url}: MAY BE VULNERABLE!")
return True
except (httpx.ConnectTimeout, httpx.PoolTimeout, httpx.ReadTimeout, httpx.RemoteProtocolError, httpx.ConnectError, httpx.ReadError):
retries -= 1
continue
async def main():
if len(argv) != 2:
print("Usage: python3 check.py <hosts.csv> (format: host,port))")
return
hostports = [l.strip() for l in Path(argv[1]).read_text().splitlines()]
print(f"Loaded {len(hostports)} hosts.")
headers = {"Authorization": "Basic b3J0aGFuYzpvcnRoYW5j"}
async with httpx.AsyncClient(follow_redirects=True, headers=headers, verify=False, timeout=15) as client:
tasks = []
for hostport in hostports:
host, port = hostport.split(',')
url = f"{host}:{port}"
tasks.append(asyncio.ensure_future(check_vulnerable(client, url)))
scan_result = await asyncio.gather(*tasks)
print(f"Number of vulnerable hosts: {scan_result.count(True)}")
asyncio.run(main())