-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathprotocols.py
183 lines (151 loc) · 6.62 KB
/
protocols.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
#!/usr/bin/env python3
# https://github.com/HectorTa1989/HecPy3-Packet-Sniffer
__author__ = 'Hector Ta @ https://github.com/HectorTa1989'
from ctypes import BigEndianStructure, create_string_buffer, c_ubyte, c_uint8, \
c_uint16, c_uint32, sizeof
from socket import inet_ntop, AF_INET, AF_INET6
class Protocol(BigEndianStructure):
_pack_ = 1
def __new__(cls, packet):
return cls.from_buffer_copy(packet)
def __init__(self, *args):
super().__init__()
self.encapsulated_proto = None
def __str__(self):
return create_string_buffer(sizeof(self))[:]
@staticmethod
def addr_array_to_hdwr(address: str) -> str:
"""
Converts a c_ubyte array of 6 bytes to IEEE 802 MAC address.
Ex: From b'\xceP\x9a\xcc\x8c\x9d' to 'ce:50:9a:cc:8c:9d'
"""
return ':'.join(format(octet, '02x') for octet in bytes(address))
@staticmethod
def hex_format(value: int, str_length: int) -> str:
"""
Fills a hex value with zeroes to the left for compliance with
the presentation of codes used in Internet protocols.
Ex: From '0x800' to '0x0800'
"""
return format(value, '#0{}x'.format(str_length))
class Ethernet(Protocol): # IEEE 802.3 standard
_fields_ = [
('dst', c_ubyte * 6), # Destination hardware address
('src', c_ubyte * 6), # Source hardware address
('eth', c_uint16) # Ethertype
]
header_len = 14
ethertypes = {'0x0806': 'ARP', '0x0800': 'IPv4', '0x86dd': 'IPv6'}
def __init__(self, packet: bytes):
super().__init__(packet)
self.dest = self.addr_array_to_hdwr(self.dst)
self.source = self.addr_array_to_hdwr(self.src)
self.ethertype = self.hex_format(self.eth, 6)
# Limit implementation to common protocols
self.encapsulated_proto = self.ethertypes.get(self.ethertype, None)
class IPv4(Protocol): # IETF RFC 791
_fields_ = [
("version", c_uint8, 4), # Protocol version
("ihl", c_uint8, 4), # Internet header length
("dscp", c_uint8, 6), # Differentiated services code point
("ecp", c_uint8, 2), # Explicit congestion notification
("len", c_uint16), # Total packet length
("id", c_uint16), # Identification
("flags", c_uint16, 3), # Fragmentation control flags
("offset", c_uint16, 13), # Fragment offset
("ttl", c_uint8), # Time to live
("proto", c_uint8), # Encapsulated protocol
("chksum", c_uint16), # Header checksum
("src", c_ubyte * 4), # Source address
("dst", c_ubyte * 4) # Destination address
]
header_len = 20
proto_numbers = {1: 'ICMP', 6: 'TCP', 17: 'UDP'}
def __init__(self, packet: bytes):
super().__init__(packet)
self.source = inet_ntop(AF_INET, self.src)
self.dest = inet_ntop(AF_INET, self.dst)
# Limit implementation to common protocols
self.encapsulated_proto = self.proto_numbers.get(self.proto, None)
class IPv6(Protocol): # IETF RFC 2460 / 8200
_fields_ = [
("version", c_uint32, 4), # Protocol version
("tclass", c_uint32, 8), # Traffic class
("flabel", c_uint32, 20), # Flow label
("payload_len", c_uint16), # Payload length
("next_header", c_uint8), # Type of next header
("hop_limit", c_uint8), # Hop limit (replaces IPv4 TTL)
("src", c_ubyte * 16), # Source address
("dst", c_ubyte * 16) # Destination address
]
header_len = 40
def __init__(self, packet: bytes):
super().__init__(packet)
self.source = inet_ntop(AF_INET6, self.src)
self.dest = inet_ntop(AF_INET6, self.dst)
class ARP(Protocol): # IETF RFC 826
_fields_ = [
("htype", c_uint16), # Hardware type
("ptype", c_uint16), # Protocol type
("hlen", c_uint8), # Hardware length
("plen", c_uint8), # Protocol length
("oper", c_uint16), # Operation
("sha", c_ubyte * 6), # Sender hardware address
("spa", c_ubyte * 4), # Sender protocol address
("tha", c_ubyte * 6), # Target hardware address
("tpa", c_ubyte * 4), # Target protocol address
]
header_len = 28
def __init__(self, packet: bytes):
super().__init__(packet)
self.protocol = self.hex_format(self.ptype, 6)
self.source_hdwr = self.addr_array_to_hdwr(self.sha)
self.target_hdwr = self.addr_array_to_hdwr(self.tha)
self.source_proto = inet_ntop(AF_INET, bytes(self.spa))
self.target_proto = inet_ntop(AF_INET, bytes(self.tpa))
class TCP(Protocol): # IETF RFC 793
_fields_ = [
("sport", c_uint16), # Source port
("dport", c_uint16), # Destination port
("seq", c_uint32), # Sequence number
("ack", c_uint32), # Acknowledgement number
("offset", c_uint16, 4), # Data offset
("reserved", c_uint16, 3), # Reserved field
("flags", c_uint16, 9), # TCP flag codes
("window", c_uint16), # Size of the receive window
("chksum", c_uint16), # TCP header checksum
("urg", c_uint16), # Urgent pointer
]
header_len = 32
def __init__(self, packet: bytes):
super().__init__(packet)
self.flag_hex = self.hex_format(self.flags, 5)
self.flag_txt = self.translate_flags()
def translate_flags(self):
f_names = 'NS', 'CWR', 'ECE', 'URG', 'ACK', 'PSH', 'RST', 'SYN', 'FIN'
f_bits = format(self.flags, '09b')
return ' '.join(flag_name for flag_name, flag_bit in
zip(f_names, f_bits) if flag_bit == '1')
class UDP(Protocol): # IETF RFC 768
_fields_ = [
("sport", c_uint16), # Source port
("dport", c_uint16), # Destination port
("len", c_uint16), # Header length
("chksum", c_uint16) # Header checksum
]
header_len = 8
def __init__(self, packet: bytes):
super().__init__(packet)
class ICMP(Protocol): # IETF RFC 792
_fields_ = [
("type", c_uint8), # Control message type
("code", c_uint8), # Control message subtype
("chksum", c_uint16), # Header checksum
("rest", c_ubyte * 4) # Rest of header (contents vary)
]
header_len = 8
icmp_types = {0: 'REPLY', 8: 'REQUEST'}
def __init__(self, packet: bytes):
super().__init__(packet)
# Limit implementation to ICMP ECHO
self.type_txt = self.icmp_types.get(self.type, 'OTHER')