diff --git a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java
index c4904ddc3..9d3c41ee6 100644
--- a/java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java
+++ b/java/beam/src/main/java/com/logicalclocks/hsfs/beam/StreamFeatureGroup.java
@@ -25,6 +25,7 @@
 import com.logicalclocks.hsfs.OnlineConfig;
 import com.logicalclocks.hsfs.StatisticsConfig;
 import com.logicalclocks.hsfs.Storage;
+import com.logicalclocks.hsfs.StorageConnector;
 import com.logicalclocks.hsfs.beam.engine.FeatureGroupEngine;
 import com.logicalclocks.hsfs.beam.engine.BeamProducer;
 import com.logicalclocks.hsfs.constructor.QueryBase;
@@ -48,7 +49,7 @@ public class StreamFeatureGroup extends FeatureGroupBase<PCollection<Object>> {
   public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integer version, String description,
       List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
       boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig, String onlineTopicName,
-      String eventTime, OnlineConfig onlineConfig) {
+      String eventTime, OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
     this();
     this.featureStore = featureStore;
     this.name = name;
@@ -65,6 +66,8 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
     this.onlineTopicName = onlineTopicName;
     this.eventTime = eventTime;
     this.onlineConfig = onlineConfig;
+    this.storageConnector = storageConnector;
+    this.path = path;
   }
 
   public StreamFeatureGroup() {
diff --git a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java
index d13a9c8e1..c3cd6cbd0 100644
--- a/java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java
+++ b/java/flink/src/main/java/com/logicalclocks/hsfs/flink/StreamFeatureGroup.java
@@ -26,6 +26,7 @@
 import com.logicalclocks.hsfs.OnlineConfig;
 import com.logicalclocks.hsfs.StatisticsConfig;
 import com.logicalclocks.hsfs.Storage;
+import com.logicalclocks.hsfs.StorageConnector;
 import com.logicalclocks.hsfs.constructor.QueryBase;
 
 import com.logicalclocks.hsfs.metadata.Statistics;
@@ -54,7 +55,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
       List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
       boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig,
       String onlineTopicName, String topicName, String notificationTopicName, String eventTime,
-      OnlineConfig onlineConfig) {
+      OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
     this();
     this.featureStore = featureStore;
     this.name = name;
@@ -73,6 +74,8 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
     this.notificationTopicName = notificationTopicName;
     this.eventTime = eventTime;
     this.onlineConfig = onlineConfig;
+    this.storageConnector = storageConnector;
+    this.path = path;
   }
 
   public StreamFeatureGroup() {
diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java
index 585f4780e..61ca2f5a9 100644
--- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java
+++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/FeatureGroupBase.java
@@ -132,6 +132,14 @@ public abstract class FeatureGroupBase<T> {
   @Setter
   protected OnlineConfig onlineConfig;
 
+  @Getter
+  @Setter
+  protected StorageConnector storageConnector;
+
+  @Getter
+  @Setter
+  protected String path;
+
   @JsonIgnore
   // These are only used in the client. In the server they are aggregated in the `features` field
   protected List<String> partitionKeys;
diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java
index 71be4d995..91bd36819 100644
--- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java
+++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/StorageConnector.java
@@ -124,6 +124,9 @@ public static class S3Connector extends StorageConnector {
     @Getter @Setter
     protected String bucket;
 
+    @Getter @Setter
+    protected String region;
+
     @Getter @Setter
     protected String sessionToken;
 
diff --git a/java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java b/java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java
index da4bf655a..a937cca1a 100644
--- a/java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java
+++ b/java/hsfs/src/main/java/com/logicalclocks/hsfs/engine/FeatureGroupUtils.java
@@ -128,7 +128,7 @@ public static Long getTimeStampFromDateString(String inputDate) throws FeatureSt
     }
 
     SimpleDateFormat dateFormat = new SimpleDateFormat(dateFormatPattern);
-    Long commitTimeStamp = dateFormat.parse(tempDate).getTime();;
+    Long commitTimeStamp = dateFormat.parse(tempDate).getTime();
 
     return commitTimeStamp;
   }
diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/ExternalFeatureGroup.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/ExternalFeatureGroup.java
index 13f907ff4..e62a5e83e 100644
--- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/ExternalFeatureGroup.java
+++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/ExternalFeatureGroup.java
@@ -57,10 +57,6 @@
 @JsonIgnoreProperties(ignoreUnknown = true)
 public class ExternalFeatureGroup extends FeatureGroupBase<Dataset<Row>> {
 
-  @Getter
-  @Setter
-  private StorageConnector storageConnector;
-
   @Getter
   @Setter
   private String query;
@@ -69,10 +65,6 @@ public class ExternalFeatureGroup extends FeatureGroupBase<Dataset<Row>> {
   @Setter
   private ExternalDataFormat dataFormat;
 
-  @Getter
-  @Setter
-  private String path;
-
   @Getter
   @Setter
   private List<OnDemandOptions> options;
diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureGroup.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureGroup.java
index 9c968b3ba..07fe98ff6 100644
--- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureGroup.java
+++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/FeatureGroup.java
@@ -31,6 +31,7 @@
 import com.logicalclocks.hsfs.OnlineConfig;
 import com.logicalclocks.hsfs.StatisticsConfig;
 import com.logicalclocks.hsfs.Storage;
+import com.logicalclocks.hsfs.StorageConnector;
 import com.logicalclocks.hsfs.TimeTravelFormat;
 import com.logicalclocks.hsfs.FeatureGroupBase;
 import com.logicalclocks.hsfs.metadata.Statistics;
@@ -64,7 +65,8 @@ public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer ver
                       String description, List<String> primaryKeys, List<String> partitionKeys,
                       String hudiPrecombineKey, boolean onlineEnabled, TimeTravelFormat timeTravelFormat,
                       List<Feature> features, StatisticsConfig statisticsConfig, String onlineTopicName,
-                      String topicName, String notificationTopicName, String eventTime, OnlineConfig onlineConfig) {
+                      String topicName, String notificationTopicName, String eventTime, OnlineConfig onlineConfig,
+                      StorageConnector storageConnector, String path) {
     this();
     this.featureStore = featureStore;
     this.name = name;
@@ -85,6 +87,8 @@ public FeatureGroup(FeatureStore featureStore, @NonNull String name, Integer ver
     this.notificationTopicName = notificationTopicName;
     this.eventTime = eventTime;
     this.onlineConfig = onlineConfig;
+    this.storageConnector = storageConnector;
+    this.path = path;
   }
 
   public FeatureGroup() {
diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java
index 597e6e3ad..0c8b9bae3 100644
--- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java
+++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/StreamFeatureGroup.java
@@ -30,6 +30,7 @@
 import com.logicalclocks.hsfs.OnlineConfig;
 import com.logicalclocks.hsfs.StatisticsConfig;
 import com.logicalclocks.hsfs.Storage;
+import com.logicalclocks.hsfs.StorageConnector;
 import com.logicalclocks.hsfs.FeatureGroupBase;
 import com.logicalclocks.hsfs.metadata.Statistics;
 
@@ -62,7 +63,7 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
                             List<String> primaryKeys, List<String> partitionKeys, String hudiPrecombineKey,
                             boolean onlineEnabled, List<Feature> features, StatisticsConfig statisticsConfig,
                             String onlineTopicName, String topicName, String notificationTopicName, String eventTime,
-                            OnlineConfig onlineConfig) {
+                            OnlineConfig onlineConfig, StorageConnector storageConnector, String path) {
     this();
     this.featureStore = featureStore;
     this.name = name;
@@ -81,6 +82,8 @@ public StreamFeatureGroup(FeatureStore featureStore, @NonNull String name, Integ
     this.notificationTopicName = notificationTopicName;
     this.eventTime = eventTime;
     this.onlineConfig = onlineConfig;
+    this.storageConnector = storageConnector;
+    this.path = path;
   }
 
   public StreamFeatureGroup() {
diff --git a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java
index b30bca98c..1aed9a650 100644
--- a/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java
+++ b/java/spark/src/main/java/com/logicalclocks/hsfs/spark/engine/SparkEngine.java
@@ -219,10 +219,6 @@ public Dataset<Row> registerOnDemandTemporaryTable(ExternalFeatureGroup onDemand
             ? onDemandFeatureGroup.getDataFormat().toString() : null, getOnDemandOptions(onDemandFeatureGroup),
         onDemandFeatureGroup.getStorageConnector().getPath(onDemandFeatureGroup.getPath()));
 
-    if (!Strings.isNullOrEmpty(onDemandFeatureGroup.getLocation())) {
-      sparkSession.sparkContext().textFile(onDemandFeatureGroup.getLocation(), 0).collect();
-    }
-
     dataset.createOrReplaceTempView(alias);
     return dataset;
   }
diff --git a/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestFeatureGroup.java b/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestFeatureGroup.java
index adddd8310..bedd9716e 100644
--- a/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestFeatureGroup.java
+++ b/java/spark/src/test/java/com/logicalclocks/hsfs/spark/TestFeatureGroup.java
@@ -67,11 +67,11 @@ public void testFeatureGroupPrimaryKey() {
 
     StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
         Collections.singletonList("primaryKey"), Collections.singletonList("partitionKey"), "hudiPrecombineKey",
-        true, features, null, "onlineTopicName", null, null, null, null);
+        true, features, null, "onlineTopicName", null, null, null, null, null, null);
 
     Exception pkException = assertThrows(FeatureStoreException.class, () -> {
       featureGroupEngine.saveFeatureGroupMetaData(featureGroup,
-         null, null, null, null, null);;;
+         null, null, null, null, null);
     });
 
     // Assert
@@ -93,11 +93,11 @@ public void testFeatureGroupEventTimeFeature() {
 
     StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
         Collections.singletonList("featureA"), null, null,
-        true, features, null, "onlineTopicName", null, null, "eventTime", null);
+        true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null);
 
     Exception eventTimeException = assertThrows(FeatureStoreException.class, () -> {
       streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup,
-          null, null, null, null, null);;;
+          null, null, null, null, null);
     });
 
     // Assert
