21
21
from enum import Enum
22
22
from typing import TYPE_CHECKING , Any , Dict , List , Optional , Union
23
23
24
+ import humps
25
+ from hopsworks import execution as execution_mod
24
26
from hopsworks_common import user as user_mod
25
27
from hopsworks_common import util
28
+ from hsfs import expectation_suite as es_mod
29
+ from hsfs import validation_report as vr_mod
26
30
from hsfs .core .constants import HAS_GREAT_EXPECTATIONS
27
31
28
32
31
35
ExpectationSuite ,
32
36
ExpectationSuiteValidationResult ,
33
37
)
34
- from hsfs import expectation_suite as es_mod
35
- from hsfs import validation_report as vr_mod
36
38
37
39
38
40
class FeatureStoreActivityType (Enum ):
@@ -44,69 +46,67 @@ class FeatureStoreActivityType(Enum):
44
46
COMMIT = "COMMIT"
45
47
46
48
47
- @dataclass (frozen = True , init = False , repr = False )
49
+ @dataclass (init = False , repr = False )
48
50
class FeatureStoreActivity :
49
51
type : FeatureStoreActivityType
50
- metadata : str
51
52
timestamp : int
52
- user : user_mod .User
53
+ metadata : str
54
+ user : Optional [user_mod .User ]
53
55
# optional fields depending on the activity type
54
56
validation_report : Optional [
55
57
Union [vr_mod .ValidationReport , ExpectationSuiteValidationResult ]
56
58
] = None
57
59
expectation_suite : Optional [Union [es_mod .ExpectationSuite , ExpectationSuite ]] = None
58
60
commit : Optional [Dict [str , Union [str , int , float ]]] = None
59
61
statistics : Optional [Dict [str , Union [str , int , float ]]] = None
62
+ execution : Optional [execution_mod .Execution ] = None
63
+ execution_last_event_time : Optional [int ] = None
60
64
# internal fields
61
65
id : int
62
66
href : str
63
67
64
68
def __init__ (
65
69
self ,
66
- type : FeatureStoreActivityType ,
67
- metadata : str ,
70
+ type : str ,
68
71
timestamp : int ,
69
- user : Dict [str , Any ],
72
+ metadata : Optional [str ] = None ,
73
+ user : Optional [Dict [str , Any ]] = None ,
74
+ expectation_suite : Optional [Dict [str , Any ]] = None ,
75
+ validation_report : Optional [Dict [str , Any ]] = None ,
76
+ commit : Optional [Dict [str , Union [str , int , float ]]] = None ,
77
+ statistics : Optional [Dict [str , Union [str , int , float ]]] = None ,
78
+ execution : Optional [Dict [str , Any ]] = None ,
79
+ execution_last_event_time : Optional [int ] = None ,
70
80
** kwargs ,
71
81
):
72
- self .type = type
73
- self .metadata = metadata
82
+ self .type = FeatureStoreActivityType (type ) if isinstance (type , str ) else type
74
83
self .timestamp = timestamp
75
- self .user = user_mod .User .from_response_json (user )
76
84
77
85
self .id = kwargs .get ("id" )
78
- self .href = kwargs .get ("href" , "" )
86
+ self .href = kwargs .get ("href" )
79
87
80
- self .commit = None
81
- self .expectation_suite = None
82
- self .validation_report = None
83
- self .statistics = None
88
+ self .user = user_mod .User .from_response_json (user ) if user else None
89
+ self .metadata = metadata
90
+ self .commit = commit
91
+ self .statistics = statistics
92
+ self .execution = (
93
+ execution_mod .Execution .from_response_json (execution ) if execution else None
94
+ )
95
+ self .execution_last_event_time = execution_last_event_time
84
96
85
- if self .type == FeatureStoreActivityType .VALIDATIONS :
86
- if HAS_GREAT_EXPECTATIONS :
87
- self .validation_report = ExpectationSuiteValidationResult (
88
- ** kwargs .get ("validation_report" )
89
- )
90
- else :
91
- self .validation_report = vr_mod .ValidationReport (
92
- ** kwargs .get ("validation_report" )
93
- )
94
-
95
- if self .type == FeatureStoreActivityType .EXPECTATIONS :
97
+ if self .type == FeatureStoreActivityType .VALIDATIONS and validation_report :
98
+ self .validation_report = vr_mod .ValidationReport .from_response_json (
99
+ validation_report
100
+ )
96
101
if HAS_GREAT_EXPECTATIONS :
97
- self .expectation_suite = ExpectationSuite (
98
- ** kwargs .get ("expectation_suite" )
99
- )
100
- else :
101
- self .expectation_suite = es_mod .ExpectationSuite (
102
- ** kwargs .get ("expectation_suite" )
103
- )
104
-
105
- if self .type == FeatureStoreActivityType .COMMIT :
106
- self .commit = kwargs .get ("commit" )
102
+ self .validation_report = self .validation_report .to_ge_type ()
107
103
108
- if self .statistics :
109
- self .statistics = kwargs .get ("statistics" )
104
+ if self .type == FeatureStoreActivityType .EXPECTATIONS and expectation_suite :
105
+ self .expectation_suite = es_mod .ExpectationSuite .from_response_json (
106
+ expectation_suite
107
+ )
108
+ if HAS_GREAT_EXPECTATIONS :
109
+ self .expectation_suite = self .expectation_suite .to_ge_type ()
110
110
111
111
@classmethod
112
112
def from_response_json (
@@ -117,35 +117,67 @@ def from_response_json(
117
117
cls .from_response_json (activity ) for activity in response_json ["items" ]
118
118
]
119
119
else :
120
- return cls (** response_json )
120
+ return cls (** humps . decamelize ( response_json ) )
121
121
122
122
def to_dict (self ) -> Dict [str , Any ]:
123
- json = {
123
+ activity_dict = {
124
124
"id" : self .id ,
125
125
"type" : self .type .value ,
126
126
"metadata" : self .metadata ,
127
127
"timestamp" : self .timestamp ,
128
- "user" : self .user .json (),
129
128
}
129
+ if self .user :
130
+ activity_dict ["user" ] = self .user .to_dict ()
130
131
if self .validation_report :
131
- json ["validation_report" ] = self .validation_report .json ()
132
+ activity_dict ["validation_report" ] = (
133
+ self .validation_report .to_dict ()
134
+ if hasattr (self .validation_report , "_id" )
135
+ else self .validation_report .to_json_dict ()
136
+ )
132
137
if self .expectation_suite :
133
- json ["expectation_suite" ] = self .expectation_suite .json ()
138
+ activity_dict ["expectation_suite" ] = (
139
+ self .expectation_suite .to_dict ()
140
+ if hasattr (self .expectation_suite , "_id" )
141
+ else self .expectation_suite .to_json_dict ()
142
+ )
134
143
if self .commit :
135
- json ["commit" ] = self .commit
144
+ activity_dict ["commit" ] = self .commit
136
145
if self .statistics :
137
- json ["statistics" ] = self .statistics
146
+ activity_dict ["statistics" ] = self .statistics
147
+ if self .execution :
148
+ activity_dict ["execution" ] = humps .decamelize (
149
+ json .loads (self .execution .json ())
150
+ )
138
151
139
- return json
152
+ return activity_dict
140
153
141
154
def json (self ) -> str :
142
155
return json .dumps (self .to_dict (), cls = util .Encoder )
143
156
144
157
def __repr__ (self ):
145
158
utc_human_readable = (
146
159
datetime .datetime .fromtimestamp (
147
- self .timestamp , datetime .timezone .utc
160
+ self .timestamp / 1000 , datetime .timezone .utc
148
161
).strftime (r"%Y-%m-%d %H:%M:%S" )
149
162
+ " UTC"
150
163
)
151
- return f"Activity:\n { self .type } ,\n { self .metadata } ,\n at: { utc_human_readable } ,\n user: { self .user } "
164
+ the_string = f"Activity { self .type .value } ,"
165
+ the_string += f" at: { utc_human_readable } "
166
+ if self .user :
167
+ the_string += f", by: { self .user .email } "
168
+ if self .metadata :
169
+ the_string += f"\n \t { self .metadata } "
170
+ if self .execution :
171
+ the_string += f"\n { self .execution .get_url ()} ,"
172
+ if self .validation_report :
173
+ the_string += f"Validation { 'succeeded' if self .validation_report .success else 'failed' } ."
174
+ if self .expectation_suite :
175
+ the_string += (
176
+ f"It has { len (self .expectation_suite .expectations )} expectations."
177
+ )
178
+ if self .statistics :
179
+ the_string += f"\n Computed following statistics:\n { json .dumps (self .statistics , indent = 2 )} "
180
+ if self .commit :
181
+ the_string += f"\n Data ingestion:\n { json .dumps (self .commit , indent = 2 )} "
182
+
183
+ return the_string
0 commit comments