|
32 | 32 | )
|
33 | 33 | from hsfs.client import exceptions
|
34 | 34 | from hsfs.constructor.filter import Filter, Logic
|
| 35 | +from hsfs.constructor.join import Join |
| 36 | +from hsfs.constructor.query import Query |
35 | 37 | from hsfs.core import (
|
36 | 38 | feature_view_api,
|
37 | 39 | query_constructor_api,
|
@@ -1073,116 +1075,156 @@ def _get_feature_view_url(self, fv: "feature_view.FeatureView"):
|
1073 | 1075 | def generate_fully_qualied_feature_name(
|
1074 | 1076 | self, feature_group: feature_group.FeatureGroup, feature_name: str
|
1075 | 1077 | ):
|
| 1078 | + """ |
| 1079 | + Generate the fully qualified feature name for a feature. The fully qualified name is created by concatenating |
| 1080 | + the project name, feature group name, feature group version and feature name. |
| 1081 | + """ |
1076 | 1082 | return f"{feature_group._get_project_name()}_{feature_group.name}_{feature_group.version}_{feature_name}"
|
1077 | 1083 |
|
1078 |
| - def _primary_keys_from_join(self, joins, check_duplicate): |
1079 |
| - query_pks = set() |
| 1084 | + def _primary_keys_from_join( |
| 1085 | + self, joins: List[Join], check_duplicate: bool, pk_names: set[str] |
| 1086 | + ) -> set[str]: |
| 1087 | + """ |
| 1088 | + Recursive function that extracts the primary keys from the join objects and returns them as a set. |
| 1089 | + """ |
1080 | 1090 | for join in joins:
|
1081 |
| - query = join.query |
1082 |
| - select_features = [feature.name for feature in join.query._left_features] |
1083 |
| - for feature in query._left_feature_group.features: |
1084 |
| - if feature.primary: |
1085 |
| - if feature.name not in select_features: |
1086 |
| - f_name = self.generate_fully_qualied_feature_name( |
1087 |
| - query._left_feature_group, feature.name |
1088 |
| - ) |
1089 |
| - if check_duplicate: |
1090 |
| - # TODO: THis is not correct we must check the entire feature views selected features for duplicates need to adapt. |
1091 |
| - f_name = self._check_if_exists_with_prefix( |
1092 |
| - f_name, query_pks |
1093 |
| - ) |
1094 |
| - query_pks.add(f_name) |
1095 |
| - else: |
1096 |
| - f_name = ( |
1097 |
| - feature.name |
1098 |
| - if join.prefix is None |
1099 |
| - else join.prefix + feature.name |
1100 |
| - ) |
1101 |
| - if check_duplicate: |
1102 |
| - # TODO: THis is not correct we must check the entire feature views selected features for duplicates need to adapt. |
1103 |
| - f_name = self._check_if_exists_with_prefix( |
1104 |
| - f_name, query_pks |
1105 |
| - ) |
1106 |
| - query_pks.add(f_name) |
1107 |
| - if query._joins: |
| 1091 | + sub_query = join.query |
| 1092 | + join_prefix = join.prefix |
| 1093 | + |
| 1094 | + sub_query_selected_feature_names = [ |
| 1095 | + feature.name for feature in join.query._left_features |
| 1096 | + ] |
| 1097 | + sub_query_feature_group = sub_query._left_feature_group |
| 1098 | + sub_query_pk_names = { |
| 1099 | + feature.name |
| 1100 | + for feature in sub_query_feature_group.features |
| 1101 | + if feature.primary |
| 1102 | + } |
| 1103 | + |
| 1104 | + sub_query_pk_names = { |
| 1105 | + self.generate_fully_qualied_feature_name( |
| 1106 | + sub_query_feature_group, pk_name |
| 1107 | + ) |
| 1108 | + if pk_name not in sub_query_selected_feature_names |
| 1109 | + else (pk_name if not join_prefix else join_prefix + pk_name) |
| 1110 | + for pk_name in sub_query_pk_names |
| 1111 | + } |
| 1112 | + |
| 1113 | + if check_duplicate: |
| 1114 | + for sub_query_pk_name in sub_query_pk_names: |
| 1115 | + self._check_if_exists_with_prefix(sub_query_pk_name, pk_names) |
| 1116 | + |
| 1117 | + pk_names.update(sub_query_pk_names) |
| 1118 | + |
| 1119 | + if sub_query._joins: |
1108 | 1120 | sub_query_pks = self._primary_keys_from_join(
|
1109 |
| - query._joins, check_duplicate |
| 1121 | + sub_query._joins, check_duplicate, pk_names |
1110 | 1122 | )
|
1111 |
| - query_pks.update(sub_query_pks) |
1112 |
| - return query_pks |
| 1123 | + pk_names.update(sub_query_pks) |
| 1124 | + return pk_names |
1113 | 1125 |
|
1114 |
| - def _event_time_from_join(self, joins, check_duplicate): |
1115 |
| - query_ets = set() |
| 1126 | + def _event_time_from_join( |
| 1127 | + self, joins: List[Join], check_duplicate: bool, et_names: set[str] |
| 1128 | + ) -> set[str]: |
1116 | 1129 | for join in joins:
|
1117 |
| - query = join.query |
1118 |
| - select_features = [feature.name for feature in join.query._left_features] |
1119 |
| - if join.query._left_feature_group.event_time: |
1120 |
| - if join.query._left_feature_group.event_time not in select_features: |
1121 |
| - f_name = self.generate_fully_qualied_feature_name( |
1122 |
| - join.query._left_feature_group, |
1123 |
| - join.query._left_feature_group.event_time, |
1124 |
| - ) |
1125 |
| - if check_duplicate: |
1126 |
| - # TODO: THis is not correct we must check the entire feature views selected features for duplicates need to adapt. |
1127 |
| - f_name = self._check_if_exists_with_prefix(f_name, query_ets) |
1128 |
| - query_ets.add(f_name) |
1129 |
| - else: |
1130 |
| - f_name = ( |
1131 |
| - join.query._left_feature_group.event_time |
1132 |
| - if join.prefix is None |
1133 |
| - else join.prefix + join.query._left_feature_group.event_time |
1134 |
| - ) |
1135 |
| - if check_duplicate: |
1136 |
| - # TODO: THis is not correct we must check the entire feature views selected features for duplicates need to adapt. |
1137 |
| - f_name = self._check_if_exists_with_prefix(f_name, query_ets) |
1138 |
| - query_ets.add(f_name) |
1139 |
| - if query._joins: |
1140 |
| - sub_query_ets = self._event_time_from_join( |
1141 |
| - query._joins, check_duplicate |
1142 |
| - ) |
1143 |
| - query_ets.update(sub_query_ets) |
1144 |
| - return query_ets |
1145 |
| - |
1146 |
| - def _get_primary_keys_from_query(self, fv_query_obj, check_duplicate=True): |
1147 |
| - # Patch for now to check there is a need to use the fully qualified name |
1148 |
| - select_features = [feature.name for feature in fv_query_obj._left_features] |
1149 |
| - fv_pks = set( |
1150 |
| - [ |
| 1130 | + sub_query = join.query |
| 1131 | + join_prefix = join.prefix |
| 1132 | + |
| 1133 | + sub_query_selected_feature_names = [ |
| 1134 | + feature.name for feature in join.query._left_features |
| 1135 | + ] |
| 1136 | + sub_query_feature_group = sub_query._left_feature_group |
| 1137 | + sub_query_event_time = sub_query_feature_group.event_time |
| 1138 | + |
| 1139 | + sub_query_event_time = ( |
1151 | 1140 | self.generate_fully_qualied_feature_name(
|
1152 |
| - fv_query_obj._left_feature_group, feature.name |
| 1141 | + sub_query_feature_group, sub_query_event_time |
1153 | 1142 | )
|
1154 |
| - if feature.name not in select_features |
1155 |
| - else feature.name |
1156 |
| - for feature in fv_query_obj._left_feature_group.features |
1157 |
| - if feature.primary |
1158 |
| - ] |
1159 |
| - ) |
1160 |
| - fv_pks.update( |
1161 |
| - self._primary_keys_from_join(fv_query_obj._joins, check_duplicate) |
1162 |
| - ) |
| 1143 | + if sub_query_event_time not in sub_query_selected_feature_names |
| 1144 | + else ( |
| 1145 | + join_prefix + sub_query_event_time |
| 1146 | + if join_prefix |
| 1147 | + else sub_query_event_time |
| 1148 | + ) |
| 1149 | + ) |
1163 | 1150 |
|
1164 |
| - return list(fv_pks) |
| 1151 | + if check_duplicate: |
| 1152 | + self._check_if_exists_with_prefix(sub_query_event_time, et_names) |
1165 | 1153 |
|
1166 |
| - def _get_eventtimes_from_query(self, fv_query_obj, check_duplicate=True): |
1167 |
| - fv_events = set() |
1168 |
| - select_features = [feature.name for feature in fv_query_obj._left_features] |
| 1154 | + et_names.add(sub_query_event_time) |
1169 | 1155 |
|
1170 |
| - if fv_query_obj._left_feature_group.event_time: |
1171 |
| - if fv_query_obj._left_feature_group.event_time not in select_features: |
1172 |
| - fv_events.add( |
1173 |
| - self.generate_fully_qualied_feature_name( |
1174 |
| - fv_query_obj._left_feature_group, |
1175 |
| - fv_query_obj._left_feature_group.event_time, |
1176 |
| - ) |
| 1156 | + if sub_query._joins: |
| 1157 | + sub_query_ets = self._event_time_from_join( |
| 1158 | + sub_query._joins, check_duplicate, et_names |
1177 | 1159 | )
|
1178 |
| - else: |
1179 |
| - fv_events.add(fv_query_obj._left_feature_group.event_time) |
| 1160 | + et_names.update(sub_query_ets) |
| 1161 | + return et_names |
| 1162 | + |
| 1163 | + def _get_primary_keys_from_query( |
| 1164 | + self, query: Query, check_duplicate: bool = True |
| 1165 | + ) -> List[str]: |
| 1166 | + """ |
| 1167 | + Function that checks the primary keys from the query object and returns them as a list. |
| 1168 | +
|
| 1169 | + #Arguments: |
| 1170 | + fv_query_obj : `Query`. Query object from which the primary keys are extracted. |
| 1171 | + check_duplicate : `bool`. Flag to check if the primary keys are duplicated in the query. |
| 1172 | + """ |
| 1173 | + root_feature_group_selected_features_name = { |
| 1174 | + feature.name for feature in query._left_features |
| 1175 | + } |
| 1176 | + |
| 1177 | + root_feature_group = query._left_feature_group |
| 1178 | + |
| 1179 | + root_feature_group_primary_keys_names = { |
| 1180 | + feature.name for feature in root_feature_group.features if feature.primary |
| 1181 | + } |
| 1182 | + |
| 1183 | + pk_names = { |
| 1184 | + self.generate_fully_qualied_feature_name(root_feature_group, pk_name) |
| 1185 | + if pk_name not in root_feature_group_selected_features_name |
| 1186 | + else pk_name |
| 1187 | + for pk_name in root_feature_group_primary_keys_names |
| 1188 | + } |
| 1189 | + |
| 1190 | + pk_names.update( |
| 1191 | + self._primary_keys_from_join(query._joins, check_duplicate, pk_names) |
| 1192 | + ) |
| 1193 | + |
| 1194 | + return list(pk_names) |
| 1195 | + |
| 1196 | + def _get_eventtimes_from_query( |
| 1197 | + self, query: Query, check_duplicate: bool = True |
| 1198 | + ) -> List[str]: |
| 1199 | + """ |
| 1200 | + Function that checks the event times from the query object and returns them as a list. |
| 1201 | +
|
| 1202 | + #Arguments: |
| 1203 | + fv_query_obj : `Query`. Query object from which the event times are extracted. |
| 1204 | + check_duplicate : `bool`. Flag to check if the event times are duplicated in the query. |
| 1205 | + """ |
| 1206 | + root_feature_group_selected_features_name = { |
| 1207 | + feature.name for feature in query._left_features |
| 1208 | + } |
| 1209 | + |
| 1210 | + root_feature_group = query._left_feature_group |
| 1211 | + |
| 1212 | + root_feature_group_event_time = root_feature_group.event_time |
| 1213 | + |
| 1214 | + et_names = { |
| 1215 | + self.generate_fully_qualied_feature_name( |
| 1216 | + root_feature_group, root_feature_group_event_time |
| 1217 | + ) |
| 1218 | + if root_feature_group_event_time |
| 1219 | + not in root_feature_group_selected_features_name |
| 1220 | + else root_feature_group_event_time |
| 1221 | + } |
1180 | 1222 |
|
1181 |
| - fv_events.update( |
1182 |
| - self._event_time_from_join(fv_query_obj._joins, check_duplicate) |
| 1223 | + et_names.update( |
| 1224 | + self._event_time_from_join(query._joins, check_duplicate, et_names) |
1183 | 1225 | )
|
1184 | 1226 |
|
1185 |
| - return list(fv_events) |
| 1227 | + return list(et_names) |
1186 | 1228 |
|
1187 | 1229 | def _check_if_exists_with_prefix(self, f_name, f_set):
|
1188 | 1230 | if f_name in f_set:
|
|
0 commit comments