Skip to content

Commit cf1c325

Browse files
committed
add filter to kafka
1 parent 0e92376 commit cf1c325

File tree

1 file changed

+5
-1
lines changed

1 file changed

+5
-1
lines changed

utils/python/hsfs_utils.py

+5-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
hopsfs = pfs.HadoopFileSystem("default", user=os.environ["HADOOP_USER_NAME"])
1414
from pyspark.sql import SparkSession
1515
from pyspark.sql.types import StructField, StructType, _parse_datatype_string
16-
from pyspark.sql.functions import max
16+
from pyspark.sql.functions import max, expr
1717

1818
import hopsworks
1919

@@ -300,6 +300,10 @@ def offline_fg_materialization(spark: SparkSession, job_conf: Dict[Any, Any], in
300300
.load()
301301
)
302302

303+
# filter only the necassary entries
304+
df = df.filter(expr("CAST(filter(headers, header -> header.key = 'featureGroupId')[0].value AS STRING)") == str(entity._id))
305+
df = df.filter(expr("CAST(filter(headers, header -> header.key = 'subjectId')[0].value AS STRING)") == str(entity.subject["id"]))
306+
303307
# deserialize dataframe so that it can be properly saved
304308
deserialized_df = engine.get_instance()._deserialize_from_avro(entity, df)
305309

0 commit comments

Comments
 (0)