diff --git a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/engine/BeamProducer.java b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/engine/BeamProducer.java index e2b13e074d..f93b3fea3d 100644 --- a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/engine/BeamProducer.java +++ b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/engine/BeamProducer.java @@ -79,8 +79,8 @@ public BeamProducer(String topic, Map properties, Schema schema, headerMap.put("projectId", String.valueOf(streamFeatureGroup.getFeatureStore().getProjectId()).getBytes(StandardCharsets.UTF_8)); headerMap.put("featureGroupId", String.valueOf(streamFeatureGroup.getId()).getBytes(StandardCharsets.UTF_8)); - headerMap.put("subjectId", - String.valueOf(streamFeatureGroup.getSubject().getId()).getBytes(StandardCharsets.UTF_8)); + headerMap.put("schemaId", + String.valueOf(streamFeatureGroup.getSubject().getSchemaId()).getBytes(StandardCharsets.UTF_8)); } @Override diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/KafkaRecordSerializer.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/KafkaRecordSerializer.java index b1729f75da..42ca2ea3e2 100644 --- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/KafkaRecordSerializer.java +++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/KafkaRecordSerializer.java @@ -49,8 +49,8 @@ public class KafkaRecordSerializer implements KafkaRecordSerializationSchema byte[] projectId = String.valueOf(featureGroupBase.getFeatureStore().getProjectId()) .getBytes(StandardCharsets.UTF_8); byte[] featureGroupId = String.valueOf(featureGroupBase.getId()).getBytes(StandardCharsets.UTF_8); - byte[] subjectId = String.valueOf(featureGroupBase.getSubject().getId()).getBytes(StandardCharsets.UTF_8); + byte[] schemaId = String.valueOf(featureGroupBase.getSubject().getSchemaId()).getBytes(StandardCharsets.UTF_8); onlineFeatureGroupToAvro(featureGroupBase, encodeComplexFeatures(featureGroupBase, dataset)) .withColumn("headers", array( @@ -554,8 +554,8 @@ public void writeOnlineDataframe(FeatureGroupBase featureGroupBase, Dataset lit(featureGroupId).as("value") ), struct( - lit("subjectId").as("key"), - lit(subjectId).as("value") + lit("schemaId").as("key"), + lit(schemaId).as("value") ) )) .write() @@ -573,7 +573,7 @@ public StreamingQuery writeStreamDataframe(FeatureGroupBase featureGroupBase byte[] projectId = String.valueOf(featureGroupBase.getFeatureStore().getProjectId()) .getBytes(StandardCharsets.UTF_8); byte[] featureGroupId = String.valueOf(featureGroupBase.getId()).getBytes(StandardCharsets.UTF_8); - byte[] subjectId = String.valueOf(featureGroupBase.getSubject().getId()).getBytes(StandardCharsets.UTF_8); + byte[] schemaId = String.valueOf(featureGroupBase.getSubject().getSchemaId()).getBytes(StandardCharsets.UTF_8); queryName = makeQueryName(queryName, featureGroupBase); DataStreamWriter writer = @@ -588,8 +588,8 @@ public StreamingQuery writeStreamDataframe(FeatureGroupBase featureGroupBase lit(featureGroupId).as("value") ), struct( - lit("subjectId").as("key"), - lit(subjectId).as("value") + lit("schemaId").as("key"), + lit(schemaId).as("value") ) )) .writeStream() diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/hudi/DeltaStreamerAvroDeserializer.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/hudi/DeltaStreamerAvroDeserializer.java index 581735dca0..812702ba98 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/hudi/DeltaStreamerAvroDeserializer.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/hudi/DeltaStreamerAvroDeserializer.java @@ -68,7 +68,7 @@ public DeltaStreamerAvroDeserializer() { @SneakyThrows @Override public void configure(Map configs, boolean isKey) { - this.subjectId = (String) configs.get(HudiEngine.SUBJECT_ID); + this.subjectId = (String) configs.get(HudiEngine.SCHEMA_ID); GenericData.get().addLogicalTypeConversion(new Conversions.DecimalConversion()); String featureGroupSchema = (String) configs.get(HudiEngine.FEATURE_GROUP_SCHEMA); String encodedFeatureGroupSchema = configs.get(HudiEngine.FEATURE_GROUP_ENCODED_SCHEMA).toString() diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/hudi/HudiEngine.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/hudi/HudiEngine.java index b574f14ac3..233ee89faa 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/hudi/HudiEngine.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/hudi/HudiEngine.java @@ -126,7 +126,7 @@ public class HudiEngine { protected static final String SPARK_MASTER = "yarn"; protected static final String PROJECT_ID = "projectId"; protected static final String FEATURE_STORE_NAME = "featureStoreName"; - protected static final String SUBJECT_ID = "subjectId"; + protected static final String SCHEMA_ID = "schemaId"; protected static final String FEATURE_GROUP_NAME = "featureGroupName"; protected static final String FEATURE_GROUP_VERSION = "featureGroupVersion"; protected static final String FUNCTION_TYPE = "functionType"; @@ -368,7 +368,7 @@ public void streamToHoodieTable(SparkSession sparkSession, StreamFeatureGroup st writeOptions); hudiWriteOpts.put(PROJECT_ID, String.valueOf(streamFeatureGroup.getFeatureStore().getProjectId())); hudiWriteOpts.put(FEATURE_STORE_NAME, streamFeatureGroup.getFeatureStore().getName()); - hudiWriteOpts.put(SUBJECT_ID, String.valueOf(streamFeatureGroup.getSubject().getId())); + hudiWriteOpts.put(SCHEMA_ID, String.valueOf(streamFeatureGroup.getSubject().getSchemaId())); hudiWriteOpts.put(FEATURE_GROUP_NAME, streamFeatureGroup.getName()); hudiWriteOpts.put(FEATURE_GROUP_VERSION, String.valueOf(streamFeatureGroup.getVersion())); hudiWriteOpts.put(HUDI_TABLE_NAME, utils.getFgName(streamFeatureGroup)); diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 1cf68ea138..bab0c12dae 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -1137,7 +1137,7 @@ def _kafka_produce( "utf8" ), "featureGroupId": str(feature_group._id).encode("utf8"), - "subjectId": str(feature_group.subject["id"]).encode("utf8"), + "schemaId": str(feature_group.subject["schemaId"]).encode("utf8"), } producer.produce( diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index fd16bb73ac..bd23d4ef22 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -356,7 +356,7 @@ def save_stream_dataframe( project_id = str(feature_group.feature_store.project_id) feature_group_id = str(feature_group._id) - subject_id = str(feature_group.subject["id"]).encode("utf8") + schema_id = str(feature_group.subject["schemaId"]).encode("utf8") if query_name is None: query_name = ( @@ -376,9 +376,7 @@ def save_stream_dataframe( lit("featureGroupId").alias("key"), lit(feature_group_id.encode("utf8")).alias("value"), ), - struct( - lit("subjectId").alias("key"), lit(subject_id).alias("value") - ), + struct(lit("schemaId").alias("key"), lit(schema_id).alias("value")), ), ) .writeStream.outputMode(output_mode) @@ -453,7 +451,7 @@ def _save_online_dataframe(self, feature_group, dataframe, write_options): project_id = str(feature_group.feature_store.project_id).encode("utf8") feature_group_id = str(feature_group._id).encode("utf8") - subject_id = str(feature_group.subject["id"]).encode("utf8") + schema_id = str(feature_group.subject["schemaId"]).encode("utf8") serialized_df.withColumn( "headers", @@ -463,7 +461,7 @@ def _save_online_dataframe(self, feature_group, dataframe, write_options): lit("featureGroupId").alias("key"), lit(feature_group_id).alias("value"), ), - struct(lit("subjectId").alias("key"), lit(subject_id).alias("value")), + struct(lit("schemaId").alias("key"), lit(schema_id).alias("value")), ), ).write.format(self.KAFKA_FORMAT).options(**write_options).option( "topic", feature_group._online_topic_name