-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy paths7_client.py
290 lines (240 loc) · 9.71 KB
/
s7_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
import snap7
from snap7.util import get_real, get_int, get_bool, get_dword
from snap7.type import Area
import re
class PLCMemoryMapper:
def __init__(self, client):
self.client = client
self.memory_types = {
'REAL': (get_real, 4),
'INT': (get_int, 2),
'BOOL': (get_bool, 1),
'DWORD': (get_dword, 4)
}
self.data_area_mapping = {
'BOOL': 'DBX',
'BYTE': 'DBB',
'WORD': 'DBW',
'INT': 'DBW',
'REAL': 'DBD',
'DWORD': 'DBD'
}
def parse_address(self, address_str):
"""
Parse address string like %DB42:DBD0:REAL or %DB42:DBW2:INT
Returns:
- DB Number
- Offset
- Data Type
"""
pattern = r'%DB(\d+):DB([BDWX])(\d+):(\w+)'
match = re.match(pattern, address_str)
if not match:
raise ValueError(f"Invalid address format: {address_str}")
db_num = int(match.group(1))
data_area = match.group(2)
offset = int(match.group(3))
data_type = match.group(4)
if data_area == 'D': # Double word
offset *= 4
elif data_area == 'W': # Word
offset *= 2
elif data_area == 'X': # Bit
pass
elif data_area == 'B': # Byte
pass
return db_num, offset, data_type
def read_address(self, address_str):
"""Read value from a specific PLC memory address and determine the correct data type."""
db_num, offset, data_type = self.parse_address(address_str)
if data_type not in self.memory_types:
raise ValueError(f"Unsupported data type: {data_type}")
read_func, size = self.memory_types[data_type]
try:
data = self.client.read_area(Area.DB, db_num, offset, size)
value = read_func(data, 0)
# Skip empty or default values
if value is None or value == 0:
return None
return {
'address': address_str,
'value': value,
'db': db_num,
'offset': offset,
'type': data_type
}
except Exception as e:
print(f"Error reading address {address_str}: {e}")
return None
def discover_rack_and_slot(ip, max_rack=0, max_slot=3, port=102):
"""
Discover valid rack and slot combinations for a PLC.
Parameters:
- ip: PLC IP address
- max_rack: Maximum rack number to test (default is 0 for single-rack systems)
- max_slot: Maximum slot number to test (default is 3)
- port: Port number (default is 102 for S7 communication)
Returns:
- A list of valid (rack, slot) combinations
"""
client = snap7.client.Client()
valid_combinations = []
for rack in range(max_rack + 1):
for slot in range(max_slot + 1):
try:
client.connect(ip, rack, slot, port)
if client.get_connected():
print(f"Valid connection found at Rack {rack}, Slot {slot}")
valid_combinations.append((rack, slot))
except Exception as e:
print(f"Failed to connect at Rack {rack}, Slot {slot}: {e}")
continue
finally:
client.disconnect()
return valid_combinations
def discover_plc_memory(ip, rack, slot, max_db=100, max_offset=100, step=4, port=102):
"""
Discover accessible memory areas in a PLC.
Parameters:
- ip: PLC IP address
- rack: Rack number
- slot: Slot number
- max_db: Maximum DB number to scan
- max_offset: Maximum offset within each DB
- step: Step size for offsets (default is 4 bytes)
- port: Port number (default is 102 for S7 communication)
Returns:
- A list of discovered addresses with accessible memory
"""
client = snap7.client.Client()
discovered = []
try:
client.connect(ip, rack, slot, port)
if client.get_connected():
print(f"Connected to PLC at {ip}, Rack {rack}, Slot {slot}")
mapper = PLCMemoryMapper(client)
for db_num in range(1, max_db + 1):
for offset in range(0, max_offset + 1, step):
for data_type in mapper.memory_types.keys():
data_area_prefix = mapper.data_area_mapping.get(data_type)
if not data_area_prefix:
continue # Skip unsupported data types
address_str = f"%DB{db_num}:{data_area_prefix}{offset}:{data_type}"
try:
result = mapper.read_address(address_str)
if result:
discovered.append(result)
print(f"Discovered: {result['address']} Value: {result['value']}")
except ValueError as ve:
print(f"ValueError for address {address_str}: {ve}")
except Exception as e:
print(f"Error reading address {address_str}: {e}")
continue
else:
print(f"Failed to connect to PLC at {ip}, Rack {rack}, Slot {slot}")
except Exception as e:
print(f"Connection error: {e}")
finally:
client.disconnect()
return discovered
def format_opcua_style_output(discovered_memory):
"""
Format discovered memory map into OPC UA-style output.
Parameters:
- discovered_memory: List of memory entries with DB, offset, type, and value.
Returns:
- Formatted string of OPC UA-style memory addresses.
"""
bit_addresses = []
byte_addresses = []
for entry in discovered_memory:
db = entry['db']
offset = entry['offset']
data_type = entry['type']
value = entry['value']
if value is None or value == 0:
continue
# OPC UA-style memory formatting
if data_type == 'BOOL':
bit_offset = offset % 8
start_byte = offset // 8
bit_addresses.append(
f"DB{db}B{start_byte}.{bit_offset}:BOOL Value: {value}"
)
else:
byte_addresses.append(
f"DB{db}B{offset}:{data_type} Value: {value}"
)
return "\n".join(bit_addresses + byte_addresses)
def discover_and_format_plc_memory(ip, rack, slot, max_db=100, max_offset=100, step=4, port=102):
"""
Discover accessible memory areas in a PLC and return them as a variable dictionary.
"""
client = snap7.client.Client()
memory_map = {}
try:
client.connect(ip, rack, slot, port)
if client.get_connected():
print(f"Connected to PLC at {ip}, Rack {rack}, Slot {slot}")
mapper = PLCMemoryMapper(client)
for db_num in range(1, max_db + 1):
db_key = f"DB{db_num}"
memory_map[db_key] = {
"NamespaceIndex": 0,
"QualifiedName": f"i={2253 + db_num}",
"NodeId": db_key,
"children": {}
}
for offset in range(0, max_offset + 1, step):
for data_type in mapper.memory_types.keys():
data_area_prefix = mapper.data_area_mapping.get(data_type)
if not data_area_prefix:
continue
address_str = f"%DB{db_num}:{data_area_prefix}{offset}:{data_type}"
try:
result = mapper.read_address(address_str)
if result:
memory_key = result['address']
memory_map[db_key]['children'][memory_key] = {
"NamespaceIndex": 0,
"QualifiedName": f"Sensor{11492 + db_num * 100 + offset * 10}_{result['type']}",
"NodeId": memory_key,
"Value": result['value'],
"Type": result['type']
}
print(f"{memory_key} Value: {result['value']} Type: {result['type']}")
except ValueError as ve:
print(f"ValueError for address {address_str}: {ve}")
except Exception as e:
print(f"Error reading address {address_str}: {e}")
continue
if not memory_map[db_key]['children']:
del memory_map[db_key]
else:
print(f"Failed to connect to PLC at {ip}, Rack {rack}, Slot {slot}")
except Exception as e:
print(f"Connection error: {e}")
finally:
client.disconnect()
return memory_map
def run(ip):
print("Discovering rack and slot...")
valid_rack_slots = discover_rack_and_slot(ip)
memory_map = {}
if not valid_rack_slots:
print("No valid rack and slot combinations found.")
else:
print("Discovered valid rack and slot combinations:")
for rack, slot in valid_rack_slots:
print(f"Rack: {rack}, Slot: {slot}")
# Scan memory and return the variable dictionary for each valid combination
for rack, slot in valid_rack_slots:
print(f"\nScanning memory for Rack {rack}, Slot {slot}...\n")
memory_map.update(discover_and_format_plc_memory(ip, rack, slot))
return memory_map
if __name__ == "__main__":
ip = "10.1.10.210"
memorymap = run(ip)
print("\nFinal Memory Map:")
for key, value in memorymap.items():
print(f"{key} => {value}")