From 7d26561c2e76ddcd2cffca0466b995db18675d72 Mon Sep 17 00:00:00 2001 From: bubriks Date: Wed, 4 Oct 2023 14:52:07 +0200 Subject: [PATCH 1/2] init --- .../logicalclocks/hsfs/beam/engine/BeamProducer.java | 4 ++-- .../hsfs/flink/engine/KafkaRecordSerializer.java | 4 ++-- .../com/logicalclocks/hsfs/metadata/Subject.java | 10 +++++++--- .../logicalclocks/hsfs/spark/engine/SparkEngine.java | 12 ++++++------ .../spark/engine/hudi/DeltaStreamerKafkaSource.java | 6 +++--- .../hsfs/spark/engine/hudi/HudiEngine.java | 4 ++-- python/hsfs/engine/python.py | 4 +++- python/hsfs/engine/spark.py | 10 ++++------ 8 files changed, 29 insertions(+), 25 deletions(-) diff --git a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/engine/BeamProducer.java b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/engine/BeamProducer.java index e2b13e074d..f93b3fea3d 100644 --- a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/engine/BeamProducer.java +++ b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/engine/BeamProducer.java @@ -79,8 +79,8 @@ public BeamProducer(String topic, Map properties, Schema schema, headerMap.put("projectId", String.valueOf(streamFeatureGroup.getFeatureStore().getProjectId()).getBytes(StandardCharsets.UTF_8)); headerMap.put("featureGroupId", String.valueOf(streamFeatureGroup.getId()).getBytes(StandardCharsets.UTF_8)); - headerMap.put("subjectId", - String.valueOf(streamFeatureGroup.getSubject().getId()).getBytes(StandardCharsets.UTF_8)); + headerMap.put("schemaId", + String.valueOf(streamFeatureGroup.getSubject().getSchemaId()).getBytes(StandardCharsets.UTF_8)); } @Override diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/KafkaRecordSerializer.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/KafkaRecordSerializer.java index b1729f75da..42ca2ea3e2 100644 --- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/KafkaRecordSerializer.java +++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/engine/KafkaRecordSerializer.java @@ -49,8 +49,8 @@ public class KafkaRecordSerializer implements KafkaRecordSerializationSchema byte[] projectId = String.valueOf(featureGroupBase.getFeatureStore().getProjectId()) .getBytes(StandardCharsets.UTF_8); byte[] featureGroupId = String.valueOf(featureGroupBase.getId()).getBytes(StandardCharsets.UTF_8); - byte[] subjectId = String.valueOf(featureGroupBase.getSubject().getId()).getBytes(StandardCharsets.UTF_8); + byte[] schemaId = String.valueOf(featureGroupBase.getSubject().getSchemaId()).getBytes(StandardCharsets.UTF_8); onlineFeatureGroupToAvro(featureGroupBase, encodeComplexFeatures(featureGroupBase, dataset)) .withColumn("headers", array( @@ -554,8 +554,8 @@ public void writeOnlineDataframe(FeatureGroupBase featureGroupBase, Dataset lit(featureGroupId).as("value") ), struct( - lit("subjectId").as("key"), - lit(subjectId).as("value") + lit("schemaId").as("key"), + lit(schemaId).as("value") ) )) .write() @@ -573,7 +573,7 @@ public StreamingQuery writeStreamDataframe(FeatureGroupBase featureGroupBase byte[] projectId = String.valueOf(featureGroupBase.getFeatureStore().getProjectId()) .getBytes(StandardCharsets.UTF_8); byte[] featureGroupId = String.valueOf(featureGroupBase.getId()).getBytes(StandardCharsets.UTF_8); - byte[] subjectId = String.valueOf(featureGroupBase.getSubject().getId()).getBytes(StandardCharsets.UTF_8); + byte[] schemaId = String.valueOf(featureGroupBase.getSubject().getSchemaId()).getBytes(StandardCharsets.UTF_8); DataStreamWriter writer = onlineFeatureGroupToAvro(featureGroupBase, encodeComplexFeatures(featureGroupBase, dataset)) @@ -587,8 +587,8 @@ public StreamingQuery writeStreamDataframe(FeatureGroupBase featureGroupBase lit(featureGroupId).as("value") ), struct( - lit("subjectId").as("key"), - lit(subjectId).as("value") + lit("schemaId").as("key"), + lit(schemaId).as("value") ) )) .writeStream() diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/hudi/DeltaStreamerKafkaSource.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/hudi/DeltaStreamerKafkaSource.java index 3e6a6cdffd..e243714da4 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/hudi/DeltaStreamerKafkaSource.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/hudi/DeltaStreamerKafkaSource.java @@ -91,15 +91,15 @@ protected InputBatch> fetchNewData(Option lastChe if (totalNewMsgs <= 0L) { return new InputBatch(Option.empty(), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges)); } else { - JavaRDD newDataRdd = this.toRdd(offsetRanges, props.getString(HudiEngine.SUBJECT_ID)); + JavaRDD newDataRdd = this.toRdd(offsetRanges, props.getString(HudiEngine.SCHEMA_ID)); return new InputBatch(Option.of(newDataRdd), KafkaOffsetGen.CheckpointUtils.offsetsToStr(offsetRanges)); } } - private JavaRDD toRdd(OffsetRange[] offsetRanges, String subjectId) { + private JavaRDD toRdd(OffsetRange[] offsetRanges, String schemaId) { return KafkaUtils.createRDD(this.sparkContext, this.offsetGen.getKafkaParams(), offsetRanges, LocationStrategies.PreferConsistent()) - .filter(obj -> subjectId.equals(getHeader(obj.headers(), "subjectId"))) + .filter(obj -> schemaId.equals(getHeader(obj.headers(), "schemaId"))) .map(obj -> (GenericRecord) obj.value()); } diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/hudi/HudiEngine.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/hudi/HudiEngine.java index 690b2948ca..1845896252 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/hudi/HudiEngine.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/hudi/HudiEngine.java @@ -126,7 +126,7 @@ public class HudiEngine { protected static final String SPARK_MASTER = "yarn"; protected static final String PROJECT_ID = "projectId"; protected static final String FEATURE_STORE_NAME = "featureStoreName"; - protected static final String SUBJECT_ID = "subjectId"; + protected static final String SCHEMA_ID = "schemaId"; protected static final String FEATURE_GROUP_NAME = "featureGroupName"; protected static final String FEATURE_GROUP_VERSION = "featureGroupVersion"; protected static final String FUNCTION_TYPE = "functionType"; @@ -367,7 +367,7 @@ public void streamToHoodieTable(SparkSession sparkSession, StreamFeatureGroup st writeOptions); hudiWriteOpts.put(PROJECT_ID, String.valueOf(streamFeatureGroup.getFeatureStore().getProjectId())); hudiWriteOpts.put(FEATURE_STORE_NAME, streamFeatureGroup.getFeatureStore().getName()); - hudiWriteOpts.put(SUBJECT_ID, String.valueOf(streamFeatureGroup.getSubject().getId())); + hudiWriteOpts.put(SCHEMA_ID, String.valueOf(streamFeatureGroup.getSubject().getSchemaId())); hudiWriteOpts.put(FEATURE_GROUP_NAME, streamFeatureGroup.getName()); hudiWriteOpts.put(FEATURE_GROUP_VERSION, String.valueOf(streamFeatureGroup.getVersion())); hudiWriteOpts.put(HUDI_TABLE_NAME, utils.getFgName(streamFeatureGroup)); diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py index 5b63dc47de..611888e71e 100644 --- a/python/hsfs/engine/python.py +++ b/python/hsfs/engine/python.py @@ -1091,7 +1091,9 @@ def _kafka_produce( "utf8" ), "featureGroupId": str(feature_group._id).encode("utf8"), - "subjectId": str(feature_group.subject["id"]).encode("utf8"), + "schemaId": str(feature_group.subject["schemaId"]).encode( + "utf8" + ), }, ) diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index c6dce83b7e..256c824855 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -341,7 +341,7 @@ def save_stream_dataframe( project_id = str(feature_group.feature_store.project_id).encode("utf8") feature_group_id = str(feature_group._id).encode("utf8") - subject_id = str(feature_group.subject["id"]).encode("utf8") + schema_id = str(feature_group.subject["schemaId"]).encode("utf8") query = ( serialized_df.withColumn( @@ -354,9 +354,7 @@ def save_stream_dataframe( lit("featureGroupId").alias("key"), lit(feature_group_id).alias("value"), ), - struct( - lit("subjectId").alias("key"), lit(subject_id).alias("value") - ), + struct(lit("schemaId").alias("key"), lit(schema_id).alias("value")), ), ) .writeStream.outputMode(output_mode) @@ -422,7 +420,7 @@ def _save_online_dataframe(self, feature_group, dataframe, write_options): project_id = str(feature_group.feature_store.project_id).encode("utf8") feature_group_id = str(feature_group._id).encode("utf8") - subject_id = str(feature_group.subject["id"]).encode("utf8") + schema_id = str(feature_group.subject["schemaId"]).encode("utf8") serialized_df.withColumn( "headers", @@ -432,7 +430,7 @@ def _save_online_dataframe(self, feature_group, dataframe, write_options): lit("featureGroupId").alias("key"), lit(feature_group_id).alias("value"), ), - struct(lit("subjectId").alias("key"), lit(subject_id).alias("value")), + struct(lit("schemaId").alias("key"), lit(schema_id).alias("value")), ), ).write.format(self.KAFKA_FORMAT).options(**write_options).option( "topic", feature_group._online_topic_name From e627a7687d63e8f5bcecc73861fd7b0a7f0d2fd8 Mon Sep 17 00:00:00 2001 From: bubriks Date: Thu, 25 Jan 2024 10:15:12 +0100 Subject: [PATCH 2/2] merge fix --- .../hsfs/spark/engine/hudi/DeltaStreamerAvroDeserializer.java | 2 +- python/hsfs/engine/spark.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/hudi/DeltaStreamerAvroDeserializer.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/hudi/DeltaStreamerAvroDeserializer.java index 581735dca0..812702ba98 100644 --- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/hudi/DeltaStreamerAvroDeserializer.java +++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/hudi/DeltaStreamerAvroDeserializer.java @@ -68,7 +68,7 @@ public DeltaStreamerAvroDeserializer() { @SneakyThrows @Override public void configure(Map configs, boolean isKey) { - this.subjectId = (String) configs.get(HudiEngine.SUBJECT_ID); + this.subjectId = (String) configs.get(HudiEngine.SCHEMA_ID); GenericData.get().addLogicalTypeConversion(new Conversions.DecimalConversion()); String featureGroupSchema = (String) configs.get(HudiEngine.FEATURE_GROUP_SCHEMA); String encodedFeatureGroupSchema = configs.get(HudiEngine.FEATURE_GROUP_ENCODED_SCHEMA).toString() diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py index e3472f3bd6..868283a37f 100644 --- a/python/hsfs/engine/spark.py +++ b/python/hsfs/engine/spark.py @@ -354,8 +354,8 @@ def save_stream_dataframe( feature_group, self._encode_complex_features(feature_group, dataframe) ) - project_id = str(feature_group.feature_store.project_id).encode("utf8") - feature_group_id = str(feature_group._id).encode("utf8") + project_id = str(feature_group.feature_store.project_id) + feature_group_id = str(feature_group._id) schema_id = str(feature_group.subject["schemaId"]).encode("utf8") if query_name is None: