27
27
import io .hops .hopsworks .common .featurestore .storageconnectors .FeaturestoreStorageConnectorDTO ;
28
28
import io .hops .hopsworks .common .hdfs .DistributedFileSystemOps ;
29
29
import io .hops .hopsworks .common .hdfs .DistributedFsService ;
30
- import io .hops .hopsworks .common .hdfs .HdfsUsersController ;
31
30
import io .hops .hopsworks .common .hdfs .Utils ;
32
- import io .hops .hopsworks .common .hdfs .inode .InodeController ;
33
31
import io .hops .hopsworks .exceptions .FeaturestoreException ;
34
32
import io .hops .hopsworks .exceptions .HopsSecurityException ;
35
33
import io .hops .hopsworks .exceptions .KafkaException ;
45
43
import io .hops .hopsworks .persistence .entity .featurestore .featuregroup .ondemand .OnDemandOption ;
46
44
import io .hops .hopsworks .persistence .entity .featurestore .storageconnector .FeaturestoreConnector ;
47
45
import io .hops .hopsworks .persistence .entity .featurestore .storageconnector .FeaturestoreConnectorType ;
48
- import io .hops .hopsworks .persistence .entity .hdfs .inode .Inode ;
49
46
import io .hops .hopsworks .persistence .entity .project .Project ;
50
47
import io .hops .hopsworks .persistence .entity .user .Users ;
51
48
import io .hops .hopsworks .restutils .RESTCodes ;
@@ -82,10 +79,6 @@ public class OnDemandFeaturegroupController {
82
79
@ EJB
83
80
private DistributedFsService distributedFsService ;
84
81
@ EJB
85
- private HdfsUsersController hdfsUsersController ;
86
- @ EJB
87
- private InodeController inodeController ;
88
- @ EJB
89
82
private OnlineFeaturegroupController onlineFeatureGroupController ;
90
83
@ EJB
91
84
private FeatureGroupInputValidation featureGroupInputValidation ;
@@ -132,13 +125,14 @@ public OnDemandFeaturegroup createOnDemandFeaturegroup(Featurestore featurestore
132
125
"Data format required when specifying " + connector .getConnectorType () + " storage connectors" );
133
126
}
134
127
128
+ createFile (project , user , featurestore , onDemandFeaturegroupDTO );
129
+
135
130
//Persist on-demand featuregroup
136
131
OnDemandFeaturegroup onDemandFeaturegroup = new OnDemandFeaturegroup ();
137
132
onDemandFeaturegroup .setDescription (onDemandFeaturegroupDTO .getDescription ());
138
133
onDemandFeaturegroup .setFeaturestoreConnector (connector );
139
134
onDemandFeaturegroup .setQuery (onDemandFeaturegroupDTO .getQuery ());
140
135
onDemandFeaturegroup .setFeatures (convertOnDemandFeatures (onDemandFeaturegroupDTO , onDemandFeaturegroup ));
141
- onDemandFeaturegroup .setInode (createFile (project , user , featurestore , onDemandFeaturegroupDTO ));
142
136
onDemandFeaturegroup .setDataFormat (onDemandFeaturegroupDTO .getDataFormat ());
143
137
onDemandFeaturegroup .setPath (onDemandFeaturegroupDTO .getPath ());
144
138
@@ -154,13 +148,15 @@ public OnDemandFeaturegroup createOnDemandFeaturegroup(Featurestore featurestore
154
148
}
155
149
156
150
public OnDemandFeaturegroup createSpineGroup (Featurestore featurestore ,
157
- OnDemandFeaturegroupDTO onDemandFeaturegroupDTO , Project project , Users user ) throws FeaturestoreException {
151
+ OnDemandFeaturegroupDTO onDemandFeaturegroupDTO ,
152
+ Project project , Users user ) throws FeaturestoreException {
153
+ createFile (project , user , featurestore , onDemandFeaturegroupDTO );
154
+
158
155
OnDemandFeaturegroup onDemandFeaturegroup = new OnDemandFeaturegroup ();
159
156
onDemandFeaturegroup .setDescription (onDemandFeaturegroupDTO .getDescription ());
160
157
onDemandFeaturegroup .setFeatures (convertOnDemandFeatures (onDemandFeaturegroupDTO , onDemandFeaturegroup ));
161
158
onDemandFeaturegroup .setSpine (onDemandFeaturegroupDTO .getSpine ());
162
- onDemandFeaturegroup .setInode (createFile (project , user , featurestore , onDemandFeaturegroupDTO ));
163
-
159
+
164
160
onDemandFeaturegroupFacade .persist (onDemandFeaturegroup );
165
161
return onDemandFeaturegroup ;
166
162
}
@@ -224,8 +220,6 @@ public void updateOnDemandFeaturegroupMetadata(Project project, Users user, Feat
224
220
}
225
221
// finally merge in database
226
222
onDemandFeaturegroupFacade .updateMetadata (onDemandFeaturegroup );
227
-
228
-
229
223
}
230
224
231
225
private void updateOnDemandFeatures (OnDemandFeaturegroup onDemandFeaturegroup ,
@@ -276,14 +270,11 @@ public void verifySchemaUnchangedAndValid(Collection<OnDemandFeature> previousSc
276
270
*/
277
271
public void removeOnDemandFeaturegroup (Featurestore featurestore , Featuregroup featuregroup ,
278
272
Project project , Users user ) throws FeaturestoreException {
279
- String username = hdfsUsersController .getHdfsUserName (project , user );
280
- DistributedFileSystemOps udfso = null ;
281
-
282
- // this is here for old feature groups that don't have a file
283
273
onDemandFeaturegroupFacade .remove (featuregroup .getOnDemandFeaturegroup ());
284
274
275
+ DistributedFileSystemOps udfso = null ;
285
276
try {
286
- udfso = distributedFsService .getDfsOps (username );
277
+ udfso = distributedFsService .getDfsOps (project , user );
287
278
udfso .rm (getFilePath (featurestore , featuregroup .getName (), featuregroup .getVersion ()), false );
288
279
} catch (IOException | URISyntaxException e ) {
289
280
throw new FeaturestoreException (RESTCodes .FeaturestoreErrorCode .COULD_NOT_DELETE_ON_DEMAND_FEATUREGROUP ,
@@ -307,25 +298,29 @@ private List<OnDemandFeature> convertOnDemandFeatures(OnDemandFeaturegroupDTO on
307
298
return features ;
308
299
}
309
300
310
- private Inode createFile (Project project , Users user , Featurestore featurestore ,
301
+ private void createFile (Project project , Users user , Featurestore featurestore ,
311
302
OnDemandFeaturegroupDTO onDemandFeaturegroupDTO ) throws FeaturestoreException {
312
- String username = hdfsUsersController .getHdfsUserName (project , user );
313
-
314
- Path path = null ;
315
303
DistributedFileSystemOps udfso = null ;
316
304
try {
317
- path = getFilePath (featurestore , onDemandFeaturegroupDTO .getName (), onDemandFeaturegroupDTO .getVersion ());
305
+ Path path = getFilePath (featurestore , onDemandFeaturegroupDTO .getName (), onDemandFeaturegroupDTO .getVersion ());
318
306
319
- udfso = distributedFsService .getDfsOps (username );
307
+ udfso = distributedFsService .getDfsOps (project , user );
320
308
udfso .touchz (path );
321
309
} catch (IOException | URISyntaxException e ) {
322
310
throw new FeaturestoreException (RESTCodes .FeaturestoreErrorCode .COULD_NOT_CREATE_ON_DEMAND_FEATUREGROUP ,
323
311
Level .SEVERE , "Error creating the placeholder file" , e .getMessage (), e );
324
312
} finally {
325
313
distributedFsService .closeDfsClient (udfso );
326
314
}
315
+ }
327
316
328
- return inodeController .getInodeAtPath (path .toString ());
317
+ public String getFeatureGroupLocation (Featuregroup featuregroup ) throws FeaturestoreException {
318
+ try {
319
+ return getFilePath (featuregroup .getFeaturestore (), featuregroup .getName (), featuregroup .getVersion ()).toString ();
320
+ } catch (URISyntaxException e ) {
321
+ throw new FeaturestoreException (RESTCodes .FeaturestoreErrorCode .COULD_NOT_GET_FEATURE_GROUP_METADATA ,
322
+ Level .SEVERE , "" , e .getMessage (), e );
323
+ }
329
324
}
330
325
331
326
private Path getFilePath (Featurestore featurestore , String name , Integer version ) throws URISyntaxException {
0 commit comments