@@ -119,7 +119,7 @@ public void testFeatureGroupPartitionPrecombineKeys() {
 
     StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
         Collections.singletonList("featureA"), Collections.singletonList("partitionKey"), "hudiPrecombineKey",
-        true, features, null, "onlineTopicName", null, null, null, null);
+        true, features, null, "onlineTopicName", null, null, null, null, null, null);
 
     Exception partitionException = assertThrows(FeatureStoreException.class, () -> {
       streamFeatureGroupEngine.saveFeatureGroupMetaData(featureGroup,
@@ -164,7 +164,7 @@ public void testFeatureGroupAppendFeaturesResetSubject() throws FeatureStoreExce
 
     StreamFeatureGroup featureGroup = new StreamFeatureGroup(featureStore, "fgName", 1, "description",
         Collections.singletonList("featureA"), null, null,
-        true, features, null, "onlineTopicName", null, null, "eventTime", null);
+        true, features, null, "onlineTopicName", null, null, "eventTime", null, null, null);
     featureGroup.featureGroupEngine = featureGroupEngine;
 
     // Act
diff --git a/python/hsfs/core/delta_engine.py b/python/hsfs/core/delta_engine.py
index 95ea7f37b..0032daddf 100644
--- a/python/hsfs/core/delta_engine.py
+++ b/python/hsfs/core/delta_engine.py
@@ -52,10 +52,12 @@ def save_delta_fg(self, dataset, write_options, validation_id=None):
         return self._feature_group_api.commit(self._feature_group, fg_commit)
 
     def register_temporary_table(self, delta_fg_alias, read_options):
+        location = self._feature_group.prepare_spark_location()
+
         delta_options = self._setup_delta_read_opts(delta_fg_alias, read_options)
         self._spark_session.read.format(self.DELTA_SPARK_FORMAT).options(
             **delta_options
-        ).load(self._feature_group.location).createOrReplaceTempView(
+        ).load(location).createOrReplaceTempView(
             delta_fg_alias.alias
         )
 
@@ -85,15 +87,17 @@ def _setup_delta_read_opts(self, delta_fg_alias, read_options):
         return delta_options
 
     def delete_record(self, delete_df):
+        location = self._feature_group.prepare_spark_location()
+
         if not DeltaTable.isDeltaTable(
-            self._spark_session, self._feature_group.location
+            self._spark_session, location
         ):
             raise FeatureStoreException(
                 f"This is no data available in Feature group {self._feature_group.name}, or it not DELTA enabled "
             )
         else:
             fg_source_table = DeltaTable.forPath(
-                self._spark_session, self._feature_group.location
+                self._spark_session, location
             )
 
             source_alias = (
@@ -109,16 +113,18 @@ def delete_record(self, delete_df):
             ).whenMatchedDelete().execute()
 
         fg_commit = self._get_last_commit_metadata(
-            self._spark_session, self._feature_group.location
+            self._spark_session, location
         )
         return self._feature_group_api.commit(self._feature_group, fg_commit)
 
     def _write_delta_dataset(self, dataset, write_options):
+        location = self._feature_group.prepare_spark_location()
+
         if write_options is None:
             write_options = {}
 
         if not DeltaTable.isDeltaTable(
-            self._spark_session, self._feature_group.location
+            self._spark_session, location
         ):
             (
                 dataset.write.format(DeltaEngine.DELTA_SPARK_FORMAT)
@@ -129,11 +135,11 @@ def _write_delta_dataset(self, dataset, write_options):
                     else []
                 )
                 .mode("append")
-                .save(self._feature_group.location)
+                .save(location)
             )
         else:
             fg_source_table = DeltaTable.forPath(
-                self._spark_session, self._feature_group.location
+                self._spark_session, location
             )
 
             source_alias = (
@@ -149,9 +155,18 @@ def _write_delta_dataset(self, dataset, write_options):
             ).whenMatchedUpdateAll().whenNotMatchedInsertAll().execute()
 
         return self._get_last_commit_metadata(
-            self._spark_session, self._feature_group.location
+            self._spark_session, location
         )
 
+    def vacuum(self, retention_hours):
+        location = self._feature_group.prepare_spark_location()
+
+        delta_table = DeltaTable.forPath(self._spark_session, location)
+
+        # Vacuum the table
+        # https://docs.delta.io/1.0.1/api/python/index.html#delta.tables.DeltaTable.vacuum
+        delta_table.vacuum(retention_hours)
+
     def _generate_merge_query(self, source_alias, updates_alias):
         merge_query_list = []
         primary_key = self._feature_group.primary_key
diff --git a/python/hsfs/core/feature_group_engine.py b/python/hsfs/core/feature_group_engine.py
index a06d12ca9..234829879 100644
--- a/python/hsfs/core/feature_group_engine.py
+++ b/python/hsfs/core/feature_group_engine.py
@@ -246,6 +246,20 @@ def commit_delete(feature_group, delete_df, write_options):
             )
             return hudi_engine_instance.delete_record(delete_df, write_options)
 
