-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathuattributesvalidator.py
447 lines (385 loc) · 16.9 KB
/
uattributesvalidator.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
"""
SPDX-FileCopyrightText: 2024 Contributors to the Eclipse Foundation
See the NOTICE file(s) distributed with this work for additional
information regarding copyright ownership.
This program and the accompanying materials are made available under the
terms of the Apache License Version 2.0 which is available at
http://www.apache.org/licenses/LICENSE-2.0
SPDX-License-Identifier: Apache-2.0
"""
import time
from abc import abstractmethod
from enum import Enum
from uprotocol.uri.validator.urivalidator import UriValidator
from uprotocol.uuid.factory.uuidutils import UUIDUtils
from uprotocol.v1.uattributes_pb2 import (
UAttributes,
UMessageType,
UPriority,
)
from uprotocol.v1.uri_pb2 import UUri
from uprotocol.v1.uuid_pb2 import UUID
from uprotocol.validation.validationresult import ValidationResult
class UAttributesValidator:
"""
UAttributes is the class that defines the Payload. It is the place
for configuring time to live, priority,
security tokens and more.<br><br>
Each UAttributes class defines a different type of message payload.
The payload can represent a simple published
payload with some state change,Payload representing an RPC
request or Payload representing an RPC response.<br><br>
UAttributesValidator is a base class for all UAttribute
validators, that can help validate that the
UAttributes object is correctly defined to define the Payload correctly.
"""
@staticmethod
def get_validator(attribute: UAttributes):
"""
Static factory method for getting a validator according to the
UMessageType defined in the
UAttributes.<br>
@param attribute: UAttributes containing the UMessageType.
@return: returns a UAttributesValidator according to the
UMessageType defined in the
UAttributes.
"""
if attribute.type is None:
return Validators.PUBLISH.validator()
elif attribute.type == UMessageType.UMESSAGE_TYPE_RESPONSE:
return Validators.RESPONSE.validator()
elif attribute.type == UMessageType.UMESSAGE_TYPE_REQUEST:
return Validators.REQUEST.validator()
elif attribute.type == UMessageType.UMESSAGE_TYPE_NOTIFICATION:
return Validators.NOTIFICATION.validator()
else:
return Validators.PUBLISH.validator()
def validate(self, attributes: UAttributes) -> ValidationResult:
"""
Take a UAttributes object and run validations.<br><br>
@param attributes:The UAttriubes to validate.
@return:Returns a ValidationResult that is success or failed
with a message containing all validation errors
for invalid configurations.
"""
error_messages = [
self.validate_type(attributes),
self.validate_ttl(attributes),
self.validate_sink(attributes),
self.validate_priority(attributes),
self.validate_permission_level(attributes),
self.validate_req_id(attributes),
self.validate_id(attributes),
]
error_messages = [status.get_message() for status in error_messages if status.is_failure()]
if error_messages:
return ValidationResult.failure(",".join(error_messages))
else:
return ValidationResult.success()
@staticmethod
def is_expired(u_attributes: UAttributes) -> bool:
"""
Check the time-to-live attribute to see if it has expired. <br>
The message has expired when the current time is greater
than the original UUID time
plus the ttl attribute.
@param u_attributes UAttributes with time to live value.
@return Returns a true if the original time plus the ttl
is less than the current time
"""
ttl = u_attributes.ttl
maybe_time = UUIDUtils.get_time(u_attributes.id)
if maybe_time is None or ttl <= 0:
return False
return (maybe_time + ttl) < int(time.time() * 1000)
@staticmethod
def validate_ttl(attr: UAttributes) -> ValidationResult:
"""
Validate the time to live configuration. If the UAttributes
does not contain a time to live then the
ValidationResult
is ok.<br><br>
@param attr:UAttributes object containing the message time
to live configuration to validate.
@return:Returns a ValidationResult that is success or
failed with a failure message.
"""
return ValidationResult.success()
@abstractmethod
def validate_sink(self, attr: UAttributes) -> ValidationResult:
"""
Validate the sink UriPart.
@param attr:UAttributes object containing the sink to validate.
@return:Returns a ValidationResult that is success or
failed with a failure message.
"""
pass
@staticmethod
def validate_priority(attr: UAttributes):
"""
Validate the priority value to ensure it is one of the known CS values
@param attr Attributes object containing the
Priority to validate.
@return Returns a ValidationResult that is success or
failed with a failure message.
"""
return (
ValidationResult.success()
if attr.priority >= UPriority.UPRIORITY_CS1
else ValidationResult.failure(f"Invalid UPriority [{UPriority.Name(attr.priority)}]")
)
@staticmethod
def validate_permission_level(attr: UAttributes) -> ValidationResult:
"""
Validate the permissionLevel for the default case. If the
UAttributes does not contain a permission level
then the ValidationResult is ok.<br><br>
@param attr:UAttributes object containing the permission
level to validate.
@return:Returns a ValidationResult indicating if the
permissionLevel is valid or not.
"""
if attr.HasField("permission_level") and attr.permission_level <= 0:
return ValidationResult.failure("Invalid Permission Level")
else:
return ValidationResult.success()
@staticmethod
def validate_req_id(attr: UAttributes) -> ValidationResult:
"""
Validate the correlationId for the default case. Only the response message should have a reqid.
@param attr:Attributes object containing the request id to validate.
@return:Returns a ValidationResult that is
success or failed with a failure message.
"""
return (
ValidationResult.failure("Message should not have a reqid")
if attr.HasField("reqid")
else ValidationResult.success()
)
@staticmethod
def validate_id(attr: UAttributes) -> ValidationResult:
"""
Validate the Id for the default case. If the UAttributes object does
not contain an Id, the ValidationResult is failed.
@param attr:Attributes object containing the Id to validate.
@return:Returns a ValidationResult that is success or failed
"""
if not attr.HasField("id"):
return ValidationResult.failure("Missing id")
if not UUIDUtils.is_uuid(attr.id):
return ValidationResult.failure("Attributes must contain valid uProtocol UUID in id property")
return ValidationResult.success()
@abstractmethod
def validate_type(self, attr: UAttributes):
"""
Validate the UMessageType attribute, it is required.<br><br>
@param attr:UAttributes object containing the
message type to validate.
@return:Returns a ValidationResult that is success
or failed with a failure message.
"""
raise NotImplementedError("Subclasses must implement this method.")
class Publish(UAttributesValidator):
"""
Implements validations for UAttributes that define a message
that is meant for publishing state changes.
"""
def validate_type(self, attributes_value: UAttributes) -> ValidationResult:
"""
Validates that attributes for a message meant to publish
state changes has the correct type.<br><br>
@param attributes_value:UAttributes object containing
the message type to validate.
@return:Returns a ValidationResult that is success or
failed with a failure message.
"""
return (
ValidationResult.success()
if attributes_value.type == UMessageType.UMESSAGE_TYPE_PUBLISH
else (ValidationResult.failure(f"Wrong Attribute Type [{UMessageType.Name(attributes_value.type)}]"))
)
def validate_sink(self, attributes_value: UAttributes) -> ValidationResult:
"""
Validate the sink UriPart for Publish events. Publish should not have a sink.
@param attributes_value UAttributes object containing the sink to validate.
@return Returns a ValidationResult that is success or failed with a failure message.
"""
return (
ValidationResult.failure("Sink should not be present")
if attributes_value.HasField("sink")
else ValidationResult.success()
)
def __str__(self):
return "UAttributesValidator.Publish"
class Request(UAttributesValidator):
"""
Implements validations for UAttributes that define a message
that is meant for an RPC request.
"""
def validate_type(self, attributes_value: UAttributes) -> ValidationResult:
"""
Validates that attributes for a message meant for an
RPC request has the correct type.<br><br>
@param attributes_value:UAttributes object containing
the message type to validate.
@return:Returns a ValidationResult that is success
or failed with a failure message.
"""
return (
ValidationResult.success()
if attributes_value.type == UMessageType.UMESSAGE_TYPE_REQUEST
else (ValidationResult.failure(f"Wrong Attribute Type [{UMessageType.Name(attributes_value.type)}]"))
)
def validate_sink(self, attributes_value: UAttributes) -> ValidationResult:
"""
Validates that attributes for a message meant for an RPC
request has a destination sink.<br><br> In the case
of an RPC request, the sink is required.
@param attributes_value:UAttributes object containing
the sink to validate.
@return:Returns a ValidationResult that is success or
failed with a failure message.
"""
return (
ValidationResult.success()
if UriValidator.is_rpc_method(attributes_value.sink)
else ValidationResult.failure("Invalid Sink Uri")
)
def validate_ttl(self, attributes_value: UAttributes) -> ValidationResult:
"""
Validate the time to live configuration.<br>In the case of an RPC
request, the time to live is required.<br><br>
@param attributes_value:UAttributes object containing the
time to live to validate.
@return:Returns a ValidationResult that is success or
failed with a failure message.
"""
if not attributes_value.HasField("ttl"):
return ValidationResult.failure("Missing TTL")
return ValidationResult.success()
def validate_priority(self, attributes_value: UAttributes) -> ValidationResult:
"""
Validate the priority value to ensure it is one of the known CS values
@param attributes_value Attributes object containing the Priority to validate.
@return Returns a {@link ValidationResult} that is success or failed with a failure message.
"""
return (
ValidationResult.success()
if attributes_value.priority >= UPriority.UPRIORITY_CS4
else ValidationResult.failure(f"Invalid UPriority [{UPriority.Name(attributes_value.priority)}]")
)
def __str__(self):
return "UAttributesValidator.Request"
class Response(UAttributesValidator):
"""
Implements validations for UAttributes that define a message that is
meant for an RPC response.
"""
def validate_type(self, attributes_value: UAttributes) -> ValidationResult:
"""
Validates that attributes for a message meant for an RPC
response has the correct type.<br><br>
@param attributes_value:UAttributes object containing the
message type to validate.
@return:Returns a ValidationResult that is success or
failed with a failure message.
"""
return (
ValidationResult.success()
if attributes_value.type == UMessageType.UMESSAGE_TYPE_RESPONSE
else (ValidationResult.failure(f"Wrong Attribute Type [{UMessageType.Name(attributes_value.type)}]"))
)
def validate_sink(self, attributes_value: UAttributes) -> ValidationResult:
"""
Validates that attributes for a message meant for an RPC response
has a destination sink.<br>In the case of
an RPC response, the sink is required.<br><br>
@param attributes_value:UAttributes object containing the sink
to validate.
@return:Returns a ValidationResult that is success or failed
with a failure message.
"""
if not attributes_value.HasField("sink") or attributes_value.sink == UUri():
return ValidationResult.failure("Missing Sink")
return (
ValidationResult.success()
if UriValidator.is_rpc_response(attributes_value.sink)
else ValidationResult.failure("Invalid Sink Uri")
)
def validate_req_id(self, attributes_value: UAttributes) -> ValidationResult:
"""
Validate the correlationId. n the case of an RPC response, the
correlation id is required.<br><br>
@param attributes_value:UAttributes object containing the
correlation id to validate.
@return:Returns a ValidationResult that is success or
failed with a failure message.
"""
if not attributes_value.HasField("reqid") or attributes_value.reqid == UUID():
return ValidationResult.failure("Missing correlationId")
if not UUIDUtils.is_uuid(attributes_value.reqid):
return ValidationResult.failure("Invalid correlation UUID")
return ValidationResult.success()
def validate_priority(self, attributes_value: UAttributes) -> ValidationResult:
"""
Validate the priority value to ensure it is one of the known CS values
@param attributes_value Attributes object containing the Priority to validate.
@return Returns a ValidationResult that is success or failed with a failure message.
"""
return (
ValidationResult.success()
if attributes_value.priority >= UPriority.UPRIORITY_CS4
else ValidationResult.failure(f"Invalid UPriority [{UPriority.Name(attributes_value.priority)}]")
)
def __str__(self):
return "UAttributesValidator.Response"
class Notification(UAttributesValidator):
"""
Implements validations for UAttributes that define a message that is
meant for notifications.
"""
def validate_type(self, attributes_value: UAttributes) -> ValidationResult:
"""
Validates that attributes for a message meant for Notification
state changes has the correct type.<br><br>
@param attributes_value:UAttributes object containing the message
type to validate.
@return:Returns a ValidationResult that is success or failed with
a failure message.
"""
return (
ValidationResult.success()
if attributes_value.type == UMessageType.UMESSAGE_TYPE_NOTIFICATION
else (ValidationResult.failure(f"Wrong Attribute Type [{UMessageType.Name(attributes_value.type)}]"))
)
def validate_sink(self, attributes_value: UAttributes) -> ValidationResult:
"""
Validates that attributes for a message meant for an RPC response
has a destination sink.<br>In the case of
an RPC response, the sink is required.<br><br>
@param attributes_value:UAttributes object containing the
sink to validate.
@return:Returns a ValidationResult that is success or
failed with a failure message.
"""
if not attributes_value.HasField("sink") or attributes_value.sink == UUri():
return ValidationResult.failure("Missing Sink")
return (
ValidationResult.success()
if UriValidator.is_default_resource_id(attributes_value.sink)
else ValidationResult.failure("Invalid Sink Uri")
)
def __str__(self):
return "UAttributesValidator.Notification"
class Validators(Enum):
"""
Validators Factory. <br>Example: UAttributesValidator
validateForPublishMessageType =
UAttributesValidator.Validators.PUBLISH.validator()
"""
PUBLISH = Publish()
REQUEST = Request()
RESPONSE = Response()
NOTIFICATION = Notification()
def validator(self):
return self.value