-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathReadWriteMemory.py
368 lines (315 loc) · 15.6 KB
/
ReadWriteMemory.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
from typing import Any, List
import os.path
import ctypes
import ctypes.wintypes
# Process Permissions
PROCESS_QUERY_INFORMATION = 0x0400
PROCESS_VM_OPERATION = 0x0008
PROCESS_VM_READ = 0x0010
PROCESS_VM_WRITE = 0x0020
PROCESS_ALL_ACCESS = 0x1f0fff
MAX_PATH = 260
class ReadWriteMemoryError(Exception):
pass
class Process(object):
"""
The Process class holds the information about the requested process.
"""
def __init__(self, name: str = '', pid: int = -1, handle: int = -1, error_code: str = None):
"""
:param name: The name of the executable file for the specified process.
:param pid: The process ID.
:param handle: The process handle.
:param error_code: The error code from a process failure.
"""
self.name = name
self.pid = pid
self.handle = handle
self.error_code = error_code
def __repr__(self) -> str:
return f'{self.__class__.__name__}: "{self.name}"'
def open(self):
"""
Open the process with the Query, Operation, Read and Write permissions and return the process handle.
:return: True if the handle exists if not return False
"""
dw_desired_access = (PROCESS_QUERY_INFORMATION | PROCESS_VM_OPERATION | PROCESS_VM_READ | PROCESS_VM_WRITE)
b_inherit_handle = True
self.handle = ctypes.windll.kernel32.OpenProcess(dw_desired_access, b_inherit_handle, self.pid)
if not self.handle:
raise ReadWriteMemoryError(f'Unable to open process <{self.name}>')
def close(self) -> int:
"""
Closes the handle of the process.
:return: The last error code from the result after an attempt to close the handle.
"""
ctypes.windll.kernel32.CloseHandle(self.handle)
return self.get_last_error()
def get_all_access_handle(self):
"""
Gets full access handle of the process.
:return: handle of the process
"""
b_inherit_handle = True
self.handle = ctypes.windll.kernel32.OpenProcess(PROCESS_ALL_ACCESS, b_inherit_handle, self.pid)
@staticmethod
def get_last_error() -> int:
"""
Get the last error code.
:return: The last error code.
"""
return ctypes.windll.kernel32.GetLastError()
def get_pointer(self, lp_base_address: hex, offsets: List[hex] = ()) -> int:
"""
Get the pointer of a given address.
:param lp_base_address: The address from where you want to get the pointer.
:param offsets: a list of offets.
:return: The pointer of a give address.
"""
temp_address = self.read(lp_base_address)
pointer = 0x0
if not offsets:
return lp_base_address
else:
for offset in offsets:
pointer = int(str(temp_address), 0) + int(str(offset), 0)
temp_address = self.read(pointer)
return pointer
def get_modules(self) -> List[int]:
"""
Get the process's modules.
:return: A list of the process's modules adresses in decimal.
:return: An empty list if the process is not open.
"""
modules = (ctypes.wintypes.HMODULE * MAX_PATH)()
ctypes.windll.psapi.EnumProcessModules(self.handle, modules, ctypes.sizeof(modules), None)
return [x for x in tuple(modules) if x != None]
def thread(self, address: int):
"""
Create a remote thread to the address.
If you don't know what you're doing, the process can crash.
"""
ctypes.windll.kernel32.CreateRemoteThread(self.handle, 0, 0, address, 0, 0, 0)
self.close() #the thread stays in the process
self.open() #just for better code understanding
def read(self, lp_base_address: int) -> Any:
"""
Read data from the process's memory.
:param lp_base_address: The process's pointer
:return: The data from the process's memory if succeed if not raises an exception.
"""
try:
read_buffer = ctypes.c_uint()
lp_buffer = ctypes.byref(read_buffer)
n_size = ctypes.sizeof(read_buffer)
lp_number_of_bytes_read = ctypes.c_ulong(0)
ctypes.windll.kernel32.ReadProcessMemory(self.handle, ctypes.c_void_p(lp_base_address), lp_buffer,
n_size, lp_number_of_bytes_read)
return read_buffer.value
except (BufferError, ValueError, TypeError) as error:
if self.handle:
self.close()
self.error_code = self.get_last_error()
error = {'msg': str(error), 'Handle': self.handle, 'PID': self.pid,
'Name': self.name, 'ErrorCode': self.error_code}
ReadWriteMemoryError(error)
def readString(self, lp_base_address: int, length: int) -> Any:
"""
Read data from the process's memory.
:param lp_base_address: The process's pointer
:param length: The length of string
:return: The data from the process's memory if succeed if not raises an exception.
"""
try:
read_buffer = ctypes.create_string_buffer(length)
lp_number_of_bytes_read = ctypes.c_ulong(0)
ctypes.windll.kernel32.ReadProcessMemory(self.handle, lp_base_address, read_buffer, length, lp_number_of_bytes_read)
bufferArray = bytearray(read_buffer)
found_terminator = bufferArray.find(b'\x00')
if found_terminator != -1:
return bufferArray[:found_terminator].decode('utf-8')
print("[ReadMemory/Error]: terminator not found.\naddress: %s" % hex(lp_base_address))
return ""
except (BufferError, ValueError, TypeError) as error:
if self.handle:
self.close()
self.error_code = self.get_last_error()
error = {'msg': str(error), 'Handle': self.handle, 'PID': self.pid,
'Name': self.name, 'ErrorCode': self.error_code}
ReadWriteMemoryError(error)
def readByte(self, lp_base_address: int, length: int = 1) -> bytes:
"""
Read data from the process's memory.
:param lp_base_address: The process's pointer {don't use offsets}
:param length: The length of the bytes to read
:return: The data from the process's memory if succeed if not raises an exception.
"""
try:
read_buffer = ctypes.c_ubyte()
lp_buffer = ctypes.byref(read_buffer)
n_size = ctypes.sizeof(read_buffer)
lp_number_of_bytes_read = ctypes.c_ulong(0)
hex_list = [read_buffer.value.to_bytes(1, 'big').hex() for x in range(length) if ctypes.windll.kernel32.ReadProcessMemory(self.handle, ctypes.c_void_p(lp_base_address + x), lp_buffer, n_size, lp_number_of_bytes_read)]
return bytes.fromhex("".join(hex_list))
except (BufferError, ValueError, TypeError) as error:
if self.handle:
self.close()
self.error_code = self.get_last_error()
error = {'msg': str(error), 'Handle': self.handle, 'PID': self.pid,
'Name': self.name, 'ErrorCode': self.error_code}
ReadWriteMemoryError(error)
def readU64(self, lp_base_address: int) -> int:
length = 8 # 8 bytes in u64
try:
read_buffer = ctypes.c_ubyte()
lp_buffer = ctypes.byref(read_buffer)
n_size = ctypes.sizeof(read_buffer)
lp_number_of_bytes_read = ctypes.c_ulong(0)
byte_arr = [read_buffer.value.to_bytes(1, 'big') for x in range(length) if ctypes.windll.kernel32.ReadProcessMemory(self.handle, ctypes.c_void_p(lp_base_address + x), lp_buffer, n_size, lp_number_of_bytes_read)]
# byte_seq = join byte_arr and then convert to u64?
# int.from_bytes(y.to_bytes(1, 'big'), 'big')
except (BufferError, ValueError, TypeError) as error:
if self.handle:
self.close()
self.error_code = self.get_last_error()
error = {'msg': str(error), 'Handle': self.handle, 'PID': self.pid,
'Name': self.name, 'ErrorCode': self.error_code}
ReadWriteMemoryError(error)
def write(self, lp_base_address: int, value: int) -> bool:
"""
Write data to the process's memory.
:param lp_base_address: The process' pointer.
:param value: The data to be written to the process's memory
:return: It returns True if succeed if not it raises an exception.
"""
try:
write_buffer = ctypes.c_uint(value)
lp_buffer = ctypes.byref(write_buffer)
n_size = ctypes.sizeof(write_buffer)
lp_number_of_bytes_written = ctypes.c_ulong(0)
ctypes.windll.kernel32.WriteProcessMemory(self.handle, ctypes.c_void_p(lp_base_address), lp_buffer,
n_size, lp_number_of_bytes_written)
return True
except (BufferError, ValueError, TypeError) as error:
if self.handle:
self.close()
self.error_code = self.get_last_error()
error = {'msg': str(error), 'Handle': self.handle, 'PID': self.pid,
'Name': self.name, 'ErrorCode': self.error_code}
ReadWriteMemoryError(error)
def writeString(self, lp_base_address: int, string: str) -> bool:
"""
Write data to the process's memory.
:param lp_base_address: The process' pointer.
:param string: The string to be written to the process's memory
:return: It returns True if succeed if not it raises an exception.
"""
try:
write_buffer = ctypes.create_string_buffer(string.encode())
lp_buffer = ctypes.byref(write_buffer)
n_size = ctypes.sizeof(write_buffer)
lp_number_of_bytes_written = ctypes.c_size_t()
ctypes.windll.kernel32.WriteProcessMemory(self.handle, lp_base_address, lp_buffer,
n_size, lp_number_of_bytes_written)
return True
except (BufferError, ValueError, TypeError) as error:
if self.handle:
self.close()
self.error_code = self.get_last_error()
error = {'msg': str(error), 'Handle': self.handle, 'PID': self.pid,
'Name': self.name, 'ErrorCode': self.error_code}
ReadWriteMemoryError(error)
def writeByte(self, lp_base_address: int, bytes: List[hex]) -> bool:
"""
Write data to the process's memory.
:param lp_base_address: The process' pointer {don't use offsets}.
:param bytes: The byte(s) to be written to the process's memory
:return: It returns True if succeed if not it raises an exception.
"""
try:
for x in range(len(bytes)):
write_buffer = ctypes.c_ubyte(bytes[x])
lp_buffer = ctypes.byref(write_buffer)
n_size = ctypes.sizeof(write_buffer)
lp_number_of_bytes_written = ctypes.c_ulong(0)
ctypes.windll.kernel32.WriteProcessMemory(self.handle, ctypes.c_void_p(lp_base_address + x), lp_buffer,
n_size, lp_number_of_bytes_written)
return True
except (BufferError, ValueError, TypeError) as error:
if self.handle:
self.close()
self.error_code = self.get_last_error()
error = {'msg': str(error), 'Handle': self.handle, 'PID': self.pid,
'Name': self.name, 'ErrorCode': self.error_code}
ReadWriteMemoryError(error)
class ReadWriteMemory:
"""
The ReadWriteMemory Class is used to read and write to the memory of a running process.
"""
def __init__(self):
self.process = Process()
@staticmethod
def set_privileges():
import win32con
import win32api
import win32security
import ntsecuritycon
from ntsecuritycon import TokenPrivileges
remote_server = None
token = win32security.OpenProcessToken(win32api.GetCurrentProcess(), win32con.TOKEN_ADJUST_PRIVILEGES | win32con.TOKEN_QUERY)
win32security.AdjustTokenPrivileges(token, False, ((p[0], 2) if p[0] == win32security.LookupPrivilegeValue(remote_server, "SeBackupPrivilege") or p[0] == win32security.LookupPrivilegeValue(remote_server, "SeDebugPrivilege") or p[0] == win32security.LookupPrivilegeValue(remote_server, "SeSecurityPrivilege") else (p[0], p[1]) for p in win32security.GetTokenInformation(token, TokenPrivileges)))
def get_process_by_name(self, process_name: str) -> "Process":
"""
:description: Get the process by the process executabe\'s name and return a Process object.
:param process_name: The name of the executable file for the specified process for example, my_program.exe.
:return: A Process object containing the information from the requested Process.
"""
if not process_name.endswith('.exe'):
process_name = process_name + '.exe'
process_ids = self.enumerate_processes()
for process_id in process_ids:
self.process.handle = ctypes.windll.kernel32.OpenProcess(PROCESS_QUERY_INFORMATION, False, process_id)
if self.process.handle:
image_file_name = (ctypes.c_char * MAX_PATH)()
if ctypes.windll.psapi.GetProcessImageFileNameA(self.process.handle, image_file_name, MAX_PATH) > 0:
filename = os.path.basename(image_file_name.value)
if filename.decode('utf-8') == process_name:
self.process.pid = process_id
self.process.name = process_name
return self.process
self.process.close()
raise ReadWriteMemoryError(f'Process "{self.process.name}" not found!')
def get_process_by_id(self, process_id: int) -> "Process":
"""
:description: Get the process by the process ID and return a Process object.
:param process_id: The process ID.
:return: A Process object containing the information from the requested Process.
"""
self.process.handle = ctypes.windll.kernel32.OpenProcess(PROCESS_QUERY_INFORMATION, False, process_id)
if self.process.handle:
image_file_name = (ctypes.c_char * MAX_PATH)()
if ctypes.windll.psapi.GetProcessImageFileNameA(self.process.handle, image_file_name, MAX_PATH) > 0:
filename = os.path.basename(image_file_name.value)
self.process.pid = process_id
self.process.name = filename.decode('utf-8')
self.process.close()
return self.process
else:
raise ReadWriteMemoryError(f'Unable to get the executable\'s name for PID={self.process.pid}!')
raise ReadWriteMemoryError(f'Process "{self.process.pid}" not found!')
@staticmethod
def enumerate_processes() -> list:
"""
Get the list of running processes ID's from the current system.
:return: A list of processes ID's
"""
count = 32
while True:
process_ids = (ctypes.wintypes.DWORD * count)()
cb = ctypes.sizeof(process_ids)
bytes_returned = ctypes.wintypes.DWORD()
if ctypes.windll.Psapi.EnumProcesses(ctypes.byref(process_ids), cb, ctypes.byref(bytes_returned)):
if bytes_returned.value < cb:
return list(set(process_ids))
else:
count *= 2