+    @staticmethod
+    def clean(feature_group, write_options):
+        if feature_group.time_travel_format == "DELTA":
+            delta_engine_instance = delta_engine.DeltaEngine(
+                feature_group.feature_store_id,
+                feature_group.feature_store_name,
+                feature_group,
+                engine.get_instance()._spark_session,
+                engine.get_instance()._spark_context,
+            )
+            return delta_engine_instance.vacuum(write_options.get("retention_hours", None))
+        else:
+            return None
+
     def sql(self, query, feature_store_name, dataframe_type, online, read_options):
         if online and self._online_conn is None:
             self._online_conn = self._storage_connector_api.get_online_connector(
@@ -285,7 +299,7 @@ def append_features(self, feature_group, new_features):
         if feature_group.time_travel_format == "DELTA":
             engine.get_instance().add_cols_to_delta_table(feature_group, new_features)
         else:
-            engine.get_instance().save_empty_dataframe(feature_group)
+            engine.get_instance().save_empty_dataframe(feature_group, new_features=new_features)
 
     def update_description(self, feature_group, description):
         """Updates the description of a feature group."""
diff --git a/python/hsfs/core/hudi_engine.py b/python/hsfs/core/hudi_engine.py
index 04c2c5a6b..4492f0a19 100644
--- a/python/hsfs/core/hudi_engine.py
+++ b/python/hsfs/core/hudi_engine.py
@@ -20,6 +20,10 @@
 
 
 class HudiEngine:
+
+    HUDI_SPEC_FEATURE_NAMES = ["_hoodie_record_key", "_hoodie_partition_path",
+                               "_hoodie_commit_time", "_hoodie_file_name", "_hoodie_commit_seqno"]
+
     HUDI_SPARK_FORMAT = "org.apache.hudi"
     HUDI_TABLE_NAME = "hoodie.table.name"
     HUDI_TABLE_STORAGE_TYPE = "hoodie.datasource.write.storage.type"
@@ -100,21 +104,25 @@ def delete_record(self, delete_df, write_options):
         return self._feature_group_api.commit(self._feature_group, fg_commit)
 
     def register_temporary_table(self, hudi_fg_alias, read_options):
+        location = self._feature_group.prepare_spark_location()
+
         hudi_options = self._setup_hudi_read_opts(hudi_fg_alias, read_options)
         self._spark_session.read.format(self.HUDI_SPARK_FORMAT).options(
             **hudi_options
-        ).load(self._feature_group.location).createOrReplaceTempView(
+        ).load(location).createOrReplaceTempView(
             hudi_fg_alias.alias
         )
 
     def _write_hudi_dataset(self, dataset, save_mode, operation, write_options):
+        location = self._feature_group.prepare_spark_location()
+
         hudi_options = self._setup_hudi_write_opts(operation, write_options)
         dataset.write.format(HudiEngine.HUDI_SPARK_FORMAT).options(**hudi_options).mode(
             save_mode
-        ).save(self._feature_group.location)
+        ).save(location)
 
         feature_group_commit = self._get_last_commit_metadata(
-            self._spark_context, self._feature_group.location
+            self._spark_context, location
         )
 
         return feature_group_commit
