File tree 2 files changed +23
-2
lines changed
java/src/main/java/com/logicalclocks/utils
2 files changed +23
-2
lines changed Original file line number Diff line number Diff line change @@ -115,13 +115,22 @@ public static void main(String[] args) throws Exception {
115
115
}
116
116
LOGGER .info ("Hsfs utils write options: {}" , writeOptions );
117
117
118
+ boolean success = false ;
118
119
try {
119
120
if (op .equals ("offline_fg_materialization" ) || op .equals ("offline_fg_backfill" )) {
120
121
SparkEngine .getInstance ().streamToHudiTable (streamFeatureGroup , writeOptions );
121
122
}
123
+ success = true ;
122
124
} finally {
123
125
LOGGER .info ("Closing spark session..." );
124
- SparkEngine .getInstance ().closeSparkSession ();
126
+ try {
127
+ SparkEngine .getInstance ().closeSparkSession ();
128
+ } catch (Exception e ) {
129
+ LOGGER .error ("Error closing spark session" , e );
130
+ }
131
+ if (!success ) {
132
+ System .exit (1 );
133
+ }
125
134
}
126
135
System .exit (0 );
127
136
}
Original file line number Diff line number Diff line change 1
1
from __future__ import annotations
2
2
3
+ import sys
3
4
import os
4
5
import argparse
5
6
import json
@@ -285,6 +286,7 @@ def parse_isoformat_date(da: str) -> datetime:
285
286
args = parser .parse_args ()
286
287
job_conf = read_job_conf (args .path )
287
288
289
+ success = False
288
290
try :
289
291
if args .op == "insert_fg" :
290
292
insert_fg (spark , job_conf )
@@ -300,6 +302,16 @@ def parse_isoformat_date(da: str) -> datetime:
300
302
import_fg (job_conf )
301
303
elif args .op == "run_feature_monitoring" :
302
304
run_feature_monitoring (job_conf )
305
+
306
+ success = True
303
307
finally :
304
308
if spark is not None :
305
- spark .stop ()
309
+ try :
310
+ spark .stop ()
311
+ except Exception as e :
312
+ print (f"Error stopping spark session: { e } " )
313
+ if not success :
314
+ sys .exit (1 )
315
+
316
+ sys .exit (0 )
317
+
You can’t perform that action at this time.
0 commit comments