Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add delta lake to guides #355

Merged
merged 1 commit into from
Mar 4, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 16 additions & 9 deletions docs/user_guides/fs/feature_group/create.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ The first step to create a feature group is to create the API metadata object re
primary_key=['location_id'],
partition_key=['day'],
event_time='event_time',
time_travel_format='DELTA',
)
```

Expand All @@ -47,7 +48,7 @@ The last parameter used in the examples above is `stream`. The `stream` paramete

##### Primary key

A primary key is required when using the default Hudi file format to store offline feature data. When inserting data in a feature group on the offline feature store, the DataFrame you are writing is checked against the existing data in the feature group. If a row with the same primary key is found in the feature group, the row will be updated. If the primary key is not found, the row is appended to the feature group.
A primary key is required when using the default table format (Hudi) to store offline feature data. When inserting data in a feature group on the offline feature store, the DataFrame you are writing is checked against the existing data in the feature group. If a row with the same primary key is found in the feature group, the row will be updated. If the primary key is not found, the row is appended to the feature group.
When writing data on the online feature store, existing rows with the same primary key will be overwritten by new rows with the same primary key.

##### Event time
Expand Down Expand Up @@ -80,6 +81,11 @@ MaxDirectoryItemsExceededException - The directory item limit is exceeded: limit

By using partitioning the system will write the feature data in different subdirectories, thus allowing you to write 10240 files per partition.

##### Table format

When you create a feature group, you can specify the table format you want to use to store the data in your feature group by setting the `time_travel_format` parameter. The currently support values are "HUDI", "DELTA", "NONE" (which defaults to Parquet).


#### Streaming Write API

As explained above, the stream parameter controls whether to enable the streaming write APIs to the online and offline feature store.
Expand All @@ -95,6 +101,7 @@ For Python environments, only the stream API is supported (stream=True).
primary_key=['location_id'],
partition_key=['day'],
event_time='event_time'
time_travel_format='HUDI',
)
```

Expand All @@ -108,6 +115,7 @@ For Python environments, only the stream API is supported (stream=True).
primary_key=['location_id'],
partition_key=['day'],
event_time='event_time',
time_travel_format='HUDI',
stream=True
)
```
Expand All @@ -132,8 +140,8 @@ By default, feature groups in hopsworks will share a project-wide topic.
#### Best Practices for Writing

When designing a feature group, it is worth taking a look at how this feature group will be queried in the future, in order to optimize it for those query patterns.
At the same time, Spark and Hudi tend to overpartition writes, creatingtoo many small parquet files, which is inefficient and slowing down the write.
But they also slow down queries, because file listings are taking more time, but also reading many small files is usually slower.
At the same time, Spark and Hudi tend to overpartition writes, creating too many small parquet files, which is inefficient and slows down writes.
But they also slow down queries, because file listings take more time and reading many small files is slower than fewer larger files.
The best practices described in this section hold both for the Streaming API and the Batch API.

Four main considerations influence the write and the query performance:
Expand All @@ -145,8 +153,7 @@ Four main considerations influence the write and the query performance:

##### Partitioning on a feature group level

**Partitioning on the feature group level** allows Hopsworks and Hudi to push down filters to the filesystem during training dataset or batch data generation.
In practice that means, less directories need to be listed and less files need to be read, speeding up queries.
**Partitioning on the feature group level** allows Hopsworks and the table format (Hudi or Delta) to push down filters to the filesystem when reading from feature groups. In practice that means, less directories need to be listed and less files need to be read, speeding up queries.

For example, most commonly, filtering is done on the event time column of a feature group when generating training data or batches of data:
```python
Expand Down Expand Up @@ -199,13 +206,13 @@ fg = feature_store.create_feature_group(...
##### Parquet file size within a feature group partition

Once you have decided on the feature group level partitioning and you start inserting data to the feature group, there are multiple ways in order to
influence how Hudi will **split the data between parquet files within the feature group partitions**.
influence how the table format (Hudi or Delta) will **split the data between parquet files within the feature group partitions**.
The two things that influence the number of parquet files per partition are

1. The number of feature group partitions written in a single insert
2. The shuffle parallelism used by Hudi
2. The shuffle parallelism used by the table format

In general, the inserted dataframe (unique combination of partition key values) will be parallised according to the following Hudi settings:
For example, the inserted dataframe (unique combination of partition key values) will be parallised according to the following Hudi settings:
!!! example "Default Hudi partitioning"
```python
write_options = {
Expand Down Expand Up @@ -261,7 +268,7 @@ In that case you can increase the Hudi shuffle parallelism accordingly.

When creating a feature group that uses streaming write APIs for data ingestion it is possible to define the Kafka topics that should be utilized.
The default approach of using a project-wide topic functions great for use cases involving little to no overlap when producing data. However,
concurrently inserting into multiple feature groups could cause read amplification for the Hudi delta streamer job. Therefore, it is
concurrently inserting into multiple feature groups could cause read amplification for the offline materialization job (e.g., Hudi Delta Streamer). Therefore, it is
advised to utilize separate topics when ingestions overlap or there is a large frequently running insertion into a specific feature group.

### Register the metadata and save the feature data
Expand Down
Loading