@@ -144,6 +152,9 @@ def _setup_hudi_write_opts(self, operation, write_options):
             else self._feature_group.primary_key[0]
         )
 
+        # dont enable hive sync when using managed FG
+        hive_sync = self._feature_group.storage_connector is None
+
         hudi_options = {
             self.HUDI_KEY_GENERATOR_OPT_KEY: self.HUDI_COMPLEX_KEY_GENERATOR_OPT_VAL,
             self.HUDI_PRECOMBINE_FIELD: pre_combine_key,
@@ -153,7 +164,7 @@ def _setup_hudi_write_opts(self, operation, write_options):
             self.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY: self.DEFAULT_HIVE_PARTITION_EXTRACTOR_CLASS_OPT_VAL
             if len(partition_key) >= 1
             else self.HIVE_NON_PARTITION_EXTRACTOR_CLASS_OPT_VAL,
-            self.HUDI_HIVE_SYNC_ENABLE: "true",
+            self.HUDI_HIVE_SYNC_ENABLE: str(hive_sync).lower(),
             self.HUDI_HIVE_SYNC_MODE: self.HUDI_HIVE_SYNC_MODE_VAL,
             self.HUDI_HIVE_SYNC_DB: self._feature_store_name,
             self.HUDI_HIVE_SYNC_TABLE: table_name,
@@ -226,9 +237,8 @@ def _setup_hudi_read_opts(self, hudi_fg_alias, read_options):
     def reconcile_hudi_schema(
         self, save_empty_dataframe_callback, hudi_fg_alias, read_options
     ):
-        fg_table_name = hudi_fg_alias.feature_group._get_table_name()
         if sorted(self._spark_session.table(hudi_fg_alias.alias).columns) != sorted(
-            self._spark_session.table(fg_table_name).columns
+            [feature.name for feature in hudi_fg_alias.feature_group._features] + self.HUDI_SPEC_FEATURE_NAMES
         ):
             full_fg = self._feature_group_api.get(
                 feature_store_id=hudi_fg_alias.feature_group._feature_store_id,
diff --git a/python/hsfs/engine/python.py b/python/hsfs/engine/python.py
index 96c4f9ecc..512803ebd 100644
--- a/python/hsfs/engine/python.py
+++ b/python/hsfs/engine/python.py
@@ -481,6 +481,12 @@ def register_external_temporary_table(
         # No op to avoid query failure
         pass
 
+    def register_delta_temporary_table(
+        self, delta_fg_alias, feature_store_id, feature_store_name, read_options
+    ):
+        # No op to avoid query failure
+        pass
+
     def register_hudi_temporary_table(
         self,
         hudi_fg_alias: "hsfs.constructor.hudi_feature_group_alias.HudiFeatureGroupAlias",
@@ -1203,7 +1209,7 @@ def save_stream_dataframe(
         )
 
     def save_empty_dataframe(
-        self, feature_group: Union[FeatureGroup, ExternalFeatureGroup]
+        self, feature_group: Union[FeatureGroup, ExternalFeatureGroup], new_features=None
     ) -> None:
         """Wrapper around save_dataframe in order to provide no-op."""
         pass
diff --git a/python/hsfs/engine/spark.py b/python/hsfs/engine/spark.py
index 53ea4dc0f..85071df22 100644
--- a/python/hsfs/engine/spark.py
+++ b/python/hsfs/engine/spark.py
@@ -193,12 +193,10 @@ def register_external_temporary_table(self, external_fg, alias):
                 external_fg.query,
                 external_fg.data_format,
                 external_fg.options,
-                external_fg.storage_connector._get_path(external_fg.path),
+                external_fg.prepare_spark_location(),
             )
         else:
             external_dataset = external_fg.dataframe
-        if external_fg.location:
-            self._spark_session.sparkContext.textFile(external_fg.location).collect()
 
         external_dataset.createOrReplaceTempView(alias)
         return external_dataset
@@ -213,10 +211,12 @@ def register_hudi_temporary_table(
             self._spark_context,
             self._spark_session,
         )
+
         hudi_engine_instance.register_temporary_table(
             hudi_fg_alias,
             read_options,
         )
+
         hudi_engine_instance.reconcile_hudi_schema(
             self.save_empty_dataframe, hudi_fg_alias, read_options
         )
@@ -1196,7 +1196,7 @@ def _setup_s3_hadoop_conf(self, storage_connector, path):
         self._set_s3_hadoop_conf(
             storage_connector, f"fs.s3a.bucket.{storage_connector.bucket}"
         )
-        return path.replace("s3", "s3a", 1) if path is not None else None
+        return path.replace("s3://", "s3a://", 1) if path is not None else None
 
     def _set_s3_hadoop_conf(self, storage_connector, prefix):
         if storage_connector.access_key:
@@ -1247,13 +1247,22 @@ def is_spark_dataframe(self, dataframe):
             return True
         return False
 
-    def save_empty_dataframe(self, feature_group):
-        fg_table_name = feature_group._get_table_name()
-        dataframe = self._spark_session.table(fg_table_name).limit(0)
+    def save_empty_dataframe(self, feature_group, new_features=None):
+        location = feature_group.prepare_spark_location()
+
+        dataframe = self._spark_session.read.format("hudi").load(location)
+
+        if (new_features is not None):
+            if isinstance(new_features, list):
+                for new_feature in new_features:
+                    dataframe = dataframe.withColumn(new_feature.name, lit(None).cast(new_feature.type))
+            else:
+                dataframe = dataframe.withColumn(new_features.name, lit(None).cast(new_features.type))
+
 
         self.save_dataframe(
             feature_group,
-            dataframe,
+            dataframe.limit(0),
             "upsert",
             feature_group.online_enabled,
             "offline",
@@ -1262,20 +1271,22 @@ def save_empty_dataframe(self, feature_group):
         )
 
     def add_cols_to_delta_table(self, feature_group, new_features):
-        new_features_map = {}
-        if isinstance(new_features, list):
-            for new_feature in new_features:
-                new_features_map[new_feature.name] = lit("").cast(new_feature.type)
-        else:
-            new_features_map[new_features.name] = lit("").cast(new_features.type)
+        location = feature_group.prepare_spark_location()
+
+        dataframe = self._spark_session.read.format("delta").load(location)
+
+        if (new_features is not None):
+            if isinstance(new_features, list):
+                for new_feature in new_features:
+                    dataframe = dataframe.withColumn(new_feature.name, lit("").cast(new_feature.type))
+            else:
+                dataframe = dataframe.withColumn(new_features.name, lit("").cast(new_features.type))
 
-        self._spark_session.read.format("delta").load(
-            feature_group.location
-        ).withColumns(new_features_map).limit(0).write.format("delta").mode(
+        dataframe.limit(0).write.format("delta").mode(
             "append"
         ).option("mergeSchema", "true").option(
             "spark.databricks.delta.schema.autoMerge.enabled", "true"
-        ).save(feature_group.location)
+        ).save(location)
 
     def _apply_transformation_function(
         self,
diff --git a/python/hsfs/feature_group.py b/python/hsfs/feature_group.py
index c3407ea2f..f47f1d962 100644
--- a/python/hsfs/feature_group.py
+++ b/python/hsfs/feature_group.py
@@ -140,6 +140,8 @@ def __init__(
                 Dict[str, Any],
             ]
         ] = None,
+        storage_connector: Union[sc.StorageConnector, Dict[str, Any]] = None,
+        path: Optional[str] = None,
         **kwargs,
     ) -> None:
         self._version = version
@@ -156,6 +158,14 @@ def __init__(
         self._feature_store_id = featurestore_id
         self._feature_store = None
         self._variable_api: VariableApi = VariableApi()
+        self._path = path
+
+        if storage_connector is not None and isinstance(storage_connector, dict):
+            self._storage_connector = sc.StorageConnector.from_response_json(
+                storage_connector
+            )
+        else:
+            self._storage_connector: "sc.StorageConnector" = storage_connector
 
         self._online_config = (
             OnlineConfig.from_response_json(online_config)
@@ -2049,6 +2059,20 @@ def online_enabled(self) -> bool:
     def online_enabled(self, online_enabled: bool) -> None:
         self._online_enabled = online_enabled
 
+    @property
+    def path(self) -> Optional[str]:
+        return self._path
+
+    @property
+    def storage_connector(self) -> "sc.StorageConnector":
+        return self._storage_connector
+
+    def prepare_spark_location(self) -> str:
+        location = self.location
+        if (self.storage_connector is not None):
+            location = self.storage_connector.prepare_spark(location)
+        return location
+
     @property
     def topic_name(self) -> Optional[str]:
         """The topic used for feature group data ingestion."""
@@ -2235,6 +2259,8 @@ def __init__(
             ]
         ] = None,
         offline_backfill_every_hr: Optional[Union[str, int]] = None,
+        storage_connector: Union[sc.StorageConnector, Dict[str, Any]] = None,
+        path: Optional[str] = None,
         **kwargs,
     ) -> None:
         super().__init__(
@@ -2252,6 +2278,8 @@ def __init__(
             notification_topic_name=notification_topic_name,
             deprecated=deprecated,
             online_config=online_config,
+            storage_connector=storage_connector,
+            path=path,
         )
         self._feature_store_name: Optional[str] = featurestore_name
         self._description: Optional[str] = description
@@ -3249,6 +3277,30 @@ def commit_delete_record(
         """
         self._feature_group_engine.commit_delete(self, delete_df, write_options or {})
 
+    def clean(
+        self,
+        write_options: Optional[Dict[Any, Any]] = None,
+    ) -> None:
+        """ Clean up old files. This method can only be used on feature groups stored as DELTA.
+
+        !!! example
+            ```python
+            # connect to the Feature Store
+            fs = ...
+
+            # get the Feature Group instance
+            fg = fs.get_or_create_feature_group(...)
+
+            commit_details = fg.clean(write_options = {"retention_hours": 100})
+
+        # Arguments
+            write_options: User provided write options. Defaults to `{}`.
+
+        # Raises
+            `hsfs.client.exceptions.RestAPIError`.
+        """
+        self._feature_group_engine.clean(self, write_options or {})
+
     def as_of(
         self,
         wallclock_time: Optional[Union[str, int, datetime, date]] = None,
@@ -3557,6 +3609,7 @@ def to_dict(self) -> Dict[str, Any]:
             "transformationFunctions": [
                 tf.to_dict() for tf in self._transformation_functions
             ],
+            "path": self._path,
         }
         if self._online_config:
             fg_meta_dict["onlineConfig"] = self._online_config.to_dict()
@@ -3564,6 +3617,8 @@ def to_dict(self) -> Dict[str, Any]:
             fg_meta_dict["embeddingIndex"] = self.embedding_index.to_dict()
         if self._stream:
             fg_meta_dict["deltaStreamerJobConf"] = self._deltastreamer_jobconf
+        if self._storage_connector:
+            fg_meta_dict["storageConnector"] = self._storage_connector.to_dict()
         return fg_meta_dict
 
     def _get_table_name(self) -> str:
@@ -3801,6 +3856,8 @@ def __init__(
             notification_topic_name=notification_topic_name,
             deprecated=deprecated,
             online_config=online_config,
+            storage_connector=storage_connector,
+            path=path,
         )
 
         self._feature_store_name = featurestore_name
@@ -3809,7 +3866,6 @@ def __init__(
         self._creator = user.User.from_response_json(creator)
         self._query = query
         self._data_format = data_format.upper() if data_format else None
-        self._path = path
 
         self._features = [
             feature.Feature.from_response_json(feat) if isinstance(feat, dict) else feat
@@ -3850,12 +3906,6 @@ def __init__(
             self._features = features
             self._options = options or {}
 
-        if storage_connector is not None and isinstance(storage_connector, dict):
-            self._storage_connector = sc.StorageConnector.from_response_json(
-                storage_connector
-            )
-        else:
-            self._storage_connector: "sc.StorageConnector" = storage_connector
         self._vector_db_client: Optional["VectorDbClient"] = None
         self._href: Optional[str] = href
 
@@ -4245,18 +4295,10 @@ def query(self) -> Optional[str]:
     def data_format(self) -> Optional[str]:
         return self._data_format
 
-    @property
-    def path(self) -> Optional[str]:
-        return self._path
-
     @property
     def options(self) -> Optional[Dict[str, Any]]:
         return self._options
 
-    @property
-    def storage_connector(self) -> "sc.StorageConnector":
-        return self._storage_connector
-
     @property
     def creator(self) -> Optional["user.User"]:
         return self._creator
diff --git a/python/hsfs/feature_store.py b/python/hsfs/feature_store.py
index 2a384c961..d5c85cc52 100644
--- a/python/hsfs/feature_store.py
+++ b/python/hsfs/feature_store.py
@@ -524,6 +524,8 @@ def create_feature_group(
             ]
         ] = None,
         offline_backfill_every_hr: Optional[Union[int, str]] = None,
+        storage_connector: Union[storage_connector.StorageConnector, Dict[str, Any]] = None,
+        path: Optional[str] = None,
     ) -> feature_group.FeatureGroup:
         """Create a feature group metadata object.
 
@@ -628,6 +630,10 @@ def plus_two(value):
                 periodically. The value can be either an integer representing the number of hours between each run
                 or a string representing a cron expression. Set the value to None to avoid scheduling the materialization
                 job. Defaults to None (i.e no scheduling).
+            storage_connector: the storage connector to use to establish connectivity
+                with the data source.
+            path: The location within the scope of the storage connector, from where to read
+                the data for the external feature group
 
         # Returns
             `FeatureGroup`. The feature group metadata object.
@@ -655,6 +661,8 @@ def plus_two(value):
             transformation_functions=transformation_functions,
             online_config=online_config,
             offline_backfill_every_hr=offline_backfill_every_hr,
+            storage_connector=storage_connector,
+            path=path,
         )
         feature_group_object.feature_store = self
         return feature_group_object
@@ -689,6 +697,8 @@ def get_or_create_feature_group(
         ] = None,
         online_config: Optional[Union[OnlineConfig, Dict[str, Any]]] = None,
         offline_backfill_every_hr: Optional[Union[int, str]] = None,
+        storage_connector: Union[storage_connector.StorageConnector, Dict[str, Any]] = None,
+        path: Optional[str] = None,
     ) -> Union[
         feature_group.FeatureGroup,
         feature_group.ExternalFeatureGroup,
@@ -783,6 +793,10 @@ def get_or_create_feature_group(
                 periodically. The value can be either an integer representing the number of hours between each run
                 or a string representing a cron expression. Set the value to None to avoid scheduling the materialization
                 job. Defaults to None (i.e no automatic scheduling). Applies only on Feature Group creation.
+            storage_connector: the storage connector to use to establish connectivity
+                with the data source.
+            path: The location within the scope of the storage connector, from where to read
+                the data for the external feature group
 
         # Returns
             `FeatureGroup`. The feature group metadata object.
@@ -819,6 +833,8 @@ def get_or_create_feature_group(
                     transformation_functions=transformation_functions,
                     online_config=online_config,
                     offline_backfill_every_hr=offline_backfill_every_hr,
+                    storage_connector=storage_connector,
+                    path=path,
                 )
                 feature_group_object.feature_store = self
                 return feature_group_object
diff --git a/python/hsfs/storage_connector.py b/python/hsfs/storage_connector.py
index 3618e2e68..c7ba70dc2 100644
--- a/python/hsfs/storage_connector.py
+++ b/python/hsfs/storage_connector.py
@@ -134,6 +134,12 @@ def description(self) -> Optional[str]:
     def spark_options(self) -> None:
         pass
 
+    def prepare_spark(self, path: Optional[str] = None) -> Optional[str]:
+        _logger.info(
+            "This Storage Connector cannot be prepare for Spark."
+        )
+        return path
+
     def read(
         self,
         query: Optional[str] = None,
@@ -270,6 +276,7 @@ def __init__(
         server_encryption_algorithm: Optional[str] = None,
         server_encryption_key: Optional[str] = None,
         bucket: Optional[str] = None,
+        region: Optional[str] = None,
         session_token: Optional[str] = None,
         iam_role: Optional[str] = None,
         arguments: Optional[Dict[str, Any]] = None,
@@ -283,6 +290,7 @@ def __init__(
         self._server_encryption_algorithm = server_encryption_algorithm
         self._server_encryption_key = server_encryption_key
         self._bucket = bucket
+        self._region = region
         self._session_token = session_token
         self._iam_role = iam_role
         self._arguments = (
@@ -314,6 +322,11 @@ def bucket(self) -> Optional[str]:
         """Return the bucket for S3 connectors."""
         return self._bucket
 
+    @property
+    def region(self) -> Optional[str]:
+        """Return the region for S3 connectors."""
+        return self._region
+
     @property
     def session_token(self) -> Optional[str]:
         """Session token."""
@@ -393,7 +406,7 @@ def read(
             if options is not None
             else self.spark_options()
         )
-        if not path.startswith("s3://"):
+        if not path.startswith(("s3://", "s3a://")):
             path = self._get_path(path)
             print(
                 "Prepending default bucket specified on connector, final path: {}".format(
diff --git a/python/tests/engine/test_spark.py b/python/tests/engine/test_spark.py
index 3c7fa999c..488232dee 100644
--- a/python/tests/engine/test_spark.py
+++ b/python/tests/engine/test_spark.py
@@ -174,43 +174,6 @@ def test_register_external_temporary_table(self, mocker):
         # Arrange
         mocker.patch("hopsworks_common.client.get_instance")
         mock_sc_read = mocker.patch("hsfs.storage_connector.JdbcConnector.read")
-        mock_pyspark_getOrCreate = mocker.patch(
-            "pyspark.sql.session.SparkSession.builder.getOrCreate"
-        )
-
-        spark_engine = spark.Engine()
-
-        jdbc_connector = storage_connector.JdbcConnector(
-            id=1,
-            name="test_connector",
-            featurestore_id=1,
-            connection_string="",
-            arguments="",
-        )
-
-        external_fg = feature_group.ExternalFeatureGroup(
-            storage_connector=jdbc_connector, id=10
-        )
-
-        # Act
-        spark_engine.register_external_temporary_table(
-            external_fg=external_fg,
-            alias=None,
-        )
-
-        # Assert
-        assert (
-            mock_pyspark_getOrCreate.return_value.sparkContext.textFile.call_count == 0
-        )
-        assert mock_sc_read.return_value.createOrReplaceTempView.call_count == 1
-
-    def test_register_external_temporary_table_external_fg_location(self, mocker):
-        # Arrange
-        mocker.patch("hopsworks_common.client.get_instance")
-        mock_sc_read = mocker.patch("hsfs.storage_connector.JdbcConnector.read")
-        mock_pyspark_getOrCreate = mocker.patch(
-            "pyspark.sql.session.SparkSession.builder.getOrCreate"
-        )
 
         spark_engine = spark.Engine()
 
@@ -233,9 +196,6 @@ def test_register_external_temporary_table_external_fg_location(self, mocker):
         )
 
         # Assert
-        assert (
-            mock_pyspark_getOrCreate.return_value.sparkContext.textFile.call_count == 1
-        )
         assert mock_sc_read.return_value.createOrReplaceTempView.call_count == 1
 
     def test_register_hudi_temporary_table(self, mocker):
@@ -4396,11 +4356,11 @@ def test_setup_s3_hadoop_conf_legacy(self, mocker):
         # Act
         result = spark_engine._setup_s3_hadoop_conf(
             storage_connector=s3_connector,
-            path="s3_test_path",
+            path="s3://_test_path",
         )
 
         # Assert
-        assert result == "s3a_test_path"
+        assert result == "s3a://_test_path"
         assert (
             mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.call_count
             == 14
@@ -4453,11 +4413,11 @@ def test_setup_s3_hadoop_conf_bucket_scope(self, mocker):
         # Act
         result = spark_engine._setup_s3_hadoop_conf(
             storage_connector=s3_connector,
-            path="s3_test_path",
+            path="s3://_test_path",
         )
 
         # Assert
-        assert result == "s3a_test_path"
+        assert result == "s3a://_test_path"
         assert (
             mock_pyspark_getOrCreate.return_value.sparkContext._jsc.hadoopConfiguration.return_value.set.call_count
             == 14
@@ -4559,7 +4519,9 @@ def test_save_empty_dataframe(self, mocker):
         mock_spark_engine_save_dataframe = mocker.patch(
             "hsfs.engine.spark.Engine.save_dataframe"
         )
-        mock_spark_table = mocker.patch("pyspark.sql.session.SparkSession.table")
+        mock_spark_read = mocker.patch("pyspark.sql.SparkSession.read")
+        mock_format = mocker.Mock()
+        mock_spark_read.format.return_value = mock_format
 
         # Arrange
         spark_engine = spark.Engine()
@@ -4579,7 +4541,7 @@ def test_save_empty_dataframe(self, mocker):
 
         # Assert
         assert mock_spark_engine_save_dataframe.call_count == 1
-        assert mock_spark_table.call_count == 1
+        assert mock_spark_read.format.call_count == 1
 
     def test_apply_transformation_function_single_output(self, mocker):
         # Arrange
diff --git a/utils/python/hsfs_utils.py b/utils/python/hsfs_utils.py
index 5af468873..4e73f1604 100644
--- a/utils/python/hsfs_utils.py
+++ b/utils/python/hsfs_utils.py
@@ -247,6 +247,18 @@ def run_feature_monitoring(job_conf: Dict[str, str]) -> None:
         raise e
 
 
+def clean_fg(spark: SparkSession, job_conf: Dict[Any, Any]) -> None:
+    """
+    Run clean on a feature group.
+    """
+    feature_store = job_conf.pop("feature_store")
+    fs = get_feature_store_handle(feature_store)
+
+    entity = fs.get_feature_group(name=job_conf["name"], version=job_conf["version"])
+
+    entity.clean()
+
+
 if __name__ == "__main__":
     # Setup spark first so it fails faster in case of args errors
     # Otherwise the resource manager will wait until the spark application master
@@ -265,6 +277,7 @@ def run_feature_monitoring(job_conf: Dict[str, str]) -> None:
             "ge_validate",
             "import_fg",
             "run_feature_monitoring",
+            "clean_fg",
         ],
         help="Operation type",
     )
@@ -303,6 +316,8 @@ def parse_isoformat_date(da: str) -> datetime:
             import_fg(job_conf)
         elif args.op == "run_feature_monitoring":
             run_feature_monitoring(job_conf)
+        elif args.op == "clean_fg":
+            clean_fg(spark, job_conf)
 
         success = True
     except Exception: